Wednesday, March 14, 2012

SimpleAggregator

Since I've started the more user-friendly approach with the ordered index, that gave me ideas for the aggregation too. After all, even though the manual aggregation gives the flexibility, it's too much work for the simple cases. And here we go, I've added the SimpleAggregator to make things easier. Here is the same example as before written with the SimpleAggregator:

my $uTrades = Triceps::Unit->new("uTrades") or die "$!";

# the input data
my $rtTrade = Triceps::RowType->new(
  id => "int32", # trade unique id
  symbol => "string", # symbol traded
  price => "float64",
  size => "float64", # number of shares traded
) or die "$!";

my $ttWindow = Triceps::TableType->new($rtTrade)
  ->addSubIndex("byId", 
    Triceps::IndexType->newHashed(key => [ "id" ])
  ) 
  ->addSubIndex("bySymbol", 
    Triceps::IndexType->newHashed(key => [ "symbol" ])
    ->addSubIndex("last2",
      Triceps::IndexType->newFifo(limit => 2)
    )   
  ) 
or die "$!";

# the aggregation result
my $rtAvgPrice;
my $compText; # for debugging

Triceps::SimpleAggregator::make(
  tabType => $ttWindow,
  name => "aggrAvgPrice",
  idxPath => [ "bySymbol", "last2" ],
  result => [ 
    symbol => "string", "last", sub {$_[0]->get("symbol");},
    id => "int32", "last", sub {$_[0]->get("id");},
    price => "float64", "avg", sub {$_[0]->get("price");},
  ],
  saveRowTypeTo => \$rtAvgPrice,
  saveComputeTo => \$compText,
) or die "$!";

$ttWindow->initialize() or die "$!";
my $tWindow = $uTrades->makeTable($ttWindow,
  &Triceps::EM_CALL, "tWindow") or die "$!";

# label to print the result of aggregation
my $lbAverage = $uTrades->makeLabel($rtAvgPrice, "lbAverage",
  undef, sub { # (label, rowop)
    print($_[1]->printP(), "\n");
  }) or die "$!";
$tWindow->getAggregatorLabel("aggrAvgPrice")->chain($lbAverage)
  or die "$!";

while(<STDIN>) {
  chomp;
  my @data = split(/,/); # starts with a string opcode
  $uTrades->makeArrayCall($tWindow->getInputLabel(), @data)
    or die "$!";
  $uTrades->drainFrame(); # just in case, for completeness
}

The main loop and the printing is the same as before. The result produced is also exactly the same as before.

But the aggregator is created with Triceps::SimpleAggregator::make(). Its arguments are in the option format: the option name-value pairs, in any order. Most of these "options" are actually mandatory. The aggregator type is connected to the table type with the options:

tabType - table type to put the aggregator on

idxPath - an a reference to an array of index names, forming the path to the index where the aggregator type will be set

name - the aggregator type name

The result row type and computation is defined with the option "result": each group of four values in that array defines one result field: the field name, its type, the aggregation function name, and a closure that extracts the aggregation function argument from the row (well, it can be any function reference, doesn't have to be an anonymous closure). That function gets the row as the argument $_[0] and returns the extracted value to run the aggregation on. The field name is by convention separated from its definition fields by "=>". Remember, it's just a convention, for Perl a "=>" is just as good as a comma.

SimpleAggregator::make() automatically generates the  result row type and aggregation function, creates an aggregator type from them, and sets it on the index type. It returns back the table type on success,  but for this example it was good enough to check the result for undef. The information about the aggregation result can be found by traversing through the index type tree, or by constructing a table and getting the row type from the aggregator result label. However it's much easier to save it during construction, and the option (this time an optional one!) "saveRowTypeTo" allows to do this. Give it a reference to a variable, and the row type will be placed into that variable.

Most of the time the things would just work. However if they don't and something dies in the aggregator, you will need the source code of the compute function to make sense of these errors. The option "saveComputeTo" gives a variable to save that source code for future perusal and other entertainment. Here is what gets produced by this example:

sub {
  use strict;
  my ($table, $context, $aggop, $opcode, $rh, $state, @args) = @_;
  return if ($context->groupSize()==0 || $opcode == &Triceps::OP_NOP);
  my $v2_count = 0;
  my $v2_sum = 0;
  my $npos = 0;
  for (my $rhi = $context->begin(); !$rhi->isNull(); $rhi = $context->next($rhi)) {
    my $row = $rhi->getRow();
    # field price=avg
    my $a2 = $args[2]($row);
    { if (defined $a2) { $v2_sum += $a2; $v2_count++; }; }
    $npos++;
  }
  my $rowLast = $context->last()->getRow();
  my $l0 = $args[0]($rowLast);
  my $l1 = $args[1]($rowLast);
  $context->makeArraySend($opcode,
    ($l0), # symbol
    ($l1), # id
    (($v2_count == 0? undef : $v2_sum / $v2_count)), # price
  );
}

At the moment the compute function is quite straightforward and just does the aggregation from scratch every time. It's only smart enough to skip the iteration if all the result consists of only "first", "last" and "count_star". It gets the closures for the argument extraction as arguments.

The aggregation functions available at the moment are:

  • first - value from the first row in the group.
  • last - value from the last row in the group.
  • count_star - number of rows in the group, like SQL COUNT(*). Since there is no argument for this function, use undef instead of the argument closure.
  • sum - sum of the values.
  • max - the maximum value.
  • min - the minimum value.
  • avg - the average of all the non-null values.
  • avg_perl - the average of all values, with null values treated in Perl fashion as zeroes. So, technically when this example used "avg", it works the same as the previous versions only for the non-null fields. To be really the same, it should have used "avg_perl".
  • nth_simple - the Nth value from the start of the group. This is a tricky function because it needs two arguments: the value of N and the field selector. Multiple direct arguments will be supported in the future but right now it works through a workaround: the argument closure must return not just the extracted field but a reference to array with two values, the N and the field. For example, sub { [1, $_[0]->get("id")];}. The N is counted starting from 0, so the value of 1 will return the second record. This function works in a fairly simple-minded and inefficient way at the moment.
There will be more functions to come, and you can even already add your own. More on that in the next installment.

I can think of many ways the SimpleAggregator can be improved, but for now they have been pushed into the future to keep it simple.

No comments:

Post a Comment