Saturday, February 18, 2012

The proper aggregation, part1

Since the manual aggregation is error-prone, Triceps can manage it for you and do it right. The only thing you need to do is do the actual iteration and computation. Here is the rewrite of the same example with a Triceps aggregator:

my $uTrades = Triceps::Unit->new("uTrades") or die "$!";

# the input data
my $rtTrade = Triceps::RowType->new(
  id => "int32", # trade unique id
  symbol => "string", # symbol traded
  price => "float64",
  size => "float64", # number of shares traded
) or die "$!";

# the aggregation result
my $rtAvgPrice = Triceps::RowType->new(
  symbol => "string", # symbol traded
  id => "int32", # last trade's id
  price => "float64", # avg price of the last 2 trades
) or die "$!";

# aggregation handler: recalculate the average each time the easy way
sub computeAverage # (table, context, aggop, opcode, rh, state, args...)
  my ($table, $context, $aggop, $opcode, $rh, $state, @args) = @_;

  # don't send the NULL record after the group becomes empty
  return if ($context->groupSize()==0
    || $opcode == &Triceps::OP_NOP);

  my $sum = 0;
  my $count = 0;
  for (my $rhi = $context->begin(); !$rhi->isNull();
      $rhi = $context->next($rhi)) {
    $sum += $rhi->getRow()->get("price");
  my $rLast = $context->last()->getRow() or die "$!";
  my $avg = $sum/$count;

  my $res = $context->resultType()->makeRowHash(
    symbol => $rLast->get("symbol"),
    id => $rLast->get("id"),
    price => $avg
  ) or die "$!";
  $context->send($opcode, $res) or die "$!";
my $ttWindow = Triceps::TableType->new($rtTrade)
    Triceps::IndexType->newHashed(key => [ "id" ])
    Triceps::IndexType->newHashed(key => [ "symbol" ])
      Triceps::IndexType->newFifo(limit => 2)
        $rtAvgPrice, "aggrAvgPrice", undef, \&computeAverage)
or die "$!";
$ttWindow->initialize() or die "$!";
my $tWindow = $uTrades->makeTable($ttWindow,
  &Triceps::EM_CALL, "tWindow") or die "$!";

# label to print the result of aggregation
my $lbAverage = $uTrades->makeLabel($rtAvgPrice, "lbAverage",
  undef, sub { # (label, rowop)
    print($_[1]->printP(), "\n");
  }) or die "$!";
  or die "$!";

while(<STDIN>) {
  my @data = split(/,/); # starts with a string opcode
  $uTrades->makeArrayCall($tWindow->getInputLabel(), @data)
    or die "$!";
  $uTrades->drainFrame(); # just in case, for completeness

What changed in this code? The things got rearranged a bit.The aggregator is now defined as a part of the table type, so the aggregation result row type and its computational function had to be moved up.

The AggregatorType object holds the information about the aggregator. In the table type, the aggregator type gets attached to an index type with setAggregator(). In this case, to the FIFO index type.  At present an index type may have no more than one aggregator type attached to it. There is no particular reason for that, other than that it was slightly easier to implement, and that I can't think yet of a real-word situation where multiple aggregators on the same index would be needed. If this situation will ever occur, this support can be added. However a table type may have multiple aggregator types in it, on different indexes.  You can save a reference to an aggregator type in a variable and reuse it in the different table types too (though not multiple times in the same table, since that would cause a naming conflict).

The aggregator type is created with the arguments of result row type, aggregator name, group initialization Perl function (which may be undef, as in this example), group computation Perl function, and the optional arguments for the functions. Note that there is a difference in naming between the aggregator types and index types: an aggregator type knows its name, while an index type does not. An index type is given a name only in its hierarchy inside the table type, but it does not know its name.

When a table is created, it finds all the aggregator types in it, and creates an output label for each of them. The names of the aggregator types are used as suffixes to the table name. In this example the aggregator will have its output label named "tWindow.aggrAvgPrice". This puts all the aggregator types in the table into the same namespace, so make sure to give them different names in the same table type. Also avoid the names "in" and "out" because these are already taken by the table's own labels. The aggregator labels in the table can be found with

$aggLabel = $table->getAggregatorLabel("aggName") or die "$!";

The aggregator types are theoretically multithreaded, but for all I can tell, they will not integrate with the Perl multithreading well, due to the way the Perl objects (the execution methods!) are tied to each thread's separate interpreter. In the future expect that the table types with aggregators could not be shared between the threads.

After the logic is moved into a managed aggregator, the main loop becomes simpler. This new main loop also takes advantage of makeArrayCall() to become a little shorter yet. The label $lbAverage now reverts to just printing the rowops going through it, and gets chained to the aggregator output label.

The computation function gets a lot more arguments than it used to. The most interesting and most basic ones are $context, $opcode, and $rh. The rest are useful in the more complex cases only.

The aggregator type is exactly that: a type. It doesn't know, on which table or index, or even index type it will be used, and indeed, it might be used on multiple tables and index types. But to do the iteration on the rows, the computation function needs to get this information somehow. And it does, in the form of aggregator context. The manual aggregation used the last table output row to find, on which exact group to iterate. The managed aggregator gets the last modified row handle as the argument $rh. But our simple aggregator doesn't even need to consult $rh because the context takes care of finding the group too: it knows the exact group and exact index that needs to be aggregated (look at the index tree drawings for the difference between an index type and an index).

The context provides its own begin() and next() methods. They are actually slightly more efficient than the usual table iteration methods because they take advantage of that exact known index. The most important part, they work differently.


returns a NULL row handle when it reaches the end of the group. Do not, I repeat, DO NOT use the $rhi->next() in the aggregators, or you'll get some very wrong results.

The context also has a bit more of its own magic.


returns the last row handle in the group. This comes very handy because in most of the cases you want the data from the last row to fill the fields that haven't been aggregated as such. This is like the SQL function LAST(). Using the fields from the argument $rh, unless they are the key fields for this group, is generally not a good idea because it adds an extra dependency on the order of modifications to the table. The FIRST() or LAST() (i.e. the context's begin() or last()) are much better and not any more expensive.


returns the number of rows in the group. It's your value of COUNT(*) in SQL terms, and if that's all you need, you don't need to iterate.

$context->send($opcode, $row)

constructs a result rowop and sends it to the aggregator's output label. Remember, the aggregator type as such knows nothing about this label, so the path through the context is the only path. Note also that it takes a row and not a rowop, because a label is needed to construct the rowop in the first place.


provides the result row type needed to construct the result row. For the version 1.0 I've added a couple of convenience methods that combine the row construction and sending, that can be used instead:

$context->makeHashSend ($opcode, $fieldName => $fieldValue, ...)
$context->makeArraySend($opcode, $fieldValue, ...)

The final thing about the aggregator context: it works only inside the aggregator computation function. Once the function returns, all its methods start returning undefs. So there is no point in trying to save it for later in a global variable or such, don't do that.

As you can see, computeAverage() is has the same logic as before, only now uses the aggregation context. And I've removed the debugging printout of the rows in the group.

The last unexplained piece is the opcode handling and that comparison to OP_NOP.  Basically, the table calls the aggregator computation every time something changes in its index. It describes the reason for the call in the argument $aggop ("aggregation operation"). Depending on how clever an aggregator wants to be, it may do something useful on all of these occasions, or only on some of them. The simple aggregator that doesn't try any smart optimizations but just goes and iterates through the rows every time only needs to react in some of the cases. To make its life easier, Triceps pre-computes the opcode that should be used for the result and puts it into the argument $opcode.  So to ignore the non-interesting calls, the simple aggregator computation can just return if it sees the opcode OP_NOP.

Why does it also check for the group  size being 0? Again, Triceps provides flexibility in the aggregators. Among others, it allows to implement the logic like Coral8, when on deletion of the last row in the group the aggregator would send a row with all non-key fields set to NULL (it can take the key fields from the argument $rh). So for this specific purpose the computation function gets called with all rows deleted from the group, and $opcode set to OP_INSERT. And, by the way, a true Coral8-styled aggregator would ignore all the calls where the $opcode is not OP_INSERT. But the normal aggregators need to avoid doing this kind of crap, so they have to ignore the calls where $context->groupSize()==0.

No comments:

Post a Comment