Thursday, April 11, 2013

Multithreaded pipeline, part 1

The Perl API for the threads is pretty much done now, time for the examples.

The first one re-does a variation of already shown example, the traffic data aggregation from the Chapter 13. The short recap is that it gets the data for each network packet going through and keeps it for a some time, aggregates the data by the hour and keeps it for a longer time, and aggregates it by the day and keeps for a longer time yet. This multi-stage computation naturally matches the pipeline approach.

Since this new example highlights different features than the one in chapter 13, I've changed it logic a little too: it updates both the hourly and daily summaries on every packet received. And I didn't bother to implement the part with the automatic cleaning of the old data, it doesn't add anything to the pipeline works.

The pipeline topologies are quite convenient for working with the threads. The parallel computations create a possibility of things happening in an unpredictable order and producing unpredictable results. The pipeline topology allows the parallelism and at the same time also keeps the data in the same predictable order, with no possibility of rows overtaking each other.

The computation can be split into the following threads:

  • read the input, convert and send the data into the model
  • store the recent data and aggregate it by the hour
  • store the hourly data and aggregate it by the day
  • store the daily data
  • get the data at the end of the pipeline and print it

Technically, each next stage only needs the data from the previous stage, but to get the updates to the printing stage, they all go all the way through.

Dumping the contents of the tables also requires some special support. Each table is local to its thread and can't be access from the other threads. To dump its contents, the dump request needs to be sent to its thread, which would extract the data and send it through. There are multiple ways to deal with the dump results. One is to have a special label for each table's dump and propagate it to the last stage to print. If all that is needed is text, one label that allows to send strings is good enough, all the dumps can send the data converted to text into it, and it would go all the way through the pipeline. For this example I've picked the last approach.

And now is time to show some code. The main part goes like this:

Triceps::Triead::startHere(
  app => "traffic",
  thread => "print",
  main => \&PrintMain,
);


The startHere() creates an App and starts a Triead in the current OS thread. "traffic" is the app name, "print" the thread name. This thread will be the end of the pipeline, and it will create the rest of the threads. This is a convenient pattern when the results of the model need to be fed back to the current thread, and it works out very conveniently for the unit tests. PrintMain() is the body function of this printing thread:

sub PrintMain # (@opts)
{
  my $opts = {};
  Triceps::Opt::parse("traffic main", $opts, {@Triceps::Triead::opts}, @_);
  my $owner = $opts->{owner};
  my $unit = $owner->unit();

  Triceps::Triead::start(
    app => $opts->{app},
    thread => "read",
    main => \&ReaderMain,
  );
  Triceps::Triead::start(
    app => $opts->{app},
    thread => "raw_hour",
    main => \&RawToHourlyMain,
    from => "read/data",
  );
  Triceps::Triead::start(
    app => $opts->{app},
    thread => "hour_day",
    main => \&HourlyToDailyMain,
    from => "raw_hour/data",
  );
  Triceps::Triead::start(
    app => $opts->{app},
    thread => "day",
    main => \&StoreDailyMain,
    from => "hour_day/data",
  );

  my $faIn = $owner->importNexus(
    from => "day/data",
    as => "input",
    import => "reader",
  );

  $faIn->getLabel("print")->makeChained("print", undef, sub {
    print($_[1]->getRow()->get("text"));
  });
  for my $tag ("packet", "hourly", "daily") {
    makePrintLabel($tag, $faIn->getLabel($tag));
  }

  $owner->readyReady();
  $owner->mainLoop(); # all driven by the reader
}

startHere() accepts a number of fixed options plus arbitrary options that itself doesn't care about but passes to the thread's main function, which are then the responsibility of the main function to parse. To reiterate, the main function gets all the options from the call of startHere(), both these that startHere() parses and these that it simply passes through. startHere() also adds one more option on its own: owner containing the TrieadOwner object that the thread uses to communicate with the rest of the App.

In this case PrintMain() doesn't have any extra options on its own, it's just happy to get startHere()'s standard set that it takes all together from @Triceps::Triead::opts.

It gets the TrieadOwner object $owner from the option appended by startHere(). Each TrieadOwner is created with its own Unit, so the unit is obtained from it to create the thread's model in it. Incidentally, the TrieadOwner  also acts as a clearing trigger object for the Unit, so when the TrieadOwner is destroyed, it properly clears the Unit.

Then it goes and creates all the threads of the pipeline. The start() works very much like startHere(), only it actually creates a new thread and starts the main function in it. The main function can be the same whether it runs through start() or startHere(). The special catch is that the options to start() must contain only the plain Perl values, not Triceps objects. It has to do with how Perl works with threads: it makes a copy of every value for the new thread, and it cant's copy the XS objects, so they simply become undefined in the new thread.

All but the first thread in the pipeline have the extra option from: it tells the input nexus for this thread, and each thread creates an output nexus "data". As mentioned before, the nexus namespaces are per thread that created it, so when the option from says "read/data", it's the nexus "data" created by the thread "read".

So, the pipeline gets all connected sequentially until eventually PrintMain() imports the nexus at its tail. importNexus() returns a facet, which is the thread's API to the nexus. A facet looks very much like an FnReturn for most purposes, with a few additions. It even has a real FnReturn in it, and you work with the labels of that FnReturn. The option as of importNexus gives the name to the facet and to its same-named FnReturn (without it the facet would be named the same as the short name of the nexus, in this case "data"). The option import tells whether this thread will be reading or writing to the nexus, and in this case it's reading.

By the time the pipeline gets to the last stage, it connects a few label:

  • print - carries the direct text lines to print in its field "text", and its contents gets printed
  • dumprq - carries the dump requests to the tables, so the printing thread doesn't care about it
  • packet - carries the raw data about the packets
  • hourly - carries the hourly summaries
  • daily - carries the daily summaries

The last three get also printed but this time as whole rows.

And after everything is connected, the thread both tells that it's ready and waits for all the other threads to be ready by calling readyReady(). Then its the run time, and mainLoop() takes care of it: until it's told to shutdown, it keeps reading data from the nexus and processes it. The shutdown will be controlled by the file reading thread at the start of the pipeline.The processing is done by getting the rowops from the nexus and calling them on the appropriate label in the facet, which then calls the the labels chained from it, and that gets all the rest of the thread's model running.

No comments:

Post a Comment