Friday, April 27, 2012

JoinTwo joins tables

Another piece of the join infrastructure looks ready for the prime time now, the JoinTwo template. As the name suggests, it joins two tables.

Fundamentally, joining the two tables is kind of like the two symmetrical copies of LookupJoin. And it really is, under the hood JoinTwo is translated into two LookupJoins. But there is a good deal more stuff going on.

For all I can tell, the CEP systems with the insert-only stream model tend to start with the assumption that the LookupJoin (or whetever they call it) is good enough. Then it turns out that manually writing the join twice where it can be done once is a pain. So the table-to-table join gets added. Then the interesting nuances crop up. Then it turns out that it would be real convenient to propagate the deletes through the join, and that gets added as a special feature behind the scenes.

So in Triceps the JoinTwo takes care of the details of joining the tables. In a common database a join query causes a join plan to be created: with what table to start, and which to look up in next. A CEP system deals with the changing data, and a join has to react to data changes on each of its input tables. It must have multiple plans, one for starting from each of the tables. And essentially a LookupJoin embodies such a plan, and JoinTwo makes two of them.

Why only two? Because it's the minimal usable number. The join logic is tricky, so it's better to work out the kinks on something simpler. And it still can be scaled to many tables by joining them in stages. It's not quite as efficient as a direct join of multiple tables, because the result of each stage has to be put into a table, but does the job.

For the application example, I'll be using one from the area of stock lending. Think of a large multinational broker that wants to keep track of its lending activities. It has many customers to whom the stock can be loaned or from whom it can be borrowed. This information comes as the records of positions, of how many shares are loaned or borrowed for each customer, and at what contractual price. And since the clients are from all around the world, the prices may be in different currencies. A simplified and much shortened version of position information may look like this:

our $rtPosition = Triceps::RowType->new( # a customer account position
  date => "int32", # as of which date, in format YYYYMMDD
  customer => "string", # customer account id
  symbol => "string", # stock symbol
  quantity => "float64", # number of shares
  price => "float64", # share price in local currency
  currency => "string", # currency code of the price
) or die "$!";

Then we want to aggregate these data in different ways, getting the broker-wide summaries by the symbol, by customer etc. The aggregation is updated as the business day goes on. At the end of the business day the state of the day freezes,
 and the new day's initial data is loaded. That's why the business date is part of the schema. If you wonder, the next day's initial data is usually the same as at the end of the previous day, except where some contractual conditions change. The detailed position data is thrown away after a few days, or even right at the end of the day, but the aggregation results from the end of the day are kept for a longer history.

There is a problem with summing up the monetary values: they come in different currencies and can not be added up directly. If we want to get this kind of summaries, we have to translate all of them to a single reference currency. That's what the sample joins will be doing: finding the translation rates to the US dollars. The currency rates come in the translation schema:

our $rtToUsd = Triceps::RowType->new( # a currency conversion to USD
  date => "int32", # as of which date, in format YYYYMMDD
  currency => "string", # currency code
  toUsd => "float64", # multiplier to convert this currency to USD
) or die "$!";

Since the currency rates change all the time, to make sense of a previous day's position the previous day's rates need to also be kept, and so the rates are also marked with a date.

Having the mood set, here is the first example of an inner join:

# exchange rates, to convert all currencies to USD
our $ttToUsd = Triceps::TableType->new($rtToUsd)
    Triceps::IndexType->newHashed(key => [ "date", "currency" ])
  ->addSubIndex("byDate", # for cleaning by date
    Triceps::SimpleOrderedIndex->new(date => "ASC")
    ->addSubIndex("grouping", Triceps::IndexType->newFifo())
or die "$!";
$ttToUsd->initialize() or die "$!";

# the positions in the original currency
our $ttPosition = Triceps::TableType->new($rtPosition)
    Triceps::IndexType->newHashed(key => [ "date", "customer", "symbol" ])
  ->addSubIndex("currencyLookup", # for joining with currency conversion
    Triceps::IndexType->newHashed(key => [ "date", "currency" ])
    ->addSubIndex("grouping", Triceps::IndexType->newFifo())
  ->addSubIndex("byDate", # for cleaning by date
    Triceps::SimpleOrderedIndex->new(date => "ASC")
    ->addSubIndex("grouping", Triceps::IndexType->newFifo())
or die "$!";
$ttPosition->initialize() or die "$!";

our $uJoin = Triceps::Unit->new("uJoin") or die "$!";

our $tToUsd = $uJoin->makeTable($ttToUsd,
  &Triceps::EM_CALL, "tToUsd") or die "$!";
our $tPosition = $uJoin->makeTable($ttPosition,
  &Triceps::EM_CALL, "tPosition") or die "$!";

our $join = Triceps::JoinTwo->new(
  name => "join",
  leftTable => $tPosition,
  leftIdxPath => [ "currencyLookup" ],
  rightTable => $tToUsd,
  rightIdxPath => [ "primary" ],
  type => "inner",
); # would die by itself on an error
# label to print the changes to the detailed stats
makePrintLabel("lbPrint", $join->getOutputLabel());

while(<STDIN>) {
  my @data = split(/,/); # starts with a command, then string opcode
  my $type = shift @data;
  if ($type eq "cur") {
    $uJoin->makeArrayCall($tToUsd->getInputLabel(), @data)
      or die "$!";
  } elsif ($type eq "pos") {
    $uJoin->makeArrayCall($tPosition->getInputLabel(), @data)
      or die "$!";
  $uJoin->drainFrame(); # just in case, for completeness

The example just does the joining, leaving the aggregation to the imagination of the reader.  The result of a JoinTwo is not stored in a table. It is a stream of ephemeral updates, same as for LookupJoin. If you want to keep them, you can put them into a table yourself (and maybe do the aggregation in the same table).

Both the joined tables must provide an index for the efficient joining. The index may be leaf (selecting one row per key) or non-leaf (containing multiple rows per key) but it must be there. This makes sure that the joins are always efficient and you don't have to hunt for why your model is suddenly so slow.

The indexes also provide the default way of finding the join condition: the key fields in the indexes are paired up together, in the order they go in the index specifications. Once again, the fields are paired not by name but by order. If the indexes are nested, the outer indexes precede in the order. For example, the $ttToUsd could have the same index done in a nested way and it would work just as well:

    Triceps::IndexType->newHashed(key => [ "date" ])
      Triceps::IndexType->newHashed(key => [ "currency" ])

Same as with LookupJoin, currently only the Hashed indexes are supported, and must go through all the path. The outer index "byDate" here can not be a Sorted/Ordered index, that would be an error and the join will refuse to accept it.

But if the order of fields in it were changed in $ttToUsd to be different from $ttPosition, like this

    Triceps::IndexType->newHashed(key => [ "currency", "date" ])

then it would be a mess. The wrong fields would be matched up in the join condition, which would become ( == tToUsd.currency && tPosition.currency ==, and everything would go horribly wrong.

Incidentally, this situation in this particular case would be caught because JoinTwo is much less lenient than LookupJoin as the key field types go. It requires the types of the matching fields to be exactly the same. Partially, for the reasons of catching the wrong order, partially for the sake of the result consistency. JoinTwo does the look-ups in both directions. And think about what happens if a string and an int32 field get matched up, and then the non-numeric strings turn up, containing "abc" and "qwerty". Those strings on the left side will match the rows with numeric 0 on the right side. But then if the row with 0 on the right side changes, it would look for the string "0" on the left, which would not find either "abcd" or "qwerty". The state of the join will become a mess. So no automatic key type conversions here.

If you wonder, there are ways to specify the key fields match explicitly, they will be shown later.

The option "type" selects the inner join mode. The inner join is the default, and would have been used even if this option was not specified.

The results here include all the fields from both sides by default. The JoinTwo is smart and knows how to exclude the duplicating key fields.

The joins are currently not equipped to actually compute the translated prices directly. They can only look up the information for it, and the computation can be done later, before or during the aggregation.

That's enough explanations for now, let's look at the result (with the input rows shown as usual in italics):
Inserting the reference currencies produces no result, since it's an inner join and they have no matching positions yet.

join.leftLookup.out OP_INSERT date="20120310" customer="one"
 symbol="AAA" quantity="100" price="15" currency="USD" toUsd="1" 
join.leftLookup.out OP_INSERT date="20120310" customer="two"
 symbol="AAA" quantity="100" price="8" currency="GBP" toUsd="2" 

Now the positions arrive and find the matching translations to USD. The label names on the output are an interesting artifact of all the chained labels receiving the original rowop that refers to the first label in the chain. Which happens to be the output label of a LookupJoin inside JoinTwo. The name of that LookupJoin shows whether the row that initiated the table came from the left or right side of the JoinTwo.


This position is out of luck: no translation for its currency. The inner join is actually not a good choice here. If a row does not pass through because of the lack of translation, it gets excluded even from the aggregations that do not require the translation, such as those that total up the quantity of a particular symbol across all the customers. A left join would be better suited.

join.leftLookup.out OP_INSERT date="20120310" customer="three"
 symbol="BBB" quantity="200" price="80" currency="GBP" toUsd="2" 

Another position arrives, same as before.

join.rightLookup.out OP_INSERT date="20120310" customer="three"
 symbol="AAA" quantity="100" price="300" currency="RUR" toUsd="0.04" 

The translation for RUR finally comes in. The position in RUR can now find its match and propagates through.

join.rightLookup.out OP_DELETE date="20120310" customer="two"
 symbol="AAA" quantity="100" price="8" currency="GBP" toUsd="2" 
join.rightLookup.out OP_DELETE date="20120310" customer="three"
 symbol="BBB" quantity="200" price="80" currency="GBP" toUsd="2" 
join.rightLookup.out OP_INSERT date="20120310" customer="two"
 symbol="AAA" quantity="100" price="8" currency="GBP" toUsd="2.2" 
join.rightLookup.out OP_INSERT date="20120310" customer="three"
 symbol="BBB" quantity="200" price="80" currency="GBP" toUsd="2.2" 

An exchange rate update for GBP arrives. It amounts to "delete the old translation and then insert a new one". Each of these operations updates the state of the join: the disappearing translation causes all the GBP positions to be deleted from the result, and the new translation inserts them back, with the new value of toUsd.

join.leftLookup.out OP_DELETE date="20120310" customer="one"
 symbol="AAA" quantity="100" price="15" currency="USD" toUsd="1" 
join.leftLookup.out OP_INSERT date="20120310" customer="one"
 symbol="AAA" quantity="200" price="16" currency="USD" toUsd="1" 

A position update arrives. Again, it's a delete-and-insert, and propagates through the join as such.

No comments:

Post a Comment