Monday, November 26, 2012

Streaming functions and unit boundaries (and TQL guts)

Now let's take a look at the insides of the Tql module. I'll be skipping over the code that is less interesting, you can find the full version in the source code of perl/Triceps/lib/Triceps/X/Tql.pm as always. The constructor is one of these things to be skipped. The initialization part is more interesting:

sub initialize # ($self)
{
    my $myname = "Triceps::X::Tql::initialize";
    my $self = shift;

    return if ($self->{initialized});

    my %dispatch;
    my @labels;
    for (my $i = 0; $i <= $#{$self->{tables}}; $i++) {
        my $name = $self->{tableNames}[$i];
        my $table = $self->{tables}[$i];

        confess "$myname: found a duplicate table name '$name', all names are: "
                . join(", ", @{$self->{tableNames}})
            if (exists $dispatch{$name});

        $dispatch{$name} = $table;
        push @labels, $name, $table->getDumpLabel();
    }

    $self->{dispatch} = \%dispatch;
    $self->{fret} = Triceps::FnReturn->new(
        name => $self->{name} . ".fret",
        labels => \@labels,
    );

    $self->{initialized} = 1;
}

It creates a dispatch table of name-to-table and also an FnReturn that contains the dump labels of all the tables.

Each query will be created as its own unit. It will run, and then get cleared and disposed of, very convenient. By the way, that is the answer to the question of why would someone want to use multiple units in the same thread: for modular disposal.

But the labels in the main unit and the query unit can't be directly connected. A direct connection would create the stable references, and the disposal won't work. That's where the streaming function interface comes to the rescue: it provides a temporary connection. Build the query unit, build a binding for it, push the binding onto the FnReturn of the main unit, run the query, pop the binding, dispose of the query unit.

And the special capacity (or if you will, superpower) of the streaming functions that allows all that is that the FnReturn and FnBinding don't have to be of the same unit. They may be of the different units and will still work together fine.

The query() method then handles the creation of the unit and stuff:

sub query # ($self, $argline)
{
    my $myname = "Triceps::X::Tql::query";

    my $self = shift;
    my $argline = shift;

    confess "$myname: may be used only on an initialized object"
        unless ($self->{initialized});

    $argline =~ s/^([^,]*)(,|$)//; # skip the name of the label
    my $q = $1; # the name of the query itself
    #&Triceps::X::SimpleServer::outCurBuf("+DEBUGquery: $argline\n");
    my @cmds = split_braced($argline);
    if ($argline ne '') {
        # Presumably, the argument line should contain no line feeds, so it should be safe to send back.
        &Triceps::X::SimpleServer::outCurBuf("+ERROR,OP_INSERT,$q: mismatched braces in the trailing $argline\n");
        return
    }

    # The context for the commands to build up an execution of a query.
    # Unlike $self, the context is created afresh for every query.
    my $ctx = {};
    # The query will be built in a separate unit
    $ctx->{tables} = $self->{dispatch};
    $ctx->{fretDumps} = $self->{fret};
    $ctx->{u} = Triceps::Unit->new("${q}.unit");
    $ctx->{prev} = undef; # will contain the output of the previous command in the pipeline
    $ctx->{actions} = []; # code that will run the pipeline
    $ctx->{id} = 0; # a unique id for auto-generated objects

    # It's important to place the clearing trigger outside eval {}. Otherwise the
    # clearing will erase any errors in $@ returned from eval.
    my $cleaner = $ctx->{u}->makeClearingTrigger();
    if (! eval {
        foreach my $cmd (@cmds) {
            #&Triceps::X::SimpleServer::outCurBuf("+DEBUGcmd, $cmd\n");
            my @args = split_braced($cmd);
            my $argv0 = bunquote(shift @args);
            # The rest of @args do not get unquoted here!
            die "No such TQL command '$argv0'\n" unless exists $tqlDispatch{$argv0};
            $ctx->{id}++;
            &{$tqlDispatch{$argv0}}($ctx, @args);
            # Each command must set its result label (even if an undef) into
            # $ctx->{next}.
            die "Internal error in the command $argv0: missing result definition\n"
                unless (exists $ctx->{next});
            $ctx->{prev} = $ctx->{next};
            delete $ctx->{next};
        }
        if (defined $ctx->{prev}) {
            # implicitly print the result of the pipeline, no options
            &{$tqlDispatch{"print"}}($ctx);
        }

        # Now run the pipeline
        foreach my $code (@{$ctx->{actions}}) {
            &$code;
        }

        # Now run the pipeline
        1; # means that everything went OK
    }) {
        # XXX this won't work well with the multi-line errors
        &Triceps::X::SimpleServer::outCurBuf("+ERROR,OP_INSERT,$q: error: $@\n");
        return
    }
}

Each TQL command is defined as its own method, all of them collected in the %tqlDispatch. query() splits the pipeline and then lets each command build its part of the query, connecting them through $ctx. A command may also register an action to be run later. After everything is built, the actions run and produce the result.

The functions split_braced() and bunquote() are imported from the package Triceps::X::Braced that handles the parsing of the braced nested lists.

Another interesting part is the error reporting, done as a special label "+ERROR". It's actually one of the sticky points of why the code is not of production quality: because the errors may be multi-line, and the SimpleServer protocol really expects everything to be single-line. Properly, some quoting would have to be done.

Moving on, here is the "read" command handler:

sub _tqlRead # ($ctx, @args)
{
    my $ctx = shift;
    die "The read command may not be used in the middle of a pipeline.\n"
        if (defined($ctx->{prev}));
    my $opts = {};
    &Triceps::Opt::parse("read", $opts, {
        table => [ undef, \&Triceps::Opt::ck_mandatory ],
    }, @_);

    my $fret = $ctx->{fretDumps};
    my $tabname = bunquote($opts->{table});

    die ("Read found no such table '$tabname'\n")
        unless (exists $ctx->{tables}{$tabname});
    my $unit = $ctx->{u};
    my $table = $ctx->{tables}{$tabname};
    my $lab = $unit->makeDummyLabel($table->getRowType(), "lb" . $ctx->{id} . "read");
    $ctx->{next} = $lab;

    my $code = sub {
        Triceps::FnBinding::call(
            name => "bind" . $ctx->{id} . "read",
            unit => $unit,
            on => $fret,
            labels => [
                $tabname => $lab,
            ],
            code => sub {
                $table->dumpAll();
            },
        );
    };
    push @{$ctx->{actions}}, $code;
}

It's the only command that registers an action, which sends data into the query unit. The rest of commands just add more handlers to the pipeline in the unit, and get the data that flows from "read". The action sets up a binding and calls the table dump, to send the data into that binding.

The reading of the tables could have also been done without the bindings, and without the need to bind the units at all: just iterate through the table procedurally in the action. But this whole example has been built largely to showcase that the bindings can be used in this way, so naturally it uses bindings.

The bindings come more useful when the query logic has to react to the normal logic of the main unit, such as in the subscriptions: set up the query, read its initial state, and then keep reading as the state gets updated. But guess what, the subscriptions can't be done with the FnReturns as shown because the FnReturn only sends its data to the last binding pushed onto it. This means, if multiple subscriptions get set up, only the last one will be getting the data. There will be a separate mechanism for that.

No comments:

Post a Comment