Tuesday, February 28, 2012

The proper aggregation, part 6: additive

In some cases the aggregation values don't have to be calculated by going through all the rows from scratch every time. If you do a sum of a field, you can as well add the value of the field when a row is inserted and subtract when a row is deleted. Not surprisingly, this is called an "additive aggregation".

The averaging can also be done as an additive aggregation: it amounts to a sum divided by a count. The sum can obviously be done additive. The count is potentially additive too, but even better, we have the shortcut of $context->groupSize(). Well, at least for the same definition of count that has been used in the non-additive example. The SQL definition of count (and of average) includes only the non-NULL values, but here we go with the Perl approach where a NULL is taken to have the same meaning as 0. The proper SQL count could not use the shortcut but would still be additive.

Triceps provides a way to implement the additive aggregation too. It calls the aggregation computation function for each changed row, giving it an opportunity to react. The argument $aggop indicates, what has happened. Here is the same example rewritten in an additive way:

sub computeAverage # (table, context, aggop, opcode, rh, state, args...)
{
  my ($table, $context, $aggop, $opcode, $rh, $state, @args) = @_;
  my $rowchg;

  if ($aggop == &Triceps::AO_BEFORE_MOD) { 
    $context->send($opcode, $state->{lastrow}) or die "$!";
    return;
  } elsif ($aggop == &Triceps::AO_AFTER_DELETE) { 
    $rowchg = -1; 
  } elsif ($aggop == &Triceps::AO_AFTER_INSERT) { 
    $rowchg = 1;
  } else { # AO_COLLAPSE, also has opcode OP_DELETE
    return
  } 

  $state->{price_sum} += $rowchg * $rh->getRow()->get("price");

  return if ($context->groupSize()==0
    || $opcode == &Triceps::OP_NOP);

  my $rLast = $context->last()->getRow() or die "$!";
  my $count = $context->groupSize();
  my $avg = $state->{price_sum}/$count;
  my $res = $context->resultType()->makeRowHash(
    symbol => $rLast->get("symbol"), 
    id => $rLast->get("id"), 
    price => $avg
  ) or die "$!";
  $state->{lastrow} = $res;

  $context->send($opcode, $res) or die "$!";
}

sub initAverage #  (@args)
{
  return { lastrow => undef, price_sum => 0 };
}

Also, the tricks of keeping an extra row could not be used with the additive aggregation. An additive aggregation relies on Triceps to tell it, which rows are deleted and which inserted, so it can not do any extra skipping easily. The index for the aggregation has to be defined with the correct limits. If we want an average of the last 2 rows, we set the limit to 2:

      Triceps::IndexType->newFifo(limit => 2)
      ->setAggregator(Triceps::AggregatorType->new(
        $rtAvgPrice, "aggrAvgPrice", \&initAverage, \&computeAverage)
      )

The aggregation state has grown: now it includes not only the last sent row but also the sum of the price, which is used for the aggregation, kept together in a hash. The last sent row doesn't really have to be kept, and I'll show another example without it, but for now let's look at how things are done when it is kept.

The argument $aggop describes, why the computation is being called. Note that Triceps doesn't know if the aggregation is additive or not. It does the calls the same in every case. Just in the previous examples we weren't interested in this information and didn't look at it. $aggop contains one of the constant values:

&Triceps::AO_BEFORE_MOD: the group is about to be modified, need to send a DELETE of the old aggregated row. The argument $opcode will always be OP_DELETE.

&Triceps::AO_AFTER_DELETE: the group has been modified by deleting a row from it. The argument $rh will refer to the row handle being deleted. The $opcode may be either OP_NOP or OP_INSERT. A single operation on a table may affect multiple rows: an insert may trigger the replacement policy in the indexes and cause one or more rows to be deleted. If there are multiple rows deleted or inserted in a group, the additive aggregator needs to know about all of them to keep its state correct but does not need (and even must not) send a new result until the last one of them has been processed. The call for the last modification will have the opcode of OP_INSERT. The preceding intermediate ones will have the opcode of OP_NOP. An important point, even though a row is being deleted from the group, the aggregator opcode is OP_INSERT, because it inserts the new aggregator state!

&Triceps::AO_AFTER_INSERT: the group has been modified by inserting a row into it. Same as for AO_AFTER_DELETE, $rh will refer to the row handle being inserted, and $opcode will be OP_NOP or OP_INSERT.

&Triceps::AO_COLLAPSE: called after the last row is deleted from the group, just before the whole group is collapsed and deleted. This allows the aggregator to destroy its state properly. For most of the aggregators there is nothing special to be done. The only case when you want to do something is if your state causes some circular references. Perl doesn't free the circular references until the whole interpreter exits, and so you'd have to break the circle to let them be freed immediately. The aggregator should not produce any results on this call. In the version 0.99 this aggregator operation carried the opcode of OP_INSERT, but after some thinking, this didn't make a whole lot of sense, so for 1.0 I've changed it to OP_NOP. It doesn't matter a whole lot because the aggregator computation doesn't produce any result and should not care. But for the abstract aesthetic reasons OP_NOP looks better.

The computation reacts accordingly: for the before-modification it re-sends the old result with the new opcode, for the collapse does nothing, and for after-modification calculates the sign, whether the value from $rh needs to be added or subtracted from the sum. I'm actually thinking, maybe this sign should be passed as a separate argument too, and then both the aggregation operation constants AO_AFTER_* can be merged into one. We'll see, maybe it will be changed in the future.

Then the addition/subtraction is done and the state updated.

After that, if the row does not need to be sent (opcode is OP_NOP or group size is 0), can as well return here without constructing the new row.

If the row needs to be produced, continue with the same logic as the non-additive aggregator, only without iteration through the group. The field "id" in the result is produced by essentially the SQL LAST() operator. LAST() and FIRST() are not additive, they refer to the values in the last or first row in the group's order, and simply can not be calculated from looking at which rows are being inserted and deleted without knowing their order in the group. But they are fast as they are and do not require iteration. The same goes for the row count (as long as we don't care about excluding NULLs, violating the SQL semantics). And for averaging there is the last step to do after the additive part is done: divide the sum by the count.

All these non-additive steps are done in this last section, then the result row is constructed, remembered and sent.

Not all the aggregation operations can be expressed in an additive way. It may even vary by the data. For MAX(), the insertion of a row can be always done additively, just comparing the new value with the remembered maximum, and replacing it if the new value is greater. The deletion can also compare the deleted value with the remembered maximum. If the deleted value is less, then the maximum is unchanged. But if the deleted value is equal to the maximum, MAX() has to iterate through all the values and find the new maximum.

Some functions may be even trickier. The calculation of the standard deviation requires to find the mean (the same average but named in a more scientific way) value in an additive or iterative way, and after that iterate again and find the deviation from that mean.

There is also an issue with the floating point precision in the additive aggregation. It's not such a big issue if the rows are only added and never deleted from the group, but can get much worse with the deletion. Let me show it with a sample run of the additive code:

OP_INSERT,1,AAA,1,10
tWindow.aggrAvgPrice OP_INSERT symbol="AAA" id="1" price="1" 
OP_INSERT,2,AAA,1e20,20
tWindow.aggrAvgPrice OP_DELETE symbol="AAA" id="1" price="1" 
tWindow.aggrAvgPrice OP_INSERT symbol="AAA" id="2" price="5e+19" 
OP_INSERT,3,AAA,2,10
tWindow.aggrAvgPrice OP_DELETE symbol="AAA" id="2" price="5e+19" 
tWindow.aggrAvgPrice OP_INSERT symbol="AAA" id="3" price="5e+19" 
OP_INSERT,4,AAA,3,10
tWindow.aggrAvgPrice OP_DELETE symbol="AAA" id="3" price="5e+19" 
tWindow.aggrAvgPrice OP_INSERT symbol="AAA" id="4" price="1.5" 

Why is the last result 1.5 while it had to be (2+3)/2 = 2.5? Because adding together 1e20 and 2 had pushed the 2 beyond the precision of floating-point number. 1e20+2 = 1e20. So when the row with 1e20 was deleted from the group and subtracted form the sum, that left 0. Which got then averaged with 3.

Of course, with the real stock prices there won't be that much variation. But the subtler errors will still accumulate over time, and you have to expect them and plan accordingly.

No comments:

Post a Comment