Saturday, March 31, 2012

The guts of Collapse

The Collapse implementation is fairly small, and is another worthy example for the docs. It's a template, and a "normal" one too: no code generation whatsoever, just a combination of ready components. As with SimpleAggregator, the current Collapse is quite simple and will grow more features over time, so I've copied the original simple version into t/xCollapse.t to stay there unchanged.

The most notable thing about Collapse is that it took just about an hour to write the first version of it and another three or so hours to test it. Which is a lot less than the similar code in the Aleri or Coral8 code base took. The reason for this is that Triceps provides the fairly flexible base data structures that can be combined easily directly in a scripting language. There is no need to redo a lot from scratch every time, just take something and add a little bit on top.

So here it is, with the interspersed comments.

package Triceps::Collapse;
use Carp;
use strict;

sub new # ($class, $optName => $optValue, ...)
  my $class = shift;
  my $self = {};

  &Triceps::Opt::parse($class, $self, { 
    unit => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "Triceps::Unit") } ],
    name => [ undef, \&Triceps::Opt::ck_mandatory ],
    data => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "ARRAY") } ],
  }, @_);

  # parse the data element
  my $dataref = $self->{data};
  my $dataset = {};
  # dataref->[1] is the best guess for the dataset name, in case if the option "name" goes first
  &Triceps::Opt::parse("$class data set (" . $dataref->[1] . ")", $dataset, { 
    name => [ undef, \&Triceps::Opt::ck_mandatory ],
    key => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "ARRAY", "") } ],
    rowType => [ undef, sub { &Triceps::Opt::ck_ref(@_, "Triceps::RowType"); } ],
    fromLabel => [ undef, sub { &Triceps::Opt::ck_ref(@_, "Triceps::Label"); } ],
  }, @$dataref);

The options parsing goes as usual. The option "data" is parsed again for the options inside it, and those are places into the hash %$dataset.

  # save the dataset for the future
  $self->{datasets}{$dataset->{name}} = $dataset;
  # check the options
  confess "The data set (" . $dataset->{name} . ") must have only one of options rowType or fromLabel"
    if (defined $dataset->{rowType} && defined $dataset->{fromLabel});
  confess "The data set (" . $dataset->{name} . ") must have exactly one of options rowType or fromLabel"
    if (!defined $dataset->{rowType} && !defined $dataset->{fromLabel});

The dataset options "rowType" and "fromLabel" are both optional but exactly one of them must be present, to be sufficient and non-conflicting. So the code makes sure of it.

  my $lbFrom = $dataset->{fromLabel};
  if (defined $lbFrom) {
    confess "The unit of the Collapse and the unit of its data set (" . $dataset->{name} . ") fromLabel must be the same"
      unless ($self->{unit}->same($lbFrom->getUnit()));
    $dataset->{rowType} = $lbFrom->getType();

If "fromLabel" is used, the row type is found from it. This looks like a pretty good pattern that I plan to spread to the other elements in the future. The unit could also be found from it.

  # create the tables
  $dataset->{tt} = Triceps::TableType->new($dataset->{rowType})
      Triceps::IndexType->newHashed(key => $dataset->{key})
    or confess "Collapse table type creation error for dataset '" . $dataset->{name} . "':\n$! ";

  $dataset->{tbInsert} = $self->{unit}->makeTable($dataset->{tt}, "EM_CALL", $self->{name} . "." . $dataset->{name} . ".tbInsert")
    or confess "Collapse internal error: insert table creation for dataset '" . $dataset->{name} . "':\n$! ";
  $dataset->{tbDelete} = $self->{unit}->makeTable($dataset->{tt}, "EM_CALL", $self->{name} . "." . $dataset->{name} . ".tbInsert")
    or confess "Collapse internal error: delete table creation for dataset '" . $dataset->{name} . "':\n$! ";

The state is kept in two tables. The reason for them is this: after collapsing, the Collapse may send for each key either a single INSERT rowop, if the row was not there before and became inserted, DELETE rowop if the row was there before and then became deleted, or a DELETE followed by an INSERT if the row was there but then changed its value. Accordingly, this state is kept in two tables: one contains the DELETE part, another the INSERT part for each key, and either part may be empty (or both, if the row at that key has not been changed). After each flush both tables become empty, and then start collecting the modifications again.

  # create the labels
  $dataset->{lbIn} = $self->{unit}->makeLabel($dataset->{rowType}, $self->{name} . "." . $dataset->{name} . ".in",
    undef, \&_handleInput, $self, $dataset)
      or confess "Collapse internal error: input label creation for dataset '" . $dataset->{name} . "':\n$! ";
  $dataset->{lbOut} = $self->{unit}->makeDummyLabel($dataset->{rowType}, $self->{name} . "." . $dataset->{name} . ".out")
    or confess "Collapse internal error: output label creation for dataset '" . $dataset->{name} . "':\n$! ";

The input and output labels get created. The input label has the function with the processing logic set as its handler. The output label is just a dummy. Note that the tables don't get connected anywhere, they are just used as storage, without any immediate reactions to their modifications.

  # chain the input label, if any
  if (defined $lbFrom) {
      or confess "Collapse internal error: input label chaining for dataset '" . $dataset->{name} . "' to '" . $lbFrom->getName() . "' failed:\n$! ";
    delete $dataset->{fromLabel}; # no need to keep the reference any more

And if the fromLabel is used, the Collapse gets connected to it. After that there is no good reason to keep a separate reference to that label, especially considering that it creates a reference loop and would mess with the memory management. So it gets deleted.

  bless $self, $class;
  return $self;

The final blessing is boilerplate. The constructor creates the data structures but doesn't implement any logic. The logic goes next:

sub _handleInput # ($label, $rop, $self, $dataset)
  my $label = shift;
  my $rop = shift;
  my $self = shift;
  my $dataset = shift;

  if ($rop->isInsert()) {
      or confess "Collapse " . $self->{name} . " internal error: dataset '" . $dataset->{name} . "' failed an insert-table-insert:\n$! ";

The Collapse element knows nothing about the data that went through it before. After each flush it starts again from scratch.  It expects that the stream of rows is self-consistent, and makes the conclusions about the previous data based on the new data it sees. An INSERT rowop may mean one of two things: either there was no previous record with this key, or there was a previous record with this key and then it got deleted. The Delete table can be use to differentiate these situations: if there was a row that was then deleted, the Delete table would contain that row. But for the INSERT it doesn't matter: in either case it just inserts the new row into the Insert table.If there was no such row before, it would be the new INSERT. If there was such a row before, it would be an INSERT following a DELETE.

Incidentally, this logic happens to work for the insert-only streams of data too, when the rows get replaced by simply sending another row with the same key. Then if there was a previous row in the Insert table, it would simply get replaced by a new one, and eventually at the flush time the last row would go through.

  } elsif($rop->isDelete()) {
    if (! $dataset->{tbInsert}->deleteRow($rop->getRow())) {
      confess "Collapse " . $self->{name} . " internal error: dataset '" . $dataset->{name} . "' failed an insert-table-delete:\n$! "
        if ($! ne "");
        or confess "Collapse " . $self->{name} . " internal error: dataset '" . $dataset->{name} . "' failed a delete-table-insert:\n$! ";

The DELETE case is more interesting. If we see a DELETE rowop, this means that either there was an INSERT sent before the last flush and now that INSERT becomes undone, or that there was an INSERT after the flush, which also becomes undone. The actions for these cases are different: if the INSERT was before the flush, this row should go into the Delete table, and eventually propagate as a DELETE during the next flush. If the last INSERT was after the flush, then its row would be stored in the Insert table, and now we just need to delete that row and pretend that it never was.

That's what the logic does: first it tries to remove from the Insert table. If succeeded, then it was an INSERT after the flush, that became undone now, and there is nothing more to do. If there was no row to delete, this means that the INSERT must have happened before the last flush, and we need to remember this row in the Delete table and pass it on in the next flush.

Note that this logic is not resistant to an incorrect data sequences. If there ever are two DELETEs for the same key in a row (which should never happen in a correct sequence), the second DELETE will end up in the Delete table.

sub flush # ($self)
  my $self = shift;
  my $unit = $self->{unit};
  my $OP_INSERT = &Triceps::OP_INSERT;
  my $OP_DELETE = &Triceps::OP_DELETE;
  foreach my $dataset (values %{$self->{datasets}}) {
    my $tbIns = $dataset->{tbInsert};
    my $tbDel = $dataset->{tbDelete};
    my $lbOut = $dataset->{lbOut};
    my $next;
    # send the deletes always before the inserts
    for (my $rh = $tbDel->begin(); !$rh->isNull(); $rh = $next) {
      $next = $rh->next(); # advance the irerator before removing
      $unit->call($lbOut->makeRowop($OP_DELETE, $rh->getRow()));
    for (my $rh = $tbIns->begin(); !$rh->isNull(); $rh = $next) {
      $next = $rh->next(); # advance the irerator before removing
      $unit->call($lbOut->makeRowop($OP_INSERT, $rh->getRow()));

The flushing is fairly straightforward: first it sends on all the DELETEs, then all the INSERTs, clearing the tables along the way. At first I've though of matching the DELETEs and INSERTs together, sending them next to each other in case if both are available for some key. It's not that difficult to do. But then I've realized that it doesn't matter and just did it the simple way.

sub getInputLabel($$) # ($self, $dsetname)
  my ($self, $dsetname) = @_;
  confess "Unknown dataset '$dsetname'"
    unless exists $self->{datasets}{$dsetname};
  return $self->{datasets}{$dsetname}{lbIn};

sub getOutputLabel($$) # ($self, $dsetname)
  my ($self, $dsetname) = @_;
  confess "Unknown dataset '$dsetname'"
    unless exists $self->{datasets}{$dsetname};
  return $self->{datasets}{$dsetname}{lbOut};

sub getDatasets($) # ($self)
  my $self = shift;
  return keys %{$self->{datasets}};

The getter functions are fairly simple. The only catch is that the code has to check for exists before it reads the value of $self->{datasets}{$dsetname}{lbOut}. Otherwise, if  an incorrect $dsetname is used,  the reading would return an undef but along the way would create an unpopulated $self->{datasets}{$dsetname}. Which would then cause a crash when flush() tries to iterate through it and finds the dataset options missing.

That's it, Collapse in a nutshell!

Friday, March 30, 2012

Collapsed updates

Sometimes the exact sequence of how a row at a particular key was updated does not matter, the only interesting part is the end result. Like the OUTPUT EVERY statement in CCL or the pulsed subscription in Aleri. It doesn't have to be time-driven either: if the data comes in as batches, it makes sense to collapse the modifications from the whole batch into one, and send it at the end of the batch.

To do this in Triceps, I've made a template. Here is an example of its use with interspersed comments:

my $unit = Triceps::Unit->new("unit") or die "$!";

our $rtData = Triceps::RowType->new(
  # mostly copied from the traffic aggregation example
  local_ip => "string",
  remote_ip => "string",
  bytes => "int64",
) or die "$!";

The meaning of the rows is not particularly important for this example.  It just uses a pair of the IP addresses as the collapse key. The collapse absolutely needs a primary key, since it has to track and collapse multiple updates to the same row.

my $collapse = Triceps::Collapse->new(
  unit => $unit,
  name => "collapse",
  data => [
    name => "idata",
    rowType => $rtData,
    key => [ "local_ip", "remote_ip" ],
) or die "$!";

Most of the options are self-explanatory. The dataset is defined with nested options to make the API extensible, to allow multiple datasets to be defined in the future. But at the moment only one is allowed. A dataset collapses the data at one label: an input label and an output label get defined for it, just as for the table. The data arrives at the input label, gets collapsed by the primary key, and then stays in the Collapse until the flush. When the Collapse gets flushed, the data is sent out of its output label. After the flush, the Collapse has no data it, and starts collecting the updates again from scratch. The labels gets named by connecting the names of the Collapse element, of the dataset, and "in" or "out". For this Collapse, the label names will be "" and "collapse.idata.out".

Note that the dataset options are specified in a referenced array, not a hash! If you try to use a hash, it will fail. When specifying the dataset options, put the "name" first. It's used  in the error messages about any issues in the dataset, and the code really expects the name to go first.

my $lbPrint = makePrintLabel("print", $collapse->getOutputLabel("idata"));

To print the result, a print label is created in this example in the same way as in the previous ones. The print label gets connected to the Collapse's output label. The method to get the collapse's output label is very much like table's. Only it gets the dataset name as an argument.

sub mainloop($$$) # ($unit, $datalabel, $collapse)
  my $unit = shift;
  my $datalabel = shift;
  my $collapse = shift;
  while(<STDIN>) {
    my @data = split(/,/); # starts with a command, then string opcode
    my $type = shift @data;
    if ($type eq "data") {
      my $rowop = $datalabel->makeRowopArray(@data)
        or die "$!";
      $unit->call($rowop) or die "$!";
      $unit->drainFrame(); # just in case, for completeness
    } elsif ($type eq "flush") {

&mainloop($unit, $collapse->getInputLabel("idata"), $collapse);

There will be a second example, so I've placed the main look into a function. It works in the same way as in the examples before: extracts the data from the CSV format and sends it to a label. The first column is used as a command: "data" sends the data, and "flush" performs the flush from the Collapse. The flush marks the end of the batch. Here is an example of a run, with the input lines shown as usual in italics:

collapse.idata.out OP_INSERT local_ip="" remote_ip="" bytes="100" 
collapse.idata.out OP_DELETE local_ip="" remote_ip="" bytes="100" 
collapse.idata.out OP_INSERT local_ip="" remote_ip="" bytes="200" 
collapse.idata.out OP_INSERT local_ip="" remote_ip="" bytes="2000" 
collapse.idata.out OP_DELETE local_ip="" remote_ip="" bytes="2000" 

You can trace and make sure that the flushed data is the cumulative result of the data that went it.

The Collapse also allows to specify the row type and the input connection for a dataset in a different way:

my $lbInput = $unit->makeDummyLabel($rtData, "lbInput");

my $collapse = Triceps::Collapse->new(
  unit => $unit,
  name => "collapse",
  data => [
    name => "idata",
    fromLabel => $lbInput,
    key => [ "local_ip", "remote_ip" ],
) or die "$!";

&mainloop($unit, $lbInput, $collapse);

Normally $lbInput would be not a dummy label but the output label of some element. The option "fromLabel" tells that the dataset input will be coming from that label. So the Collapse can automatically both copy its row type for the dataset, and also chain the dataset's input label to that label. It's a pure convenience, allowing to skip the manual steps. In the future it should probably take a whole list of source labels and chain itself to all of them, but for now only one.

This example produces exactly the same output as the previous one, so there is no use in copying it again.

For the last item that hasn't been shown yet, you can get the list of dataset names (well, currently only one name):

@names = $collapse->getDatasets();

And the very last thing to tell about the use of Collapse, when something goes wrong, it will die (and confess). No need to follow its methods with "or die".

Monday, March 26, 2012

The dreaded diamond and insert-only updates

The second problem with the diamond topology (see the diagram in the previous post) happens when the blocks B and C keep the state, and the input data gets updated by simply re-sending a record with the same key. This kind of updates is typical for the systems that do not have the concept of opcodes.

Consider a CCL example (approximate, since I can't test it) that gets the securities borrow and loan event reports, differentiated by the sign of the quantity, and sums up the borrows and loans separately:

create schema s_A (
  id integer, 
  symbol string,
  quantity long
create input stream i_A schema s_A;

create schema s_D (
  symbol string,
  borrowed boolean, // flag: loaned or borrowed
  quantity long
create public window w_D schema s_D
keep last per symbol, borrowed;

create public window w_B schema s_A keep last per id;
create public window w_C schema s_A keep last per id;

insert when quantity < 0
  then w_B
  else w_C
select * from i_A; 

insert into w_D
group by symbol
from w_B;

insert into w_D
group by symbol 
from w_C;

It works OK until a row with the same id gets updated to a different sign of quantity:


If the quantity kept the same sign, the new row would simply replace the old one in w_B or w_C, and the aggregation result would be right again. But when the sign changes, the new row goes into a different direction than the previous one. Now it ends up with both w_B and w_C having rows with the same id: one old and one new!

In this case really the problem is at the "fork" part of the "diamond", the merging part of it is just along for the ride, carrying the incorrect results.

This problem does not happen in the systems that have both inserts and deletes. Then the data sequence becomes


The DELETE goes along the same branch as the first insert and undoes its effect, then the second INSERT goes into the other branch.

Since Triceps has both INSERT and DELETE opcodes, it's immune to this problem, as long as the input data has the correct DELETEs in it.

If you wonder, the CCL example can be fixed too but in a more round-about way, by adding a couple of statements before the "insert-when" statement:

on w_A
delete from w_B
  where =;

on w_A
delete from w_C
  where =;

This generates the matching deletes. Of course, if you want, you can use this way with Triceps too.

The dreaded diamond and the execution order

The "diamond" is a particular topology of the data flow, when the computation separates based on some condition and then merges again. Like this:

It is also known as "fork-join" (the "join" here has nothing to do with the SQL join, it just means that the arrows merge to the same block).

This topology is a known source of two problems. The first problem is about the execution order.

To make things easier to see, let's consider a simple example.

Suppose the rows come into the block A with the schema:

key string
value int32

And come out of the blocks B and C into D with schema

key string
value int32
negative int32

With the logic in the blocks:

A: if value < 0 then B else C
B: negative = 1
C: negative = 0

Yes, this is a very dumb example that can usually be handled by a conditional expression in a single block. But that's to keep it simlple. A real example would often include some SQL joins, with different joins done on condition.

Suppose A then gets the input, in CSV form:


What arrives at D should be


And with the first four rows this is not a problem: they follow the same path and are queued sequentially, so the order is preserved. But the last row follows a different path.And the last two rows logically represent a single update and would likely arrive closely together. The last row might happen to overtake the one before it, and D would see the incorrect result:


If all these input rows arrive closely one after another, the last row might overtake even more of them and produce an even more interesting result like


Such misorderings may also happen between the rows with different keys. Those are usually less of a problem, because usually if D keeps a table, the rows with different keys may be updated in any order without losing the meaning. But in case if D keeps a  FIFO index (say, for a window based on a row count), and the two keys fall into the same FIFO bucket, their misordering would also affect the logic.

The reasons for this can be subdivided further into two classes:
  • asynchronous execution
  • incorrect scheduling in the synchronous execution

If each block executes asynchronously in its own thread, there is no way to predict, in which order they will actually execute. If some data is sent to B and C at about the same time, it becomes a race between them. One of the paths might also be longer than the other, making one alternative always win the race. This kind of problems is fairly common for the Aleri system that is highly multithreaded. But this is the problem of absolutely any CEP engine if you split the execution by multiple threads or processes.

But the single-threaded execution is not necessarily a cure either. Then the order of execution is up to the scheduler. And if the scheduler gets all these rows close together, and then decides to process all the input of A, then all the input of B, of C and of D, then D will receive the rows in the order:


Which is typical for, say, Coral8 if all the input rows arrive in a single bundle (see the separate post on bundling too).

At the moment Triceps does not directly support the multithreaded execution, so that renders the first sub-case moot. But I have ideas on how to get this working without too much trouble.

When the single-threaded scheduling is concerned, Triceps provides two answers.

First, the conditional logic can often be expressed procedurally:

if ($a->get("value") < 0) {
  D($rtD->makeRowHash($a->toHash(), negative => 1));
} else {
  D($rtD->makeRowHash($a->toHash(), negative => 0));

The procedural if-else logic can easily handle not only the simple expressions but things like look-ups and modifications in the tables.

Second, if the logic is broken into the separate labels, the label call semantics provides the same ordering as well:

$lbA = $unit->makeLabel($rtA, "A", undef, sub {
  my $rop = $_[1]; 
  my $op = $rop->getOpcode(); my $a = $rop->getRow();
  if ($a->get("value") < 0) { 
    $unit->call($lbB->makeRowop($op, $a));
  } else { 
    $unit->call($lbC->makeRowop($op, $a));
}) or die "$!";

$lbB = $unit->makeLabel($rtA, "B", undef, sub {
  my $rop = $_[1]; 
  my $op = $rop->getOpcode(); my $a = $rop->getRow();
  $unit->makeHashCall($lbD, $op, $a->toHash(), negative => 1)
    or die "$!";
}) or die "$!";

$lbC = $unit->makeLabel($rtA, "C", undef, sub {
  my $rop = $_[1]; 
  my $op = $rop->getOpcode(); my $a = $rop->getRow();
  $unit->makeHashCall($lbD, $op, $a->toHash(), negative => 0)
    or die "$!";
}) or die "$!";

When the label A calls the label B or C, which calls the label D, A does not get to see its next input row until the whole chain of calls to D and beyond completes. B and C may be replaced with the label chains of arbitrary complexity, including loops, without disturbing the logic.

Sunday, March 25, 2012

AggregatorContext reference

AggregatorContext is one of the arguments passed to the aggregator computation function. It encapsulates the iteration through the aggregation group, in the order of the index on which the aggregator is defined. After the computation function returns, the context becomes invalidated and stops working, so there is no  point in saving it between the calls. There is no way to construct the aggregator context directly.

It provides the following methods:

$result = $ctx->groupSize();

Returns the number of rows in the group. This is currently a unique feature available only in an aggregator, not in the normal iteration through the table.

$rowType = $ctx->resultType();

Returns the row type of the aggregation result.

$rh = $ctx->begin();

The first row handle of the iteration. In case of an empty group would return a null handle.

$rh = $ctx->next($rh);

The next row handle in order. If the argument handle was the last row in the group, returns a null handle. So the iteration through the group with a context is similar to iteration through the whole table: it ends when begin() or next() returns a null handle.

$rh = $ctx->last();

The last row handle in the group. If the group is empty, returns a null handle.

$rh = $ctx->beginIdx($idxType);

The first row in the group, according to a specific index type's order. The index type must belong to the group, otherwise the result is undefined. If the group is empty, will return the same value as endIdx(). If $idxType is non-leaf, the effect is the same as if its first leaf were used.

$rh = $ctx->endIdx($idxType);

The handle past the last row in the group, according to a specific index type's order. The index type must belong to the group, otherwise the result is undefined and might even result in an endless iteration loop. If $idxType is non-leaf, the effect is the same as if its first leaf were used. This kind of iteration uses the table's $t->nextIdx($idxType, $rh) or $rh->next($idxType) to advance the position. The Table reference post said that that not every possible order is iterable. Well, with the aggregation context, every order is iterable. You can pick any index in the group and iterate in its order. And aggregation is where this ability counts the most.

If the group happens to be the last group of this index type (not of $idxType but of the index on which the aggregator is defined) in the table, endIdx()  would return a null row handle. If it's also empty, beginIdx() would also return a null handle, and in general, for an empty group beginIdx() would return the same value as endIdx(). If the group is not the last one, endIdx() returns the handle of the first row in the next group.

$rh = $ctx->lastIdx($idxType);

The last row in the group according to a particular index type's order. The index type must belong to the group, otherwise the result is undefined. If the group is empty, returns a null handle.

$ctx->send($opcode, $row) or die "$!";

Constructs a result rowop for the aggregator and arranges for it to be sent to the aggregator's output  label. The actual sending is delayed: it will be done only after all the aggregators run, then the table's changes are sent to the table's normal output label, then each aggregator's changes are sent to its label. Note that the aggregator's output label is not visible in the computation function, so the rowop can not be constructed directly. Instead send() takes care of it. The row must be of a type at least matching the aggregator's result type (and of course the normal practice is to use the aggregator's result type to construct the row). On success returns 1, on error returns undef and the error message.

$ctx->makeHashSend($opcode, $fieldName => $fieldValue, ...) or die "$!";

A convenience function that produces the row from pairs of field names and values and sends it. A combination of makeRowHash() and send().

$ctx->makeArraySend($opcode, @fields) or die "$!";

A convenience function that produces the row from the array of field values and sends it. A combination of makeRowArray() and send().

Note that an aggregator must never change the table. Any attempt to change the table is a fatal error.

Friday, March 23, 2012

RowHandle reference

A RowHandle is essentially the glue that keeps a row in the table. A row's handle keeps the position of the row in the table and allows to navigate from it in the direction of every index. It also keeps the helper information for the indexes. For example, the Hashed index calculates the has value for the row's fields once and remembers it in the handle. The table operates always on the handles, never directly on the rows. The table methods that accept rows as arguments, implicitly wrap then into handles before doing any operations.

A row handle always belongs to a particular table, and can not be mixed between the tables, even if the tables are of the same type. Even before a row handle has been inserted into the table and after it has been removed, it still belongs to that table and can not be inserted into any other one.

Just as the tables are single-threaded, the row handles are single-threaded.

A RowHandle is created by the table's factory

$rh = $table->makeRowHandle($row) or die "$!";

The newly created row handle is not inserted in the table. To find out, whether the row handle is actually inserted in the table, use

$result = $rh->isInTable();

As a special case, a row handle may be null. It pretty much means that there is only the Perl wrapper layer of RowHandle but no actual RowHandle under it. This happens to be much more convenient than dealing with undefined values at Perl level. The null row handles are returned by the certain table calls to indicate that the requested data was not found in the table. A row handle can be checked for being null:

$result = $rh->isNull();

A null row handle may also be explicitly created with

$rh = $table->makeNullRowHandle();

As usual, the row handle references can be compared for the sameness of the actual row handle they contain:

$result = $rh1->same($rh2);

The row can be extracted from the row handle:

$row = $rh->getRow() or die "$!";

If the row handle is null, getRow() will return an undef and an error message.

The rest of the row handle methods are just a syntactic sugar for the table's iteration methods:

$rh = $rh->next();
$rh = $rh->nextIdx($idxType);
$rh = $rh->firstOfGroupIdx($idxType);
$rh = $rh->nextGroupIdx($idxType);

They work in exactly the same way as the table methods.

Table reference

The tables are created from table types:

$t = $unit->makeTable($tabType, $enqMode, "tableName") or die "$!";

The table type must be initialized before it can be used to create tables. The tables are strictly single-threaded.

The enqueueing mode can be specified as a string or Triceps constant. However in the modern reality you should use "EM_CALL" or &Triceps::EM_CALL. This argument is likely to be removed altogether in the future and become fixed to EM_CALL. The table name will be used for the error messages and to create the table labels.

The references to the tables can be as usual compared for sameness by

$result = $t1->same($t2);

Each table creates an input label, an output label, and a label for each aggregator defined in its type. They can be reached with:

$lb = $t->getInputLabel();
$lb = $t->getOutputLabel();
$lb = $t->getAggregatorLabel("aggName") or die "$!";

With an invalid name, getAggregatorLabel() returns an error. The table can also return its type, unit, row type, name :

$tt = $t->getType();
$u = $t->getUnit();
$rt = $t-> getRowType();
$name = $t->getName();

The number of rows in the table is read with

$result = $t->size();

The table stores rows wrapped in the row handles. The row handles are created with:

$rh = $t->makeRowHandle($row) or die "$!";
$rh = $t->makeNullRowHandle();

The row must be of a matching type. A null row handle is a handle without a row in it. It can not be placed into a table but this kind of row handle gets returned by table operations to indicate things not found. In case if you want to full some of your code by slipping it a null handle, makeNullRowHandle() provides a way to do it. The row handles belong to a particular table and can not be mixed between them, even if the tables are of the same type.

The table operations can be done by either sending the rowops to the table's input label or by calling the operations directly.

$result =$t->insert($row_or_rh [, $copyTray]) or die "$!";

Insert a row or row handle into the table.  The row handle must not be in the table before the call, it may be either freshly created or previously removed from the table. If a row is used as an argument, it is internally wrapped in a fresh row handle, and then that row handle inserted. An insert may trigger the replacement policy in the table's indexes and have some rows removed before the insert is done. The optional copy tray can be used to collect a copy of all the row updates that happen in the table as a result of the insert, both on the table output label and on all its aggregator labels. Returns 1 on success, 0 if the insert can not be done (the row handle is already in the table or null), undef and an error message on an incorrect argument.

$result = $t->remove($rh [, $copyTray]) or die "$!";

Removes a row handle from the table. The row handle must be previously inserted in the table, and either found in it or a reference to it remembered from before. An attempt to remove a newly created row handle will have no effect. The optional copy tray works in the same way as for insert(). The result is 1 on success (even if the row handle was not in the table), or undef and error message on an incorrect argument.

$result= $t->deleteRow($row [, $copyTray]) or die "$!";

Finds the handle of the matching row by the table's first leaf index and removes it. Returns 1 on success, 0 if the row was not found, undef and error message on an incorrect argument. Unlike insert(), the deletion methods for a row handle and a row are named differently to emphasize their difference. The method remove() must get a reference to the exactly same row handle that was previously inserted. The method deleteRow() does not have to get the same row as was previously inserted, instead it will find a row handle of the row that has the same key as the argument, according to the first leaf index. deleteRow() never deletes more than one row. If the index contains multiple matching rows (for example, if the first leaf is a FIFO index), only one of them will be removed, usually the first one (the exact choice depends on what row gets found by the index).

The row handles can be found in the table by indexes:

$rh = $t->find($row_or_rh);
$rh = $t->findIdx($idxType, $row_or_rh);

The default find() works using the first leaf index type, i.e. the following two areequivalent

$t->findIdx($t->getType()->getFirstLeaf(), $r)

but the find() version is slightly more efficient because it handles the index types inside the C++ code and does not create the Perl wrappers for them. The index type in all the table operations must be exactly one from the table's type, and not a copy. Since when a table type is constructed, the index types are copied into it, the only way to find the correct index type is to construct the whole table type and then get the index type from it using findSubIdx().

The find() operation is also used internally by deleteRow() and to process the rowops received at the table's input label.

If a row is used as an argument for find, a temporary row handle is internally created for it, and then the find is performed on it. Note that if you have a row handle that is already in the table, there is generally no use calling find on it, you will just get the same row handle back (well, except for the case of multi-valued indexes, then you will get back some matching row handle, usually the first one, which may be the same or not). The normal use is to create a new row handle, and then find a match for it.

If the matching row is not found, find methods would return a null row handle. They return undef and an error message on an argument error.

A findIdx() with a non-leaf index argument is a special case: it returns the first row handle of the group that has the key matching the argument. The order of "first" in this case is defined according to that index'es first leaf sub-index.

There also are convenience methods that construct a row from the field arguments and then find it:

$rh = $t->findBy("fieldName" => $fieldValue, ...);
$rh = $t->findIdxBy($idxType, "fieldName" => $fieldValue, ...);

If the row creation fails, these methods die.

The table can be iterated using the methods

$rh = $t->begin();
$rh = $t->next($rh); 
$rh = $t->beginIdx($idxType);
$rh = $t->nextIdx($idxType, $rh);

As usual, the versions without an explicit index type use the first leaf index type. The begin methods return the first row handle according to an index'es order, the next methods advance to the next row handle. When the end of the table is reached, these methods return a null row handle. The next methods return a null row handle if their argument row handle is a null or not in the table. So, if you iterate and remove the row handles, make sure to advance the iterator first and only then remove the current row handle.

If the index argument is non-leaf, it's equivalent to its first leaf.

To iterate through only a group, use findIdx() on the parent index type of the group to find the first row of the group. Then things become tricky: take the first index type one level below it to determine the iteration order (a group may have multiple indexes in it, defining different iteration orders). Use that index type with the usual nextIdx() to advance the iterator. However the end of the group will not be signaled by a null row handle. Instead first find the end marker handle of the group by using

$endrh = $t->nextGroupIdx($subIdxType, $firstrh);

Th $subIdxType here is the same index as used for nextIdx(). Then each row handle can be compared with the end marker with $rh->same($endrh).

The value $endrh is actually the first row handle of the next group, so it can also be used to jump quickly to the next group, and essentially iterate by groups. After the last group, nextGroupIdx() will return a null row handle. Which is OK for iteration, because at the end of the last group nextIdx() will also return a null row handle.

What if a group has a whole sub-tree of indexes in it, and you want to iterate it by the order of not the first sub-index? Still use findIdx() in the same way to find a row handle in the desired group. But then convert it to the first row handle in the desired order:

$beginrh = $t->firstOfGroupIdx($subIdxType, $rh);

After that proceed as before: get the end marker with nextGroupIdx() on the same sub-index, and iterate with nextIdx() on it.

This group iteration is somewhat messy and tricky, and maybe something better can be done with it in the future. If you look closely, you can also see that it doesn't allow to iterate the groups in every possible order. For example, if you have an index type hierarchy

| +-D
| | +-G
| | +-H 
| +-E 

and you want to iterate on the group inside B, you can go in the order of D or G (which is the same as D, since G is the first leaf of D) or of E, but you can not go in the order of H. But for most of the practical purposes it should be good enough.

Tuesday, March 20, 2012

AggregatorType reference

The aggregator type gets created by the constructor

$at = Triceps::AggregatorType->new($resultRowType, "aggName", $initFunc,
  $handlerFunc, @args...) or die "$!";

The rows created by the aggregator are of $resultRowType . The aggregator name is used to name the aggregator result label in the table, "tableName.aggName". It is also used to get the reference of that label from the table.

The optional args are passed to both the init and handler functions (to which $initFunc and $handlerFunc are references). The init function is called when the row group (contained in an index of the type, on which this aggregator type is set) is created. It initializes the group's aggregation state.  The handler function gets called on the changes to the group, as had been described at length previously.

The methods for comparison, printing and copying work similarly to the index types:

$result = $at1->same($at2);
$result = $at1->equals($at2);
$result = $at1->match($at2);
$result = $at->print(); 
$atCopy =  $at->copy();

The matching aggregator types may differ in the aggregator name and in the field names of the result row type. However the function references and their arguments must be the same.

IndexType reference

The index types in Triceps are available in the following kinds:
  • Hashed: Provides quick random access based on the key formed from the fields of the row in the table. May be leaf or non-leaf. The order of of rows in the index will be repeatable between the runs of the same program on the same machine architecture, but not easily predictable. Internally the rows are stored in a tree but the comparisons of the rows are accelerated by pre-calculating a hash value from the key fields and keeping it in the row handle.
  • FIFO: Keeps the rows in the order they were received. There is no efficient way to find a particular row in this index, the search in it works by going through all the rows sequentially and comparing the rows for exact equality. It provides the expiration policies based on the row count. It may only be a leaf.
  • PerlSorted: Provides random access based on the key field comparison, expressed as a Perl function. This results in a predictable order of rows but the execution of the Perl code makes it slower than the Hashed index.  May be leaf or non-leaf. There is also a SimpleOrdered index implementation done in Perl on top of the PerlSorted index, that allows to specify the keys in a more convenient way.

The hashed index is created with:

$it = Triceps::IndexType->newHashed($optionName => $optionValue, ...)
  or die "$!";

The only available option is "key", and it's mandatory. It's argument is the reference to an array of strings that specify the names of the key fields.

The FIFO index is created with:

$it = Triceps::IndexType->newFifo($optionName => $optionValue, ...)
  or die "$!";

The options are:
  • limit: sets the limit value for the replacement policy. Once the number of rows attempts to grow beyond this value, the older records get removed. Setting it to 0 disables the replacement policy, which is the default. Don't try to set it to negative values, they will be treated as unsigned, and thus become some very large positive ones. 
  • jumping: determines the variation of the replacement policy in effect. If set to 0 (default), implements the sliding window policy, removing the older rows one by one. If non-0, implements the jumping window policy, removing all the older rows when a new row causes the limit overflow.
  • reverse: if non-0, the iteration on this index goes in the reverse order. However the expiration policy still works in the direct order! The default is 0.

The PerlSorted index is created as:

$it = Triceps::IndexType->newPerlSorted($sortName, $initFunc,
  $compareFunc, @args...) or die "$!";

The $sortName is a string describing the sorting order, used in print() and error messages. $initFunc is a function reference that can be used to generate the comparison function dynamically at the table type initialization time (or use undef if using a fixed comparison function). $compareFunc is the fixed comparison function, if preferred (or use undef if it will be generated dynamically by the init function). The args are the optional extra arguments for the initialization and/or comparison function. See the details in the dedicated post.

The index types are connected in a table type to form a tree. To nest the indexType1 under indexType2, use:

$indexType2-> addSubIndex("indexName", $indexType1) or die "$!";

It returns the reference to the same $indexType2, so these calls can be conveniently chained, to add multiple sub-indexes under it. If $indexType2 can not be non-leaf, the call will fail. The added sub-index is actually a copy of $indexType1, the same as when adding an index type under a table type.

The same introspection methods of the nested index types are available as in the table type:

$itSub = $it-> findSubIndex("indexName") or die "$!";
$itSub = $it-> findSubIndexById($indexTypeId) or die "$!";

@itSubs = $it->getSubIndexes();
$itSub = $it->getFirstLeaf();

If the index type is already a leaf, getFirstLeaf() will return itself.

An aggregator type can be set on an index type. It will create aggregators that run on the rows stored withing the indexes of this type.

$it->setAggregator($aggType) or die "$!";

The value returned is the same index type reference $it, allowing the chaining calls, along with the addSubIndex(). Only one aggregator type is allowed on an index type. Calling setAggregator() repeatedly will replace the aggregator type. The aggregator type can be read back with

$aggType = $it->getAggregator();

The returned value may be undef (but "$!" not set) if no aggregator type has been set.

The index type gets initialized when the table type where it belongs gets initialized. After an index type has been initialized, it can not be changed any more, and any methods that change it will return an error. Whether an index type has been initialized, can be found with

$result = $it->isInitialized();

An index type can be copied with

$itCopy = $it->copy();

The copy reverts to the un-initialized state. It's always a deep copy, with all the nested index and aggregator types copied. All of these copies are un-initialized.

When an index type becomes initialized, it becomes tied to a particular table type. This table type can be read with

$tabType = $it->getTabtype() or die "$!";

If the index type is not initialized yet, this will return an error.

The usual sameness comparisons and print methods are available (and the print method has the usual optional arguments):

$result = $it1->same($it2);
$result = $it1->equals($it2); 
$result = $it1->match($it2);
$result = $it->print();

The matching index types may differ in the names of nested indexes. In the matching PerlSorted index types, the descriptive names of the types may also differ.

An index type can be checked for being a leaf:

$result = $it->isLeaf();

The index type id (see the explanation of that in the TableType reference) can be read with

$id = $it->getIndexId();

A special method that works only on the Hashed index types

@keys = $it->getKey();

returns the array of field names forming the key. On the other index types it returns an empty array, though probably a better support would be available for the PerlSorted indexes in the future.

A special method that works only on the PerlSorted index types

$it->setComparator($compareFunc, @args...) or die "$!";

allows to set an auto-generater comparator function and its optional arguments from an initializer function at the table initialization time. On success it returns 1. For all other index types this method returns an error.

TableType reference

I've shown quite a few of examples that use the tables. And there will be more, since the tables are such a centerpiece of processing. But some more obscure details have been skipped over. So, this starts a series of posts that list out the table-related features.

The TableType gets created from a row type as

$tt = Triceps::TableType->new($rowType) or die "$!";

After that it can be configured by adding the index types. Eventually it has to be initialized and that freezes the table type and makes it immutable. All the steps up to and including the initialization must be done from a single thread, after initialization a table type may be shared between multiple threads.

The index types are added with

$tt->addSubIndex("indexName", $indexType) or die "$!";

The result is the same table type (unless it's an undef signifying an error), so the index type additions can be chained with each other and with the construction:

$tt = Triceps::TableType->new($rowType) 
  ->addSubIndex("indexName1", $indexType1) 
  ->addSubIndex("indexName2", $indexType2)
  or die "$!";

Since the table type initialization freezes not only the table type itself but also all the index types in it, that would make things difficult if the same index type is added to two table types. To avoid these issues, addSubIndex() adds not the actual argument index type but first creates a fresh uninitialized copy of it, and then adds it. The initialization is done with:

$tt->initialize() or die "$!";

The index types check most of their arguments at the initialization time, so that's where most of the errors will be reported. Calling initialize() repeatedly will have no effect and just return the same result again and again. It's possible to check whether the table type has been initialized:

$result = $tt->isInitialized();

The other introspection is the row type:

$rowType = $tt->rowType();

There are multiple ways to get back the index types. To find a index type by name, use:

$indexType = $tt->findSubIndex("indexName") or die "$!";

This is symmetric with addSubIndex(), so it works only for the top-level index types, to get the nested ones, repeat the same call on the found index types.

There is also a way to find the first index type of a particular kind. It's called somewhat confusingly

$indexType = $tt->findSubIndexById($indexTypeId) or die "$!";

where $indexTypeId is one of either strings of Triceps constants
  • "IT_HASHED" or &Triceps::IT_HASHED
  • "IT_FIFO" or &Triceps::IT_FIFO
  • "IT_SORTED" or &Triceps::IT_SORTED
Technically, there is also IT_ROOT but it's of little use for this situation since it's the root of the index type tree hidden inside the table type, and would never be a sub-index type. It's possible to iterate through all the possible index type ids as

for ($i = 0; $i < &Triceps::IT_LAST; $i++) { ... }

The conversion between the strings and constants for index type ids is done with

$intId = &Triceps::stringIndexId($stringId);
$stringId = &Triceps::indexIdString($intId);

If an invalid value is supplied, the conversion functions will return undef.

The first leaf index type (the one used for the default look-ups and iteration) can be found with

$indexType = $tt->getFirstLeaf();

And all the top-level indexes can be found with

@indexTypes = $tt->getSubIndexes();

The resulting array contains the pairs of names and index types. If the order is not important but you want to perform the look-ups by name, the result can be stored directly into a hash:

%indexTypes = $tt->getSubIndexes();

The usual reference comparison methods are:

$result = $tt1->same($tt2);
$result = $tt1->equals($tt2);
$result = $tt1->match($tt2); 

And finally the content of a table type can be converted to a human-readable description with

$res = $tt->print();

with the usual optional arguments.

Monday, March 19, 2012

The copy tray for table operations

The aggregators are now wrapped up, moving on to the last feature of the tables. Its methods insert(), remove() and deleteRow() have an extra optional argument: the copy tray.

If used, it will put a copy of all the rowops produced during the operation (including the output of the aggregators) into that tray. The idea here is to use it in cases if you don't want to connect the output labels of the table directly, but instead collect and process them afterwards manually. Like this:

$ctr = $u->makeTray();
$t->insert($row, $ctr);
foreach my $rop ($ctr->toArray()) {

However in reality it didn't work out so well. The processing loop would have to have all the lengthy if-else sequences to branch first by the label (if there are any aggregators) and then by opcode. It looks too difficult. Well, it could work in the simple situations but not more than that.

In the future this feature will likely be deprecated, and I already have a better idea. Because of this, I see no point in going into the more extended examples.


The SimpleAggregator was one of the examples that uses the class Triceps::Opt to parse its arguments formatted as options. There is actually a similar option parser in CPAN but it didn't do everything I wanted, and considering how tiny it is, it's easier to write a new one from scratch than to extend that one. I also like to avoid the extra dependencies.

The heart of it is the method Triceps::Opt::parse(). Normally it would be called from a constructor of another class to parse the constructor's option, the SimpleAggregator was somewhat abusing it. It does the following:

  • Checks that all the options are known.
  • Checks that the values are acceptable.
  • Copies the values into the instance hash of the  target class.
  • Provides the default values for the unspecified options.

If anything goes wrong, it dies with a reasonable message. The arguments tell the class name for the messages (since, remember, it normally is expected to be called from the class constructor), the reference to object instance hash where to copy the options, the descriptions of the supported options, and the actual key-value pairs. A normal call looks like this:

package MyClass;

sub new() # (class, option => value, ...)
  my $class = shift;
  my $self = {};

  &Triceps::Opt::parse($class, $self, { 
      opt1 => [ 0 ],
      opt2 => [ undef, \&Triceps::Opt::ck_mandatory ],
      opt3 => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "ARRAY") } ],
    }, @_);

  bless $self, $class;
  return $self;

At the end of it, if all went well, the hash in $self would have the values at keys "opt1" and so on.

The options descriptions go in pairs of option name and an array reference with description. The array contains the default value and the checking function, either of which may be undefined. The checking function returns if everything went fine or dies on any errors. To die happily with a proper message,  it gets not only the value to check but more, altogether:
  • The value to check.
  • The name of the option.
  • The name of the class.
  • The object instance ($self), just in case.

If you want to do multiple checks, you just make a closure and call all the checks in sequence, passing @_ to them all, like shown here for opt3. If more arguments need to be passed to the checking function, just add them after @_ (or, if you prefer, before it).

You can create any checking functions, but a few ready ones are provided:

  • Triceps::Opt::ck_mandatory checks that the value is defined.
  • Triceps::Opt::ck_ref checks that the value is a reference to a particular class. Just give the class name as the extra argument. Or, to check that the reference is to array or hash, make the argument "ARRAY" or "HASH". Or an empty string "" to check that it's not a reference at all. For the arrays and hashes it can also check the values in there for being references to the correct types: give that type as the second extra argument. But it doesn't go deeper than that, just one nesting level.
  • Triceps::Opt::ck_refscalar checks that the value is a reference to a scalar. This is designed to check the arguments which are used to return data back to the caller, and if would accept any previous value in that scalar: an actual scalar value, an undef or a reference.
The ck_ref and ck_refscalar allow the value to be undefined, so they can safely be used on the optional options.When I come up with more of the usable check functions, I'll add them.

Sunday, March 18, 2012

The guts of SimpleAggregator, part 2

The functions that translate the $%variable names are built after the same pattern but have the different built-in variables:

# @param varname - variable to replace
# @param func - function name, for error messages
# @param vars - definitions of the function's vars
# @param id - the unique id of this field
# @param argCount - the argument count declared by the function
sub replaceStep # ($varname, $func, $vars, $id, $argCount)
  my ($varname, $func, $vars, $id, $argCount) = @_;

  if ($varname eq 'argiter') {
    confess "MySimpleAggregator: internal error in definition of aggregation function '$func', step computation refers to 'argiter' but the function declares no arguments"
      unless ($argCount > 0);
    return "\$a${id}";
  } elsif ($varname eq 'niter') {
    return "\$npos";
  } elsif ($varname eq 'groupsize') {
    return "\$context->groupSize()";
  } elsif (exists $vars->{$varname}) {
    return "\$v${id}_${varname}";
  } else {
    confess "MySimpleAggregator: internal error in definition of aggregation function '$func', step computation refers to an unknown variable '$varname'"

sub replaceResult # ($varname, $func, $vars, $id, $argCount)
  my ($varname, $func, $vars, $id, $argCount) = @_;

  if ($varname eq 'argfirst') {
    confess "MySimpleAggregator: internal error in definition of aggregation function '$func', result computation refers to '$varname' but the function declares no arguments"
      unless ($argCount > 0);
    return "\$f${id}";
  } elsif ($varname eq 'arglast') {
    confess "MySimpleAggregator: internal error in definition of aggregation function '$func', result computation refers to '$varname' but the function declares no arguments"
      unless ($argCount > 0);
    return "\$l${id}";
  } elsif ($varname eq 'groupsize') {
    return "\$context->groupSize()";
  } elsif (exists $vars->{$varname}) {
    return "\$v${id}_${varname}";
  } else {
    confess "MySimpleAggregator: internal error in definition of aggregation function '$func', result computation refers to an unknown variable '$varname'"

And finally the definition of the aggregation functions:

our $FUNCTIONS = {
  first => {
    result => '$%argfirst',
  last => {
    result => '$%arglast',
  count_star => {
    argcount => 0,
    result => '$%groupsize',
  count => {
    vars => { count => 0 },
    step => '$%count++ if (defined $%argiter);',
    result => '$%count',
  sum => {
    vars => { sum => 0 },
    step => '$%sum += $%argiter;',
    result => '$%sum',
  max => {
    vars => { max => 'undef' },
    step => '$%max = $%argiter if (!defined $%max || $%argiter > $%max);',
    result => '$%max',
  min => {
    vars => { min => 'undef' },
    step => '$%min = $%argiter if (!defined $%min || $%argiter < $%min);',
    result => '$%min',
  avg => {
    vars => { sum => 0, count => 0 },
    step => 'if (defined $%argiter) { $%sum += $%argiter; $%count++; }',
    result => '($%count == 0? undef : $%sum / $%count)',
  avg_perl => { # Perl-like treat the NULLs as 0s
    vars => { sum => 0 },
    step => '$%sum += $%argiter;',
    result => '$%sum / $%groupsize',
  nth_simple => { # inefficient, need proper multi-args for better efficiency
    vars => { n => 'undef', tmp => 'undef', val => 'undef' },
    step => '($%n, $%tmp) = @$%argiter; if ($%n == $%niter) { $%val = $%tmp; }',
    result => '$%val',

You can use as the starting point for building your own. As you can see, this very first simple version of SimpleAggregator didn't include the user-provided functions but the real one already does.

That's it, the whole aggregator generation.

The guts of SimpleAggregator, part 1

The implementation of the SimpleAggregator has turned out to be surprisingly small. Maybe a little biggish for a single post but still small. I've liked it so much that I've even saved the original small version in the file xSimpleAggregator.t. As more features will be added, the "official" version of the SimpleAggregator will grow (and already did) but that example file will stay small and simple.

I'll put the commentary interlaced with te code. So, here we go.

package MySimpleAggregator;
use Carp;

use strict;

sub make # (optName => optValue, ...)
  my $opts = {}; # the parsed options
  my $myname = "MySimpleAggregator::make";

  &Triceps::Opt::parse("MySimpleAggregator", $opts, { 
      tabType => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "Triceps::TableType") } ],
      name => [ undef, \&Triceps::Opt::ck_mandatory ],
      idxPath => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "ARRAY", "") } ],
      result => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "ARRAY") } ],
      saveRowTypeTo => [ undef, sub { &Triceps::Opt::ck_refscalar(@_) } ],
      saveInitTo => [ undef, sub { &Triceps::Opt::ck_refscalar(@_) } ],
      saveComputeTo => [ undef, sub { &Triceps::Opt::ck_refscalar(@_) } ],
    }, @_);

  # reset the saved source code
  ${$opts->{saveInitTo}} = undef if (defined($opts->{saveInitTo}));
  ${$opts->{saveComputeTo}} = undef if (defined($opts->{saveComputeTo}));
  ${$opts->{saveRowTypeTo}} = undef if (defined($opts->{saveRowTypeTo}));

Triceps::Opt is a class that deals with parsing and doing the basic checks on the options. I'll decribe it in detail in a separate post. For now, the important part is that the checked options are copied into the hash pointed to by $opts. If this were a proper object constructors, it would have been $self instead of $opts.

  # find the index type, on which to build the aggregator
  my $idx;
    my @path = @{$opts->{idxPath}};
    confess "$myname: idxPath must be an array of non-zero length"
      unless ($#path >= 0);
    my $cur = $opts->{tabType}; # the root of the tree
    my $progress = ''; 
    foreach my $p (@path) {
      $progress .= $p;
      $cur = $cur->findSubIndex($p) 
        or confess("$myname: unable to find the index type at path '$progress', table type is:\n" . $opts->{tabType}->print() . " "); 
      $progress .= '.';
    $idx = $cur;
  confess "$myname: the index type is already initialized, can not add an aggregator on it"
    if ($idx->isInitialized());

Since the SimpleAggregator uses an existing table with existing index, it doesn't require the aggregation key: it just takes an index that forms the group, and whatever key that leads to this index becomes the aggregation key. The lookup of index by path should probably become a standard method on a table type, but here it's implemented directly. Obviously, an aggregator can not be added on an already initialized index.

  # check the result definition and build the result row type and code snippets for the computation
  my $rtRes;
  my $needIter = 0; # flag: some of the functions require iteration
  my $needfirst = 0; # the result needs the first row of the group
  my $needlast = 0; # the result needs the last row of the group
  my $codeInit = ''; # code for function initialization
  my $codeStep = ''; # code for iteration
  my $codeResult = ''; # code to compute the intermediate values for the result
  my $codeBuild = ''; # code to build the result row
  my @compArgs; # the field functions are passed as args to the computation
    my $grpstep = 4; # definition grouped by 4 items per result field
    my @resopt = @{$opts->{result}};
    my @rtdefRes; # field definition for the result
    my $id = 0; # numeric id of the field

    while ($#resopt >= 0) {
      confess "$myname: the values in the result definition must go in groups of 4"
        unless ($#resopt >= 3);
      my $fld = shift @resopt;
      my $type = shift @resopt;
      my $func = shift @resopt;
      my $funcarg = shift @resopt;

      confess("$myname: the result field name must be a string, got a " . ref($fld) . " ")
        unless (ref($fld) eq '');
      confess("$myname: the result field type must be a string, got a " . ref($type) . " for field '$fld'")
        unless (ref($type) eq '');
      confess("$myname: the result field function must be a string, got a " . ref($func) . " for field '$fld'")
        unless (ref($func) eq '');

This starts the loop that goes over the result fields and builds the code to create them. The code will be built in multiple snippets that will eventually be combined to produce the compute function. Since the arguments go in groups of 4, it becomes fairly easy to miss one element somewhere, and then everything gets real confusing. So the code attempts to check the types of the arguments, in hopes of catching these off-by-ones as early as possible. The variable $id will be used to produce the unique prefixes for the function's variables.

       my $funcDef = $FUNCTIONS->{$func}
        or confess("$myname: function '" . $func . "' is unknown");

      my $argCount = $funcDef->{argcount};
      $argCount = 1 # 1 is the default value
        unless defined($argCount);
      confess("$myname: in field '$fld' function '$func' requires an argument computation that must be a Perl sub reference")
        unless ($argCount == 0 || ref $funcarg eq 'CODE');
      confess("$myname: in field '$fld' function '$func' requires no argument, use undef as a placeholder")
        unless ($argCount != 0 || !defined $funcarg);

      push(@rtdefRes, $fld, $type);

      push(@compArgs, $funcarg)
        if (defined $funcarg);

The definitions of "standard" aggregation functions are kept in $FUNCTIONS (it will be shown later). They are defined in exactly the same way as the vwap function has been shown before. The types of the fields get collected for the row definition, and the aggregation argument computation closures (or, technically, functions) get also collected, to pass later as the arguments of the compute function.

      # add to the code snippets

      ### initialization
      my $vars = $funcDef->{vars};
      if (defined $vars) {
        foreach my $v (keys %$vars) {
          # the variable names are given a unique prefix;
          # the initialization values are constants, no substitutions
          $codeInit .= "  my \$v${id}_${v} = " . $vars->{$v} . ";\n";
      } else {
        $vars = { }; # a dummy

      ### iteration
      my $step = $funcDef->{step};
      if (defined $step) {
        $needIter = 1;
        $codeStep .= "    # field $fld=$func\n";
        if (defined $funcarg) {
          # compute the function argument from the current row
          $codeStep .= "    my \$a${id} = \$args[" . $#compArgs ."](\$row);\n";
        # substitute the variables in $step
        $step =~ s/\$\%(\w+)/&replaceStep($1, $func, $vars, $id, $argCount)/ge;
        $codeStep .= "    { $step; }\n";

The initialization and iteration are produced if defined. It remembers in $needIter if any of the functions involved needs iteration.  And the iteration step is placed into a block. An extra ";" is added just in case, it doesn't hurt and helps if it was forgotten in the function definition.

      ### result building
      my $result = $funcDef->{result};
      confess "MySimpleAggregator: internal error in definition of aggregation function '$func', missing result computation"
        unless (defined $result);
      # substitute the variables in $result
      if ($result =~ /\$\%argfirst/) {
        $needfirst = 1;
        $codeResult .= "  my \$f${id} = \$args[" . $#compArgs ."](\$rowFirst);\n";
      if ($result =~ /\$\%arglast/) {
        $needlast = 1;
        $codeResult .= "  my \$l${id} = \$args[" . $#compArgs ."](\$rowLast);\n";
      $result =~ s/\$\%(\w+)/&replaceResult($1, $func, $vars, $id, $argCount)/ge;
      $codeBuild .= "    ($result), # $fld\n";

    $rtRes = Triceps::RowType->new(@rtdefRes)
      or confess "$myname: invalid result row type definition: $!";
  ${$opts->{saveRowTypeTo}} = $rtRes if (defined($opts->{saveRowTypeTo}));

In the same way the result computation is created, and remembers if any function wanted the fields from the first or last row. And eventually the result row type is created. Next the compute function gets assembled:

  # build the computation function
  my $compText = "sub {\n";
  $compText .= "  use strict;\n";
  $compText .= "  my (\$table, \$context, \$aggop, \$opcode, \$rh, \$state, \@args) = \@_;\n";
  $compText .= "  return if (\$context->groupSize()==0 || \$opcode == &Triceps::OP_NOP);\n";
  $compText .= $codeInit;
  if ($needIter) {
    $compText .= "  my \$npos = 0;\n";
    $compText .= "  for (my \$rhi = \$context->begin(); !\$rhi->isNull(); \$rhi = \$context->next(\$rhi)) {\n";
    $compText .= "    my \$row = \$rhi->getRow();\n";
    $compText .= $codeStep;
    $compText .= "    \$npos++;\n";
    $compText .= "  }\n";
  if ($needfirst) {
    $compText .= "  my \$rowFirst = \$context->begin()->getRow();\n";
  if ($needlast) {
    $compText .= "  my \$rowLast = \$context->last()->getRow();\n";
  $compText .= $codeResult;
  $compText .= "  \$context->makeArraySend(\$opcode,\n";
  $compText .= $codeBuild;
  $compText .= "  );\n";
  $compText .= "}\n";

  ${$opts->{saveComputeTo}} = $compText if (defined($opts->{saveComputeTo}));

The optional parts get included only if some of the functions needed them.

  # compile the computation function
  my $compFun = eval $compText
    or confess "$myname: error in compilation of the aggregation computation:\n  $@\nfunction text:\n$compText ";

  # build and add the aggregator
  my $agg = Triceps::AggregatorType->new($rtRes, $opts->{name}, undef, $compFun, @compArgs)
    or confess "$myname: internal error: failed to build an aggregator type: $! ";

    or confess "$myname: failed to set the aggregator in the index type: $! ";

  return $opts->{tabType};

Then the compute function is compiled. In case if the compilation fails, the error message will include both the compilation error and the text of the auto-generated function. Otherwise there would be no way to know, what exactly went wrong. Well, since no used code is included into the auto-generated function, it should never fail. Except if there is some bad code in the aggregation function definitions. The compiled function and collected closures are then used to create the aggregator.

Friday, March 16, 2012

The ubiquitous VWAP

Every CEP supplier loves an example of VWAP calculation: it's small, it's about that quintessential CEP activity: aggregation, and it sounds like something from the real world.

A quick sidebar: what is the VWAP? It's the Value-Weighted Average Price: the average price for the shares traded during some period of time, usually a day. If you take the price of every share traded during the day and calculate the average, you get the VWAP. What is the value-weighted part? The shares don't usually get sold one by one. They're sold in the variable-sized lots. If you think in the terms of lots and not individual shares, you have to weigh the trade prices (not to be confused with costs) for the lots proportional to the number of shares in them.

I've been using VWAP for trying out the approaches to the aggregation templates. The cutest so far is actually not a template at all: it's simply a user-defined aggregation function for the SimpleAggregator. Here is how it goes:

# VWAP function definition
my $myAggFunctions = {
  myvwap => {
    vars => { sum => 0, count => 0, size => 0, price => 0 },
    step => '($%size, $%price) = @$%argiter; '
      . 'if (defined $%size && defined $%price) '
        . '{$%count += $%size; $%sum += $%size * $%price;}',
    result => '($%count == 0? undef : $%sum / $%count)',

my $ttWindow = Triceps::TableType->new($rtTrade)
    Triceps::IndexType->newHashed(key => [ "id" ])
    Triceps::IndexType->newHashed(key => [ "symbol" ])
    ->addSubIndex("fifo", Triceps::IndexType->newFifo())
or die "$!";

# the aggregation result
my $rtVwap;
my $compText; # for debugging

  tabType => $ttWindow,
  name => "aggrVwap",
  idxPath => [ "bySymbol", "fifo" ],
  result => [
    symbol => "string", "last", sub {$_[0]->get("symbol");},
    id => "int32", "last", sub {$_[0]->get("id");},
    volume => "float64", "sum", sub {$_[0]->get("size");},
    vwap => "float64", "myvwap", sub { [$_[0]->get("size"), $_[0]->get("price")];},
  functions => $myAggFunctions,
  saveRowTypeTo => \$rtVwap,
  saveComputeTo => \$compText,
) or die "$!";

The rest of the example is the same as for the previous examples of the trades aggregation.

The option "functions" of  Triceps::SimpleAggregator::make() lets you add the custom aggregation functions. They're actually defined in the same way as the "standard" functions that come with the SimpleAggregator. The argument of that option is a reference to a hash, with the names of functions as the key and references to the function definitions as values. Each definition is again a hash, containing up to 4 keys:

  • argcount - Defines the number of arguments of the function, which maybe currently 0 or 1, with 1 being the default.
  • vars - Defines the variables used to keep the context of this function.
  • step - The computation of a single step of iteration.
  • result - The computation of the result of the function. This key is mandatory. The rest can be skipped if not needed.
The vwap function actually has two arguments per row: the trade size and the price. But no more than one argument is supported. So it works in the same way as "nth_simple": it leaves the argcount as the default 1 and packs its two argument into one, combining them into a single array returned by reference. That's why the closure for this field is

sub { [$_[0]->get("size"), $_[0]->get("price")];}

The single array reference becomes the closure's result and the vwap function's single argument, later unpacked by its code. By the way, the order of the elements in this array is important, first size and then price, not the other way around, or your results will be wrong.

The value of "vars" is a reference to yet another hash that maps the variable names to their initial values. The variables are always scalars. I just didn't find anything yet that would require a non-scalar. If the need for arrays or hashes arises, you can just create a reference and put it into a scalar. The initial values are strings that are substituted into the generated code as is. For the numbers, you can usually just put them in as numbers and they will work fine: that's  what vwap does with its 0s. If you don't particularly want to initialize with anything, put "undef" there - in quotes. If you want to use a string constant, quote it twice, like "'initial'" or '"initial"'. If you ever need an array or hash reference, that would be a "[]" or "{}". The namespace of the  variables is local to the functions, and when SimpleAggregator generates the code with them, it will add a unique mangled prefix to make sure that the variables from different fields don't conflict with each other.

The vwap computation defines four variables:  two to build the aggregation result and two to keep temporarily the trade size and price extracted from the current row.

The presence of "step" is what tells the SimpleAggregator that this function needs  to iterate through the rows. Its value is a string defining the code snippet that would be placed into the iteration loop. The step computation can refer to the function's variables through the syntax of "$%varname". All such occurrences just get globally replaced, like in a K&R C macro. This is a fairly rarely occurring combination in the normal Perl code, so there should be little confusion. If you ever need to pass this sequence through as a literal, just break it up: depending on the circumstances, use'$\%' or '${%...}'.

There also are a few pre-defined variables that can be used in "step" (make sure not to name your variables conflicting with those):
  • $%argiter - The function's argument extracted from the current row.
  • $%niter - The number of the current row in the group, starting from 0.
  • $%groupsize - The size of the group ($context->groupSize()).

The step of vwap first extracts the size and price from the current row. It uses @$%argiter, which naturally means "array to which $%argiter refers". If you like to put everything into the explicit parenthesis, you could also use  @{$%argiter} instead. Then it updates the current count of shares and the sum of all trades' prices. The check for undef helps keeping things more consistent, taking into the consideration only the rows where both size and price are defined. Without this check, if some row has only the price undefined, its size will still affect and throw off the result.

The step's code gets enclosed in a block when the code is generated, so it can safely define some scope-local variables in it. Those get used as the normal $var or @var or %var. Here size and price could have been done as scope-local variables instead of being function-wide:

my ($size, $price) = @$%argiter; ...

The value of $%argiter gets actually computed in the generated code in advance, once per row, so it's safe to use it in the "step" code multiple times.

The "result" is very much like "step", only it's mandatory, its string value defines an expression and not a set of statements, and it has the different pre-defined variables:

  • $%argfist - The function argument from the first row of the group.
  • $%arglast - The function argument from the last row of the group.
  • $%groupsize - Still the size of the group.
For vwap the result is a straightforward division, with a little precaution against division by 0.

And that's it, a home-grown aggregation function for vwap. When debugging your aggregation functions, the ability to look at the generated code, as saved with the option saveComputeTo, comes pretty handy. If the error happens right during the compilation of the generated code, its source text gets printed automatically in the error message. In the future I plan to add the syntax checks for the code snippets of the functions even before embedding them into a full compute function, but things haven't gotten that far yet.

Wednesday, March 14, 2012


Since I've started the more user-friendly approach with the ordered index, that gave me ideas for the aggregation too. After all, even though the manual aggregation gives the flexibility, it's too much work for the simple cases. And here we go, I've added the SimpleAggregator to make things easier. Here is the same example as before written with the SimpleAggregator:

my $uTrades = Triceps::Unit->new("uTrades") or die "$!";

# the input data
my $rtTrade = Triceps::RowType->new(
  id => "int32", # trade unique id
  symbol => "string", # symbol traded
  price => "float64",
  size => "float64", # number of shares traded
) or die "$!";

my $ttWindow = Triceps::TableType->new($rtTrade)
    Triceps::IndexType->newHashed(key => [ "id" ])
    Triceps::IndexType->newHashed(key => [ "symbol" ])
      Triceps::IndexType->newFifo(limit => 2)
or die "$!";

# the aggregation result
my $rtAvgPrice;
my $compText; # for debugging

  tabType => $ttWindow,
  name => "aggrAvgPrice",
  idxPath => [ "bySymbol", "last2" ],
  result => [ 
    symbol => "string", "last", sub {$_[0]->get("symbol");},
    id => "int32", "last", sub {$_[0]->get("id");},
    price => "float64", "avg", sub {$_[0]->get("price");},
  saveRowTypeTo => \$rtAvgPrice,
  saveComputeTo => \$compText,
) or die "$!";

$ttWindow->initialize() or die "$!";
my $tWindow = $uTrades->makeTable($ttWindow,
  &Triceps::EM_CALL, "tWindow") or die "$!";

# label to print the result of aggregation
my $lbAverage = $uTrades->makeLabel($rtAvgPrice, "lbAverage",
  undef, sub { # (label, rowop)
    print($_[1]->printP(), "\n");
  }) or die "$!";
  or die "$!";

while(<STDIN>) {
  my @data = split(/,/); # starts with a string opcode
  $uTrades->makeArrayCall($tWindow->getInputLabel(), @data)
    or die "$!";
  $uTrades->drainFrame(); # just in case, for completeness

The main loop and the printing is the same as before. The result produced is also exactly the same as before.

But the aggregator is created with Triceps::SimpleAggregator::make(). Its arguments are in the option format: the option name-value pairs, in any order. Most of these "options" are actually mandatory. The aggregator type is connected to the table type with the options:

tabType - table type to put the aggregator on

idxPath - an a reference to an array of index names, forming the path to the index where the aggregator type will be set

name - the aggregator type name

The result row type and computation is defined with the option "result": each group of four values in that array defines one result field: the field name, its type, the aggregation function name, and a closure that extracts the aggregation function argument from the row (well, it can be any function reference, doesn't have to be an anonymous closure). That function gets the row as the argument $_[0] and returns the extracted value to run the aggregation on. The field name is by convention separated from its definition fields by "=>". Remember, it's just a convention, for Perl a "=>" is just as good as a comma.

SimpleAggregator::make() automatically generates the  result row type and aggregation function, creates an aggregator type from them, and sets it on the index type. It returns back the table type on success,  but for this example it was good enough to check the result for undef. The information about the aggregation result can be found by traversing through the index type tree, or by constructing a table and getting the row type from the aggregator result label. However it's much easier to save it during construction, and the option (this time an optional one!) "saveRowTypeTo" allows to do this. Give it a reference to a variable, and the row type will be placed into that variable.

Most of the time the things would just work. However if they don't and something dies in the aggregator, you will need the source code of the compute function to make sense of these errors. The option "saveComputeTo" gives a variable to save that source code for future perusal and other entertainment. Here is what gets produced by this example:

sub {
  use strict;
  my ($table, $context, $aggop, $opcode, $rh, $state, @args) = @_;
  return if ($context->groupSize()==0 || $opcode == &Triceps::OP_NOP);
  my $v2_count = 0;
  my $v2_sum = 0;
  my $npos = 0;
  for (my $rhi = $context->begin(); !$rhi->isNull(); $rhi = $context->next($rhi)) {
    my $row = $rhi->getRow();
    # field price=avg
    my $a2 = $args[2]($row);
    { if (defined $a2) { $v2_sum += $a2; $v2_count++; }; }
  my $rowLast = $context->last()->getRow();
  my $l0 = $args[0]($rowLast);
  my $l1 = $args[1]($rowLast);
    ($l0), # symbol
    ($l1), # id
    (($v2_count == 0? undef : $v2_sum / $v2_count)), # price

At the moment the compute function is quite straightforward and just does the aggregation from scratch every time. It's only smart enough to skip the iteration if all the result consists of only "first", "last" and "count_star". It gets the closures for the argument extraction as arguments.

The aggregation functions available at the moment are:

  • first - value from the first row in the group.
  • last - value from the last row in the group.
  • count_star - number of rows in the group, like SQL COUNT(*). Since there is no argument for this function, use undef instead of the argument closure.
  • sum - sum of the values.
  • max - the maximum value.
  • min - the minimum value.
  • avg - the average of all the non-null values.
  • avg_perl - the average of all values, with null values treated in Perl fashion as zeroes. So, technically when this example used "avg", it works the same as the previous versions only for the non-null fields. To be really the same, it should have used "avg_perl".
  • nth_simple - the Nth value from the start of the group. This is a tricky function because it needs two arguments: the value of N and the field selector. Multiple direct arguments will be supported in the future but right now it works through a workaround: the argument closure must return not just the extracted field but a reference to array with two values, the N and the field. For example, sub { [1, $_[0]->get("id")];}. The N is counted starting from 0, so the value of 1 will return the second record. This function works in a fairly simple-minded and inefficient way at the moment.
There will be more functions to come, and you can even already add your own. More on that in the next installment.

I can think of many ways the SimpleAggregator can be improved, but for now they have been pushed into the future to keep it simple.

By the way, where are the examples

I've been talking about all these examples but I can't remember if I ever told, where to find them in the code.

Overall, the examples live together with unit tests, under perl/Triceps/t. The files with names starting with x contain the examples, like xWindow.t. Usually there are multiple related examples in the same file. And they are not quite exactly the same as the ones in the posts, because they are plugged into the unit test infrastructure: rather than reading and writing to the stdin and stdout, they take the inputs from variables, put the results into variables, and have the results checked for correctness. This way the examples stay working and do not experience the bit rot when something changes.

The other unit tests in the .t files are interesting too, since they contain absolutely all the possible usages of everything, and can be used as a reference. However they tend to be much more messy and hard to read, exactly because they contain in them lots of tiny snippets that do everything.

I've been coming up with the new examples and features for 1.0 as I'm writing the docs. How to get those? By checking out the trunk of the development tree directly from svn:

svn co

You don't need any login for check-out. You can keep it current with latest changes by periodically running "svn update". After you've checked out the trunk, you can build it as usual, and find the examples in the usual location under perl/Triceps/t.