Saturday, September 29, 2012

Hello streaming functions, or a functional Collapse

Coming up with the good examples of the streaming function usage in Triceps is surprisingly difficult. Ironically, the flexibility of Triceps is the problem. If all you have is SQL, the streaming functions become pretty much a must. But if you can write the procedural code, most things are easier that way. For a streaming function to become beneficial, it has to be written in SQLy primitives (such as tables, joins) and not be easily reducible to the procedural code.

The most distilled example I've come up is in te implementation of Collapse. The original implementation of Collapse is described in the manual section "Collapsed updates". The flush() there goes in a loop deleting the all rows from the state tables and sending them as rowops to the output.

The deletion of all the rows can nowadays be done easier with the Table method clear(). However by itself it doesn't solve the problem of sending the output. It sends the deleted rows to the table's output label but we can't just connect the output of the state tables to the Collapse output: then it would also pick up all the intermediate changes! The data needs to be picked up from the tables output selectively, only in flush().

This makes it a good streaming function: the body of the function consists of running clear() on the state tables, and its result is whatever comes on the output labels of the tables.

Since most of the logic remains unchanged, I've implemented this new version of Collapse as a subclass that extends and replaces some of the code with its own:

package FnCollapse;

our @ISA=qw(Triceps::Collapse);

sub new # ($class, $optName => $optValue, ...)
    my $class = shift;
    my $self = $class->SUPER::new(@_);
    # Now add an FnReturn to the output of the dataset's tables.
    # One return is enough for both.
    # Also create the bindings for sending the data.
    foreach my $dataset (values %{$self->{datasets}}) {
        my $fret = Triceps::FnReturn->new(
            name => $self->{name} . "." . $dataset->{name} . ".retTbl",
            labels => [
                del => $dataset->{tbDelete}->getOutputLabel(),
                ins => $dataset->{tbInsert}->getOutputLabel(),
        $dataset->{fret} = $fret;

        # these variables will be compiled into the binding snippets
        my $lbOut = $dataset->{lbOut};
        my $unit = $self->{unit};
        my $OP_INSERT = &Triceps::OP_INSERT;
        my $OP_DELETE = &Triceps::OP_DELETE;

        my $fbind = Triceps::FnBinding->new(
            name => $self->{name} . "." . $dataset->{name} . ".bndTbl",
            on => $fret,
            unit => $unit,
            labels => [
                del => sub {
                    if ($_[1]->isDelete()) {
                ins => sub {
                    if ($_[1]->isDelete()) {
                        $unit->call($lbOut->makeRowop($OP_INSERT, $_[1]->getRow()));
        $dataset->{fbind} = $fbind;
    bless $self, $class;
    return $self;

# Override the base-class flush with a different implementation.
sub flush # ($self)
    my $self = shift;
    foreach my $dataset (values %{$self->{datasets}}) {
        # The binding takes care of producing and directing
        # the output. AutoFnBind will unbind when the block ends.
        my $ab = Triceps::AutoFnBind->new(
            $dataset->{fret} => $dataset->{fbind}

new() adds the streaming function elements in each data set. They consist of two parts: FnReturn defines the return value of a streaming function (there is no formal definition of the body or the entry point since they are quite flexible), and FnBinding defines a call of the streaming function. In this case the function is called in only one place, so one FnBinding is defined. If called from multiple places, there would be multiple FnBindings.

When a normal procedural function is called, the return address provides the connection to get the result back from it to the caller. In a streaming function, the FnBinding connects the result labels to the caller's further processing of the returned data. Unlike the procedural functions, the data is not returned in one step: run the function, compute the value, return it. Instead the return value of a streaming function is a stream of rowops. As each of them is sent to a return label, it goes through the binding and to the caller's further processing. Then the streaming function continues, producing the next rowop, and so on.

If this sounds complicated, please realize that here we're dealing with the assembly language equivalent for streaming functions. I expect that over time it will become easier.

The second source of complexity is that the arguments of a streaming function are not computed in one step either. You don't normally have a full set of rows to send to a streaming function in one go. Instead you set up the streaming call to bind the result, then you pump the rowops to the function's input, creating them in whatever way.

Getting back to the definition of a streaming function, FnReturn defines a set of labels, each with a logical name. In this case the names are "del" and "ins". The labels inside FnReturn are a special variety of dummy labels, but they are chained to some real labels that send the result of the function. The snippet

   del => $dataset->{tbDelete}->getOutputLabel(),

says "create a return label named 'del' and chain it from the tbDelete's output label". There is more details to the naming and label creation but let's not get bogged in it now.

The FnBinding defines a matching set of labels, with the same logical names. It's like a receptacle and plug: you put the plug into the receptacle and get the data flowing, you unplug it and the data flow stops. The Perl version of FnBinding provides a convenience: when it gets a code reference instead of a label, it automatically creates a label with that code for its handler.

In this case both binding labels forward the data to the Collapse's output label. Only the one for the insert table has to change the opcodes to OP_INSERT. The check

if ($_[1]->isDelete()) ...

is really redundant, to be on the safe side, since we know that when the data will be flowing, all of it will be coming from the table clearing and have the opcodes of OP_DELETE.

The actual call happens in flush(): Triceps::AutoFnBind does the "plug into receptable" thing, with automatic unplugging when leaving the block scope. If you want to do things manually, FnReturn has the methods push() and pop() but the scoped binding is safer and easier. Once the binding is done, the data is sent through the function by calling clear() on both tables. And then the block ends, AutoFnBind undoes the binding, and the life goes on.

The result produced by this version of Collapse is exactly the same as by the original version. And even when we get down to grits, it's produced with the exact same logical sequence: the rows are sent out as they are deleted from the state tables. But it's structured differently: instead of the procedural deletion and sending of the rows, the internal machinery of the tables gets invoked, and the results of that machinery are then converted to the form suitable for the collapse results and propagated to the output.

Philosophically, it could be argued: what is the body of this function? Is it just the internal logic of the table delection, that gets triggered by clear() in the caller? Or are the clear() calls also a part of the function body? But it practice it just doesn't matter, whatever.

No comments:

Post a Comment