Friday, March 30, 2012

Collapsed updates

Sometimes the exact sequence of how a row at a particular key was updated does not matter, the only interesting part is the end result. Like the OUTPUT EVERY statement in CCL or the pulsed subscription in Aleri. It doesn't have to be time-driven either: if the data comes in as batches, it makes sense to collapse the modifications from the whole batch into one, and send it at the end of the batch.

To do this in Triceps, I've made a template. Here is an example of its use with interspersed comments:

my $unit = Triceps::Unit->new("unit") or die "$!";

our $rtData = Triceps::RowType->new(
  # mostly copied from the traffic aggregation example
  local_ip => "string",
  remote_ip => "string",
  bytes => "int64",
) or die "$!";

The meaning of the rows is not particularly important for this example.  It just uses a pair of the IP addresses as the collapse key. The collapse absolutely needs a primary key, since it has to track and collapse multiple updates to the same row.

my $collapse = Triceps::Collapse->new(
  unit => $unit,
  name => "collapse",
  data => [
    name => "idata",
    rowType => $rtData,
    key => [ "local_ip", "remote_ip" ],
  ],
) or die "$!";

Most of the options are self-explanatory. The dataset is defined with nested options to make the API extensible, to allow multiple datasets to be defined in the future. But at the moment only one is allowed. A dataset collapses the data at one label: an input label and an output label get defined for it, just as for the table. The data arrives at the input label, gets collapsed by the primary key, and then stays in the Collapse until the flush. When the Collapse gets flushed, the data is sent out of its output label. After the flush, the Collapse has no data it, and starts collecting the updates again from scratch. The labels gets named by connecting the names of the Collapse element, of the dataset, and "in" or "out". For this Collapse, the label names will be "collapse.idata.in" and "collapse.idata.out".

Note that the dataset options are specified in a referenced array, not a hash! If you try to use a hash, it will fail. When specifying the dataset options, put the "name" first. It's used  in the error messages about any issues in the dataset, and the code really expects the name to go first.

my $lbPrint = makePrintLabel("print", $collapse->getOutputLabel("idata"));

To print the result, a print label is created in this example in the same way as in the previous ones. The print label gets connected to the Collapse's output label. The method to get the collapse's output label is very much like table's. Only it gets the dataset name as an argument.

sub mainloop($$$) # ($unit, $datalabel, $collapse)
{
  my $unit = shift;
  my $datalabel = shift;
  my $collapse = shift;
  while(<STDIN>) {
    chomp;
    my @data = split(/,/); # starts with a command, then string opcode
    my $type = shift @data;
    if ($type eq "data") {
      my $rowop = $datalabel->makeRowopArray(@data)
        or die "$!";
      $unit->call($rowop) or die "$!";
      $unit->drainFrame(); # just in case, for completeness
    } elsif ($type eq "flush") {
      $collapse->flush();
    }
  }
}

&mainloop($unit, $collapse->getInputLabel("idata"), $collapse);

There will be a second example, so I've placed the main look into a function. It works in the same way as in the examples before: extracts the data from the CSV format and sends it to a label. The first column is used as a command: "data" sends the data, and "flush" performs the flush from the Collapse. The flush marks the end of the batch. Here is an example of a run, with the input lines shown as usual in italics:

data,OP_INSERT,1.2.3.4,6.7.8.9,1000
data,OP_DELETE,1.2.3.4,6.7.8.9,1000
flush
collapse.idata.out OP_INSERT local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="100" 
data,OP_DELETE,1.2.3.4,5.6.7.8,100
data,OP_INSERT,1.2.3.4,5.6.7.8,200
data,OP_INSERT,1.2.3.4,6.7.8.9,2000
flush
collapse.idata.out OP_DELETE local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="100" 
collapse.idata.out OP_INSERT local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="200" 
collapse.idata.out OP_INSERT local_ip="1.2.3.4" remote_ip="6.7.8.9" bytes="2000" 
data,OP_DELETE,1.2.3.4,6.7.8.9,2000
data,OP_INSERT,1.2.3.4,6.7.8.9,3000
data,OP_DELETE,1.2.3.4,6.7.8.9,3000
data,OP_INSERT,1.2.3.4,6.7.8.9,4000
data,OP_DELETE,1.2.3.4,6.7.8.9,4000
flush
collapse.idata.out OP_DELETE local_ip="1.2.3.4" remote_ip="6.7.8.9" bytes="2000" 

You can trace and make sure that the flushed data is the cumulative result of the data that went it.

The Collapse also allows to specify the row type and the input connection for a dataset in a different way:

my $lbInput = $unit->makeDummyLabel($rtData, "lbInput");

my $collapse = Triceps::Collapse->new(
  unit => $unit,
  name => "collapse",
  data => [
    name => "idata",
    fromLabel => $lbInput,
    key => [ "local_ip", "remote_ip" ],
  ],
) or die "$!";

&mainloop($unit, $lbInput, $collapse);

Normally $lbInput would be not a dummy label but the output label of some element. The option "fromLabel" tells that the dataset input will be coming from that label. So the Collapse can automatically both copy its row type for the dataset, and also chain the dataset's input label to that label. It's a pure convenience, allowing to skip the manual steps. In the future it should probably take a whole list of source labels and chain itself to all of them, but for now only one.

This example produces exactly the same output as the previous one, so there is no use in copying it again.

For the last item that hasn't been shown yet, you can get the list of dataset names (well, currently only one name):

@names = $collapse->getDatasets();

And the very last thing to tell about the use of Collapse, when something goes wrong, it will die (and confess). No need to follow its methods with "or die".

No comments:

Post a Comment