Friday, April 27, 2012

JoinTwo joins tables

Another piece of the join infrastructure looks ready for the prime time now, the JoinTwo template. As the name suggests, it joins two tables.

Fundamentally, joining the two tables is kind of like the two symmetrical copies of LookupJoin. And it really is, under the hood JoinTwo is translated into two LookupJoins. But there is a good deal more stuff going on.

For all I can tell, the CEP systems with the insert-only stream model tend to start with the assumption that the LookupJoin (or whetever they call it) is good enough. Then it turns out that manually writing the join twice where it can be done once is a pain. So the table-to-table join gets added. Then the interesting nuances crop up. Then it turns out that it would be real convenient to propagate the deletes through the join, and that gets added as a special feature behind the scenes.

So in Triceps the JoinTwo takes care of the details of joining the tables. In a common database a join query causes a join plan to be created: with what table to start, and which to look up in next. A CEP system deals with the changing data, and a join has to react to data changes on each of its input tables. It must have multiple plans, one for starting from each of the tables. And essentially a LookupJoin embodies such a plan, and JoinTwo makes two of them.

Why only two? Because it's the minimal usable number. The join logic is tricky, so it's better to work out the kinks on something simpler. And it still can be scaled to many tables by joining them in stages. It's not quite as efficient as a direct join of multiple tables, because the result of each stage has to be put into a table, but does the job.

For the application example, I'll be using one from the area of stock lending. Think of a large multinational broker that wants to keep track of its lending activities. It has many customers to whom the stock can be loaned or from whom it can be borrowed. This information comes as the records of positions, of how many shares are loaned or borrowed for each customer, and at what contractual price. And since the clients are from all around the world, the prices may be in different currencies. A simplified and much shortened version of position information may look like this:

our $rtPosition = Triceps::RowType->new( # a customer account position
  date => "int32", # as of which date, in format YYYYMMDD
  customer => "string", # customer account id
  symbol => "string", # stock symbol
  quantity => "float64", # number of shares
  price => "float64", # share price in local currency
  currency => "string", # currency code of the price
) or die "$!";


Then we want to aggregate these data in different ways, getting the broker-wide summaries by the symbol, by customer etc. The aggregation is updated as the business day goes on. At the end of the business day the state of the day freezes,
 and the new day's initial data is loaded. That's why the business date is part of the schema. If you wonder, the next day's initial data is usually the same as at the end of the previous day, except where some contractual conditions change. The detailed position data is thrown away after a few days, or even right at the end of the day, but the aggregation results from the end of the day are kept for a longer history.

There is a problem with summing up the monetary values: they come in different currencies and can not be added up directly. If we want to get this kind of summaries, we have to translate all of them to a single reference currency. That's what the sample joins will be doing: finding the translation rates to the US dollars. The currency rates come in the translation schema:

our $rtToUsd = Triceps::RowType->new( # a currency conversion to USD
  date => "int32", # as of which date, in format YYYYMMDD
  currency => "string", # currency code
  toUsd => "float64", # multiplier to convert this currency to USD
) or die "$!";


Since the currency rates change all the time, to make sense of a previous day's position the previous day's rates need to also be kept, and so the rates are also marked with a date.

Having the mood set, here is the first example of an inner join:

# exchange rates, to convert all currencies to USD
our $ttToUsd = Triceps::TableType->new($rtToUsd)
  ->addSubIndex("primary",
    Triceps::IndexType->newHashed(key => [ "date", "currency" ])
  ) 
  ->addSubIndex("byDate", # for cleaning by date
    Triceps::SimpleOrderedIndex->new(date => "ASC")
    ->addSubIndex("grouping", Triceps::IndexType->newFifo())
  ) 
or die "$!";
$ttToUsd->initialize() or die "$!";

# the positions in the original currency
our $ttPosition = Triceps::TableType->new($rtPosition)
  ->addSubIndex("primary",
    Triceps::IndexType->newHashed(key => [ "date", "customer", "symbol" ])
  ) 
  ->addSubIndex("currencyLookup", # for joining with currency conversion
    Triceps::IndexType->newHashed(key => [ "date", "currency" ])
    ->addSubIndex("grouping", Triceps::IndexType->newFifo())
  ) 
  ->addSubIndex("byDate", # for cleaning by date
    Triceps::SimpleOrderedIndex->new(date => "ASC")
    ->addSubIndex("grouping", Triceps::IndexType->newFifo())
  ) 
or die "$!";
$ttPosition->initialize() or die "$!";

our $uJoin = Triceps::Unit->new("uJoin") or die "$!";

our $tToUsd = $uJoin->makeTable($ttToUsd,
  &Triceps::EM_CALL, "tToUsd") or die "$!";
our $tPosition = $uJoin->makeTable($ttPosition,
  &Triceps::EM_CALL, "tPosition") or die "$!";

our $join = Triceps::JoinTwo->new(
  name => "join",
  leftTable => $tPosition,
  leftIdxPath => [ "currencyLookup" ],
  rightTable => $tToUsd,
  rightIdxPath => [ "primary" ],
  type => "inner",
); # would die by itself on an error
# label to print the changes to the detailed stats
makePrintLabel("lbPrint", $join->getOutputLabel());

while(<STDIN>) {
  chomp;
  my @data = split(/,/); # starts with a command, then string opcode
  my $type = shift @data;
  if ($type eq "cur") {
    $uJoin->makeArrayCall($tToUsd->getInputLabel(), @data)
      or die "$!";
  } elsif ($type eq "pos") {
    $uJoin->makeArrayCall($tPosition->getInputLabel(), @data)
      or die "$!";
  }
  $uJoin->drainFrame(); # just in case, for completeness
}


The example just does the joining, leaving the aggregation to the imagination of the reader.  The result of a JoinTwo is not stored in a table. It is a stream of ephemeral updates, same as for LookupJoin. If you want to keep them, you can put them into a table yourself (and maybe do the aggregation in the same table).

Both the joined tables must provide an index for the efficient joining. The index may be leaf (selecting one row per key) or non-leaf (containing multiple rows per key) but it must be there. This makes sure that the joins are always efficient and you don't have to hunt for why your model is suddenly so slow.

The indexes also provide the default way of finding the join condition: the key fields in the indexes are paired up together, in the order they go in the index specifications. Once again, the fields are paired not by name but by order. If the indexes are nested, the outer indexes precede in the order. For example, the $ttToUsd could have the same index done in a nested way and it would work just as well:

  ->addSubIndex("byDate",
    Triceps::IndexType->newHashed(key => [ "date" ])
    ->addSubIndex("primary",
      Triceps::IndexType->newHashed(key => [ "currency" ])
    )
  ) 


Same as with LookupJoin, currently only the Hashed indexes are supported, and must go through all the path. The outer index "byDate" here can not be a Sorted/Ordered index, that would be an error and the join will refuse to accept it.

But if the order of fields in it were changed in $ttToUsd to be different from $ttPosition, like this

  ->addSubIndex("primary",
    Triceps::IndexType->newHashed(key => [ "currency", "date" ])
  ) 

then it would be a mess. The wrong fields would be matched up in the join condition, which would become (tPosition.date == tToUsd.currency && tPosition.currency == tToUsd.date), and everything would go horribly wrong.

Incidentally, this situation in this particular case would be caught because JoinTwo is much less lenient than LookupJoin as the key field types go. It requires the types of the matching fields to be exactly the same. Partially, for the reasons of catching the wrong order, partially for the sake of the result consistency. JoinTwo does the look-ups in both directions. And think about what happens if a string and an int32 field get matched up, and then the non-numeric strings turn up, containing "abc" and "qwerty". Those strings on the left side will match the rows with numeric 0 on the right side. But then if the row with 0 on the right side changes, it would look for the string "0" on the left, which would not find either "abcd" or "qwerty". The state of the join will become a mess. So no automatic key type conversions here.

If you wonder, there are ways to specify the key fields match explicitly, they will be shown later.

The option "type" selects the inner join mode. The inner join is the default, and would have been used even if this option was not specified.

The results here include all the fields from both sides by default. The JoinTwo is smart and knows how to exclude the duplicating key fields.

The joins are currently not equipped to actually compute the translated prices directly. They can only look up the information for it, and the computation can be done later, before or during the aggregation.

That's enough explanations for now, let's look at the result (with the input rows shown as usual in italics):
cur,OP_INSERT,20120310,USD,1
cur,OP_INSERT,20120310,GBP,2
cur,OP_INSERT,20120310,EUR,1.5
 
Inserting the reference currencies produces no result, since it's an inner join and they have no matching positions yet.

pos,OP_INSERT,20120310,one,AAA,100,15,USD
join.leftLookup.out OP_INSERT date="20120310" customer="one"
 symbol="AAA" quantity="100" price="15" currency="USD" toUsd="1" 
pos,OP_INSERT,20120310,two,AAA,100,8,GBP
join.leftLookup.out OP_INSERT date="20120310" customer="two"
 symbol="AAA" quantity="100" price="8" currency="GBP" toUsd="2" 

Now the positions arrive and find the matching translations to USD. The label names on the output are an interesting artifact of all the chained labels receiving the original rowop that refers to the first label in the chain. Which happens to be the output label of a LookupJoin inside JoinTwo. The name of that LookupJoin shows whether the row that initiated the table came from the left or right side of the JoinTwo.

pos,OP_INSERT,20120310,three,AAA,100,300,RUR

This position is out of luck: no translation for its currency. The inner join is actually not a good choice here. If a row does not pass through because of the lack of translation, it gets excluded even from the aggregations that do not require the translation, such as those that total up the quantity of a particular symbol across all the customers. A left join would be better suited.

pos,OP_INSERT,20120310,three,BBB,200,80,GBP
join.leftLookup.out OP_INSERT date="20120310" customer="three"
 symbol="BBB" quantity="200" price="80" currency="GBP" toUsd="2" 

Another position arrives, same as before.

cur,OP_INSERT,20120310,RUR,0.04
join.rightLookup.out OP_INSERT date="20120310" customer="three"
 symbol="AAA" quantity="100" price="300" currency="RUR" toUsd="0.04" 

The translation for RUR finally comes in. The position in RUR can now find its match and propagates through.

cur,OP_DELETE,20120310,GBP,2
join.rightLookup.out OP_DELETE date="20120310" customer="two"
 symbol="AAA" quantity="100" price="8" currency="GBP" toUsd="2" 
join.rightLookup.out OP_DELETE date="20120310" customer="three"
 symbol="BBB" quantity="200" price="80" currency="GBP" toUsd="2" 
cur,OP_INSERT,20120310,GBP,2.2
join.rightLookup.out OP_INSERT date="20120310" customer="two"
 symbol="AAA" quantity="100" price="8" currency="GBP" toUsd="2.2" 
join.rightLookup.out OP_INSERT date="20120310" customer="three"
 symbol="BBB" quantity="200" price="80" currency="GBP" toUsd="2.2" 

An exchange rate update for GBP arrives. It amounts to "delete the old translation and then insert a new one". Each of these operations updates the state of the join: the disappearing translation causes all the GBP positions to be deleted from the result, and the new translation inserts them back, with the new value of toUsd.

pos,OP_DELETE,20120310,one,AAA,100,15,USD
join.leftLookup.out OP_DELETE date="20120310" customer="one"
 symbol="AAA" quantity="100" price="15" currency="USD" toUsd="1" 
pos,OP_INSERT,20120310,one,AAA,200,16,USD
join.leftLookup.out OP_INSERT date="20120310" customer="one"
 symbol="AAA" quantity="200" price="16" currency="USD" toUsd="1" 

A position update arrives. Again, it's a delete-and-insert, and propagates through the join as such.

Monday, April 23, 2012

LookupJoin key by pattern

While working on the table-to-table joins, I've been also retrofitting some of the features to the LookupJoin. So, here are two changes, both in the department of the join condition:

First, now it checks that the fields in the right side of the option "by" are actually all matching the keys fields in right-side index.

Second, now the join condition may also be specified in a pattern form, the same as the result. For example the condition from the previous examples

by => [ "acctSrc" => "source", "acctXtrId" => "external" ],

can be also specified as:

byLeft => [ "acctSrc/source", "acctXtrId/external" ],

The option name "byLeft" says that the pattern specification is for the fields on the left side (there is no symmetric "byRight"). The substitutions produce the matching field names for the right side. Unlike the result pattern, here the fields that do not find a match do not get included in the key. It's as if an implicit "!.*" gets added at the end. In fact, "!.*" really does get added implicitly at the end.

Of course, for this example either option doesn't make much difference. It starts making the difference when the key fields follow a pattern. For example, if the key fields on both sides have the names "acctSrc" and "acctXtrId", the specification with the new option becomes a little simpler:

byLeft => [ "acctSrc", "acctXtrId" ],

Even more so if the key is long, common on both sides, and all the fields have a common prefix. For example:

k_AccountSystem
k_AccountId
k_InstrumentSystem
k_InstrumentId
k_TransactionDate
k_SettlementDate

Then the key can be specified simply as

byLeft => [ "k_.*" ],

If the settlement date doesn't matter, it can be excluded:

byLeft => [ "!k_SettlementDate", "k_.*" ],

If the right side represents a swap of securities, it might have two parts to it, each describing its half with its key:

BorrowAccountSystem
BorrowAccountId
BorrowInstrumentSystem
BorrowInstrumentId
BorrowTransactionDate
BorrowSettlementDate
LoanAccountSystem
LoanAccountId
LoanInstrumentSystem
LoanInstrumentId
LoanTransactionDate
LoanSettlementDate

Then the join with its borrow part can be done with

byLeft => [ 'k_(.*)/Borrow$1' ],

Makes the long keys easier to drag around.

Friday, April 20, 2012

The key fields of LookupJoin

I think I haven't mentioned it explicitly before, but LookupJoin has an interesting property: its key fields don't have to be of the same type on the left and on the right side. Since the key building for lookup is done through Perl, the key values get automatically converted as needed.

A caveat is that the conversion might be not exactly direct. If a string gets converted to a number, then any string values that do not look like numbers will be converted to 0. A conversion between a string and a floating-point number, in either direction, is likely to lose precision. A conversion between int64 and int32 may cause the upper bits to be truncated. So what gets looked up may be not what you expect.

I'm not sure yet if I should add the requirement for the types being exactly the same. The automatic conversions seem to be convenient, just use them with care. I suppose, when the joins will get eventually implemented in the C++ code, this freedom would go away because it's much easier and more efficient in C++ to copy the field values as-is than to convert them.

The only thing currently checked is whether a field is represented in Perl as a scalar or an array, and that must match on the left and on the right. Note that the array uint8[] gets represented in Perl as a scalar string, so an uint8[] field can be matched with other scalars but not with the other arrays.

Monday, April 16, 2012

A peek inside LookupJoin

I won't be describing in the details the internals of LookupJoin. They seem a bit too big and complicated. Partially it's because the code is of an older origin, and not using the newer shortcuts. Partially it's because when I wrote it, I've tried to optimize by translating the rows to an array format instead of referring to the fields by names, and that made the code more tricky. Partially, the code has grown more complex due to all the added options. And partially the functionality just is a little tricky by itself.

But, for debugging purposes, the LookupJoin constructor can return the auto-generated code of the joiner function. It's done with the option "saveJoinerTo":

  saveJoinerTo => \$code,

This snippet will cause the auto-generated code to be placed into the variable $code. This provides a glimpse into the internal workings of the joiner.

This is the joiner code from the first example:

sub # ($inLabel, $rowop, $self)
{
  my ($inLabel, $rowop, $self) = @_;
  #print STDERR "DEBUGX LookupJoin " . $self->{name} . " in: ", $rowop->printP(), "\n";

  my $opcode = $rowop->getOpcode(); # pass the opcode
  my $row = $rowop->getRow();

  my @leftdata = $row->toArray();

  my $resRowType = $self->{resultRowType};
  my $resLabel = $self->{outputLabel};

  my $lookuprow = $self->{rightRowType}->makeRowHash(
    source => $leftdata[1],
    external => $leftdata[2],
    );
  
  #print STDERR "DEBUGX " . $self->{name} . " lookup: ", $lookuprow->printP(), "\n";
  my $rh = $self->{rightTable}->findIdx($self->{rightIdxType}, $lookuprow);
  Carp::confess("$!") unless defined $rh;

  my @rightdata; # fields from the right side, defaults to all-undef, if no data found
  my @result; # the result rows will be collected here

  if (!$rh->isNull()) {
    #print STDERR "DEBUGX " . $self->{name} . " found data: " . $rh->getRow()->printP() . "\n";
    @rightdata = $rh->getRow()->toArray();
  }

    my @resdata = ($leftdata[0],
    $leftdata[1],
    $leftdata[2],
    $leftdata[3],
    $rightdata[2],
    );
    my $resrowop = $resLabel->makeRowop($opcode, $resRowType->makeRowArray(@resdata));
    #print STDERR "DEBUGX " . $self->{name} . " +out: ", $resrowop->printP(), "\n";
    Carp::confess("$!") unless defined $resrowop;
    Carp::confess("$!") 
      unless $resLabel->getUnit()->call($resrowop);
    
}

This is the joiner code from the example with the manual iteration:

sub  # ($self, $row)
{
  my ($self, $row) = @_;

  #print STDERR "DEBUGX LookupJoin " . $self->{name} . " in: ", $row->printP(), "\n";

  my @leftdata = $row->toArray();

  my $lookuprow = $self->{rightRowType}->makeRowHash(
    source => $leftdata[1],
    external => $leftdata[2],
    );
  
  #print STDERR "DEBUGX " . $self->{name} . " lookup: ", $lookuprow->printP(), "\n";
  my $rh = $self->{rightTable}->findIdx($self->{rightIdxType}, $lookuprow);
  Carp::confess("$!") unless defined $rh;

  my @rightdata; # fields from the right side, defaults to all-undef, if no data found
  my @result; # the result rows will be collected here

  if (!$rh->isNull()) {
    #print STDERR "DEBUGX " . $self->{name} . " found data: " . $rh->getRow()->printP() . "\n";
    @rightdata = $rh->getRow()->toArray();
  }

    my @resdata = ($leftdata[0],
    $leftdata[1],
    $leftdata[2],
    $leftdata[3],
    $rightdata[2],
    );
    push @result, $self->{resultRowType}->makeRowArray(@resdata);
    #print STDERR "DEBUGX " . $self->{name} . " +out: ", $result[$#result]->printP(), "\n";
  return @result;
}

It takes different arguments because now it's not an input label handler but a common function that gets called from both the label handler and the lookup() method. And it collects the rows in an array to be returned instead of immediately passing them on.

This is the joiner code from the example with multiple rows matching on the right side:

sub # ($inLabel, $rowop, $self)
{
  my ($inLabel, $rowop, $self) = @_;
  #print STDERR "DEBUGX LookupJoin " . $self->{name} . " in: ", $rowop->printP(), "\n";

  my $opcode = $rowop->getOpcode(); # pass the opcode
  my $row = $rowop->getRow();

  my @leftdata = $row->toArray();

  my $resRowType = $self->{resultRowType};
  my $resLabel = $self->{outputLabel};

  my $lookuprow = $self->{rightRowType}->makeRowHash(
    source => $leftdata[1],
    external => $leftdata[2],
    );
  
  #print STDERR "DEBUGX " . $self->{name} . " lookup: ", $lookuprow->printP(), "\n";
  my $rh = $self->{rightTable}->findIdx($self->{rightIdxType}, $lookuprow);
  Carp::confess("$!") unless defined $rh;

  my @rightdata; # fields from the right side, defaults to all-undef, if no data found
  my @result; # the result rows will be collected here

  if ($rh->isNull()) {
    #print STDERR "DEBUGX " . $self->{name} . " found NULL\n";

    my @resdata = ($leftdata[0],
    $leftdata[1],
    $leftdata[2],
    $leftdata[3],
    $rightdata[2],
    );
    my $resrowop = $resLabel->makeRowop($opcode, $resRowType->makeRowArray(@resdata));
    #print STDERR "DEBUGX " . $self->{name} . " +out: ", $resrowop->printP(), "\n";
    Carp::confess("$!") unless defined $resrowop;
    Carp::confess("$!") 
      unless $resLabel->getUnit()->call($resrowop);
    
  } else {
    #print STDERR "DEBUGX " . $self->{name} . " found data: " . $rh->getRow()->printP() . "\n";
    my $endrh = $self->{rightTable}->nextGroupIdx($self->{iterIdxType}, $rh);
    for (; !$rh->same($endrh); $rh = $self->{rightTable}->nextIdx($self->{rightIdxType}, $rh)) {
      @rightdata = $rh->getRow()->toArray();
    my @resdata = ($leftdata[0],
    $leftdata[1],
    $leftdata[2],
    $leftdata[3],
    $rightdata[2],
    );
    my $resrowop = $resLabel->makeRowop($opcode, $resRowType->makeRowArray(@resdata));
    #print STDERR "DEBUGX " . $self->{name} . " +out: ", $resrowop->printP(), "\n";
    Carp::confess("$!") unless defined $resrowop;
    Carp::confess("$!") 
      unless $resLabel->getUnit()->call($resrowop);
    
    }
  }
}

It's more complicated in two ways: If a match is found, it has to iterate through the whole matching group. And if the match is not found, it still has to produce a result row for a left join with a separate code fragment.

Patterns in the join results

So far I've given only the examples where the fields were renamed only with literals. But the patterns allow more, they allow substituting the parts of the matched value.

Suppose, we want to just copy all the fields from both the left and right sides, but many of them happen to have the same names. If there are many fields, renaming each of them looks like a tiresome proposition. But we can just give the unique prefixes to the fields from the left and right side (or maybe to just one of the sides):

  leftFields => [ '.*/left_$&' ],
  rightFields => [ '.*/right_$&' ],

The "$&" in the substitution gets replaced with the whole matched field name. Or the parts of the field names can be selected by the parenthesis. For example, let's add an underscore after "acct_", passing the rest of the fields unchanged:

  rightFields => [ 'acct(.*)/acct_$1', '.*' ],

As usual in the Perl regexps, the parenthesized patterns are numbered left to right, starting with $1.

Manual iteration with LookupJoin

Sometimes you might want to just get the list of the resulting rows from LookupJoin and iterate over them by yourself, rather than have it call the labels. To be honest, this looked kind of important when I wrote LookupJoin first, but by now I don't see a whole lot of use in it. By now, if you want to do a manual iteration, calling findBy() and then iterating looks like a more useful option. But at the time there was no findBy(), and this feature came to exist. Here is an example:

our $join = Triceps::LookupJoin->new(
  unit => $uJoin,
  name => "join",
  leftRowType => $rtInTrans,
  rightTable => $tAccounts,  rightIdxPath => ["lookupSrcExt"],
  rightFields => [ "internal/acct" ],
  by => [ "acctSrc" => "source", "acctXtrId" => "external" ],
  automatic => 0,
); # would die by itself on an error

# label to print the changes to the detailed stats
my $lbPrintPackets = makePrintLabel("lbPrintPackets", $join->getOutputLabel());

while(<STDIN>) {
  chomp;
  my @data = split(/,/); # starts with a command, then string opcode
  my $type = shift @data;
  if ($type eq "acct") {
    $uJoin->makeArrayCall($tAccounts->getInputLabel(), @data)
      or die "$!";
  } elsif ($type eq "trans") {
    my $op = shift @data; # drop the opcode field
    my $trans = $rtInTrans->makeRowArray(@data) or die "$!";
    my @rows = $join->lookup($trans);
    foreach my $r (@rows) {
      $uJoin->call($lbPrintPackets->makeRowop($op, $r)) or die "$!";
    }
  }
  $uJoin->drainFrame(); # just in case, for completeness
}

It copies the first LookupJoin example, only now manually. Once the option "automatic" is set to 0 for the join, the method $join->lookup() becomes available to perform the lookup and return the result rows in an array (the data sent to the input label keeps working as usual, sending the resutl rows to the output label). This involves the extra overhead of keeping all the result rows (and there might be lots of them) in an array, so by default the join is compiled in an automatic-only mode.

Since lookup() knows nothing about the opcodes, those had to be sent separately around the lookup. 

The result is the same as for the first example, only the name of the result label differs:

acct,OP_INSERT,source1,999,1
acct,OP_INSERT,source1,2011,2
acct,OP_INSERT,source2,ABCD,1
trans,OP_INSERT,1,source1,999,100
lbPrintPackets OP_INSERT id="1" acctSrc="source1" acctXtrId="999"
 amount="100" acct="1" 
trans,OP_INSERT,2,source2,ABCD,200
lbPrintPackets OP_INSERT id="2" acctSrc="source2" acctXtrId="ABCD"
 amount="200" acct="1" 
trans,OP_INSERT,3,source2,QWERTY,200
lbPrintPackets OP_INSERT id="3" acctSrc="source2" acctXtrId="QWERTY"
 amount="200" 
acct,OP_INSERT,source2,QWERTY,2
trans,OP_DELETE,3,source2,QWERTY,200
lbPrintPackets OP_DELETE id="3" acctSrc="source2" acctXtrId="QWERTY"
 amount="200" acct="2" 
acct,OP_DELETE,source1,999,1

LookupJoin with multiple matches

The next example loses all connection with reality, it just serves to demonstrate another ability of LookupJoin: matching multiple rows on the right side for an incoming row. The situation itself is obviously useful and normal, just I was too lazy to invent another realistically-looking example.

So, here we go:

our $ttAccounts2 = Triceps::TableType->new($rtAccounts)
  ->addSubIndex("iterateSrc", # for iteration in order grouped by source
    Triceps::IndexType->newHashed(key => [ "source" ])
    ->addSubIndex("lookupSrcExt",
      Triceps::IndexType->newHashed(key => [ "external" ])
      ->addSubIndex("grouping", Triceps::IndexType->newFifo())
    )
  )
or die "$!";
$ttAccounts2->initialize() or die "$!";

our $tAccounts = $uJoin->makeTable($ttAccounts2,
  &Triceps::EM_CALL, "tAccounts") or die "$!";

our $join = Triceps::LookupJoin->new(
  unit => $uJoin,
  name => "join",
  leftRowType => $rtInTrans,
  rightTable => $tAccounts,
  rightIdxPath => [ "iterateSrc", "lookupSrcExt" ],
  rightFields => [ "internal/acct" ],
  by => [ "acctSrc" => "source", "acctXtrId" => "external" ],
); # would die by itself on an error

And the main loop is unchanged from the first LookupJoin example, so I wont' copy it here. Just for something different, the join index here is nested, and its path consists of two elements. It's not a leaf index either, with one FIFO level under it. And when the isLeft is not specified explicitly, it defaults to 1, maeking it a left join.

The run example uses a bit different input, highlighting the ability to match multiple rows:

acct,OP_INSERT,source1,999,1
acct,OP_INSERT,source1,2011,2
acct,OP_INSERT,source2,ABCD,1
acct,OP_INSERT,source2,ABCD,10
acct,OP_INSERT,source2,ABCD,100
trans,OP_INSERT,1,source1,999,100
join.out OP_INSERT id="1" acctSrc="source1" acctXtrId="999"
 amount="100" acct="1" 
trans,OP_INSERT,2,source2,ABCD,200
join.out OP_INSERT id="2" acctSrc="source2" acctXtrId="ABCD"
 amount="200" acct="1" 
join.out OP_INSERT id="2" acctSrc="source2" acctXtrId="ABCD"
 amount="200" acct="10" 
join.out OP_INSERT id="2" acctSrc="source2" acctXtrId="ABCD"
 amount="200" acct="100" 
trans,OP_INSERT,3,source2,QWERTY,200
join.out OP_INSERT id="3" acctSrc="source2" acctXtrId="QWERTY"
 amount="200" 
acct,OP_INSERT,source2,QWERTY,2
trans,OP_DELETE,3,source2,QWERTY,200
join.out OP_DELETE id="3" acctSrc="source2" acctXtrId="QWERTY"
 amount="200" acct="2" 
acct,OP_DELETE,source1,999,1

When a row matches multiple rows in the table, it gets multiplied. The join function iterates through the whole matching row group, and for each found row creates a result row and calls the output label with it.

Now, what if you don't want to get multiple rows back even if they are found? Of course, the best way is to just use a leaf index. But once in a while you get into situations with the denormalized data in the lookup table. You might know in advance that for each row in an index group a certain field would be the same. Or you might not care, what exact value you get as long as it's from the right group. But you might really not want the input rows to multiply when they go through the join. LookupJoin has a solution:

our $join = Triceps::LookupJoin->new(
  unit => $uJoin,
  name => "join",
  leftRowType => $rtInTrans,
  rightTable => $tAccounts,
  rightIdxPath => [ "iterateSrc", "lookupSrcExt" ],
  rightFields => [ "internal/acct" ],
  by => [ "acctSrc" => "source", "acctXtrId" => "external" ],
  limitOne => 1,
); # would die by itself on an error

The option "limitOne" changes the processing logic to pick only the first matching row. It also optimizes the join function. If limitOne is not specified explicitly, the join constructor deduces it magically by looking at whether the join index is a leaf or not. Actually, for a leaf index it would always override limitOne to 1, even if you explicitly set it to 0.

With the limit, the same input produces a different output:

acct,OP_INSERT,source1,999,1
acct,OP_INSERT,source1,2011,2
acct,OP_INSERT,source2,ABCD,1
acct,OP_INSERT,source2,ABCD,10
acct,OP_INSERT,source2,ABCD,100
trans,OP_INSERT,1,source1,999,100
join.out OP_INSERT id="1" acctSrc="source1" acctXtrId="999"
 amount="100" acct="1" 
trans,OP_INSERT,2,source2,ABCD,200
join.out OP_INSERT id="2" acctSrc="source2" acctXtrId="ABCD"
 amount="200" acct="1" 
trans,OP_INSERT,3,source2,QWERTY,200
join.out OP_INSERT id="3" acctSrc="source2" acctXtrId="QWERTY"
 amount="200" 
acct,OP_INSERT,source2,QWERTY,2
trans,OP_DELETE,3,source2,QWERTY,200
join.out OP_DELETE id="3" acctSrc="source2" acctXtrId="QWERTY"
 amount="200" acct="2" 
acct,OP_DELETE,source1,999,1

Now it just picks the first matching row instead of multiplying the rows.

More of LookupJoin

Let's look at more ways the LookupJoin can be used. Here is another example:

our $lbTrans = $uJoin->makeDummyLabel($rtInTrans, "lbTrans");

our $join = Triceps::LookupJoin->new(
  name => "join",
  leftFromLabel => $lbTrans,
  rightTable => $tAccounts,
  rightIdxPath => ["lookupSrcExt"],
  leftFields => [ "id", "amount" ],
  fieldsLeftFirst => 0,
  rightFields => [ "internal/acct" ],
  by => [ "acctSrc" => "source", "acctXtrId" => "external" ],
  isLeft => 0,
); # would die by itself on an error

# label to print the changes to the detailed stats
makePrintLabel("lbPrintPackets", $join->getOutputLabel());

while(<STDIN>) {
  chomp;
  my @data = split(/,/); # starts with a command, then string opcode
  my $type = shift @data;
  if ($type eq "acct") {
    $uJoin->makeArrayCall($tAccounts->getInputLabel(), @data)
      or die "$!";
  } elsif ($type eq "trans") {
    $uJoin->makeArrayCall($lbTrans, @data)
      or die "$!";
  }
  $uJoin->drainFrame(); # just in case, for completeness
}

This specifies the left-side data in another way: the option "leftFromLabel" provides a label which in turn provides both the input row type and the unit. You can still specify the unit option as well but it must match the one in the label. The join still has its own input label but it gets automatically chained to the one in the option.

The other options demonstrate the possibilities described in the last post. This time it's an inner join, the result has the right-side fields going first, and the left-side fields are filtered in the result.

Another way to achieve the same filtering of the left-side fields would be by throwing away everything starting with "acct" and passing through the rest:

  leftFields => [ "!acct.*", ".*" ],

And here is an example of a run:

acct,OP_INSERT,source1,999,1
acct,OP_INSERT,source1,2011,2
acct,OP_INSERT,source2,ABCD,1
trans,OP_INSERT,1,source1,999,100
join.out OP_INSERT acct="1" id="1" amount="100" 
trans,OP_INSERT,2,source2,ABCD,200
join.out OP_INSERT acct="1" id="2" amount="200" 
trans,OP_INSERT,3,source2,QWERTY,200
acct,OP_INSERT,source2,QWERTY,2
trans,OP_DELETE,3,source2,QWERTY,200
join.out OP_DELETE acct="2" id="3" amount="200" 
acct,OP_DELETE,source1,999,1

The input data is the same as the last time, but the result is different. Since it's an inner join, the rows that don't find a match don't pass through. And of course the fields are ordered and subsetted differently in the result.

Sunday, April 15, 2012

The LookupJoin template

When a join has to produce the new rows, with the data from both the incoming row and the ones looked up in the reference table, this can also be done manually but may be more convenient to do with the LookupJoin template. The translation of account to the internal ids can be done like this:

our $join = Triceps::LookupJoin->new(
  unit => $uJoin,
  name => "join",
  leftRowType => $rtInTrans,
  rightTable => $tAccounts,
  rightIdxPath => ["lookupSrcExt"],
  rightFields => [ "internal/acct" ],
  by => [ "acctSrc" => "source", "acctXtrId" => "external" ],
  isLeft => 1,
); # would die by itself on an error

# label to print the changes to the detailed stats
makePrintLabel("lbPrintPackets", $join->getOutputLabel());

while(<STDIN>) {
  chomp;
  my @data = split(/,/); # starts with a command, then string opcode
  my $type = shift @data;
  if ($type eq "acct") {
    $uJoin->makeArrayCall($tAccounts->getInputLabel(), @data)
      or die "$!";
  } elsif ($type eq "trans") {
    $uJoin->makeArrayCall($join->getInputLabel(), @data)
      or die "$!";
  }
  $uJoin->drainFrame(); # just in case, for completeness
}

The join gets defined in the option name-value format. The unit and name are as usual.

The incoming rows are always on the left side, the table on the right. LookupJoin can do either the inner join or the left outer join (since it does not react to thr changes of the right table and has no access to the start of the left side, the full and right outer joins are out). In this case the option "isLeft => 1" selects the left outer join.

The left side is described by leftRowType, and causes the join's input label of this row type to be created. The input label can be found with $join->getInputLabel().

The right side is a table, specified in rightTable. The lookups in the table are done using a combination of an index and the field pairing. The option "by" provides the field pairing. It contains the pairs of field names, one from the left, and one from the right, for the equal fields. They can be separated by "," to, but "=>" feels more idiomatic to me. These fields from the left are translated to the right and are used for look-up through the index. The index here is specified through the path in the option "rightIdxPath". If this option is missing, LookupJoin will just try to find the first top-level Hash index. Either way, the index must be a Hash index.

There is no particular reason for it not being a Sorted index, other that the getKey() call does not work for the Sorted indexes yet, and that's what the LookupJoin uses to check that the right-side index key matches the join key in "by".

The index may be either a leaf (as in this example) or non-leaf. If it's a leaf, it could look up no more than one row per key, and LookupJoin uses this internally for a little optimization.

Finally, there is the result row. It is built out of the two original rows by picking the fields according to the options leftFields and rightFields. If either option is missing, that means "take all the fields". Otherwise it gives the patterns of the fields to let through. The patterns may be either the explicit field names or regular expressions implicitly anchored at both front and end. There is also a bit extra modification possible:

  • !pattern - skip the fields matching the pattern
  • pattern/substitution - pass the matching fields and rename them according to the substitution

So in this example [ "internal/acct" ] means: pass the field "internal" but rename it to "acct". If a specification element refers to a literal field, like here, LookupJoin chacks that the field is actually present in the original row type, catching the typos. For the general regular expressions it doesn't check whether the pattern matched anything. It's not difficult to check but that would preclude the reuse of the same patterns on the varying row types, and I'm not sure yet, what is more important.

The way this whole thing works is that each field gets tested against each pattern in order. The first pattern that match determines what happens to this field. If none of the patterns matches, the field gets ignored. An important consequence about the skipping patterns is that they don't automatically pass through the non-matching fields. You need to add an explicit positive pattern at the end of the list to pass the fields through. For example, to pass everything except the field "source", the specification would be [ "!source", ".*" ]. The "!source" will catch and throw away the field "source", and ".*" will pass through the rest of the fields.

Another important point is that the field names in the result must not duplicate. It's an error. So if the duplications happen, use the substitution syntax to rename some of the fields. I'll show more patterns later.

There is the option fieldsLeftFirst that determines, which side will go first in the result. By default it's set to 1 (as in this example), and the left side goes first. If set to 0, the right side would go first.

This setup for the result row types is somewhat clumsy but it's a reasonable first attempt.

Now, having gone through the description, an example of how it works:

acct,OP_INSERT,source1,999,1
acct,OP_INSERT,source1,2011,2
acct,OP_INSERT,source2,ABCD,1
trans,OP_INSERT,1,source1,999,100
join.out OP_INSERT id="1" acctSrc="source1" acctXtrId="999"
 amount="100" acct="1" 
trans,OP_INSERT,2,source2,ABCD,200
join.out OP_INSERT id="2" acctSrc="source2" acctXtrId="ABCD"
 amount="200" acct="1" 
trans,OP_INSERT,3,source2,QWERTY,200
join.out OP_INSERT id="3" acctSrc="source2" acctXtrId="QWERTY"
 amount="200" 
acct,OP_INSERT,source2,QWERTY,2
trans,OP_DELETE,3,source2,QWERTY,200
join.out OP_DELETE id="3" acctSrc="source2" acctXtrId="QWERTY"
 amount="200" acct="2" 
acct,OP_DELETE,source1,999,1

Same as before, first the accounts table gets populated, then the transactions are sent. If an account is not found, this left outer join still passes through the original fields from the left side. Adding an account later doesn't help the rowops that already went through but the new rowops will see it. The same goes for deleting an account, it doesn't affect the past rowops either.

The lookup join, done manually

First let's look at a lookup done manually. It would also establish the baseline for the further joins.

For the background of the model, let's consider the trade information coming in from multiple sources. Each source system has its own designation of the accounts on which the trades happen but ultimately they are the same accounts. So there is a table that contains the translation from the account designations of various external systems to our system's own internal account identifier. This gets described with the row types:

our $rtInTrans = Triceps::RowType->new( # a transaction received
  id => "int32", # the transaction id
  acctSrc => "string", # external system that sent us a transaction
  acctXtrId => "string", # its name of the account of the transaction
  amount => "int32", # the amount of transaction (int is easier to check)
) or die "$!";

our $rtAccounts = Triceps::RowType->new( # account translation map
  source => "string", # external system that sent us a transaction
  external => "string", # its name of the account of the transaction
  internal => "int32", # our internal account id
) or die "$!";

Other than those basics, the rest of information is only minimal, to keep the examples smaller. Even the trade ids are expected to be global and not per the source systems (which is not realistic but saves another little bit of work).

The accounts table can be indexed in multiple ways for multiple purposes, say:

our $ttAccounts = Triceps::TableType->new($rtAccounts)
  ->addSubIndex("lookupSrcExt", # quick look-up by source and external id
    Triceps::IndexType->newHashed(key => [ "source", "external" ])
  )
  ->addSubIndex("iterateSrc", # for iteration in order grouped by source
    Triceps::IndexType->newHashed(key => [ "source" ])
    ->addSubIndex("iterateSrcExt",
      Triceps::IndexType->newHashed(key => [ "external" ])
    )
  )
  ->addSubIndex("lookupIntGroup", # quick look-up by internal id (to multiple externals)
    Triceps::IndexType->newHashed(key => [ "internal" ])
    ->addSubIndex("lookupInt", Triceps::IndexType->newFifo())
  )
or die "$!";
$ttAccounts->initialize() or die "$!";

For our purpose of joining, the first, primary key is the way to go. Using the primary key also has the advantage of making sure that there is no more than one row for each key value.

The manual lookup will do the filtering: find, whether there is a match in the translation table, and if so then passing the row through. The example goes as follows:

our $uJoin = Triceps::Unit->new("uJoin") or die "$!";

our $tAccounts = $uJoin->makeTable($ttAccounts,
  &Triceps::EM_CALL, "tAccounts") or die "$!";

my $lbFilterResult = $uJoin->makeDummyLabel($rtInTrans, "lbFilterResult");
my $lbFilter = $uJoin->makeLabel($rtInTrans, "lbFilter", undef, sub {
  my ($label, $rowop) = @_;
  my $row = $rowop->getRow();
  my $rh = $tAccounts->findBy(
    source => $row->get("acctSrc"),
    external => $row->get("acctXtrId"),
  );
  if (!$rh->isNull()) {
    $uJoin->call($lbFilterResult->makeRowop($rowop->getOpcode(), $row));
  }
}) or die "$!";

# label to print the changes to the detailed stats
makePrintLabel("lbPrintPackets", $lbFilterResult);

while(<STDIN>) {
  chomp;
  my @data = split(/,/); # starts with a command, then string opcode
  my $type = shift @data;
  if ($type eq "acct") {
    $uJoin->makeArrayCall($tAccounts->getInputLabel(), @data)
      or die "$!";
  } elsif ($type eq "trans") {
    $uJoin->makeArrayCall($lbFilter, @data)
      or die "$!";
  }
  $uJoin->drainFrame(); # just in case, for completeness
}

The findBy() is where the join actually happens: the lookup of the data in a table by values from a different row. Very similar to what the basic window example was doing before. After that the fact of successful or unsuccessful lookup is used to pass the original row through or throw it away. If the found row were used to pick some fields from it and stick them into the result, that would be a more complete join, more like what you often expect to see.

And here is an example of the input processing:

acct,OP_INSERT,source1,999,1
acct,OP_INSERT,source1,2011,2
acct,OP_INSERT,source2,ABCD,1
trans,OP_INSERT,1,source1,999,100
lbFilterResult OP_INSERT id="1" acctSrc="source1" acctXtrId="999" amount="100" 
trans,OP_INSERT,2,source2,ABCD,200
lbFilterResult OP_INSERT id="2" acctSrc="source2" acctXtrId="ABCD" amount="200" 
trans,OP_INSERT,3,source2,QWERTY,200
acct,OP_INSERT,source2,QWERTY,2
trans,OP_DELETE,3,source2,QWERTY,200
lbFilterResult OP_DELETE id="3" acctSrc="source2" acctXtrId="QWERTY" amount="200" 
acct,OP_DELETE,source1,999,1

It starts with populating the account table. Then the transactions that find the match pass, and those who don't find don't pass. If more of the account translations get added later, the transactions for them start passing but as you can see, the result might be slightly unexpected: you may get a DELETE that had no matching previous import. This happens because the lookup join keeps no history on its left side and can't react properly to the changes to the table on the right. Because of this, the lookup joins work best when the reference table gets pre-populated in advance and then stays stable.

Friday, April 13, 2012

The variety of joins

The joins are quite important for the relational data processing, and come in many varieties. And the CEP systems have their own specifics. Basically, in CEP you want the joins to be processed fast. The CEP systems deal with the changing model state, and have to process these changes incrementally.

A small change should be handled fast. It has to use the indexes to find and update all the related result rows. Even though you can make it just go sequentially through all the rows and find the relevant ones, like in a common database, that's not what you normally want. When something like this happens, the usual reaction is "wtf is my model suddenly so slow?" following by an annoyingly long investigation into the reasons of the slowness, and then rewriting the model to make it work faster. It's better to just prevent the slowness in the first place and make sure that the joins always use an index. And since you don't have to deal much with the ad-hoc queries when you write a CEP model, you can provide all the needed indexes in advance very easily.

A particularly interesting kind of joins in this regard is the equi-joins: ones that join the rows by the equality of the fields in them. They allow a very efficient index look-up. Because of this, they are popular in the CEP world. Some systems, like Aleri, support only the equi-joins to start with. The other systems are much more efficient on the equi-joins than on the other kinds of joins. At the moment Triceps follows the fashion of having the advanced support only the equi-joins. Even though the sorted indexes in Triceps should allow the range-based comparisons to be efficient too, at the moment there are no table methods for the look-up of ranges, they are left for the future work. Of course, nothing stops you from copying an equi-join template and modifying it to work by a dumb iteration. Just it would be slow, and I didn't see much point in it.

There also are three common patterns of the join usage.

In the first pattern the rows sort of go by and get enriched by looking up some information from a table and tacking it onto these rows. Sometimes not even tacking it on but maybe just filtering the data: passing through some of the rows and throwing away the rest, or directing the rows into the different kinds of processing, based on the looked-up data. For a reference, in the Coral8 CCL this situation is called "stream-to-window joins". In Triceps there are no streams and no windows, so I just call them the "lookup joins".

In the second pattern multiple stateful tables are joined together. Whenever any of the tables changes, the join result also changes, and the updates get propagated through. This can be done through lookups, but in reality it turns out that defining manually the lookups for the every possible table change becomes tedious pretty quickly. This has to be addressed by the automation.

In the third pattern the same table gets joined recursively, essentially traversing a representation of a tree stored in that table. This actually doesn't work well with the classic SQL unless the recursion depth is strictly limited. There are SQL extensions for the recursive joins in the modern databases but I haven't seen them in the CEP systems yet. Anyway, the procedural approach tends to work for this situation much better than the SQLy one, so the templates tend to be of not much help. Instead I'll show a manual example of this kind.

Thursday, April 12, 2012

An update on Collapse

While cleaning up the joins, I've also added a feature on Collapse: now if you specify the option "fromLabel" in a data set, you don't have to specify the option "unit" any more. You can but you don't have to. By default the unit will be taken from that label.

Monday, April 9, 2012

A better index type lookup

I've been recently distracted with other things, and also I've been working on the joins, before writing about them. The joins were actually some of the first templates I wrote, but that's also why they look a bit dated compared to the newer stuff. I probably won't do all I wanted to do with them, but I'm at least cleaning them up a bit and adding some of the more recent features. This rework takes time.

Along that way, I've extracted the logic that looks up the index types by path from SimpleAggregator and put it for reuse into TableType.  Now it can get used like this:

$indexType = $tableType->findIndexPath("primary", "fifo");

The arguments form a path of names in the index type tree. If the path is not found, the function would confess (that is die, with a stack trace). An empty path is also illegal and would cause the same result.