Thursday, February 16, 2012

More of the manual aggregation

Returning to the last table example, it prints the aggregated information  (the average price of two records). This can be fairly easily changed to put the information into the rows and send them on as labels. The function printAverage() morphs into computeAverage():

my $rtAvgPrice = Triceps::RowType->new(
  symbol => "string", # symbol traded
  id => "int32", # last trade's id
  price => "float64", # avg price of the last 2 trades
) or die "$!";

# place to send the average: could be a dummy label, but to keep the
# code smalled also print the rows here, instead of in a separate label
my $lbAverage = $uTrades->makeLabel($rtAvgPrice, "lbAverage",
  undef, sub { # (label, rowop)
    print($_[1]->printP(), "\n");
  }) or die "$!";

# Send the average price of the symbol in the last modified row
sub computeAverage # (row)
{
  return unless defined $rLastMod;
  my $rhFirst = $tWindow->findIdx($itSymbol, $rLastMod) or die "$!";
  my $rhEnd = $rhFirst->nextGroupIdx($itLast2) or die "$!";
  print("Contents:\n");
  my $avg;
  my ($sum, $count);
  my $rhLast;
  for (my $rhi = $rhFirst;
      !$rhi->same($rhEnd); $rhi = $rhi->nextIdx($itLast2)) {
    print("  ", $rhi->getRow()->printP(), "\n");
    $rhLast = $rhi;
    $count++;
    $sum += $rhi->getRow()->get("price");
  }
  if ($count) {
    $avg = $sum/$count;
    $uTrades->call($lbAverage->makeRowop(&Triceps::OP_INSERT,
      $rtAvgPrice->makeRowHash(
        symbol => $rhLast->getRow()->get("symbol"),
        id => $rhLast->getRow()->get("id"),
        price => $avg
      )
    ));
  }
}

For the demonstration, the aggregated records sent to $lbAverage get printed. The records being aggregated are printed during the iteration too. And here is a sample run's result, with the input records shown in italics:

OP_INSERT,1,AAA,10,10
Contents:
  id="1" symbol="AAA" price="10" size="10" 
lbAverage OP_INSERT symbol="AAA" id="1" price="10" 
OP_INSERT,3,AAA,20,20
Contents:
  id="1" symbol="AAA" price="10" size="10" 
  id="3" symbol="AAA" price="20" size="20" 
lbAverage OP_INSERT symbol="AAA" id="3" price="15" 
OP_INSERT,5,AAA,30,30
Contents:
  id="3" symbol="AAA" price="20" size="20" 
  id="5" symbol="AAA" price="30" size="30" 
lbAverage OP_INSERT symbol="AAA" id="5" price="25" 
OP_DELETE,3
Contents:
  id="5" symbol="AAA" price="30" size="30" 
lbAverage OP_INSERT symbol="AAA" id="5" price="30" 
OP_DELETE,5
Contents:

There are a couple of things to notice about it: it produces only the INSERT rowops, no DELETEs, and when the last record of the group is removed, that event produces nothing.

The first item is mildly problematic because the processing downstream from here might not be able to handle the updates properly without the DELETE rowops. It can be worked around fairly easily by connecting another table, with the same primary key as the aggregation key, to store the aggregation results. That table would automatically transform the repeated INSERTs on the same key to a DELETE-INSERT sequence.

The second item is actually pretty bad because it means that the last record deleted gets stuck in the aggregation results. The Coral8 solution for this situation is to send a row with all non-key fields set to NULL, to reset them (interestingly, it's a relatively recent addition, that bug took Coral8 years to notice). But with the opcodes available, we can as well send a DELETE rowop with a similar contents, the helper table will fill in the rest of the fields, and produce a clean DELETE.

All this can be done by the following changes. Add the table, remember its input label in $lbAvgPriceHelper. It will be used to send the aggregated rows instead of $tAvgPrice.

my $ttAvgPrice = Triceps::TableType->new($rtAvgPrice)
  ->addSubIndex("bySymbol",
    Triceps::IndexType->newHashed(key => [ "symbol" ])
  )
or die "$!";
$ttAvgPrice->initialize() or die "$!";
my $tAvgPrice = $uTrades->makeTable($ttAvgPrice,
  &Triceps::EM_CALL, "tAvgPrice") or die "$!";
my $lbAvgPriceHelper = $tAvgPrice->getInputLabel() or die "$!";

Then still use $tAvgPrice to print the records coming out, but now connect it after the helper table:

$tAvgPrice->getOutputLabel()->chain($lbAverage) or die "$!";

And in computeAverage() change the destination label and add the case for when the group becomes empty:

...
  if ($count) {
    $avg = $sum/$count;
    $uTrades->call($lbAvgPriceHelper->makeRowop(&Triceps::OP_INSERT,
      $rtAvgPrice->makeRowHash(
        symbol => $rhLast->getRow()->get("symbol"),
        id => $rhLast->getRow()->get("id"),
        price => $avg
      )
    ));
  } else {
    $uTrades->call($lbAvgPriceHelper->makeRowop(&Triceps::OP_DELETE,
      $rtAvgPrice->makeRowHash(
        symbol => $rLastMod->get("symbol"),
      )
    ));
  }
...

Then the output of the same example becomes:

OP_INSERT,1,AAA,10,10Contents:
  id="1" symbol="AAA" price="10" size="10" 
tAvgPrice.out OP_INSERT symbol="AAA" id="1" price="10" 
OP_INSERT,3,AAA,20,20
Contents:
  id="1" symbol="AAA" price="10" size="10" 
  id="3" symbol="AAA" price="20" size="20" 
tAvgPrice.out OP_DELETE symbol="AAA" id="1" price="10" 
tAvgPrice.out OP_INSERT symbol="AAA" id="3" price="15" 
OP_INSERT,5,AAA,30,30
Contents:
  id="3" symbol="AAA" price="20" size="20" 
  id="5" symbol="AAA" price="30" size="30" 
tAvgPrice.out OP_DELETE symbol="AAA" id="3" price="15" 
tAvgPrice.out OP_INSERT symbol="AAA" id="5" price="25" 
OP_DELETE,3
Contents:
  id="5" symbol="AAA" price="30" size="30" 
tAvgPrice.out OP_DELETE symbol="AAA" id="5" price="25" 
tAvgPrice.out OP_INSERT symbol="AAA" id="5" price="30" 
OP_DELETE,5
Contents:
tAvgPrice.out OP_DELETE symbol="AAA" id="5" price="30" 

All fixed, the proper DELETEs are coming out.

No comments:

Post a Comment