Wednesday, October 24, 2012

Streaming functions and recursion, part 6

The combination of the two previous examples (the one with the trays and the one with the forks) doesn't work. They could be combined but the combination just doesn't work right.

The problem is that the example with trays relies on the recursive function being executed before the tray gets called. But if both of them are forked, things break.

Well, if there is only one recursive call, it still works because the execution frame looks like this:

arg1
pop1

The rowop arg1 executes, places the result into the tray (provided that it calls the FnReturn label, not forks to it!). Then the rowop pop1 executes and calls the tray. So far so good.

Now let's do the recursion with depth two. The first level starts the same:

arg1
pop1

Then arg1 executes and forks the second level of recursion:

pop1
arg2
pop2

Do you see what went wrong? The unit execution frames are FIFO. So the second level of recursion got queued after the popping of the first level. That pop1 executes next, doesn't get any return values, and everything goes downhill from there.

Streaming functions and recursion, part 5

And there is also a way to run the recursive calls without even the need to increase the recursion depth limit. It can be left at the default 1, without setMaxRecursionDepth(). The secret is to fork the argument rowops to the functions instead of calling them.

###
# A streaming function that computes a Fibonacci number.

# Input:
#   $lbFibCompute: request to compute the number.
# Output (by FnReturn labels):
#   "result": the computed value.
# The opcode is preserved through the computation.

my @stackFib; # stack of the function states
my $stateFib; # The current state

my $frFib = Triceps::FnReturn->new(
    name => "Fib",
    unit => $uFib,
    labels => [
        result => $rtFibRes,
    ],
    onPush => sub { push @stackFib, $stateFib; $stateFib = { }; },
    onPop => sub { $stateFib = pop @stackFib; },
);

my $lbFibResult = $frFib->getLabel("result");

# Declare the label & binding variables in advance, to define them sequentially.
my ($lbFibCompute, $fbFibPrev1, $fbFibPrev2);
$lbFibCompute = $uFib->makeLabel($rtFibArg, "FibCompute", undef, sub {
    my $row = $_[1]->getRow();
    my $op = $_[1]->getOpcode();
    my $idx = $row->get("idx");

    if ($idx <= 1) {
        $uFib->fork($frFib->getLabel("result")->makeRowopHash($op,
            idx => $idx,
            fib => $idx < 1 ? 0 : 1,
        ));
    } else {
        $stateFib->{op} = $op;
        $stateFib->{idx} = $idx;

        $frFib->push($fbFibPrev1);
        $uFib->fork($lbFibCompute->makeRowopHash($op,
            idx => $idx - 1,
        ));
    }
}) or confess "$!";
$fbFibPrev1 = Triceps::FnBinding->new(
    unit => $uFib,
    name => "FibPrev1",
    on => $frFib,
    labels => [
        result => sub {
            $frFib->pop($fbFibPrev1);

            $stateFib->{prev1} = $_[1]->getRow()->get("fib");

            # must prepare before pushing new state and with it new $stateFib
            my $rop = $lbFibCompute->makeRowopHash($stateFib->{op},
                idx => $stateFib->{idx} - 2,
            );

            $frFib->push($fbFibPrev2);
            $uFib->fork($rop);
        },
    ],
);
$fbFibPrev2 = Triceps::FnBinding->new(
    unit => $uFib,
    on => $frFib,
    name => "FibPrev2",
    labels => [
        result => sub {
            $frFib->pop($fbFibPrev2);

            $stateFib->{prev2} = $_[1]->getRow()->get("fib");
            $uFib->fork($frFib->getLabel("result")->makeRowopHash($stateFib->{op},
                idx => $stateFib->{idx},
                fib => $stateFib->{prev1} + $stateFib->{prev2},
            ));
        },
    ],
);

# End of streaming function
###

This is a variety of the pre-previous example, with the split push and pop. The split is required for the fork to work: when the forked rowop executes, the calling label has already returned, so obviously the scoped approach won't work.

In this version the unit stack depth required to compute the 6th (and any) Fibonacci number reduces to 2: it's really only one level on top of the outermost frame.

Streaming functions and recursion, part 4

Following up on the previous installment, here is the example that uses the bindings with tray:

###
# A streaming function that computes a Fibonacci number.

# Input:
#   $lbFibCompute: request to compute the number.
# Output (by FnReturn labels):
#   "result": the computed value.
# The opcode is preserved through the computation.

my @stackFib; # stack of the function states
my $stateFib; # The current state

my $frFib = Triceps::FnReturn->new(
    name => "Fib",
    unit => $uFib,
    labels => [
        result => $rtFibRes,
    ],
    onPush => sub { push @stackFib, $stateFib; $stateFib = { }; },
    onPop => sub { $stateFib = pop @stackFib; },
);

my $lbFibResult = $frFib->getLabel("result");

# Declare the label & binding variables in advance, to define them sequentially.
my ($lbFibCompute, $fbFibPrev1, $fbFibPrev2);
$lbFibCompute = $uFib->makeLabel($rtFibArg, "FibCompute", undef, sub {
    my $row = $_[1]->getRow();
    my $op = $_[1]->getOpcode();
    my $idx = $row->get("idx");

    if ($idx <= 1) {
        $uFib->makeHashCall($frFib->getLabel("result"), $op,
            idx => $idx,
            fib => $idx < 1 ? 0 : 1,
        );
    } else {
        $stateFib->{op} = $op;
        $stateFib->{idx} = $idx;

        {
            my $ab = Triceps::AutoFnBind->new(
                $frFib => $fbFibPrev1
            );
            $uFib->makeHashCall($lbFibCompute, $op,
                idx => $idx - 1,
            );
        }
        $fbFibPrev1->callTray();
    }
}) or confess "$!";
$fbFibPrev1 = Triceps::FnBinding->new(
    unit => $uFib,
    name => "FibPrev1",
    on => $frFib,
    withTray => 1,
    labels => [
        result => sub {
            $stateFib->{prev1} = $_[1]->getRow()->get("fib");

            # must prepare before pushing new state and with it new $stateFib
            my $rop = $lbFibCompute->makeRowopHash($stateFib->{op},
                idx => $stateFib->{idx} - 2,
            );

            {
                my $ab = Triceps::AutoFnBind->new(
                    $frFib => $fbFibPrev2
                );
                $uFib->call($rop);
            }
            $fbFibPrev2->callTray();
        },
    ],
);
$fbFibPrev2 = Triceps::FnBinding->new(
    unit => $uFib,
    on => $frFib,
    name => "FibPrev2",
    withTray => 1,
    labels => [
        result => sub {
            $stateFib->{prev2} = $_[1]->getRow()->get("fib");
            $uFib->makeHashCall($frFib->getLabel("result"), $stateFib->{op},
                idx => $stateFib->{idx},
                fib => $stateFib->{prev1} + $stateFib->{prev2},
            );
        },
    ],
);

# End of streaming function
###

The stack depth is now greatly reduced because the unit stack pops the frames before pushing more of them. For the 2nd Fibonacci number the trace is:

unit 'uFib' before label 'FibCompute' op OP_DELETE {
unit 'uFib' before label 'FibCompute' op OP_DELETE {
unit 'uFib' before label 'Fib.result' op OP_DELETE {
unit 'uFib' after label 'Fib.result' op OP_DELETE }
unit 'uFib' after label 'FibCompute' op OP_DELETE }
unit 'uFib' before label 'FibPrev1.result' op OP_DELETE {
unit 'uFib' before label 'FibCompute' op OP_DELETE {
unit 'uFib' before label 'Fib.result' op OP_DELETE {
unit 'uFib' after label 'Fib.result' op OP_DELETE }
unit 'uFib' after label 'FibCompute' op OP_DELETE }
unit 'uFib' before label 'FibPrev2.result' op OP_DELETE {
unit 'uFib' before label 'Fib.result' op OP_DELETE {
unit 'uFib' before label 'FibCall.result' (chain 'Fib.result') op OP_DELETE {
unit 'uFib' after label 'FibCall.result' (chain 'Fib.result') op OP_DELETE }
unit 'uFib' after label 'Fib.result' op OP_DELETE }
unit 'uFib' after label 'FibPrev2.result' op OP_DELETE }
unit 'uFib' after label 'FibPrev1.result' op OP_DELETE }
unit 'uFib' after label 'FibCompute' op OP_DELETE }

For the 6th number the maximal required stack depth now gets reduced to only 9 instead of 51.

Streaming functions and recursion, part 3

FnBinding:call() with closures is easy to use but it creates a closure and an FnBinding object on each run. Can things be rearranged to reuse the same objects? With some effort, they can:

###
# A streaming function that computes a Fibonacci number.

# Input:
#   $lbFibCompute: request to compute the number.
# Output (by FnReturn labels):
#   "result": the computed value.
# The opcode is preserved through the computation.

my @stackFib; # stack of the function states
my $stateFib; # The current state

my $frFib = Triceps::FnReturn->new(
    name => "Fib",
    unit => $uFib,
    labels => [
        result => $rtFibRes,
    ],
    onPush => sub { push @stackFib, $stateFib; $stateFib = { }; },
    onPop => sub { $stateFib = pop @stackFib; },
);

my $lbFibResult = $frFib->getLabel("result");

# Declare the label & binding variables in advance, to define them sequentially.
my ($lbFibCompute, $fbFibPrev1, $fbFibPrev2);
$lbFibCompute = $uFib->makeLabel($rtFibArg, "FibCompute", undef, sub {
    my $row = $_[1]->getRow();
    my $op = $_[1]->getOpcode();
    my $idx = $row->get("idx");

    if ($idx <= 1) {
        $uFib->makeHashCall($frFib->getLabel("result"), $op,
            idx => $idx,
            fib => $idx < 1 ? 0 : 1,
        );
    } else {
        $stateFib->{op} = $op;
        $stateFib->{idx} = $idx;

        $frFib->push($fbFibPrev1);
        $uFib->makeHashCall($lbFibCompute, $op,
            idx => $idx - 1,
        );
    }
}) or confess "$!";
$fbFibPrev1 = Triceps::FnBinding->new(
    unit => $uFib,
    name => "FibPrev1",
    on => $frFib,
    labels => [
        result => sub {
            $frFib->pop($fbFibPrev1);

            $stateFib->{prev1} = $_[1]->getRow()->get("fib");

            # must prepare before pushing new state and with it new $stateFib
            my $rop = $lbFibCompute->makeRowopHash($stateFib->{op},
                idx => $stateFib->{idx} - 2,
            );

            $frFib->push($fbFibPrev2);
            $uFib->call($rop);
        },
    ],
);
$fbFibPrev2 = Triceps::FnBinding->new(
    unit => $uFib,
    on => $frFib,
    name => "FibPrev2",
    labels => [
        result => sub {
            $frFib->pop($fbFibPrev2);

            $stateFib->{prev2} = $_[1]->getRow()->get("fib");
            $uFib->makeHashCall($frFib->getLabel("result"), $stateFib->{op},
                idx => $stateFib->{idx},
                fib => $stateFib->{prev1} + $stateFib->{prev2},
            );
        },
    ],
);

# End of streaming function
###

The rest of the code stays the same, so I won't copy it here.

The computation still needs to keep the intermediate results of two recursive calls. With no closures, these results have to be kept in a global object $stateFib (which is a hash that keeps multiple values).

But it can't just be a single object! The recursive calls would overwrite it. So it has to be built into a stack of objects, a new one pushed for each call and popped after it. This pushing and popping can be tied to the pushing and popping of the bindings on an FnReturn. When the FnReturn is defined, the options onPush and onPop define the custom Perl code to execute, which is used here for the management of the state stack.

The whole logic is then split into the sections around the calls:
  • before the first call
  • between the first and second call
  • after the second call
The first section goes as a normal label and the rest are done as bindings.

A tricky moment is that a simple scoped AutoFnBind can't be used here. The pushing of the binding happens in the calling label (such as FibCompute) but then the result is processed in another label (such as FibPrev1.result). The procedural control won't return to FibCompute until after FibPrev1.result has been completed. But FibPrev1.result needs the state popped before it can do its work! So the pushing and popping of the binding is done explicitly in two split steps: push() called in FibCompute() and pop() called in FibPrev1.result. And of course then after FibPrev1.result saves the result, it pushes the binding again, which then gets popped in FibPrev2.result.

The popping can also be done without arguments, as pop(), but if it's given an argument, it will check that the binding popped is the same as its argument. This is helpful for detecting the call stack corruptions.

Now, can you guess, what depth of the unit call stack is required to compute and print the 2nd Fibonacci number? It's 7. If the tracing is enabled, it will produce this trace:

unit 'uFib' before label 'FibCompute' op OP_DELETE {
unit 'uFib' before label 'FibCompute' op OP_DELETE {
unit 'uFib' before label 'Fib.result' op OP_DELETE {
unit 'uFib' before label 'FibPrev1.result' (chain 'Fib.result') op OP_DELETE {
unit 'uFib' before label 'FibCompute' op OP_DELETE {
unit 'uFib' before label 'Fib.result' op OP_DELETE {
unit 'uFib' before label 'FibPrev2.result' (chain 'Fib.result') op OP_DELETE {
unit 'uFib' before label 'Fib.result' op OP_DELETE {
unit 'uFib' before label 'FibCall.result' (chain 'Fib.result') op OP_DELETE {
unit 'uFib' after label 'FibCall.result' (chain 'Fib.result') op OP_DELETE }
unit 'uFib' after label 'Fib.result' op OP_DELETE }
unit 'uFib' after label 'FibPrev2.result' (chain 'Fib.result') op OP_DELETE }
unit 'uFib' after label 'Fib.result' op OP_DELETE }
unit 'uFib' after label 'FibCompute' op OP_DELETE }
unit 'uFib' after label 'FibPrev1.result' (chain 'Fib.result') op OP_DELETE }
unit 'uFib' after label 'Fib.result' op OP_DELETE }
unit 'uFib' after label 'FibCompute' op OP_DELETE }
unit 'uFib' after label 'FibCompute' op OP_DELETE }

9 labels get called in a sequence, all the way from the initial call to the result printing. And only then the whole sequence unrolls back. 3 of them are chained through the bindings, so they don't push the stack frames onto the stack, and there is always the outermost stack frame, with the result of 9-3+1 = 7.  This number grows fast. For the 6th number the number of labels becomes 75 and the frame count 51.

It happens because all the calls get unrolled into a single sequence, like what I've warned against in the section on the loops. The function return does unroll its FnReturn stack but doesn't unroll the unit call stack, it just goes even deeper by calling the label that processes it.

There are ways to improve it. The simplest one is to use the FnBinding with a tray, and call this tray after the function completely returns. This works out quite conveniently in two other ways too: First, AutoFnBind with its scoped approach can be used again. And second, it allows to handle the situations where a function returns not just one row but multiple of them. That will be the next example.

Sunday, October 21, 2012

Streaming functions and recursion, part 2

Now that the recursion is permitted, let's look at a basic example: the same Fibonacci function as before, only computed in a dumb recursive way. In a real dumb recursive way, with two recursive calls, to show how they are done. The simplest and most straightforward way goes like this:

my $uFib = Triceps::Unit->new("uFib");
$uFib->setMaxRecursionDepth(100);

# Type the data going into the function
my $rtFibArg = Triceps::RowType->new(
    idx => "int32", # the index of Fibonacci number to generate
) or confess "$!";

# Type of the function result
my $rtFibRes = Triceps::RowType->new(
    idx => "int32", # the index of Fibonacci number
    fib => "int64", # the generated Fibonacci number
) or confess "$!";

###
# A streaming function that computes a Fibonacci number.

# Input:
#   $lbFibCompute: request to compute the number.
# Output (by FnReturn labels):
#   "result": the computed value.
# The opcode is preserved through the computation.

my $frFib = Triceps::FnReturn->new(
    name => "Fib",
    unit => $uFib,
    labels => [
        result => $rtFibRes,
    ],
);

my $lbFibResult = $frFib->getLabel("result");

my $lbFibCompute; # must be defined before assignment, for recursion
$lbFibCompute = $uFib->makeLabel($rtFibArg, "FibCompute", undef, sub {
    my $row = $_[1]->getRow();
    my $op = $_[1]->getOpcode();
    my $idx = $row->get("idx");
    my $res;

    if ($idx < 1) {
        $res = 0;
    } elsif($idx == 1) {
        $res = 1;
    } else {
        my ($prev1, $prev2);
        Triceps::FnBinding::call(
            name => "FibCompute.call1",
            on => $frFib,
            unit => $uFib,
            labels => [
                result => sub {
                    $prev1 = $_[1]->getRow()->get("fib");
                }
            ],
            rowop => $lbFibCompute->makeRowopHash($op,
                idx => $idx - 1,
            ),
        );
        Triceps::FnBinding::call(
            name => "FibCompute.call2",
            on => $frFib,
            unit => $uFib,
            labels => [
                result => sub {
                    $prev2 = $_[1]->getRow()->get("fib");
                }
            ],
            rowop => $lbFibCompute->makeRowopHash($op,
                idx => $idx - 2,
            ),
        );
        $res = $prev1 + $prev2;
    }
    $uFib->makeHashCall($frFib->getLabel("result"), $op,
        idx => $idx,
        fib => $res,
    );
}) or confess "$!";

# End of streaming function
###

# binding to call the Fibonacci function and print the result
my $fbFibCall = Triceps::FnBinding->new(
    name => "FibCall",
    on => $frFib,
    unit => $uFib,
    labels => [
        result => sub {
            my $row = $_[1]->getRow();
            print($row->get("fib"), " is Fibonacci number ", $row->get("idx"), "\n");
        }
    ],
);

while(<STDIN>) {
    chomp;
    my @data = split(/,/);
    {
        my $ab = Triceps::AutoFnBind->new(
            $frFib => $fbFibCall,
        );
        $uFib->makeArrayCall($lbFibCompute, @data);
    }
    $uFib->drainFrame(); # just in case, for completeness
}

The calling sequence has became different than in the looping version but the produced result is exactly the same.  The streaming function now receives an argument row and produces a result row. The unit's recursion depth limit had to be adjusted to permit the recursion.

The recursive calls are done through the FnBinding::call(), with a closure for the result handling label. That closure can access the scope of its creator and place the result into its local variable. After both intermediate results are computed, the final result computation takes place and sends out the result row.

Tuesday, October 16, 2012

More of the fork reform

I've made one more change to the execution logic. I'm not sure if I've told anything explicit about this area before, or if it's going to be a new information but here we go anyway:

All the time before it was implicitly assumed that when a label in FnReturn passes the call to a label in its bound FnBinding, it does a proper call. And indeed it was so. But the problem with that is that the bound label can not easily fork a rowop to the frame of its parent label in FnReturn. This was breaking some of the examples that I've been playing with but I haven't shown yet.

So I've changed it to work like a chained label. All along, even in 1.0, the chained labels were executing their rowops reusing the frame of their parent label, from which they are chained. Now the binding labels do the same thing. And they even are shown in the traces as being chained from the labels in FnReturn.

The only exception (this is another subject that I haven't described yet) is when the FnReturn and FnBinding are in the different units. Then the bound label is properly called with its own frame in the unit where it belongs.

Saturday, October 13, 2012

Recursion is now permitted

I've started talking about recursion, and it's much more convenient to do when the recursion can actually be used. The recursive calls (when a label calls itself, directly or indirectly) have been forbidden in Triceps. Mind you, the recursive calling still can be done with the help of trays and forking, as the previous example has shown and more examples will show. And it's probably the best way too from the standpoint of correctness. However it's not the most straightforward way.

So I've decided to allow the recursion in its direct way. Especially that it doesn't have to be all-or-nothing, it can be done in a piecemeal and controlled fashion.

Now it's controlled per-unit. Each unit has two adjustable limits:

  • Maximal stack depth: Limits the total depth of the unit's call stack. That's the maximal length of the call chain, whether it goes straight or in loops.
  • Maximal recursion depth: Limits the number of times each particular label may appear on the call stack. So if you have a recursive code fragment (a streaming function, or now you can do it with a loop too), this is the limit on its recursive re-entrances.
Both these limits accept the 0 and negative values to mean "unlimited".

The default is as it has been before: unlimited stack depth, recursion depth of 1 (which means that each label may be called once but it may not call itself). But now you can change them with the calls:

$unit-> setMaxStackDepth($n);
$unit->setMaxRecursionDepth($n);

You can change them at any time, even when the unit is running (but they will be enforced only on the next attempt to execute a rowop).

You can also read the current values:

$n = $unit->maxStackDepth();
$n = $unit->maxRecursionDepth();

Another thing about the limits is that even if you set them to "unlimited" or to some very large values, thee still are the system limits. The calls use the C++ process (or thread) stack, and if you make too many of them, the stack will overflow and the whole process will crash and possibly dump core. Keeping the call depths within reason is still a good idea.

Now you can do the direct recursion. However as with the procedural code, not all the labels are re-entrant. Some of them may work with the static data structures that can't be modified in a nested fashion. Think for example of a table: when you modify a table, it sends rowops to its "pre" and "out" labels. You can connect the other labels there, and react to the table modifications. However these labels can't attempt to modify the same table, because the table is already in the middle of a modification, and it's not re-entrant.

The table still has a separate logic to check for non-re-entrance, and no matter what is the unit's general recursion depth limit, for the table it always stays at 1. Moreover, the table enforces it across both the input label interface and the procedural interface.

If you make your own non-re-entrant labels, Triceps can make this check for you. Just mark the first label of the non-re-entrant sequence with

$label->setNonReentrant();

And it will have its own private recursion limit of 1. Any time it's attempted to execute recursively, it will confess. There is no way to unset this flag: when a label is known to be non-re-entrant, it can not suddenly become re-entrant until its code is rewritten.

You can read this flag with

$val = $label->isNonReentrant();


The next installment will show some more examples.

Thursday, October 11, 2012

Streaming functions and recursion, part 1

Let's look again at the pipeline example. Suppose we want to do the encryption twice (you know, maybe we have a secure channel to a semi-trusted intermediary who can can read the envelopes and forward the encrypted messages he can't read to the final destination). The pipeline becomes

decrypt | decrypt | process | encrypt | encrypt

Or if you want to think about it in a more function-like notation, rather than a pipeline, the logic can also be expressed as:

encrypt(encrypt(process(decrypt(decrypt(data)))))


However it would not work directly: a decrypt function has only one output and it can not have two bindings at the same time, it would not know which one to use at any particular time.

Instead you can make decrypt into a template, instantiate it twice, and connect into a pipeline. It's very much like what the Unix shell does: it instantiates a new process for each part of its pipeline.

But there is also another possibility: instead of collecting the whole pipeline in advance, do it in steps.

Start by adding in every binding:

withTray => 1,

This will make all the bindings collect the result on a tray instead of sending it on immediately. Then modify the main loop:

while(<STDIN>) {
  chomp;

  # receive
  my $abReceive = Triceps::AutoFnBind->new(
    $retReceive => $bindDecrypt,
  );
  $unit->makeArrayCall($lbReceive, "OP_INSERT", $_);

  # 1st decrypt
  my $abDecrypt1 = Triceps::AutoFnBind->new(
    $retDecrypt => $bindDecrypt,
  );
  $bindDecrypt->callTray();

  # 2nd decrypt
  my $abDecrypt2 = Triceps::AutoFnBind->new(
    $retDecrypt => $bindDispatch,
  );
  $bindDecrypt->callTray();

  # processing
  my $abProcess = Triceps::AutoFnBind->new(
    $retOutput => $bindEncrypt,
  );
  $bindDispatch->callTray();

  # 1st encrypt
  my $abEncrypt1 = Triceps::AutoFnBind->new(
    $retEncrypt => $bindEncrypt,
  );
  $bindEncrypt->callTray();

  # 2nd encrypt
  my $abEncrypt2 = Triceps::AutoFnBind->new(
    $retEncrypt => $bindSend,
  );
  $bindEncrypt->callTray();

  # send
  $bindSend->callTray();
}

Here I've dropped the encrypted-or-unencrypted choice to save the space, the data is always encrypted twice. The drainFrame() call has been dropped because it has nothing to do anyway, and actually with the way the function calls work here. The rest of the code stays the same.

The bindings have been split in stages. In each stage the next binding is set, and the data from the previous binding gets sent into it. The binding method callTray() replaces the tray in the binding with an empty one, and then calls all the rowops collected on the old tray (and then if you wonder, what happens to the old tray, it gets discarded). Because of this the first decryption stage with binding



    $retDecrypt => $bindDecrypt,

doesn't send the data circling forever. It just does one pass through the decryption and prepares for the second pass.

Every time AutoFnBind->new() runs, it doesn't replace the binding of the return but pushes a new binding onto the return's stack. Each FnReturn has its own stack of bindings (this way it's easier to manage than a single stack). When an AutoFnBind gets destroyed, it pops the binding from the return's stack. And yes, if you specify multiple bindings in one AutoFnBind, all of them get pushed on construction and popped on destruction. In this case all the auto-binds are in the same block, so they will all be destroyed at the end of block in the opposite order. Which means that in effect the code is equivalent to the nested blocks, and this version might be easier for you to think of:

while(<STDIN>) {
  chomp;

  # receive
  my $abReceive = Triceps::AutoFnBind->new(
    $retReceive => $bindDecrypt,
  );
  $unit->makeArrayCall($lbReceive, "OP_INSERT", $_);

  {
    # 1st decrypt
    my $abDecrypt1 = Triceps::AutoFnBind->new(
      $retDecrypt => $bindDecrypt,
    );
    $bindDecrypt->callTray();

    {
      # 2nd decrypt
      my $abDecrypt1 = Triceps::AutoFnBind->new(
        $retDecrypt => $bindDispatch,
      );
      $bindDecrypt->callTray();

      {
        # processing
        my $abProcess = Triceps::AutoFnBind->new(
          $retOutput => $bindEncrypt,
        );
        $bindDispatch->callTray();

        {
          # 1st encrypt
          my $abEncrypt1 = Triceps::AutoFnBind->new(
            $retEncrypt => $bindEncrypt,
          );
          $bindEncrypt->callTray();

          {
            # 2nd encrypt
            my $abEncrypt1 = Triceps::AutoFnBind->new(
              $retEncrypt => $bindSend,
            );
            $bindEncrypt->callTray();

            # send
            $bindSend->callTray();
          }
        }
      }
    }
  }
}

An interesting consequence of all this nesting, pushing and popping is that you can put the inner calls into the procedural loops if you with. For example, if you want to process every input line thrice:
while(<STDIN>) {
  chomp;

  # receive
  my $abReceive = Triceps::AutoFnBind->new(
    $retReceive => $bindDecrypt,
  );

  for (my $i = 0; $i < 3; $i++) {
    $unit->makeArrayCall($lbReceive, "OP_INSERT", $_);

    {
      # 1st decrypt
      my $abDecrypt1 = Triceps::AutoFnBind->new(
        $retDecrypt => $bindDecrypt,
      );
      $bindDecrypt->callTray();

      {
        # 2nd decrypt
        my $abDecrypt1 = Triceps::AutoFnBind->new(
          $retDecrypt => $bindDispatch,
        );
        $bindDecrypt->callTray();

        {
          # processing
          my $abProcess = Triceps::AutoFnBind->new(
            $retOutput => $bindEncrypt,
          );
          $bindDispatch->callTray();

          {
            # 1st encrypt
            my $abEncrypt1 = Triceps::AutoFnBind->new(
              $retEncrypt => $bindEncrypt,
            );
            $bindEncrypt->callTray();

            {
              # 2nd encrypt
              my $abEncrypt1 = Triceps::AutoFnBind->new(
                $retEncrypt => $bindSend,
              );
              $bindEncrypt->callTray();

              # send
              $bindSend->callTray();
            }
          }
        }
      }
    }
  }
}

This code will run the whole pipeline three times for each input line, and print out three output lines, such as:

>3639366536333263346635303566343934653533343535323534326336313632363332633332
37323635373337353663373432303466353035663439346535333435353235343230366536313664363533643232363136323633323232303633366637353665373433643232333332323230
37323635373337353663373432303466353035663439346535333435353235343230366536313664363533643232363136323633323232303633366637353665373433643232333332323230
37323635373337353663373432303466353035663439346535333435353235343230366536313664363533643232363136323633323232303633366637353665373433643232333332323230

If you wonder, what is the meaning of these lines, they are the same as before. The input is :

inc,OP_INSERT,abc,2

And each line of output is:

result OP_INSERT name="abc" count="3"

I suppose, it would be more entertaining if the processing weren't just incrementing a value in the input data but incrementing some static counter, then the three output lines would be different.

However this is not the only way to do the block nesting. The contents of the FnBinding's tray is not affected in any way by the binding being pushed or popped. It stays there throughout, until it's explicitly flushed by callTray(). So it could use the blocks formed in a more pipeline fashion (as opposed to the more function-call-like fashion shown before):

while(<STDIN>) {
  chomp;

  # receive
  {
    my $abReceive = Triceps::AutoFnBind->new(
      $retReceive => $bindDecrypt,
    );
    $unit->makeArrayCall($lbReceive, "OP_INSERT", $_);
  }

  # 1st decrypt
  {
    my $abDecrypt1 = Triceps::AutoFnBind->new(
      $retDecrypt => $bindDecrypt,
    );
    $bindDecrypt->callTray();
  }

  # 2nd decrypt
  {
    my $abDecrypt1 = Triceps::AutoFnBind->new(
      $retDecrypt => $bindDispatch,
    );
    $bindDecrypt->callTray();
  }

  # processing
  {
    my $abProcess = Triceps::AutoFnBind->new(
      $retOutput => $bindEncrypt,
    );
    $bindDispatch->callTray();
  }

  # 1st encrypt
  {
    my $abEncrypt1 = Triceps::AutoFnBind->new(
      $retEncrypt => $bindEncrypt,
    );
    $bindEncrypt->callTray();
  }

  # 2nd encrypt
  {
    my $abEncrypt1 = Triceps::AutoFnBind->new(
      $retEncrypt => $bindSend,
    );
    $bindEncrypt->callTray();
  }

  # send
  $bindSend->callTray();
}

After each stage its binding is popped, but the tray is carried through to the next stage.

Which way of blocking is better? I'd say they're  pretty equivalent in functionality, and your preference would depend on what style you prefer to express.

Splitting the XS modules into multiple files

When I've started writing Triceps, soon I've encountered a problem: my XS file was growing to a disturbing size and was getting pretty slow to compile.It really needed to be split into multiple files. I didn't even need to split a single package, just separating each sub-package into its own file would be (and still is) quite enough.

Unfortunately, the documentation didn't say anything in this department. I've found a solution on Internet but it didn't work particularly well. It was able to handle two files, and after that it crashed. I've had to look into what is going on and fix it.

So, how does it work? The building of the module itself is not a problem. It's the initialization that describes the available functions to Perl that presents the difficulty.  For each module Modulename, XS defines a function boot_Modulename (with further XS decorations around the name) that contains the initialization of the module. When the module gets loaded, Perl finds and runs this function. If you want to have multiple modules loaded from the same shared library in one go, you have to designate one as the primary module, and then have it call the boot functions of the other modules (one module per XS file).

You can put extra code into the boot function using the section BOOT:

MODULE = ...        PACKAGE = ...
BOOT:
    // code to add to the boot function

So the example I've found on the Internet did this (with the Triceps module names used for an example):

#ifdef __cplusplus
extern "C" {
#endif
XS(boot_Triceps__Label); // for Triceps::Label
XS(boot_Triceps__Row); // for Triceps::Row
#ifdef __cplusplus
};
#endif

MODULE = Triceps        PACKAGE = Triceps

BOOT:
    boot_Triceps__Label(aTHX_ cv);
    boot_Triceps__Row(aTHX_ cv);

If you added the 3rd module, it crashed. What went wrong?

As it turns out, the boot function is not just a C function but a whole Perl function, called with the exact same conventions as used for the normal XS functions, with the proper Perl stack frame. It even gets two arguments that it never uses.

When you call a Perl function, you're supposed to put the correct things on the stack, including the stack mark. If you just call it in C way as shown above, you're corrupting the Perl stack. So the limit of two calls and two arguments received by the boot_Triceps is not coincidental. The first time it calls boot_Triceps__Label(), one argument gets abused because the called function takes it as a stack mark. The second time, when boot_Triceps__Row is called, it abuses the second argument. The third time it runs out of arguments to abuse and crashes.

The fix is to do a proper Perl call sequence when you call the boot functions. Like this:

BOOT:
    PUSHMARK(SP); if (items >= 2) { XPUSHs(ST(0)); XPUSHs(ST(1)); } PUTBACK;
    boot_Triceps__Label(aTHX_ cv);
    SPAGAIN; POPs;
    //
    PUSHMARK(SP); if (items >= 2) { XPUSHs(ST(0)); XPUSHs(ST(1)); } PUTBACK;
    boot_Triceps__Row(aTHX_ cv);
    SPAGAIN; POPs;

This way you can include any number of modules. This code even passes through the original 2 arguments. By the way, note that you can not leave empty lines in the BOOT section, instead put at least an empty comment in them.

Monday, October 8, 2012

Fork revisited

I've been working on the streaming functions, and that gave me an idea for a change in scheduling. Sorry that this description is a little dense, you'd need to get the context of the old ways from the manual for the description of the changes to make sense.

If you'd want to look up the section on Basic scheduling http://triceps.sourceforge.net/docs-1.0.1/guide.html#sc_sched_basic, and the section on Loop scheduling http://triceps.sourceforge.net/docs-1.0.1/guide.html#sc_sched_loop, I've been saying that the loop logic could use some simplification, and the forking of the rowops is getting deprecated. Now I've come up with a solution for them both.

The loops required a separate label at the beginning of the loop to put a mark on its queue frame. When the loop's body unwinds and the next iteration starts, it has to avoid pushing more frames with each iteration. So it has to put the rowop for the next iteration into that beginning frame (like fork but farther up the stack), and then unwind the whole body before the beginning label picks the next rowop from its frame and runs the loop body for the next iteration.

But now one little change in the execution of the forked rowops from the frame fixes things: rather than doing a proper call and pushing a new frame for each of them, just execute them using the parent's frame. This muddles up the precise forking sequence a little (where the rowops forked by a label were guaranteed to execute before any other rowops forked by its parent). But this precision doesn't matter much: first, forking is not used much anyway, and second, the forked labels can't have an expectation that the model won't change between them being forked and executed. However this little change is very convenient for the loops.

In a loop the first label of the loop can now put the mark directly on its frame. This mark will stay there until the loop completes, executing every iteration from that point.

If we review the example from the section on Loop scheduling, with the topology

X -> A -> B -> C -> Y
     ^         |
     +---------+

Then the sequence will look like this:

Rowop X1 scheduled  on the outer frame:

[X1]

Rowop X1 executes:

[ ] ~X1
[ ]

Label X calls the first label of the loop, A, with rowop A1:

[ ] ~A1
[ ] ~X1
[ ]

The label A calls setMark() and puts the mark M on itself:

[ ] ~A1, mark M
[ ] ~X1
[ ]


The label A then calls the rowop B1 with calls the rowop C1:


[ ] ~C1

[ ] ~B1

[ ] ~A1, mark M

[ ] ~X1
[ ]


The label C loops the rowop A2 (for the second iteration of the loop) at mark M, thus placing A2 into the A1's frame.

[ ] ~C1

[ ] ~B1

[A2] ~A1, mark M

[ ] ~X1
[ ]


Then the label C returns, label B returns, and label A returns. But A1's frame is not empty yet (* shows that A1 has completed and now it's a frame without a rowop as such).

[A2] *, mark M

[ ] ~X1
[ ]


Then A2 gets taken from the frame and executed with the context of the same frame:

[ ] ~A2, mark M

[ ] ~X1
[ ]


The label A again sets the mark M, which marks the same frame, so it's pretty much a no-op (so A doesn't really have to set the mark the second time, it's just easier this way). And then it proceeds to call B and C again:


[ ] ~C2

[ ] ~B2

[ ] ~A2, mark M

[ ] ~X1
[ ]


The label C loops again back to A:


[ ] ~C2

[ ] ~B2

[A3] ~A2, mark M

[ ] ~X1
[ ]


The stack then unrolls, finds the A2's frame not empty, takes A3 from it, and continues in the same way until C decides to not loop to A any more, calling Y instead.

This has pulled with it a few more changes. The first consequence is that the frame draining doesn't happen between executing the label itself and executing its chained labels. Now it has moved to the very end. Now the label runs, then calls whatever labels are chained from it, then the frame draining happens after all the other processing has completed. If the frame is found not empty, the first label from it gets removed from the frame and "semi-called" with the same frame. If the frame is not empty again (because either the original rowop had forked/looped rowops onto it, or because the "semi-called" one did), the next label gets removed and "semi-called", and so on.

The second consequence is that this has changed the traces of the unit tracers, and I've had to add one more TracerWhen constant. Remembering the difficulties with the nesting of the traces, this was a good time to fix that too, so I've added the second TracerWhen constant. Now all of them go nicely in pairs:

TW_BEFORE, // before calling the label's execution as such
TW_AFTER, // after all the execution is done
TW_BEFORE_CHAINED, // after execution, before calling the chained labels (if they are present)
TW_AFTER_CHAINED, // after calling the chained labels (if they were present)
TW_BEFORE_DRAIN, // before draining the label's frame if it's not empty
TW_AFTER_DRAIN, // after draining the label's frame if was not empty

The TW_BEFORE/AFTER_CHAINED trace points now get called only if there actually were any chained labels to call, and TW_BEFORE/AFTER_DRAIN trace points get called only if there were anything to drain. The DRAIN trace points get always called with the original rowop that pushed this frame onto the stack first (so that matching the "before" and "after" is easy).

The full sequence in the correct order now becomes:

TW_BEFORE
TW_BEFORE_CHAINED
TW_AFTER_CHAINED 
TW_AFTER
TW_BEFORE_DRAIN
TW_AFTER_DRAIN 

But since parts of it are optional, the minimal (and most typical) one is only:

TW_BEFORE
TW_AFTER

There also are new methods to check if a particular constant (in its integer form, not as a string) is a "before" or "after". Their typical usage in a trace function, to print an opening or closing brace, looks like:

     if (Triceps::tracerWhenIsBefore($when)) {
        $msg .= " {";
    } elsif (Triceps::tracerWhenIsAfter($when)) {
        $msg .= " }";
    }


More trace points that are neither "before" or "after" could get added in the future, so a good practice is to use an elsif with both conditions rather than a simple if/else with one condition.



The third consequence is that the methods Unit::makeLoopHead() and Unit::makeLoopAround() now return only a pair of values, not a triplet. The "begin" label is not needed any more, so it's not created and not returned.

Wednesday, October 3, 2012

Streaming functions and pipelines

The streaming functions can be arranged into a pipeline by binding the result of one function to the input of another one. Fundamentally, the pipelines in the world of streaming functions are analogs of the nested calls with the common functions. For example, a pipeline (written for shortness in the Unix way)

a | b | c

is an analog of the common function calls

c(b(a()))

Of course, if the pipeline is fixed, it can as well be connected directly with the label chaining and then stay like this. A more interesting case is when the pipeline needs to be reconfigured dynamically based on the user requests.

An interesting example of pipeline usage comes from the data security. A client may connect to a CEP model element in a clear-text or encrypted way. In the encrypted way the data received from the client needs to be decrypted, then processed, and then the results encrypted before sending them back:

receive | decrypt | process | encrypt | send

In the clear-text mode the pipeline becomes shorter:

receive | process | send

Let's make an example around this idea: To highlight the flexibility, the configuration will be selectable for each input line. If the input starts with a "+", it will be considered encrypted, otherwise clear-text. Since the actual security is not important here, it will be simulated by encoding the text in hex (each byte of data becomes two hexadecimal digits). The real encryption, such as SSL, would of course require the key negotiation, but this little example just skips over this part, since it has no key.

First, define the input and output (receive and send) endpoints:

# All the input and output gets converted through an intermediate
# format of a row with one string field.
my $rtString = Triceps::RowType->new(
    s => "string"
) or confess "$!";

# All the input gets sent here.
my $lbReceive = $unit->makeDummyLabel($rtString, "lbReceive");
my $retReceive = Triceps::FnReturn->new(
    name => "retReceive",
    labels => [
        data => $lbReceive,
    ],
);

# The binding that actually prints the output.
my $bindSend = Triceps::FnBinding->new(
    name => "bindSend",
    on => $retReceive, # any matching return will do
    unit => $unit,
    labels => [
        data => sub {
            print($_[1]->getRow()->get("s"), "\n");
        },
    ],
);

The same row type $rtString will be used for the whole pipeline, sending through the arbitrary strings of text. The binding $bindSend is defined on $retReceive, so they can actually be short-circuited together. But they don't have to. $bindSend can be bound to any matching return. The matching return is defined as having the same number of labels in it, with matching row types. The names of the labels don't matter but their order does. It's a bit tricky: when a binding is created, the labels in it get connected to the return on which it's defined by name. But at this point each of them gets assigned a number, in order the labels went in that original return. After that only this number matters: if this binding gets connected to another matching return, it will get the data from the return's label with the same number, not the same name.

Next step, define the endpoints for the processing: the dispatcher and the output label. All of them use the same row type and matching returns. The actual processing will eventually be hard-connected between these endpoints.

my %dispatch; # the dispatch table will be set here

# The binding that dispatches the input data
my $bindDispatch = Triceps::FnBinding->new(
    name => "bindDispatch",
    on => $retReceive,
    unit => $unit,
    labels => [
        data => sub {
            my @data = split(/,/, $_[1]->getRow()->get("s")); # starts with a command, then string opcode
            my $type = shift @data;
            my $lb = $dispatch{$type};
            my $rowop = $lb->makeRowopArray(@data);
            $unit->call($rowop);
        },
    ],
);

# All the output gets converted to rtString and sent here.
my $lbOutput = $unit->makeDummyLabel($rtString, "lbOutput");
my $retOutput = Triceps::FnReturn->new(
    name => "retOutput",
    labels => [
        data => $lbOutput,
    ],
);

And the filters for encryption and decryption. Each of them has a binding for its input and a return for its output. The actual pseudo-encryption transformation is done with Perl functions unpack() and pack().

# The encryption pipeline element.
my $retEncrypt = Triceps::FnReturn->new(
    name => "retEncrypt",
    unit => $unit,
    labels => [
        data => $rtString,
    ],
);
my $lbEncrypt = $retEncrypt->getLabel("data") or confess "$!";
my $bindEncrypt = Triceps::FnBinding->new(
    name => "bindEncrypt",
    on => $retReceive,
    unit => $unit,
    labels => [
        data => sub {
            my $s = $_[1]->getRow()->get("s");
            $unit->makeArrayCall($lbEncrypt, "OP_INSERT", unpack("H*", $s));
        },
    ],
);

# The decryption pipeline element.
my $retDecrypt = Triceps::FnReturn->new(
    name => "retDecrypt",
    unit => $unit,
    labels => [
        data => $rtString,
    ],
);
my $lbDecrypt = $retDecrypt->getLabel("data") or confess "$!";
my $bindDecrypt = Triceps::FnBinding->new(
    name => "bindDecrypt",
    on => $retReceive,
    unit => $unit,
    labels => [
        data => sub {
            my $s = $_[1]->getRow()->get("s");
            $unit->makeArrayCall($lbDecrypt, "OP_INSERT", pack("H*", $s));
        },
    ],
);

Then goes the body of the model. It defines the actual row types for the data that gets parsed from strings and the business logic (which is pretty simple, increasing an integer field). The dispatch table connects the dispatcher with the business logic, and the conversion from the data rows to the plain text rows is done with template makePipePrintLabel(). This template is very similar to the tempate makePrintLabel() that was shown in the section "Simple wrapper templates" http://triceps.sourceforge.net/docs-1.0.1/guide.html#sc_template_wrapper.

sub makePipePrintLabel($$$) # ($print_label_name, $parent_label, $out_label)
{
    my $name = shift;
    my $lbParent = shift;
    my $lbOutput = shift;
    my $unit = $lbOutput->getUnit();
    my $lb = $lbParent->getUnit()->makeLabel($lbParent->getType(), $name,
        undef, sub { # (label, rowop)
            $unit->makeArrayCall(
                $lbOutput, "OP_INSERT", $_[1]->printP());
        }) or confess "$!";
    $lbParent->chain($lb) or confess "$!";
    return $lb;
}

# The body of the model: pass through the name, increase the count.
my $rtData = Triceps::RowType->new(
    name => "string",
    count => "int32",
) or confess "$!";

my $lbIncResult = $unit->makeDummyLabel($rtData, "result");
my $lbInc = $unit->makeLabel($rtData, "inc", undef, sub {
    my $row = $_[1]->getRow();
    $unit->makeHashCall($lbIncResult, $_[1]->getOpcode(),
        name  => $row->get("name"),
        count => $row->get("count") + 1,
    );
}) or confess ("$!");
makePipePrintLabel("printResult", $lbIncResult, $lbOutput);

%dispatch = (
    inc => $lbInc,
);

Finally, the main loop. It will check the input lines for the leading "+" and construct one or the other pipeline for processing. Of course, the pipelines don't have to be constructed in the main loop. They could have been constructed in the handler of $lbReceive just as well (then it would need a separate label to send its result to, and to include into $retReceive).

while(&readLine) {
    my $ab;
    chomp;
    if (/^\+/) {
        $ab = Triceps::AutoFnBind->new(
            $retReceive => $bindDecrypt,
            $retDecrypt => $bindDispatch,
            $retOutput => $bindEncrypt,
            $retEncrypt => $bindSend,
        );
        $_ = substr($_, 1);
    } else {
        $ab = Triceps::AutoFnBind->new(
            $retReceive => $bindDispatch,
            $retOutput => $bindSend,
        );
    };
    $unit->makeArrayCall($lbReceive, "OP_INSERT", $_);
    $unit->drainFrame();
}

The constructor of AutoFnBind (and also FnBinding::call()) can accept multiple return-binding pairs. It will bind them all, and unbind them back on its object destruction. It's the same thing as creating multiple AutoFnBind objects for one pair each, only more efficient.

And here is an example of a run ("> " as usual per new tradition shows the input lines):

> inc,OP_INSERT,abc,1
result OP_INSERT name="abc" count="2"
> inc,OP_DELETE,def,100
result OP_DELETE name="def" count="101"
> +696e632c4f505f494e534552542c6162632c32
726573756c74204f505f494e53455254206e616d653d226162632220636f756e743d22332220
> +696e632c4f505f44454c4554452c6465662c313031
726573756c74204f505f44454c455445206e616d653d226465662220636f756e743d223130322220

What is in the encrypted data? The input lines have been produced by running a Perl expression manually:

$ perl -e 'print((unpack "H*", "inc,OP_INSERT,abc,2"), "\n");'
696e632c4f505f494e534552542c6162632c32
$ perl -e 'print((unpack "H*", "inc,OP_DELETE,def,101"), "\n");'
696e632c4f505f44454c4554452c6465662c313031

They and the results can be decoded by running another Perl expression:

$ perl -e 'print((pack "H*", "726573756c74204f505f494e53455254206e616d653d226162632220636f756e743d22332220"), "\n");'
result OP_INSERT name="abc" count="3"
$ perl -e 'print((pack "H*", "726573756c74204f505f44454c455445206e616d653d226465662220636f756e743d223130322220"), "\n");'
result OP_DELETE name="def" count="102"

Tuesday, October 2, 2012

Streaming functions and loops

The streaming functions can be used to replace the topological loops (where the connection between the labels go in circles) with the procedural ones. Just make the body of the loop into a streaming function and connect its output with its own input (and of course also to the loop results). Then call this function in a procedural while-loop until the data stop circulating.

The way the streaming functions have been described so far, there is a catch, even two of them: First, with such a connection, the output of the streaming function would immediately circulate to its input, and would try to keep circulating until the loop is done, with no need for a while-loop. Second, as soon as it attempts to circulate, the scheduler will detect a recursive call and die.

But there is also a solution that has not been described yet: an FnBinding can collect the incoming rowops in a tray instead of immediately forwarding them. This tray can be called later, after the call completes. This way the iteration has its data collected, function completes, and then the next iteration of the while-loop starts, sending the data from the previous iteration. When there is nothing to send, the loop completes.

Using this logic, let's rewrite the Fibonacci example with the streaming function loops. Its original version and description of the logic can be found in the manual section "Example of a topological loop" http://triceps.sourceforge.net/docs-1.0.1/guide.html#sc_sched_loop_ex.

The new version is:

my $uFib = Triceps::Unit->new("uFib");

###
# A streaming function that computes one step of a
# Fibonacci number, will be called repeatedly.

# Type of its input and output.
my $rtFib = Triceps::RowType->new(
    iter => "int32", # number of iterations left to do
    cur => "int64", # current number
    prev => "int64", # previous number
) or confess "$!";

# Input:
#   $lbFibCompute: request to do a step. iter will be decremented,
#     cur moved to prev, new value of cur computed.
# Output (by FnReturn labels):
#   "next": data to send to the next step, if the iteration
#     is not finished yet (iter in the produced row is >0).
#   "result": the result data if the iretaion is finished
#     (iter in the produced row is 0).
# The opcode is preserved through the computation.

my $frFib = Triceps::FnReturn->new(
    name => "Fib",
    unit => $uFib,
    labels => [
        next => $rtFib,
        result => $rtFib,
    ],
);

my $lbFibCompute = $uFib->makeLabel($rtFib, "FibCompute", undef, sub {
    my $row = $_[1]->getRow();
    my $prev = $row->get("cur");
    my $cur = $prev + $row->get("prev");
    my $iter = $row->get("iter") - 1;
    $uFib->makeHashCall($frFib->getLabel($iter > 0? "next" : "result"), $_[1]->getOpcode(),
        iter => $iter,
        cur => $cur,
        prev => $prev,
    );
}) or confess "$!";

# End of streaming function
###

my $lbPrint = $uFib->makeLabel($rtFib, "Print", undef, sub {
    print($_[1]->getRow()->get("cur"));
});

# binding to run the Triceps steps in a loop
my $fbFibLoop = Triceps::FnBinding->new(
    name => "FibLoop",
    on => $frFib,
    withTray => 1,
    labels => [
        next => $lbFibCompute,
        result => $lbPrint,
    ],
);

my $lbMain = $uFib->makeLabel($rtFib, "Main", undef, sub {
    my $row = $_[1]->getRow();
    {
        my $ab = Triceps::AutoFnBind->new($frFib, $fbFibLoop);

        # send the request into the loop
        $uFib->makeHashCall($lbFibCompute, $_[1]->getOpcode(),
            iter => $row->get("iter"),
            cur => 0, # the "0-th" number
            prev => 1,
        );

        # now keep cycling the loop until it's all done
        while (!$fbFibLoop->trayEmpty()) {
            $fbFibLoop->callTray();
        }
    }
    print(" is Fibonacci number ", $row->get("iter"), "\n");
}) or confess "$!";

while(<STDIN>) {
    chomp;
    my @data = split(/,/);
    $uFib->makeArrayCall($lbMain, @data);
    $uFib->drainFrame(); # just in case, for completeness
}

It produces the same output as before (as usual in the new convention, the lines marked with "> " are the input lines):

> OP_INSERT,1
1 is Fibonacci number 1
> OP_DELETE,2
1 is Fibonacci number 2
> OP_INSERT,5
5 is Fibonacci number 5
> OP_INSERT,6
8 is Fibonacci number 6

The option "withTray" of FnBind is what makes it collect the rowops in a tray. The rowops are not the original incoming ones but already translated to call the FnBinding's output labels. The method callTray() swaps the tray with a fresh one and then calls the original tray with the collected rowops. There are more methods for the tray control: swapTray() swaps the tray with a fresh one and returns the original one, which can then be read or called; traySize() returns not just the emptiness condition but the whole size of the tray.

The whole loop runs in one binding scope, because it doesn't change with the iterations. The first row primes the loop, and then it continues while there is anything to circulate.

This example sends both the next iteration rows and the result rows through the binding. But for the result rows it doesn't have to. They can be sent directly out of the loop:

my $lbFibCompute = $uFib->makeLabel($rtFib, "FibCompute", undef, sub {
    my $row = $_[1]->getRow();
    my $prev = $row->get("cur");
    my $cur = $prev + $row->get("prev");
    my $iter = $row->get("iter") - 1;
    $uFib->makeHashCall($iter > 0? $frFib->getLabel("next") : $lbPrint, $_[1]->getOpcode(),
        iter => $iter,
        cur => $cur,
        prev => $prev,
    );
}) or confess "$!";

The printed result is exactly the same.

Sunday, September 30, 2012

Another example of streaming functions

Now I want to show an example that is fundamentally kind of dumb. The same thing is easier to do in Triceps with templates. And the whole premise is not exactly great either. But it provides an opportunity to show more of the streaming functions, in a set-up that is closer to the SQL-based systems.

The background is as follows: There happen to be multiple ways to identify the securities (stock shares etc.). "RIC" is the identifier used by Reuters (and quite often by the other data suppliers), consisting of the ticker symbol on an exchange and the code of the exchange. ISIN is the international standard identifier. A security (and some of its creative derivatives) might happen to be listed on multiple exchanges, each having its own RIC, but all translating to the same ISIN (there might be multiple ISINs too but that's another story). A large financial company would want to track a security all around the world. To aggregate the data on the security worldwide, it has to identify it by ISIN, but the data feed might be coming in as RIC only. The translation of RIC to ISIN is then done by the table during processing. The RIC is not thrown away either, it shows the detail of what happened. But ISIN is added for the aggregation.

The data might be coming from multiple feeds, and there are multiple kinds of data: trades, quotes, lending quotes and so on, each with its own schema and its own aggregations. However the step of RIC-to-ISIN translation is the same for all of them, is done by the same table, and can be done in one place.

An extra complexity is that in the real world the translation table might be incomplete. However some feeds might provide both RICs and ISINs in their records, so the pairs that aren't in the reference table yet, can be inserted there and used for the following translations. This is actually not such a great idea, because it means that there might be previous records that have went through before the translation became available. A much better way would be to do the translation as a join, where the update to a reference table would update any previous records as well. But then there would not be much use for a streaming function in it. As I've said before, it's a rather dumb example.

The streaming function will work like this: It will get an argument pair of (RIC, ISIN) from an incoming record. Either component of this pair might be empty. Since the rest of the record is wildly different for different feeds, the rest of the record is left off at this point, and the uniform argument of (RIC, ISIN) is given to the function. The function will consult its table, see if it can add more information from there, or add more information from the argument into the table, and return the hopefully enriched pair (RIC, ISIN) with an empty ISIN field replaced by the right value, to the caller.

The function is defined like this:

my $rtIsin = Triceps::RowType->new(
    ric => "string",
    isin => "string",
) or confess "$!";

my $ttIsin = Triceps::TableType->new($rtIsin)
    ->addSubIndex("byRic", Triceps::IndexType->newHashed(key => [ "ric" ])
) or confess "$!";
$ttIsin->initialize() or confess "$!";

my $tIsin = $unit->makeTable($ttIsin, "EM_CALL", "tIsin") or confess "$!";

# the results will come from here
my $fretLookupIsin = Triceps::FnReturn->new(
    name => "fretLookupIsin",
    unit => $unit,
    labels => [
        result => $rtIsin,
    ],
);

# The function argument: the input data will be sent here.
my $lbLookupIsin = $unit->makeLabel($rtIsin, "lbLookupIsin", undef, sub {
    my $row = $_[1]->getRow();
    if ($row->get("ric")) {
        my $argrh = $tIsin->makeRowHandle($row);
        my $rh = $tIsin->find($argrh);
        if ($rh->isNull()) {
            if ($row->get("isin")) {
                $tIsin->insert($argrh);
            }
        } else {
            $row = $rh->getRow();
        }
    }
    $unit->call($fretLookupIsin->getLabel("result")
        ->makeRowop("OP_INSERT", $row));
}) or confess "$!";

The $fretLookupIsin is the function result, $lbLookupIsin is the function input. In this example the result label in FnReturn is defined differently than in the previous one: not by a source label but by a row type. This label doesn't get chained to anything, instead the procedural code finds it as $fretLookupIsin->getLabel("result") and sends the rowops directly to it.

Then the ISIN translation code for some trades feed would look as follows (remember, supposedly there would be many feeds, each with its own schema, but for the example I show only one):

my $rtTrade = Triceps::RowType->new(
    ric => "string",
    isin => "string",
    size => "float64",
    price => "float64",
) or confess "$!";

my $lbTradeEnriched = $unit->makeDummyLabel($rtTrade, "lbTradeEnriched");
my $lbTrade = $unit->makeLabel($rtTrade, "lbTrade", undef, sub {
    my $rowop = $_[1];
    my $row = $rowop->getRow();
    Triceps::FnBinding::call(
        name => "callTradeLookupIsin",
        on => $fretLookupIsin,
        unit => $unit,
        rowop => $lbLookupIsin->makeRowopHash("OP_INSERT",
            ric => $row->get("ric"),
            isin => $row->get("isin"),
        ),
        labels => [
            result => sub { # a label will be created from this sub
                $unit->call($lbTradeEnriched->makeRowop($rowop->getOpcode(),
                    $row->copymod(
                        isin => $_[1]->getRow()->get("isin")
                    )
                ));
            },
        ],
    );
});

The label $lbTrade receives the incoming trades, calls the streaming function to enrich them with the ISIN data, and forwards the enriched data to the label $lbTradeEnriched. The function call is done differently in this example. Rather than create a FnBinding object and then use it with a scoped AutoFnBind, it uses the convenience function FnBinding::call() that wraps all that logic. It's simpler to use, without all these extra objects, but the price is the efficiency: it ends up creating a new FnBinding object for every call. That's where a compiler would be very useful, it could take a call like this, translate it to the internal objects once, and then keep reusing them.

A FnBinding::call gets a name that is used for the error messages and also to give names to the temporary objects it creates. The option "on" tells, which streaming function is being called (by specifying its FnReturn). The option "rowop" gives the arguments of the streaming functions. There are multiple way to do that: option "rowop" for a single rowop, "rowops" for an array of rowops, "tray" for a tray, and "code" for a procedural function that would send the inputs. And "labels" as usual connects the results of the function, either to the labels, or by creating labels automatically from the snippets of code.

The result handling here demonstrates the technique that I call the "implicit join": The function gets a portion of data from an original row, does some transformation and returns the data back. This data is then joined with the original row. The code knows, what this original row was, it gets remembered in the variable $row. The semantics of the call guarantees that nothing else has happened during the function call, and that $row is still the current row. Then the function result gets joined with $row, and the produced data is sent further on its way. The variable $row could be either a global one, or as shown here a scoped variable that gets embedded into a closure function.

The rest of the example, the dispatcher part, is:

# print what is going on
my $lbPrintIsin = makePrintLabel("printIsin", $tIsin->getOutputLabel());
my $lbPrintTrade = makePrintLabel("printTrade", $lbTradeEnriched);

# the main loop
my %dispatch = (
    isin => $tIsin->getInputLabel(),
    trade => $lbTrade,
);

while(<STDIN>) {
    chomp;
    my @data = split(/,/); # starts with a command, then string opcode
    my $type = shift @data;
    my $lb = $dispatch{$type};
    my $rowop = $lb->makeRowopArray(@data);
    $unit->call($rowop);
    $unit->drainFrame(); # just in case, for completeness
}

And an example of running, with the input lines shown according to the new convention preceded by "> ":

> isin,OP_INSERT,ABC.L,US0000012345
tIsin.out OP_INSERT ric="ABC.L" isin="US0000012345"
> isin,OP_INSERT,ABC.N,US0000012345
tIsin.out OP_INSERT ric="ABC.N" isin="US0000012345"
> isin,OP_INSERT,DEF.N,US0000054321
tIsin.out OP_INSERT ric="DEF.N" isin="US0000054321"
> trade,OP_INSERT,ABC.L,,100,10.5
lbTradeEnriched OP_INSERT ric="ABC.L" isin="US0000012345" size="100" price="10.5"
> trade,OP_DELETE,ABC.N,,200,10.5
lbTradeEnriched OP_DELETE ric="ABC.N" isin="US0000012345" size="200" price="10.5"
> trade,OP_INSERT,GHI.N,,300,10.5
lbTradeEnriched OP_INSERT ric="GHI.N" isin="" size="300" price="10.5"
> trade,OP_INSERT,,XX0000012345,400,10.5
lbTradeEnriched OP_INSERT ric="" isin="XX0000012345" size="400" price="10.5"
> trade,OP_INSERT,GHI.N,XX0000012345,500,10.5
tIsin.out OP_INSERT ric="GHI.N" isin="XX0000012345"
lbTradeEnriched OP_INSERT ric="GHI.N" isin="XX0000012345" size="500" price="10.5"
> trade,OP_INSERT,GHI.N,,600,10.5
lbTradeEnriched OP_INSERT ric="GHI.N" isin="XX0000012345" size="600" price="10.5"

The table gets pre-populated with a few translations, and the first few trades use them. Then goes the example of a non-existing translation, which gets eventually added from the incoming data (see that the trade with GHI.N,XX0000012345 both updates the ISIN table and sends through the trade record), and the following trades can then use this newly added translation.

Saturday, September 29, 2012

More of Collapse with functions

The Collapse as shown before sends all the collected deletes before all the collected inserts. For example, if it has collected the updates for four rows, the output will be (assuming that the Collapse element is named "collapse" and the data set in it is named "idata"):

collapse.idata.out OP_DELETE local_ip="3.3.3.3" remote_ip="7.7.7.7" bytes="100"
collapse.idata.out OP_DELETE local_ip="2.2.2.2" remote_ip="6.6.6.6" bytes="100"
collapse.idata.out OP_DELETE local_ip="4.4.4.4" remote_ip="8.8.8.8" bytes="100"
collapse.idata.out OP_DELETE local_ip="1.1.1.1" remote_ip="5.5.5.5" bytes="100"
collapse.idata.out OP_INSERT local_ip="3.3.3.3" remote_ip="7.7.7.7" bytes="300"
collapse.idata.out OP_INSERT local_ip="2.2.2.2" remote_ip="6.6.6.6" bytes="300"
collapse.idata.out OP_INSERT local_ip="4.4.4.4" remote_ip="8.8.8.8" bytes="300"
collapse.idata.out OP_INSERT local_ip="1.1.1.1" remote_ip="5.5.5.5" bytes="300"

What if you want the deletes followed directly by the matching inserts? Like this:

collapse.idata.out OP_DELETE local_ip="3.3.3.3" remote_ip="7.7.7.7" bytes="100"
collapse.idata.out OP_INSERT local_ip="3.3.3.3" remote_ip="7.7.7.7" bytes="300"
collapse.idata.out OP_DELETE local_ip="2.2.2.2" remote_ip="6.6.6.6" bytes="100"
collapse.idata.out OP_INSERT local_ip="2.2.2.2" remote_ip="6.6.6.6" bytes="300"
collapse.idata.out OP_DELETE local_ip="4.4.4.4" remote_ip="8.8.8.8" bytes="100"
collapse.idata.out OP_INSERT local_ip="4.4.4.4" remote_ip="8.8.8.8" bytes="300"
collapse.idata.out OP_DELETE local_ip="1.1.1.1" remote_ip="5.5.5.5" bytes="100"
collapse.idata.out OP_INSERT local_ip="1.1.1.1" remote_ip="5.5.5.5" bytes="300"

With the procedural version it required doing an look-up in the insert table after processing each row in the delete table and handling it if found. So I've left it out to avoid complicating the example. But in the streaming function form it becomes easy, just change the binding a little bit:

        my $lbInsInput = $dataset->{tbInsert}->getInputLabel();

        my $fbind = Triceps::FnBinding->new(
            name => $self->{name} . "." . $dataset->{name} . ".bndTbl",
            on => $fret,
            unit => $unit,
            labels => [
                del => sub {
                    if ($_[1]->isDelete()) {
                        $unit->call($lbOut->adopt($_[1]));
                        # If the INSERT is available after this DELETE, this
                        # will produce it.
                        $unit->call($lbInsInput->adopt($_[1]));
                    }
                },
                ins => sub {
                    if ($_[1]->isDelete()) {
                        $unit->call($lbOut->makeRowop($OP_INSERT, $_[1]->getRow()));
                    }
                },
            ],
        );

The "del" binding first sends the result out as usual and then forwards the DELETE rowop to the insert table's input. Which then causes the insert rowop to be sent of a match is found. Mind you, the look-up and conditional processing still happens. But now it all happens inside the table machinery, all you need to do is add one more line to invoke it.

Let's talk in a little more detail, what happens when the clearing of the Delete table deletes the row with (local_ip="3.3.3.3" remote_ip="7.7.7.7").

  • The Delete table sends a rowop with this row and OP_DELETE to its output label collapse.idata.tbDelete.out.
  • Which then gets forwarded to a chained label in the FnReturn, collapse.idata.retTbl.del.
  • FnReturn has a FnBinding pushed into it, so the rowop passes to the matching label in the binding, collapse.idata.bndTbl.del.
  • The Perl handler of that label gets called, first forwards the rowop to the Collapse output label collapse.idata.out, and then to the Insert table's input label collapse.idata.tbInsert.in.
  • The Insert table looks up the row by the key, finds it, removes it from the table, and sends an OP_DELETE rowop to its output label collapse.idata.tbInsert.out.
  • Which then gets forwarded to a chained label in the FnReturn, collapse.idata.retTbl.ins.
  • FnReturn has a FnBinding pushed into it, so the rowop passes to the matching label in the binding, collapse.idata.bndTbl.ins.
  • The Perl handler of that label gets called and sends the rowop with the opcode changed to OP_INSERT to the Collapse output label collapse.idata.out.
It's a fairly complicated sequence but all you needed to do was to add one line of code. The downside of course is that if something goes not the way you expected, you'd have to trace and understand the whole  long sequence.

Since when the INSERTs are send after DELETEs, their data is removed from the Insert table too, the following clear() of the Insert table won't find them any more and won't send any duplicates; it will send only the inserts for which there were no matching deletes.

You may notice that the code in the "del" handler only forwards the rows around, and that can be replaced by a chaining:

        my $lbDel = $unit->makeDummyLabel(
            $dataset->{tbDelete}->getOutputLabel()->getRowType(),
            $self->{name} . "." . $dataset->{name} . ".lbDel");
        $lbDel->chain($lbOut);
        $lbDel->chain($lbInsInput);

        my $fbind = Triceps::FnBinding->new(
            name => $self->{name} . "." . $dataset->{name} . ".bndTbl",
            on => $fret,
            unit => $unit,
            labels => [
                del => $lbDel,
                ins => sub {
                    $unit->call($lbOut->makeRowop($OP_INSERT, $_[1]->getRow()));
                },
            ],
        );

This shows another way of label definition in FnBinding: an actual label is created first and then given to FnBinding, instead of letting it automatically create a label from the code. The "if ($_[1]->isDelete())" condition has been removed from the "ins", since it's really redundant, and the delete part with its chaining doesn't do the same check anyway.

This code works just as well and even more efficiently than the previous version, since no Perl code needs to be invoked for "del", it all propagates internally through the chaining. However the price is that the DELETE rowops coming out of the output label will have the head-of-the-chain label in them:

collapse.idata.lbDel OP_DELETE local_ip="3.3.3.3" remote_ip="7.7.7.7" bytes="100"
collapse.idata.out OP_INSERT local_ip="3.3.3.3" remote_ip="7.7.7.7" bytes="300"
collapse.idata.lbDel OP_DELETE local_ip="2.2.2.2" remote_ip="6.6.6.6" bytes="100"
collapse.idata.out OP_INSERT local_ip="2.2.2.2" remote_ip="6.6.6.6" bytes="300"
collapse.idata.lbDel OP_DELETE local_ip="4.4.4.4" remote_ip="8.8.8.8" bytes="100"
collapse.idata.out OP_INSERT local_ip="4.4.4.4" remote_ip="8.8.8.8" bytes="300"
collapse.idata.lbDel OP_DELETE local_ip="1.1.1.1" remote_ip="5.5.5.5" bytes="100"
collapse.idata.out OP_INSERT local_ip="1.1.1.1" remote_ip="5.5.5.5" bytes="300"

The "ins" side can't be handled just by chaining because it has to replace the opcode in the rowops. A potential different way to handle this would be to define various label types in C++ for many primitive operations, like replacing the opcode, and then build by combining them.

The final item is that the code shown in this post involved a recursive call of the streaming function. Its output from the "del" label got fed back to the function, producing more output on the "ins" label. This worked because it invoked a different code path in the streaming function than the one that produced the "del" data. If it were to form a topological loop back to the same path with the same labels, that would have been an error. The recursion will be discussed in more detail later.