Thursday, April 11, 2013

Multithreaded pipeline, part 4

Let's look at the aggregation by the hour. First, the short version that skips over the actual logic and concentrates on how the nexuses are connected.

sub RawToHourlyMain # (@opts)
{
  my $opts = {};
  Triceps::Opt::parse("traffic main", $opts, {
    @Triceps::Triead::opts,
    from => [ undef, \&Triceps::Opt::ck_mandatory ],
  }, @_);
  my $owner = $opts->{owner};
  my $unit = $owner->unit();

  my $faIn = $owner->importNexus(
    from => $opts->{from},
    as => "input",
    import => "reader",
  );

  # ... create the table and aggregation ...

  my $faOut = $owner->makeNexus(
    name => "data",
    labels => [
      $faIn->getFnReturn()->getLabelHash(),
      hourly => $lbHourlyFiltered,
    ],
    import => "writer",
  );

  # ... connect the input nexus to the table ...
  # ... create the table dump logic ...

  $owner->readyReady();
  $owner->mainLoop(); # all driven by the reader
}

This function inherits the options from Triead::start() as usual and adds the option from of its own. This option's value is then used as the name of nexus to import for reading. The row types of the labels from that imported facet are then used to create the table and aggregation. But they aren't connected to the input labels yet.

First, the output nexus is created. The creation passes through all the incoming data, short-circuiting the input and output, and adds the extra label for the aggregated output. After that the rest of the logic can be connected to the inputs (and to the outputs too).

The reason why this connection order is important is that the labels get caller in the order they are chained from the input label. And when this thread reacts to some event, we want the original event to pass through to the output first and then send the reaction to it.

And after that it's all usual readyReady() and mainLoop().

The full text of the function follows. The logic is based on the previous example from the chapter 13, and the only big change is the use of SimpleAggergator instead of a manually-built one. The HourlyToDailyMain() is very similar, so I won't even show it, you can find the full text in SVN.

# compute an hour-rounded timestamp (in microseconds)
sub hourStamp # (time)
{
  return $_[0]  - ($_[0] % (1000*1000*3600));
}

sub RawToHourlyMain # (@opts)
{
  my $opts = {};
  Triceps::Opt::parse("traffic main", $opts, {
    @Triceps::Triead::opts,
    from => [ undef, \&Triceps::Opt::ck_mandatory ],
  }, @_);
  my $owner = $opts->{owner};
  my $unit = $owner->unit();

  # The current hour stamp that keeps being updated;
  # any aggregated data will be propagated when it is in the
  # current hour (to avoid the propagation of the aggregator clearing).
  my $currentHour;

  my $faIn = $owner->importNexus(
    from => $opts->{from},
    as => "input",
    import => "reader",
  );

  # the full stats for the recent time
  my $ttPackets = Triceps::TableType->new($faIn->getLabel("packet")->getRowType())
    ->addSubIndex("byHour",
      Triceps::IndexType->newPerlSorted("byHour", undef, sub {
        return &hourStamp($_[0]->get("time")) <=> &hourStamp($_[1]->get("time"));
      })
      ->addSubIndex("byIP",
        Triceps::IndexType->newHashed(key => [ "local_ip", "remote_ip" ])
        ->addSubIndex("group",
          Triceps::IndexType->newFifo()
        )
      )
    )
  or confess "$!";

  # type for a periodic summary, used for hourly, daily etc. updates
  my $rtSummary;
  Triceps::SimpleAggregator::make(
    tabType => $ttPackets,
    name => "hourly",
    idxPath => [ "byHour", "byIP", "group" ],
    result => [
      # time period's (here hour's) start timestamp, microseconds
      time => "int64", "last", sub {&hourStamp($_[0]->get("time"));},
      local_ip => "string", "last", sub {$_[0]->get("local_ip");},
      remote_ip => "string", "last", sub {$_[0]->get("remote_ip");},
      # bytes sent in a time period, here an hour
      bytes => "int64", "sum", sub {$_[0]->get("bytes");},
    ],
    saveRowTypeTo => \$rtSummary,
  );

  $ttPackets->initialize() or confess "$!";
  my $tPackets = $unit->makeTable($ttPackets,
    &Triceps::EM_CALL, "tPackets") or confess "$!";

  # Filter the aggregator output to match the current hour.
  my $lbHourlyFiltered = $unit->makeDummyLabel($rtSummary, "hourlyFiltered");
  $tPackets->getAggregatorLabel("hourly")->makeChained("hourlyFilter", undef, sub {
    if ($_[1]->getRow()->get("time") == $currentHour) {
      $unit->call($lbHourlyFiltered->adopt($_[1]));
    }
  });

  # It's important to connect the pass-through data first,
  # before chaining anything to the labels of the faIn, to
  # make sure that any requests and raw inputs get through before
  # our reactions to them.
  my $faOut = $owner->makeNexus(
    name => "data",
    labels => [
      $faIn->getFnReturn()->getLabelHash(),
      hourly => $lbHourlyFiltered,
    ],
    import => "writer",
  );

  my $lbPrint = $faOut->getLabel("print");

  # update the notion of the current hour before the table
  $faIn->getLabel("packet")->makeChained("processPackets", undef, sub {
    my $row = $_[1]->getRow();
    $currentHour = &hourStamp($row->get("time"));
    # skip the timestamp updates without data
    if (defined $row->get("bytes")) {
      $unit->call($tPackets->getInputLabel()->adopt($_[1]));
    }
  });

  # the dump request processing
  $tPackets->getDumpLabel()->makeChained("printDump", undef, sub {
    $unit->makeArrayCall($lbPrint, "OP_INSERT", $_[1]->getRow()->printP() . "\n");
  });
  $faIn->getLabel("dumprq")->makeChained("dump", undef, sub {
    if ($_[1]->getRow()->get("what") eq "packets") {
      $tPackets->dumpAll();
    }
  });

  $owner->readyReady();
  $owner->mainLoop(); # all driven by the reader
}

No comments:

Post a Comment