Thursday, April 11, 2013

Multithreaded pipeline, part 2

The reader thread drives the pipeline:

sub ReaderMain # (@opts)
{
  my $opts = {};
  Triceps::Opt::parse("traffic main", $opts, {@Triceps::Triead::opts}, @_);
  my $owner = $opts->{owner};
  my $unit = $owner->unit();

  my $rtPacket = Triceps::RowType->new(
    time => "int64", # packet's timestamp, microseconds
    local_ip => "string", # string to make easier to read
    remote_ip => "string", # string to make easier to read
    local_port => "int32",
    remote_port => "int32",
    bytes => "int32", # size of the packet
  ) or confess "$!";

  my $rtPrint = Triceps::RowType->new(
    text => "string", # the text to print (including \n)
  ) or confess "$!";

  my $rtDumprq = Triceps::RowType->new(
    what => "string", # identifies, what to dump
  ) or confess "$!";

  my $faOut = $owner->makeNexus(
    name => "data",
    labels => [
      packet => $rtPacket,
      print => $rtPrint,
      dumprq => $rtDumprq,
    ],
    import => "writer",
  );

  my $lbPacket = $faOut->getLabel("packet");
  my $lbPrint = $faOut->getLabel("print");
  my $lbDumprq = $faOut->getLabel("dumprq");

  $owner->readyReady();

  while(<STDIN>) {
    chomp;
    # print the input line, as a debugging exercise
    $unit->makeArrayCall($lbPrint, "OP_INSERT", "! $_\n");

    my @data = split(/,/); # starts with a command, then string opcode
    my $type = shift @data;
    if ($type eq "new") {
      $unit->makeArrayCall($lbPacket, @data);
    } elsif ($type eq "dump") {
      $unit->makeArrayCall($lbDumprq, "OP_INSERT", $data[0]);
    } else {
      $unit->makeArrayCall($lbPrint, "OP_INSERT", "Unknown command '$type'\n");
    }
    $owner->flushWriters();
  }

  {
    # drain the pipeline before shutting down
    my $ad = Triceps::AutoDrain::makeShared($owner);
    $owner->app()->shutdown();
  }
}

It starts by creating the nexus with the initial set of the labels: for the data about the network packets, for the lines to be printed at the end of the pipeline and for the dump requests to the tables in the other threads. It gets exported for the other threads to import, and also imported right back into this thread, for writing. And then the setup is done, readyReady() is called, and the processing starts.

It reads the CSV lines, splits them, makes a decision if it's a data line or dump request, and one way or the other sends it into the nexus. The data sent to a facet doesn't get immediately forwarded to the nexus. It's collected internally in a tray, and then flushWriters() sends it on. The mainLoop() shown in the last post calls flushWriters() automatically after every tray it processes from the input. But when reading from a file you've got to do it yourself. Of course, it's more efficient to send through multiple rows at once, so a smarter implementation would check if multiple lines are available from the file and send them in larger bundles.

The last part is the shutdown. After the end of file is reached, it's time to shut down the application. You can't just shut down it right away because there still might be data in the pipeline, and if you shut it down, that data will be lost. The right way is to drain the pipeline first, and then do the shutdown when the app is drained. AutoDrain::makeShared() creates a scoped drain: the drain request for all the threads is started when this object is created, and the object construction completes when the drain succeeds. When the object is destroyed, that lifts the drain. So in this case the drain succeeds and then the app gets shut down.

The shutdown causes the mainLoop() calls in all the other threads to return, and the threads to exit. Then startHere() in the first thread has the special logic in it that joins all the started threads after its own main function returns and before it completes. After that the script continues on its way and is free to exit.

No comments:

Post a Comment