Sunday, February 5, 2012

A window is a FIFO

A fairly typical situation in the CEP world is when a model needs to keep a limited history of events. For a simple example, let's discuss, how to remember the last two trades per stock symbol. The size of two has been chosen to keep the sample input and outputs small.

This is normally called a window logic, with a sliding window. You can think of it in a mechanical analogy: as the trades become available, they get printed on a long tape. However the tape is covered with a masking plate. The plate has a window cut in it that lets you see only the last two trades.

Some CEP systems have the special data structures that implement this logic, that are called windows. Triceps has a feature on a table instead that makes a table work as a window. It's not unique in this department: for example Coral8 does the opposite, calls everything a window, even if some windows are really tables in every regard but name.

In Triceps it's done like this (the usual preamble is not shown):

my $uTrades = Triceps::Unit->new("uTrades") or die "$!";
my $rtTrade = Triceps::RowType->new(
  id => "int32", # trade unique id
  symbol => "string", # symbol traded
  price => "float64",
  size => "float64", # number of shares traded
) or die "$!";

my $ttWindow = Triceps::TableType->new($rtTrade)
  ->addSubIndex("bySymbol",
    Triceps::IndexType->newHashed(key => [ "symbol" ])
      ->addSubIndex("last2",
        Triceps::IndexType->newFifo(limit => 2)
      )
  )
or die "$!";
$ttWindow->initialize() or die "$!";
my $tWindow = $uTrades->makeTable($ttWindow,
  &Triceps::EM_CALL, "tWindow") or die "$!";

# remember the index type by symbol, for searching on it
my $itSymbol = $ttWindow->findSubIndex("bySymbol") or die "$!";
# remember the FIFO index, for finding the start of the group
my $itLast2 = $itSymbol->findSubIndex("last2") or die "$!";

# print out the changes to the table as they happen
my $lbWindowPrint = $uTrades->makeLabel($rtTrade, "lbWindowPrint",
  undef, sub { # (label, rowop)
    print($_[1]->printP(), "\n"); # print the change
  }) or die "$!";
$tWindow->getOutputLabel()->chain($lbWindowPrint) or die "$!";
while(<STDIN>) {
  chomp;
  my $rTrade = $rtTrade->makeRowArray(split(/,/)) or die "$!";
  my $rhTrade = $tWindow->makeRowHandle($rTrade) or die "$!";
  $tWindow->insert($rhTrade) or die "$!"; # return of 0 is an error here
  # There are two ways to find the first record for this
  # symbol. Use one way for the symbol AAA and the other for the rest.
  my $rhFirst;
  if ($rTrade->get("symbol") eq "AAA") {
    $rhFirst = $tWindow->findIdx($itSymbol, $rTrade) or die "$!";
  } else  {
    # $rhTrade is now in the table but it's the last record
    $rhFirst = $rhTrade->firstOfGroupIdx($itLast2) or die "$!";
  }
  my $rhEnd = $rhFirst->nextGroupIdx($itLast2) or die "$!";
  print("New contents:\n");
  for (my $rhi = $rhFirst;
      !$rhi->same($rhEnd); $rhi = $rhi->nextIdx($itLast2)) {
    print("  ", $rhi->getRow()->printP(), "\n");
  }
}

This example reads the trade records in CSV format, inserts them into the table, and then prints the actual modifications reported by the table and the new state of the window for this symbol. Here is a sample log, with the input lines shown in italic:

1,AAA,10,10
tWindow.out OP_INSERT id="1" symbol="AAA" price="10" size="10" 
New contents:
  id="1" symbol="AAA" price="10" size="10" 
2,BBB,100,100
tWindow.out OP_INSERT id="2" symbol="BBB" price="100" size="100" 
New contents:
  id="2" symbol="BBB" price="100" size="100" 
3,AAA,20,20
tWindow.out OP_INSERT id="3" symbol="AAA" price="20" size="20" 
New contents:
  id="1" symbol="AAA" price="10" size="10" 
  id="3" symbol="AAA" price="20" size="20" 
4,BBB,200,200
tWindow.out OP_INSERT id="4" symbol="BBB" price="200" size="200" 
New contents:
  id="2" symbol="BBB" price="100" size="100" 
  id="4" symbol="BBB" price="200" size="200" 
5,AAA,30,30
tWindow.out OP_DELETE id="1" symbol="AAA" price="10" size="10" 
tWindow.out OP_INSERT id="5" symbol="AAA" price="30" size="30" 
New contents:
  id="3" symbol="AAA" price="20" size="20" 
  id="5" symbol="AAA" price="30" size="30" 
6,BBB,300,300
tWindow.out OP_DELETE id="2" symbol="BBB" price="100" size="100" 
tWindow.out OP_INSERT id="6" symbol="BBB" price="300" size="300" 
New contents:
  id="4" symbol="BBB" price="200" size="200" 
  id="6" symbol="BBB" price="300" size="300" 

The first thing to notice in the code is that the table type has two indexes (strictly speaking, index types,  but most of the time they can be called indexes without creating a confusion) in it. Unlike your typical database, the indexes in this example are nested.

TableType
+-IndexType Hash "bySymbol"
  +-IndexType Fifo "last2"

If you look closely, you can see, that the first call addSubIndex() adds an index type to the table type, while the textually second addSubIndex() adds an index to the previous index.

The same can also be written out in multiple separate calls:

$itLast2 =Triceps::IndexType->newFifo(limit => 2);
$itSymbol =  Triceps::IndexType->newHashed(key => [ "symbol" ]);
$itSymbol->addSubIndex("last2", $itLast2);
$ttWindow = Triceps::TableType->new($rtTrade);
$ttWindow->addSubIndex("bySymbol", $itSymbol);

I'm not perfectly happy with the way the table types are constructed with the index types right now, since the parenthesis levels have turned out a bit hard to track. This is another example of following the C++ API in Perl that didn't work out too well, and it will change in the future. But for now please bear with it.

The index nesting is kind of intuitively clear, but the details may take some time to get your head wrapped around them. You can think of it as the inner index type creating the miniature tables that hold the rows, and then the outer index holding not individual rows but those miniature tables. So, to find the rows in the table you go through two levels of indexes: first through the outer index, and then through the inner one. The table takes care of these details and makes them transparent, unless you want to stop your search at an intermediate level: such as, to find all the transactions with a given symbol, you need to do a search in the outer index, but then from that point iterate through the inner index. Then you obviously have to tell the table, where do you want to stop.

The outer index is the hash index that we've seen before, the inner index is a FIFO index. A FIFO index doesn't have any key, it just keeps the rows in the order they were inserted. You can search in a FIFO index but most of the time it's not the best idea: since it has no keys, it searches linearly through all its rows until it finds an exact match (or runs out of rows). It's a reasonable last-resort way but it's not fast and in many cases not what you want. Remember that the method deleteRow() and sending the OP_DELETE to the table's input label invoke find(), which would cause the linear search on the FIFO indexes. So when you use a FIFO index, it's usually better to find the row handle you want to delete in some other way and then call remove() on it, or use another approach that will be shown later. Or just keep inserting the rows and never delete them, like this example does.

Note that a FIFO index may contain multiple copies of an exact same row. It doesn't care, it just keeps whatever rows were given to it in whatever order they were given.

By default a FIFO index just keeps whatever rows come to it. However it may have a few options. Setting the option "limit" limits the number of rows stored in the index (not per the whole table but per one of those "miniature tables"). When you try to insert one more row, the oldest row gets thrown out, and the limit stays unbroken. That's what creates the window behavior: keep the most recent N rows.

If you look at the sample output, you can see that inserting the rows with ids 1-4 generates only the insert events on the table. But the rows 5 and 6 start overflowing their FIFO indexes, and cause the oldest row to be automatically deleted before completing the insert of the new one.

A FIFO index doesn't have to be nested inside a hash index. If you put a FIFO index at the top level, it will control the whole table. So it would be not 2 last record per key but 2 last records inserted in the whole table.

Continuing the example, the table gets created, and then the index types get extracted back from the table type. Now, why not just write out the table type creation as shown above and remember the index references? At some point in the past this actually would have worked but not any more. It has to do with the way the table type and its index types are connected. It's occasionally convenient to create one index type and then reuse it in multiple table types. However for the whole thing to work, the index type must be tied to its particular table type. This tying together happens when the table type is initialized. If you put the same index type into two table types, when the first table type is initialized, the index type will get tied to it. The second table type would then fail to initialize because an index in it is already tied elsewhere. To get around this dilemma,  now when you call addSubIndex(), it doesn't add the original index type, instead it makes a copy of it. That copy then gets tied with the table type and gets returned back with findSubIndex(). And the further table methods that take an index type argument absolutely require that the index type be tied to the table type. If you try to pass a seemingly the same index type that has not been tied, or has been tied to a different table type, that is an error. There is no interdependency between the methods makeTable() and findSubIndex(), they can be done in either order.

The label $lbWindowPrint is used to show the changes reported by the table's output label. That's where the output lines with OP_INSERT and OP_DELETE come from.

And then the main loop starts. It reads the trade records in the simple CSV format, and for simplicity acts directly on the table, bypassing the scheduler. After the row is inserted, the contents of its index group (that "miniature table") gets printed. The insertion could as well have been done with passing directly the row reference, without explicitly creating a handle. But that handle will be used to demonstrate an interesting point.

To print the contents of an index group, we need to find its boundaries. In Triceps these boundaries are expressed as the first row handle of the group, and as the row handle right after the group. There is an internal logic to that, and it will be explained later, but for now just take it on faith.

With the information we have, there are two ways to find the first row of the group:
  • With the table's method findIdx(). It's very much like find(), only it has an extra argument of a specific index type. If the index type given has no further nesting in it, findIdx() works exactly like find(). In fact, find() is exactly such a special case of findIdx() with a hardcoded index type. If you use an index type with further nesting under it, findIdx() will return the handle of the first row in the group under it (or, as usual, a NULL row handle if not found).
  • If we create the row handle explicitly before inserting it into the table, as was done in the example, that will be the exact row handle inserted into the table. Not a copy or anything but this particular row handle. After a row handle gets inserted into the table, it knows its position in the indexes. And we still have a reference to it. So then we can use this knowledge to jump to the first row handle in the group with firstOfGroupIdx(). It also takes an index type but in this case it's the type that controls the group, the FIFO index in out case.
The example shows both way. As a demonstration, it uses the first way if the symbol name is "AAA" and the second way for all the other rows.

The end boundary is found by calling nextGroupIdx() on the first row's handle. The handle of the newly inserted row could have been used for nextGroupIdx() just as well. Since both belong to the same group, the result is exactly the same.

And finally a loop runs the iteration on the group. Note that the end condition comparison is done with same(), to compare the real row handle references and not just their Perl-level wrappers. The stepping is done with nextIdx(), with is exactly like next() but according to a particular index, the FIFO one. This has actually been done purely to show off this method. In this particular case the result produced by next(), nextIdx() on the FIFO index type and nextIdx() on the index type with nesting is exactly the same. We'll come to the reasons of that yet.

As you aggregate through the group, you could do some manual aggregation along the way. For example, find the average price of the last two trades, and then do something useful with it.

P.S. The RowHandle methods firstOfGroupIdx(), nextGroupIdx(), nextIdx() as shown here are available in version 1.0. In 0.99 they are the methods on the table, like $tWindow->firstOfGroupIdx($itLast2, $rhTrade).

No comments:

Post a Comment