sub computeAverage # (table, context, aggop, opcode, rh, state, args...) { my ($table, $context, $aggop, $opcode, $rh, $state, @args) = @_; # don't send the NULL record after the group becomes empty return if ($context->groupSize()==0 || $opcode == &Triceps::OP_NOP); if ($opcode == &Triceps::OP_DELETE) { $context->send($opcode, $$state) or die "$!"; return; } my $sum = 0; my $count = 0; for (my $rhi = $context->begin(); !$rhi->isNull(); $rhi = $context->next($rhi)) { $count++; $sum += $rhi->getRow()->get("price"); } my $rLast = $context->last()->getRow() or die "$!"; my $avg = $sum/$count; my $res = $context->resultType()->makeRowHash( symbol => $rLast->get("symbol"), id => $rLast->get("id"), price => $avg ) or die "$!"; ${$state} = $res; $context->send($opcode, $res) or die "$!"; } sub initRememberLast # (@args) { my $refvar; return \$refvar; } my $ttWindow = Triceps::TableType->new($rtTrade) ->addSubIndex("byId", Triceps::IndexType->newHashed(key => [ "id" ]) ) ->addSubIndex("bySymbol", Triceps::IndexType->newHashed(key => [ "symbol" ]) ->addSubIndex("last2", Triceps::IndexType->newFifo(limit => 2) ->setAggregator(Triceps::AggregatorType->new( $rtAvgPrice, "aggrAvgPrice", \&initRememberLast, \&computeAverage) ) ) ) or die "$!";
The rest of the example stays the same, so it's not shown. Even in the part that is shown, very little has changed.
The aggregator type now has an initialization function. (This function is NOT of the same kind as for the sorted index!) This function gets called every time a new aggregation group gets created, before the first row is inserted into it. It initializes the aggregator group's Perl state (the state is per aggregator type, so if there are two parallel index types, each with an aggregator, each aggregator will have its own group state).
The state is stored in the group as a single Perl variable. So it usually is a reference. In this case the value returned is a reference to a variable that would contain a Row reference. (Ironically, the simplest case looks a bit more confusing than if it were a reference to an array or hash). Returning a reference to a "my" variable is a way to create a reference to an anonymous value: each time "my" executes, it creates a new value. Which is then kept in a reference after the initialization function returns. The next time the function executes, "my" would create another new value.
The computation function is passed that state as an argument and now makes use of it. It has two small additions. Before sending a new result row, that row gets remembered in the state reference. And then before doing any computation the function checks, if the required opcode is DELETE, and if so then simply resends the last result with the new opcode. Remember, the rows are not copied but reference-counted, so this is fairly cheap.
The extra level of referencing is used because simply assigning to $state would only change the local variable and not the value kept in the group.
However if you change the argument of the function directly, that would change the value kept in the group (similar to changing the loop variable in a foreach loop). So you can save a bit of overhead by eliminating the extra indirection. The changes will be:
sub computeAverage # (table, context, aggop, opcode, rh, state, args...) { ... if ($opcode == &Triceps::OP_DELETE) { $context->send($opcode, $state) or die "$!"; return; } ... $_[5] = $res; $context->send($opcode, $res) or die "$!"; } sub initRememberLast # (@args) { return undef; }
Even though the initialization function returns undef, it still must be present. If it's not present, the state argument of the comparison function will contain a special hardcoded and unmodifiable undef constant, and nothing could be remembered.
And here is an example of its work:
OP_INSERT,1,AAA,10,10 tWindow.aggrAvgPrice OP_INSERT symbol="AAA" id="1" price="10" OP_INSERT,2,BBB,100,100 tWindow.aggrAvgPrice OP_INSERT symbol="BBB" id="2" price="100" OP_INSERT,3,AAA,20,20 tWindow.aggrAvgPrice OP_DELETE symbol="AAA" id="1" price="10" tWindow.aggrAvgPrice OP_INSERT symbol="AAA" id="3" price="15" OP_INSERT,4,BBB,200,200 tWindow.aggrAvgPrice OP_DELETE symbol="BBB" id="2" price="100" tWindow.aggrAvgPrice OP_INSERT symbol="BBB" id="4" price="150" OP_INSERT,5,AAA,30,30 tWindow.aggrAvgPrice OP_DELETE symbol="AAA" id="3" price="15" tWindow.aggrAvgPrice OP_INSERT symbol="AAA" id="5" price="25" OP_DELETE,3 tWindow.aggrAvgPrice OP_DELETE symbol="AAA" id="5" price="25" tWindow.aggrAvgPrice OP_INSERT symbol="AAA" id="5" price="30" OP_DELETE,5 tWindow.aggrAvgPrice OP_DELETE symbol="AAA" id="5" price="30"
Since the rows are grouped by the symbol, the symbols "AAA" and "BBB" will have separate aggregation states.
No comments:
Post a Comment