Sunday, September 30, 2012

Another example of streaming functions

Now I want to show an example that is fundamentally kind of dumb. The same thing is easier to do in Triceps with templates. And the whole premise is not exactly great either. But it provides an opportunity to show more of the streaming functions, in a set-up that is closer to the SQL-based systems.

The background is as follows: There happen to be multiple ways to identify the securities (stock shares etc.). "RIC" is the identifier used by Reuters (and quite often by the other data suppliers), consisting of the ticker symbol on an exchange and the code of the exchange. ISIN is the international standard identifier. A security (and some of its creative derivatives) might happen to be listed on multiple exchanges, each having its own RIC, but all translating to the same ISIN (there might be multiple ISINs too but that's another story). A large financial company would want to track a security all around the world. To aggregate the data on the security worldwide, it has to identify it by ISIN, but the data feed might be coming in as RIC only. The translation of RIC to ISIN is then done by the table during processing. The RIC is not thrown away either, it shows the detail of what happened. But ISIN is added for the aggregation.

The data might be coming from multiple feeds, and there are multiple kinds of data: trades, quotes, lending quotes and so on, each with its own schema and its own aggregations. However the step of RIC-to-ISIN translation is the same for all of them, is done by the same table, and can be done in one place.

An extra complexity is that in the real world the translation table might be incomplete. However some feeds might provide both RICs and ISINs in their records, so the pairs that aren't in the reference table yet, can be inserted there and used for the following translations. This is actually not such a great idea, because it means that there might be previous records that have went through before the translation became available. A much better way would be to do the translation as a join, where the update to a reference table would update any previous records as well. But then there would not be much use for a streaming function in it. As I've said before, it's a rather dumb example.

The streaming function will work like this: It will get an argument pair of (RIC, ISIN) from an incoming record. Either component of this pair might be empty. Since the rest of the record is wildly different for different feeds, the rest of the record is left off at this point, and the uniform argument of (RIC, ISIN) is given to the function. The function will consult its table, see if it can add more information from there, or add more information from the argument into the table, and return the hopefully enriched pair (RIC, ISIN) with an empty ISIN field replaced by the right value, to the caller.

The function is defined like this:

my $rtIsin = Triceps::RowType->new(
    ric => "string",
    isin => "string",
) or confess "$!";

my $ttIsin = Triceps::TableType->new($rtIsin)
    ->addSubIndex("byRic", Triceps::IndexType->newHashed(key => [ "ric" ])
) or confess "$!";
$ttIsin->initialize() or confess "$!";

my $tIsin = $unit->makeTable($ttIsin, "EM_CALL", "tIsin") or confess "$!";

# the results will come from here
my $fretLookupIsin = Triceps::FnReturn->new(
    name => "fretLookupIsin",
    unit => $unit,
    labels => [
        result => $rtIsin,

# The function argument: the input data will be sent here.
my $lbLookupIsin = $unit->makeLabel($rtIsin, "lbLookupIsin", undef, sub {
    my $row = $_[1]->getRow();
    if ($row->get("ric")) {
        my $argrh = $tIsin->makeRowHandle($row);
        my $rh = $tIsin->find($argrh);
        if ($rh->isNull()) {
            if ($row->get("isin")) {
        } else {
            $row = $rh->getRow();
        ->makeRowop("OP_INSERT", $row));
}) or confess "$!";

The $fretLookupIsin is the function result, $lbLookupIsin is the function input. In this example the result label in FnReturn is defined differently than in the previous one: not by a source label but by a row type. This label doesn't get chained to anything, instead the procedural code finds it as $fretLookupIsin->getLabel("result") and sends the rowops directly to it.

Then the ISIN translation code for some trades feed would look as follows (remember, supposedly there would be many feeds, each with its own schema, but for the example I show only one):

my $rtTrade = Triceps::RowType->new(
    ric => "string",
    isin => "string",
    size => "float64",
    price => "float64",
) or confess "$!";

my $lbTradeEnriched = $unit->makeDummyLabel($rtTrade, "lbTradeEnriched");
my $lbTrade = $unit->makeLabel($rtTrade, "lbTrade", undef, sub {
    my $rowop = $_[1];
    my $row = $rowop->getRow();
        name => "callTradeLookupIsin",
        on => $fretLookupIsin,
        unit => $unit,
        rowop => $lbLookupIsin->makeRowopHash("OP_INSERT",
            ric => $row->get("ric"),
            isin => $row->get("isin"),
        labels => [
            result => sub { # a label will be created from this sub
                        isin => $_[1]->getRow()->get("isin")

The label $lbTrade receives the incoming trades, calls the streaming function to enrich them with the ISIN data, and forwards the enriched data to the label $lbTradeEnriched. The function call is done differently in this example. Rather than create a FnBinding object and then use it with a scoped AutoFnBind, it uses the convenience function FnBinding::call() that wraps all that logic. It's simpler to use, without all these extra objects, but the price is the efficiency: it ends up creating a new FnBinding object for every call. That's where a compiler would be very useful, it could take a call like this, translate it to the internal objects once, and then keep reusing them.

A FnBinding::call gets a name that is used for the error messages and also to give names to the temporary objects it creates. The option "on" tells, which streaming function is being called (by specifying its FnReturn). The option "rowop" gives the arguments of the streaming functions. There are multiple way to do that: option "rowop" for a single rowop, "rowops" for an array of rowops, "tray" for a tray, and "code" for a procedural function that would send the inputs. And "labels" as usual connects the results of the function, either to the labels, or by creating labels automatically from the snippets of code.

The result handling here demonstrates the technique that I call the "implicit join": The function gets a portion of data from an original row, does some transformation and returns the data back. This data is then joined with the original row. The code knows, what this original row was, it gets remembered in the variable $row. The semantics of the call guarantees that nothing else has happened during the function call, and that $row is still the current row. Then the function result gets joined with $row, and the produced data is sent further on its way. The variable $row could be either a global one, or as shown here a scoped variable that gets embedded into a closure function.

The rest of the example, the dispatcher part, is:

# print what is going on
my $lbPrintIsin = makePrintLabel("printIsin", $tIsin->getOutputLabel());
my $lbPrintTrade = makePrintLabel("printTrade", $lbTradeEnriched);

# the main loop
my %dispatch = (
    isin => $tIsin->getInputLabel(),
    trade => $lbTrade,

while(<STDIN>) {
    my @data = split(/,/); # starts with a command, then string opcode
    my $type = shift @data;
    my $lb = $dispatch{$type};
    my $rowop = $lb->makeRowopArray(@data);
    $unit->drainFrame(); # just in case, for completeness

And an example of running, with the input lines shown according to the new convention preceded by "> ":

> isin,OP_INSERT,ABC.L,US0000012345
tIsin.out OP_INSERT ric="ABC.L" isin="US0000012345"
> isin,OP_INSERT,ABC.N,US0000012345
tIsin.out OP_INSERT ric="ABC.N" isin="US0000012345"
> isin,OP_INSERT,DEF.N,US0000054321
tIsin.out OP_INSERT ric="DEF.N" isin="US0000054321"
> trade,OP_INSERT,ABC.L,,100,10.5
lbTradeEnriched OP_INSERT ric="ABC.L" isin="US0000012345" size="100" price="10.5"
> trade,OP_DELETE,ABC.N,,200,10.5
lbTradeEnriched OP_DELETE ric="ABC.N" isin="US0000012345" size="200" price="10.5"
> trade,OP_INSERT,GHI.N,,300,10.5
lbTradeEnriched OP_INSERT ric="GHI.N" isin="" size="300" price="10.5"
> trade,OP_INSERT,,XX0000012345,400,10.5
lbTradeEnriched OP_INSERT ric="" isin="XX0000012345" size="400" price="10.5"
> trade,OP_INSERT,GHI.N,XX0000012345,500,10.5
tIsin.out OP_INSERT ric="GHI.N" isin="XX0000012345"
lbTradeEnriched OP_INSERT ric="GHI.N" isin="XX0000012345" size="500" price="10.5"
> trade,OP_INSERT,GHI.N,,600,10.5
lbTradeEnriched OP_INSERT ric="GHI.N" isin="XX0000012345" size="600" price="10.5"

The table gets pre-populated with a few translations, and the first few trades use them. Then goes the example of a non-existing translation, which gets eventually added from the incoming data (see that the trade with GHI.N,XX0000012345 both updates the ISIN table and sends through the trade record), and the following trades can then use this newly added translation.

No comments:

Post a Comment