Showing posts with label streaming_function. Show all posts
Showing posts with label streaming_function. Show all posts

Friday, April 11, 2014

callBound

I've found that I've missed documenting yet another way to call a streaming function in Perl, the method Unit::callBound().

$unit->callBound($rowop_or_tray, $fnreturn => $fnbinding, ...);
$unit->callBound([@rowops], $fnreturn => $fnbinding, ...);

It's an encapsulation of a streaming function call, a great method if you have all the rowops for the call available upfront.. The first argument is a rowop or a tray or a reference to an array of rowops (but the trays are not allowed in the array). The rest are the pairs of FnReturns and FnBindings. The bindings are pushed onto the FnReturns, then the rowops are called, then the bindings are popped. It replaces a whole block that would contain an AutoFnBind and the calls:

{
  my $ab = Triceps::AutoFnBind->new(
    $fnreturn => $fnbinding, ...
  );
  $unit->call($rowop_or_tray);
}

Only callBound() does its work in C++, so it's more efficient than a Perl block, and it's shorter to write too.

Saturday, July 13, 2013

odds and ends

While working on threads support, I've added a few small features here and there. Some of them have been already described, some will be described now. I've also done a few more small clean-ups.

First, the historic methods setName() are now gone everywhere. This means Unit and Label classes, and in C++ also the Gadget. The names can now only be specified during the object construction.

FnReturn has the new method:

$res = fret->isFaceted();

bool isFaceted() const;

It returns true (or 1 in Perl) if this FnReturn object is a part of a Facet.

Unit has gained a couple of methods:

$res = $unit->isFrameEmpty();

bool isFrameEmpty() const;

Check whether the current frame is empty. This is different from the method empty() that checks whether the the whole unit is empty. This method is useful if you run multiple units in the same thread, with some potentially complicated cross-unit scheduling. It's what nextXtray() does with a multi-unit Triead, repeatedly calling drainFrame() for all the units that are found not empty. In this situation the simple empty() can not be used because the current inner frame might not be the outer frame, and draining the inner frame can be repeated forever while the outer frame will still contain rowops. The more precise check of isFrameEmpty() prevents the possibility of such endless loops.

$res = $unit->isInOuterFrame();

bool isInOuterFrame() const;

Check whether the unit's current inner frame is the same as its outer frame, which means that the unit is not in the middle of a call.


In Perl the method Rowop::printP() has gained an optional argument for the printed label name:

$text = $rop->printP();
$text = $rop->printP($lbname);

The reason for that is to make the printing of rowops in the chained labels more convenient. A chained label's execution handler receives the original unchanged rowop that refers to the first label in the chain. So when it gets printed, it will print the name of the first label in the chain, which might be very surprising. The explicit argument allows to override it to the name of the chained label (or to any other value).


 In C++ the Autoref has gained the method swap():

void swap(Autoref &other);

It swaps the values of two references without changing the reference counts in the referred values. This is a minor optimization for such a special situation. One or both references may contain NULL.


In C++ the Table has gained the support for sticky errors. The table internals contain a few places where the errors can't just throw an Exception because it will mess up the logic big time, most specifically the comparator functions for the indexes. The Triceps built-in indexes can't encounter any errors in the comparators but the user-defined ones, such as the Perl Sorted Index, can. Previously there was no way to report these errors other than print the error message and then either continue pretending that nothing happened or abort the program.

The sticky errors provide a way out of this sticky situation. When an index comparator encounters an error, it reports it as a sticky error in the table and then returns false. The table logic then unrolls like nothing happened for a while, but before returning from the user-initiated method it will find this sticky error and throw an Exception at a safe time. Obviously, the incorrect comparison means that the table enters some messed-up state, so all the further operations on the table will keep finding this sticky error and throw an Exception right away, before doing anything. The sticky error can't be unstuck. The only way out of it is to just discard the table and move on.

void setStickyError(Erref err);

Set the sticky error from a location where an exception can not be thrown, such as from the comparators in the indexes. Only the first error sticks, all the others are ignored since (a) the table will be dead and throwing this error in exceptions from this point on anyway and (b) the comparator is likely to report the same error repeatedly and there is no point in seeing multiple copies.

Errors *getStickyError() const;

Get the table's sticky error. Normally there is no point in doing this manually, but just in case.

void checkStickyError() const;

If the sticky error has been set, throw an Exception with it.

Friday, June 7, 2013

reordering the data from multiple threads, part 3

And finally the code of the example. I've written it with the simple approach of read stdin, print to stdout. It's easier this way, and anyway to demonstrate the rowop reordering, all the input has to be sent quickly in one chunk. The application starts as:

Triceps::Triead::startHere(
    app => "ForkJoin",
    thread => "main",
    main => \&mainT,
    workers => 2,
    delay => 0.02,
);

The main thread is:

sub mainT # (@opts)
{
    my $opts = {};
    &Triceps::Opt::parse("mainT", $opts, {@Triceps::Triead::opts,
        workers => [ 1, undef ], # number of worker threads
        delay => [ 0, undef ], # artificial delay for the 0th thread
    }, @_);
    undef @_; # avoids a leak in threads module
    my $owner = $opts->{owner};
    my $app = $owner->app();
    my $unit = $owner->unit();


So far the pretty standard boilerplate with the argument parsing.

    my $rtRate = Triceps::RowType->new( # an exchange rate between two currencies
        ccy1 => "string", # currency code
        ccy2 => "string", # currency code
        rate => "float64", # multiplier when exchanging ccy1 to ccy2
    ) or confess "$!";

    # the resulting trade recommendations
    my $rtResult = Triceps::RowType->new(
        triead => "int32", # id of the thread that produced it
        ccy1 => "string", # currency code
        ccy2 => "string", # currency code
        ccy3 => "string", # currency code
        rate1 => "float64",
        rate2 => "float64",
        rate3 => "float64",
        looprate => "float64",
    ) or confess "$!";

The row types originate from the self-join example code, same as before.

    # each tray gets sequentially numbered and framed
    my $rtFrame = Triceps::RowType->new(
        seq => "int64", # sequence number
        triead => "int32", # id of the thread that produced it (optional)
    ) or confess "$!";

    # the plain-text output of the result
    my $rtPrint = Triceps::RowType->new(
        text => "string",
    ) or confess "$!";

These are the new additions. The frame row type is used to send the information about the sequence number in the _BEGIN_ label. The print row type is used to send the text for printing back to the main thread.

    # the input data
    my $faIn = $owner->makeNexus(
        name => "input",
        labels => [
            rate => $rtRate,
            _BEGIN_ => $rtFrame,
        ],
        import => "none",
    );

    # the raw result collected from the workers
    my $faRes = $owner->makeNexus(
        name => "result",
        labels => [
            result => $rtResult,
            _BEGIN_ => $rtFrame,
        ],
        import => "none",
    );

    my $faPrint = $owner->makeNexus(
        name => "print",
        labels => [
            raw => $rtPrint, # in raw order as received by collator
            cooked => $rtPrint, # after collation
        ],
        import => "reader",
    );

The processing will go in essentially a pipeline: read the input -> process in the worker threads -> collate -> print in the main thread. Only the worker thread stage spreads into multiple threads, that are then joining the data paths back together in the collator.

    Triceps::Triead::start(
        app => $app->getName(),
        thread => "reader",
        main => \&readerT,
        to => $owner->getName() . "/input",
    );

    for (my $i = 0; $i < $opts->{workers}; $i++) {
        Triceps::Triead::start(
            app => $app->getName(),
            thread => "worker$i",
            main => \&workerT,
            from => $owner->getName() . "/input",
            to => $owner->getName() . "/result",
            delay => ($i == 0? $opts->{delay} : 0),
            workers => $opts->{workers},
            identity => $i,
        );
    }

    Triceps::Triead::start(
        app => $app->getName(),
        thread => "collator",
        main => \&collatorT,
        from => $owner->getName() . "/result",
        to => $owner->getName() . "/print",
    );

    my @rawp; # the print in original order
    my @cookedp; # the print in collated order

    $faPrint->getLabel("raw")->makeChained("lbRawP", undef, sub {
        push @rawp, $_[1]->getRow()->get("text");
    });
    $faPrint->getLabel("cooked")->makeChained("lbCookedP", undef, sub {
        push @cookedp, $_[1]->getRow()->get("text");
    });

    $owner->readyReady();

    $owner->mainLoop();

    print("--- raw ---\n", join("\n", @rawp), "\n");
    print("--- cooked ---\n", join("\n", @cookedp), "\n");
}

The collator will send the data for printing twice: first time in the order it was received ("raw"), second time in the order after collation ("cooked").

sub readerT # (@opts)
{
    my $opts = {};
    &Triceps::Opt::parse("readerT", $opts, {@Triceps::Triead::opts,
        to => [ undef, \&Triceps::Opt::ck_mandatory ], # dest nexus
    }, @_);
    undef @_; # avoids a leak in threads module
    my $owner = $opts->{owner};
    my $unit = $owner->unit();

    my $faIn = $owner->importNexus(
        from => $opts->{to},
        import => "writer",
    );

    my $lbRate = $faIn->getLabel("rate");
    my $lbBegin = $faIn->getLabel("_BEGIN_");
    # _END_ is always defined, even if not defined explicitly
    my $lbEnd = $faIn->getLabel("_END_");

This demonstrates that the labels _BEGIN_ and _END_ always get defined in each nexus, even if they are not defined explicitly. Well, here _BEGIN_ was defined explicitly but _END_ was not, and nevertheless it can be found and used.

    my $seq = 0; # the sequence

    $owner->readyReady();

    while(<STDIN>) {
        chomp;

        ++$seq; # starts with 1
        $unit->makeHashCall($lbBegin, "OP_INSERT", seq => $seq);
        my @data = split(/,/); # starts with a string opcode
        $unit->makeArrayCall($lbRate, @data);
        # calling _END_ is an equivalent of flushWriter()
        $unit->makeHashCall($lbEnd, "OP_INSERT");
    }

    {
        # drain the pipeline before shutting down
        my $ad = Triceps::AutoDrain::makeShared($owner);
        $owner->app()->shutdown();
    }
}

Each input row is sent through in a separate transaction, or in another word, a separate tray. The _BEGIN_ label carries the sequence number of the tray. The trays can as well be sent on with flushWriters() or flushWriter(), but I wanted to show that you can also flush it by calling the _END_ label.

sub workerT # (@opts)
{
    my $opts = {};
    &Triceps::Opt::parse("workerT", $opts, {@Triceps::Triead::opts,
        from => [ undef, \&Triceps::Opt::ck_mandatory ], # src nexus
        to => [ undef, \&Triceps::Opt::ck_mandatory ], # dest nexus
        delay => [ 0, undef ], # processing delay
        workers => [ undef, \&Triceps::Opt::ck_mandatory ], # how many workers
        identity => [ undef, \&Triceps::Opt::ck_mandatory ], # which one is us
    }, @_);
    undef @_; # avoids a leak in threads module
    my $owner = $opts->{owner};
    my $unit = $owner->unit();
    my $delay = $opts->{delay};
    my $workers = $opts->{workers};
    my $identity = $opts->{identity};

    my $faIn = $owner->importNexus(
        from => $opts->{from},
        import => "reader",
    );

    my $faRes = $owner->importNexus(
        from => $opts->{to},
        import => "writer",
    );

    my $lbInRate = $faIn->getLabel("rate");
    my $lbResult = $faRes->getLabel("result");
    my $lbResBegin = $faRes->getLabel("_BEGIN_");
    my $lbResEnd = $faRes->getLabel("_END_");

The worker thread starts with the pretty usual boilerplate.

    my $seq; # sequence from the frame labels
    my $compute; # the computation is to be done by this label
    $faIn->getLabel("_BEGIN_")->makeChained("lbInBegin", undef, sub {
        $seq = $_[1]->getRow()->get("seq");
    });

The processing of each transaction starts by remembering its sequence number from the _BEGIN_ label. It doesn't send a _BEGIN_ to the output yet because all the threads get the input, to be able to update its table, but then only one thread produces the output. And the thread doesn't know whether it will be the one producing the output until it knows, what is the primary key in the data. So it can start sending the output only after it had seen the data. This whole scheme works because there is exactly one data row per each transaction. A more general approach might be to have the reader thread decide, which worker will produce the result and put this information (as either a copy of the primary key or the computed thread id) into the _BEGIN_ rowop.

    # the table gets updated for every incoming rate
    $lbInRate->makeChained("lbIn", undef, sub {
        my $ccy1 = $_[1]->getRow()->get("ccy1");
        # decide, whether this thread is to perform the join
        $compute = ((ord(substr($ccy1, 0, 1)) - ord('A')) % $workers == $identity);

        # this relies on every Xtray containing only one rowop,
        # otherwise one Xtray will be split into multiple
        if ($compute) {
            $unit->makeHashCall($lbResBegin, "OP_INSERT", seq => $seq, triead => $identity);
            select(undef, undef, undef, $delay) if ($delay);
        }

        # even with $compute is set, this might produce some output or not,
        # but the frame still goes out every time $compute is set, because
        # _BEGIN_ forces it
        $unit->call($lbRateInput->adopt($_[1]));
    });

There the decision is made of whether this join is to be computed for this thread, remembered in the flag $compute, and used to generate the _BEGIN_ rowop for the output. Then the table gets updated in any case ($lbRateInput is the table's input label). I've skipped over the table creation, it's the same as in the self-join example, and you can always find the full tetx of the example in xForkJoinMt.t.

    $tRate->getOutputLabel()->makeChained("lbCompute", undef, sub {
        return if (!$compute); # not this thread's problem
        ...
                $unit->call($result);
        }
    });
    ##################################################

    $owner->readyReady();

    $owner->mainLoop();
}

Here again I've skipped over the way the result is computed. The important part is that if the $compute flag is not set, the whole self-joining computation is not performed. The _END_ label is not touched, the flushing of transactions is taken care of by the mainLoop(). Note that the _BEGIN_ label is always sent if the data is designated to this thread, even if no output as such is produced. This is done because the collator needs to get an uninterrupted sequence of transactions. Otherwise it would not be able to say if some transaction has been dropped or is only delayed.

 sub collatorT # (@opts)
{
    my $opts = {};
    &Triceps::Opt::parse("collatorT", $opts, {@Triceps::Triead::opts,
        from => [ undef, \&Triceps::Opt::ck_mandatory ], # src nexus
        to => [ undef, \&Triceps::Opt::ck_mandatory ], # dest nexus
    }, @_);
    undef @_; # avoids a leak in threads module
    my $owner = $opts->{owner};
    my $unit = $owner->unit();

    my $faRes = $owner->importNexus(
        from => $opts->{from},
        import => "reader",
    );

    my $faPrint = $owner->importNexus(
        from => $opts->{to},
        import => "writer",
    );

    my $lbResult = $faRes->getLabel("result");
    my $lbResBegin = $faRes->getLabel("_BEGIN_");
    my $lbResEnd = $faRes->getLabel("_END_");

    my $lbPrintRaw = $faPrint->getLabel("raw");
    my $lbPrintCooked = $faPrint->getLabel("cooked");

    my $seq = 1; # next expected sequence
    my @trays; # trays held for reordering: $trays[0] is the slot for sequence $seq
        # (only of course that slot will be always empty but the following ones may
        # contain the trays that arrived out of order)
    my $curseq; # the sequence of the current arriving tray

The collator thread starts very much as usual. It has its expectation of the next tray in order, which gets set correctly. The trays that arrive out of order will be buffered in the array @trays. Well, more exactly, for simplicity, all the trays get buffered there and then sent on if their turn has come.But it's possible to make an optimized version that would let the data flow through immediately if it's arriving in order.

    # The processing of data after it has been "cooked", i.e. reordered.
    my $bindRes = Triceps::FnBinding->new(
        name => "bindRes",
        on => $faRes->getFnReturn(),
        unit => $unit,
        withTray => 1,
        labels => [
            "_BEGIN_" => sub {
                $unit->makeHashCall($lbPrintCooked, "OP_INSERT", text => $_[1]->printP("BEGIN"));
            },
            "result" => sub {
                $unit->makeHashCall($lbPrintCooked, "OP_INSERT", text => $_[1]->printP("result"));
            }
        ],
    );
    $faRes->getFnReturn()->push($bindRes); # will stay permanently

The data gets collected into trays through a binding that gets permanently pushed onto the facet's FnReturn. Then when the tray's order comes, it wil lbe simply called and will produce the print calls for the cooked data order.

    # manipulation of the reordering,
    # and along the way reporting of the raw sequence
    $lbResBegin->makeChained("lbBegin", undef, sub {
        $unit->makeHashCall($lbPrintRaw, "OP_INSERT", text => $_[1]->printP("BEGIN"));
        $curseq = $_[1]->getRow()->get("seq");
    });
    $lbResult->makeChained("lbResult", undef, sub {
        $unit->makeHashCall($lbPrintRaw, "OP_INSERT", text => $_[1]->printP("result"));
    });
    $lbResEnd->makeChained("lbEnd", undef, sub {
        my $tray = $bindRes->swapTray();
        if ($curseq == $seq) {
            $unit->call($tray);
            shift @trays;
            $seq++;
            while ($#trays >= 0 && defined($trays[0])) {
                # flush the trays that arrived misordered
                $unit->call(shift @trays);
                $seq++;
            }
        } elsif ($curseq > $seq) {
            $trays[$curseq-$seq] = $tray; # remember for the future
        } else {
            # should never happen but just in case
            $unit->call($tray);
        }
    });

    $owner->readyReady();

    $owner->mainLoop();
};

The input rowops get not only collected in the binding's tray but also chained directly to the labels that print the raw order of arrival. The handling of _BEGIN_ also remembers its sequence number.

The handler of _END_ (which rowops get produced implicitly at the end of transaction) then does the heavy lifting. It looks at the sequence number remembered from _BEGIN_ and makes the decision. If the received sequence is the next expected one, the data collected in the tray gets sent on immediately, and then the contents of the @trays array gets sent on until it hits a blank spot of missing data. Or if the received sequence leaves a gap, the tray is placed into an appropriate spot in @trays for later processing.

This whole logic can be encapsulated in a class but I haven't decided yet on the best way to do it. Maybe somewhere in the future.

Thursday, June 6, 2013

reordering the data from multiple threads, part 2

Before going to the example itself, let's talk more about the general issues of the data partitioning.

If the data processing is stateless, it's easy: just partition by the primary key, and each thread can happily work on its own. If the data depends on the previous data with the same primary key, the partitioning is still easy: each thread keeps the state for its part of the keys and updates it with the new data.

But what if you need to join two tables with independent primary keys, where the matches of the keys between them are fairly arbitrary? Then you can partition by one table but you have to give a copy of the second table to each thread. Typically, if one table is larger than the other, you would partition by the key of the big table, and copy the small table everywhere. Since the data rows are referenced in Triceps, the actual data of the smaller table won't be copied, it would be just referenced from multiple threads, but each copy will still have the overhead of its own index.

With some luck, and having enough CPUs, you might be able to save a little overhead by doing a matrix: if you have one table partitioned into the parts A, B and C, and the other table into parts 1, 2 and 3, you can then do a matrix-combination into 9 threads processing the combinations A1, A2, A3, B1, B2, B3, C1, C2, C3. If both tables are of about the same size, this would create a total of 18 indexes, each keeping 1/3 of one original table, so the total size of indexes will b 6 times the size of one original table (or 3 times the combined sizes of both tables). On the other hand, if you were to copy the first table to each thread and split the second table into 9 parts, creating the same 9 threads, the total size of indexes will be 9 times the first table and 1 times the second table, resulting in the 10 times the size of an original table (or 5 times the combined sizes of both tables). There is a win to this, but the catch is that the results from this kind of 3x3 matrix will really have to be restored to the correct order afterwards.

The reason is that when a row in the first table changes, it might make it join, say, in the thread A2 instead of the thread A1. So the thread A1 would generate a DELETE for the join result, and the thread A2 would generate a following INSERT. With two separate threads, the resulting order will be unpredictable, and the INSERT coming before the DELETE would be bad. The post-reordering is really in order.

By contrast, if you just partition the first table and copy the second everywhere, you get 9 threads A, B, C, D E, F, G, H, I, and the change in a row will still keep it in the same thread, so the updates will come out of that thread strictly sequentially. If you don't care about the order changes between different primary keys, you can get away without the post-reordering. Of course, if a key field might change and you care about it being processed in order, you'd still need the post-reordering.

The example I'm going to show is a somewhat of s strange mix. It's the adaptation of the Forex arbitration example from the section 12.13. Self-join done manually. As you can see from the name of the section, it's doing a self-join, kind of like going through the same table 3 times.

The partitioning in this example works as follows:
  • All the data is sent to all the threads. All the threads keep a full copy of the table and update it according to the input. 
  • But then they compute the join only if the first currency name in the update falls into the thread's partition. 
  • The partitioning is done by the first letter of the symbol, with interleaving: the symbols starting with A are handled by the thread 0, with B by thread 1, and so on until the threads end, and then continuing again with the thread 0. A production-ready implementation would use a hash function instead. But the interleaving approach makes much easier to predict, which symbol goes to which thread for the example.
  • Naturally, all this means that the loop of 3 currencies might get created by a change in one pair and then very quickly disappear by a change to another pair.of currencies. So the post-reordering of the result is important to keep the things consistent.
I've also added a tweak allowing to artificially slow down the thread 0, making the incorrect order to show up reliably, and make the reordering code really work. For example, suppose the following input sent quickly:

OP_INSERT,AAA,BBB,1.30
OP_INSERT,BBB,AAA,0.74
OP_INSERT,AAA,CCC,1.98
OP_INSERT,CCC,AAA,0.49
OP_INSERT,BBB,CCC,1.28
OP_INSERT,CCC,BBB,0.78
OP_DELETE,BBB,AAA,0.74
OP_INSERT,BBB,AAA,0.64

With two threads, and thread 0 working slowly, it would produce the raw result:

BEGIN OP_INSERT seq="2" triead="1"
BEGIN OP_INSERT seq="5" triead="1"
BEGIN OP_INSERT seq="7" triead="1"
result OP_DELETE ccy1="AAA" ccy2="CCC" ccy3="BBB" rate1="1.98" rate2="0.78" rate3="0.74" looprate="1.142856"
BEGIN OP_INSERT seq="8" triead="1"
BEGIN OP_INSERT seq="1" triead="0"
BEGIN OP_INSERT seq="3" triead="0"
BEGIN OP_INSERT seq="4" triead="0"
BEGIN OP_INSERT seq="6" triead="0"
result OP_INSERT ccy1="AAA" ccy2="CCC" ccy3="BBB" rate1="1.98" rate2="0.78" rate3="0.74" looprate="1.142856"

Here the BEGIN lines are generated by the code and show the sequence number of the input row and the id of the thread that did the join. The result lines show the arbitration opportunities produced by the join. Obviously, not every update produces a new result, most of them don't. But the INSERT and DELETE in the result come in the wrong order: the update 7 had overtaken the update 6.

The post-reordering comes to the resque and restores the order:

BEGIN OP_INSERT seq="1" triead="0"
BEGIN OP_INSERT seq="2" triead="1"
BEGIN OP_INSERT seq="3" triead="0"
BEGIN OP_INSERT seq="4" triead="0"
BEGIN OP_INSERT seq="5" triead="1"
BEGIN OP_INSERT seq="6" triead="0"
result OP_INSERT ccy1="AAA" ccy2="CCC" ccy3="BBB" rate1="1.98" rate2="0.78" rate3="0.74" looprate="1.142856"
BEGIN OP_INSERT seq="7" triead="1"
result OP_DELETE ccy1="AAA" ccy2="CCC" ccy3="BBB" rate1="1.98" rate2="0.78" rate3="0.74" looprate="1.142856"
BEGIN OP_INSERT seq="8" triead="1"

As you can see, now the sequence numbers go in the sequential order.

reordering the data from multiple threads, part 1

The pipelined examples shown before had very conveniently preserved the order of the data while spreading the computational load among multiple threads. But it forced the computation to pass sequentially through every thread, increasing the latency and adding overhead. The other frequently used option is to farm out the work to a number of parallel threads and then collect the results back from them, as shown in the Fig. 1. This topology is colloquially known as "diamond" or "fork-join" (having nothing to do with the SQL joins, just that the arrows first fork from one box to multiple and then join back to one box).

Fig. 1. Diamond topology.

There are multiple way to decide, which thread gets which unit of data to process. One possibility that provides the natural load balancing is to keep a common queue of work items, and have the worker threads (B and C in Fig. 1) read the next work item when they become free after processing the last item. But this way has an important limitation: there is no way to tell in advance, which work item will go to which thread, so there is no way for the worker threads to keep any state. All the state would have to be encapsulated into the work item (that would be a tray in the Triceps terms). And at the moment Triceps provides no way to maintain such a shared queue.

The other way is to partition the data between the worker threads based on the primary key. Usually it's done by using a hash of the key, which would distribute the data randomly and hopefully evenly. Then a worker thread can keep the state for its subset of keys, forming a partition (also known as "shard") and process the sequential updates for this partition. The way to send a rowop only to the thread B or only to the thread C would be by having a separate designated nexus for each worker thread, and the thread A making the decision based on the hash of the primary key in the data. The processed data from all the worker threads can then be sent to a single nexus feeding the thread D.

But either way the data will arrive at D in an unpredictable order. The balance of load between the worker threads is not perfect, and there is no way to predict, how long will each tray take to process. A tray following one path may overtake a tray following another path.

If the order of the data is important, D must collate the data back into the original order before processing it further. Obviously, for that it must know what the original order was, so A must tag the data with the sequential identifiers. Since the data rows themselves may get multiplied, the tagging is best done at the tray level (what happens if the trays split in the worker threads is a separate story for the future, for now the solution is simply "keep the tray together").

When the data goes through a nexus, Triceps always keeps a tray as in indivisible bundle. It always gets read from the reading facet together, without being interspersed with the data from the other trays. As the data is sent into a writer facet, it's collected in a tray, and sent on only when the facet gets flushed, with the TrieadOwner::flushWriters() or Facet::flushWriter().

There also are two special labels defined on every nexus, that tell the tray boundaries to the reading thread:

  • _BEGIN_ is called at the start of the tray.
  • _END_ is called at the end of the tray.

These labels can be defined explicitly, or otherwise they become defined implicitly anyway. If they get defined implicitly, they get the empty row type (one with no fields). If you define them explicitly, you can use any row type you please, and this is a good place for the tray sequence id.

And in a writer facet you can send data to these labels. When you want to put a sequence id on a tray, you define the _BEGIN_ label with that sequence id field, and then call the _BEGIN_ label with the appropriate id values. Even if you don't call the _BEGIN_ and _END_ labels, they do get called (not quite called but placed on the tray) automatically anyway, with the opcode of OP_INSERT and all the fields NULL. The direct calling of these labels will also cause the facet to be flushed: _BEGIN_ flushes any data previously collected on the tray and then adds itself; _END_ adds itself and then flushes the tray.

The exact rules of how the _BEGIN_ and _END_ get called are actually somewhat complicated, having to deal with the optimizations for the case when nobody is interested in them, but they do what makes sense intuitively.

The case when these labels get a call with OP_INSERT and no data gets optimized out by not placing it into the actual Xtray, even if it was called explicitly. Then in the reader facet these implicit rowops are re-created but only if there is a chaining for their labels from the facet's FnReturn or if they are defined in the currently pushed FnBinding. So if you do a trace on the reading thread's unit, you will see these implicit rowops to be called only if they are actually used.

Tuesday, May 7, 2013

Label chaining at the front, and label method confessions.

It has been bothering me, how the threaded pipeline example was sensitive to the order of chaining the output facet and the internal logic to the input facet. To get the input data pass through first and only then have the processed data come out, the output facet had to be connected (and thus defined) first, and only then the internal logic could be connected to the input facet.  Things would be much easier if the output facet could be connected at the end, but still put at the front of the chain of the input facet's label. And the FnReturn in general suffers from this problem as well.

So I've added this feature: a way to chain a label, placing it at the front of the chain.

Along the way I've also changed all the Label methods to use the new way of error reporting, confessing on errors. No more need to add "or confess" after that manually (and I've been forgetting to do that properly all over the place).

The new method is:

$label->chainFront($otherLabel);

In C++ this is done slightly differently, by adding an extra argument to chain:

err = label->chain(otherLabel, true);

The second argument has the default value of false, so the method is still backwards-compatible, and you can call either way

err = label->chain(otherLabel);
err = label->chain(otherLabel, false);


to chain a label normally, at the end of the chain. The return value in C++ is still the Erref (though hm, maybe it could use an exception as well).

Having done this, I went and changed the TrieadOwner::makeNexus() and FnReturn::new to chain their labels at the front by default. This can be switched to the old behavior by using a new option:

  chainFront => 0

The default value of this option is 1.

In C++ this is expressed also by an extra argument to FnReturn::addFrontLabel(), that also has a default value, and the default value is true, matching the Perl code. Now when you call

ret = initialize(FnReturn::make(unit, name)
    ->addLabel("lb1", rt1)
    ->addFromLabel("lb2", lbX)
);

or

ret = initialize(FnReturn::make(unit, name)
    ->addLabel("lb1", rt1)
    ->addFromLabel("lb2", lbX, true)
);


you add the FnReturn's label to the front of the lbX's chain. To get the old behavior, use:

ret = initialize(FnReturn::make(unit, name)
    ->addLabel("lb1", rt1)
    ->addFromLabel("lb2", lbX, false)
);


I've changed the default behavior because there would not be many uses for the old one.

I haven't described yet, how the nexuses are created in C++, but they are created from an FnReturn, and thus this change to FnReturn covers them both.

Thursday, January 24, 2013

FnBinding in C++

FnBinding is defined in sched/FnBinding.h, and substantially matches the Perl version. It inherits from Starget, and can be used in only one thread.

Like many other classes, it has the constructor and the static make() function:

FnBinding(const string &name, FnReturn *fn);
static FnBinding *make(const string &name, FnReturn *fn);

The binding is constructed on a specific FnReturn and gets (references) the RowSetType from it. The FnReturn must be initialized before it can be used to create the bindings. It can be used with any matching FnReturn, not just the one it was constructed with.

It's generally constructed in a chain fashion:

Autoref<FnBinding> bind = FnBinding::make(fn)
    ->addLabel("lb1", lb1, true)
    ->addLabel("lb2", lb2, false);

Each method in the chain returns the same FnBinding object. The method addLabel() adds one concrete label that gets connected to the FnReturn's label by name. The other chainable method is withTray() which switches the mode of collecting the resulting rowops in a tray rather than calling them immediately.

The errors encountered during the chained construction are remembered and can be read later with the method:

Erref getErrors() const;

You must check the bindings for errors before using it. A binding with errors may not be used.

Or you can use the checkOrThrow() wrapper from common/Initialize.h to automatically convert any detected errors to an Exception:

Autoref<FnBinding> bind = checkOrThrow(FnBinding::make(fn)
    ->addLabel("lb1", lb1, true)
    ->addLabel("lb2", lb2, false)
    ->withTray(true)
);

FnBinding *addLabel(const string &name, Autoref<Label> lb, bool autoclear);

Adds a label to the binding. The name must match a name from the FnReturn, and there may be only one label bound to a name (some names from the return may be left unbound). The label must have a type matching the named FnReturn's label. The autoclear flag enables the automatic clearing of the label (and also forgetting it in the Unit) when the binding gets destroyed. This allows to create and destroy the bindings dynamically as needed. So, basically, if you've created a label just for the binding, use autoclear==true. If you do a binding to a label that exists in the model by itself and can be used without the binding, use autoclear==false.

In principle, nothing stops you from adding more labels later (though you can't remove nor replace the labels that are already added). Just make sure that their types match the expected ones.

Not all the available names have to get the labels added. Some (or all) may be left without labels. Any rowops coming to the undefined ones will be simply ignored.

The labels in the FnBinding may belong to a different Unit than the FnReturn. This allows to use the FnReturn/FnBinding coupling to connect the units.

FnBinding *withTray(bool on);

Changes the tray collection mode, the true argument enables it, false disables. Can be done at any time, not just at construction. Disabling the tray mode discards the current tray. If the tray mode is enabled, whenever the binding is pushed onto a return and the rowops come into it, the labels in this binding won't be called immediately but they would adopt the incoming rowops, and the result will be queued into a tray, to be executed later.

Onceref<Tray> swapTray();

Used with the tray collection mode, normally after some rowops have been collected in the tray. Returns the current tray and replaces it in the binding with a new clean tray. You can call the returned tray afterwards. If the tray mode is not enabled, will return NULL, and won't create a new tray.

Tray *getTray() const;

Get the current tray. You can use and modify the tray contents in any usual way.  If the tray mode is not enabled, will return NULL.

void callTray();

A convenience combination method that swaps the tray and calls it. This method is smart about the labels belonging to different units. Each rowop in the tray is called with its proper unit, that is found from the rowop's label. Mixing the labels of multiple units in one binding is probably still not such a great idea, but it works anyway.

const string &getName() const;

Get back the binding's name.

RowSetType *getType() const;

Get the type of the binding. It will be the same row set type object as created in the FnReturn that was used to construct this FnBinding.

int size() const;

Get the number of labels in the row set type (of all available labels, not just the ones that have been added).

const RowSetType::NameVec &getLabelNames() const;
const RowSetType::RowTypeVec &getRowTypes() const;
const string *getLabelName(int idx) const;
RowType *getRowType(const string &name) const;
RowType *getRowType(int idx) const;

The convenience wrappers that translate to the same methods in the RowSetType.

Label *getLabel(const string &name) const;
int findLabel(const string &name) const;
Label *getLabel(int idx) const;

Methods similar to FnReturn that allow to translate the names to indexes and get the labels by name or index. The same return values, the index -1 is returned for an unknown name, and a NULL label pointer is returned for an unknown name, incorrect index and a undefined label at a correct name or index.

typedef vector<Autoref<Label> > LabelVec;
const LabelVec &getLabels() const;

Return all the labels as a vector. This is an internal vector of the class, so only a const reference is returned. The elements for undefined labels will contain NULLs.

typedef vector<bool> BoolVec;
const BoolVec &getAutoclear() const;

Return the vector of the autoclear flags for the labels.

bool isAutoclear(const string &name) const;

Get the autoclear flag for a label by name. If the name is unknown, will quietly return false.

bool equals(const FnReturn *t) const;
bool match(const FnReturn *t) const;
bool equals(const FnBinding *t) const;
bool match(const FnBinding *t) const;

Similarly to the FnReturn, the convenience methods that compare the types between the FnReturns and FnBindings. They really translate to the same methods on the types of the returns or bindings.

Tuesday, January 1, 2013

Streaming function helper classes

A couple more of helper classes are defined in sched/FnReturn.h.

ScopeFnBind does a scoped pushing and popping of a binding on an FnReturn. Its only method is the constructor:

ScopeFnBind(Onceref<FnReturn> ret, Onceref<FnBinding> binding);

It's used as:

{
    ScopeFnBind autobind(ret, binding);
    ...
}

It will pop the binding at the end of the block. An unpleasant feature is that if the return stack get messed up, it will throw an Exception from a destructor, which is a big no-no in C++. However since normally in the C++ code the Triceps Exception is essentially an abort, this works good enough. If you make the Exception catchable, such as when calling the C++ code from an interpreter, you better make very sure that the stack can not get corrupted, or do not use ScopeFnBind.

AutoFnBind is a further extension of the scoped binding. It does three additional things: It allows to push multiple bindings on multiple returns as a group, popping them all on destruction. It's a reference-counted Starget object, which allows the scope to be more than one block. It also has a more controllable way of dealing with the exceptions. This last two properties allow to use it from the Perl code, making the scope of a Perl block, not C++ block, and to pass the exceptions properly back to Perl.

AutoFnBind();
AutoFnBind *make();

The constructor just creates an empty object which then gets filled with bindings.

AutoFnBind *add(Onceref<FnReturn> ret, Autoref<FnBinding> binding);

Add a binding, in a chainable fashion. The simple-minded of using the AutoFnBind is:

{
    Autoref<AutoFnBind> bind = AutoFnBind::make()
        ->add(ret1, binding1)
        ->add(ret2, binding2);
    ...
}

However if any of these add()s throw an Exception, this will leave an orphaned AutoFnBind object, since the throwing would happen before it has a chance to do the reference-counting. So the safer way to use it is:

{
    Autoref<AutoFnBind> bind = new AutoFnBind;
    bind
        ->add(ret1, binding1)
        ->add(ret2, binding2);
    ...
}

Then the AutoFnBind will be reference-counted first, and if an add() throws later, this will cause a controlled destruction of the Autoref and of AutoFnBind.

But it's not the end of the story yet. The throws on destruction are still a possibility. To catch them, use an explicit clearing:

void clear();

Pops all the bindings. If any Exceptions get thrown, they can get caught nicely. It tries to be real smart, going through all the bindings in the backwards order and popping each one of them. If a pop() throws an exception, its information will be collected but clear() will then continue going through the whole list. At the end of the run it will make sure that it doesn't have any references to anything any more, and then will re-throw any collected errors as a single Exception. This cleans up the things as much as possible and as much as can be handled, but the end result will still not be particularly clean: the returns that got their stacks corrupted will still have their stacks corrupted, and some very serious application-level cleaning will be needed to continue. Probably a better choice would be to destroy everything and restart from scratch. But at least it allows to get safely to this point of restarting from scratch.

So, the full correct sequence will be:

{
    Autoref<AutoFnBind> bind = new AutoFnBind;
    bind
        ->add(ret1, binding1)
        ->add(ret2, binding2);
    ...
    bind->clear() ;
}


Or if any code in "..." can throw anything, then something like (not tested, so use with caution):

{
    Autoref<AutoFnBind> bind = new AutoFnBind;
    bind
        ->add(ret1, binding1)
        ->add(ret2, binding2);
    try {
    ...
    } catch (Triceps::Exception e) {
        try {
            bind->clear() ;
        } catch (Triceps::Exception ee) {
            e->getErrors()->append("Unbinding errors triggered by the last error:", ee->getErrors());
        }
        throw;
    } catch (exception e) {
        bind->clear() ;
        throw;


    }
}


It tries to be nice if the exception thrown from "..." was a Triceps one, and add nicely any errors from the binding clearing to it.

Finally, a little about how the Perl AutoFnBind translates to the C++ AutoFnBind:

The Perl constructor creates the C++-level object and adds the bindings to it. If any of them throw, it destroys everything nicely and translates the Exception to Perl. Otherwise it saves a reference to the AutoFnBind in a wrapper object that gets returned to Perl.

The Perl destructor then first clears the AutoFnBind and catches if there is any Exception. However there is just no way to return a Perl exception from a Perl destructor, so it juts prints the error on stderr and calls exit(1). If no exception was thrown, the AutoFnBind gets destroyed nicely by removing the last reference.

For the nicer handling, there is a Perl-level method clear() that does the clearing and translates the exception to Perl.

FnReturn in C++

FnReturn, defined in sched/FnReturn.h, is generally constructed similarly to the RowSetType:

ret = initializeOrThrow(FnReturn::make(unit, name)
    ->addLabel("lb1", rt1)
    ->addFromLabel("lb2", lbX)
);

Or of course piece-meal. As it gets built, it actually builds a RowSetType inseide itself.

FnReturn(Unit *unit, const string &name);
static FnReturn *make(Unit *unit, const string &name);

The constructor and convenience wrapper. The unit will be remembered only as a pointer, not reference, to avoid the reference loops. However this pointer will be used to construct the internal labels. So until the FnReturn is fully initialized, you better make sure that the Unit object has a reference and doesn't get freed. FnReturn is an Starget, and must be used in only one thread.

 const string &getName() const;
 Unit *getUnitPtr() const;
 const string &getUnitName() const;

Get back the information from the constructor. Just like for the label, it reminds you that the Unit is only available as a pointer, not reference, here.  The FnReturn also have a concept of clearing: it has the special labels inside, and once any of these labels gets cleared, the FnReturn is also cleared by setting the unit pointer to NULL and forgetting the FnContext (more on that one later). So after the FnReturn is cleared, getUnitPtr() will return NULL. And again similar to the Label, there is a convenience function to get the unit name for informational printouts. When FnReturn is cleared, it returns the same "[fn return cleared]".

FnReturn *addFromLabel(const string &lname, Autoref<Label>from);

Add a label to the return by chaining it off another label. lname is the name within the return. The full name of the label will be return_name.label_name. The label names within a return must be unique and not empty, or it will be returned as an initialization error. The label type will be copied (actually, referenced) from the from label, and the new label will be automatically chained off it. The labels can be added only until the return is initialized, or it will throw an Exception.

FnReturn *addLabel(const string &lname, const_Autoref<RowType>rtype);

Add a new independen label to the return. Works very similar to addFromLabel, only uses the explicit row type and doesn't chain to anything. The label can be later found with getLabel() and either chained off something or used to send the rows to it explicitly. The labels can be added only until the return is initialized.

class FnContext: public Starget
{
public:
    virtual ~FnContext();
    virtual void onPush(const FnReturn *fret) = 0;
     virtual void onPop(const FnReturn *fret) = 0;
};
FnReturn *setContext(Onceref<FnContext> ctx);

Set the context with handlers for the pushing and popping of the bindings in the FnReturn. Triceps generally tries to follow the C++ tradition of using the virtual methods for the callbacks, with the user then subclassing the base class and replacing the callback methods. However subclassing FnReturn is extremely inconvenient, because it gets connected to the other objects in a quite complicated way. So the solution is to make a separate context class for the callbacks, and then connect it. By the way, FnContext is not a subclass of FnReturn but a separate top-level class. I.e. NOT Triceps::FnReturn::FnContext but Triceps::FnContext. The callbacks will be called just before the binding is pushed or popped, but after the check for the correctness of the push or pop. They can be used to adjust the state of the streaming function by pushing or popping its stack of local variables, like was shown in the Perl examples.The context can be set only until the return is initialized.

template<class C>  C *contextIn() const;

Get back the context. Since the context will be a subclass of FnContext, this also handles the correct type casting. Use it like:

Autoref<MyFnCtx> ctx = fret1->contextIn<MyFnCtx>();

The type is converted using the static_cast, and you need to know the correct type in advance, or your program will break in some horrible ways. If the context has not been set, it will return a NULL.

void initialize();

Initialize the FnReturn. Very similar to the Type classes, it will collect the errors in an Errors object that has to be checked afterwards, and an FnReturn with errors must not be used. The initialization can be called repeatedly with no ill effects. After initialization the structure of the return (labels and context) can not be changed any more.

Erref getErrors() const;

Get the errors detected. Normally called after initialization but can also be called at any stage, as the errors are collected all the way through the object construction.

bool isInitialized() const;

Check whether the return is initialized.

RowSetType *getType() const;

Get the type of the return, which gets built internally by the return. The names of the row types in the set will be the same as the names of labels in the return, and their order will also be the same. This call can be made only after initialization, or it will throw an Exception.

int size() const;

Get the number of labels in the return. Can be called at any time.

const RowSetType::NameVec &getLabelNames() const;
const RowSetType::RowTypeVec &getRowTypes() const;
const string *getLabelName(int idx) const;
RowType *getRowType(const string &name) const;
RowType *getRowType(int idx) const;

Get the piecemeal information about the label names and types. These are really the convenience wrappers around the RowSetType. Note that they return pointers to be able to return NULL on the argument that is out of range. A somewhat special feature is that even though the row set type can be read only after initialization (after it becomes frozen and can not be messed with any more), these wrappers work at any time, even when the return is being built.

bool equals(const FnReturn *t) const;
bool match(const FnReturn *t) const;
bool equals(const FnBinding *t) const;
bool match(const FnBinding *t) const;

Convenience wrappers that compare the equality or match of the underlying row set types.

Label *getLabel(const string &name) const;
int findLabel(const string &name) const;
Label *getLabel(int idx) const;

Get the label by name or index, or the index of the label by name. Return a NULL pointer or -1 index on an invalid argument.

typedef vector<Autoref<RetLabel> > ReturnVec;
const ReturnVec &getLabels() const;

Get the whole set of labels. FnReturn::RetLabel is a special private label type with undisclosed internals. You need to treat these labels as being a plain Label.

void push(Onceref<FnBinding> bind);

Push a binding on the return stack. The return must be initialized, and the binding must be of a matching type, or an Exception will be thrown. The reference to the binding will be kept until it's popped.

void pushUnchecked(Onceref<FnBinding> bind);

Similar to push(), only the type of the binding is not checked. This is an optimization for the automatically generated code that does all the type checks up front at the generation time. The manually written code probably should not be using it.

void pop(Onceref<FnBinding> bind);

Pop a binding from the return stack. The binding argument specifies, which binding is expected to be popped. It's not strictly necessary but allows to catch any mess-ups with the return stack early. If the stack is empty or the top binding is not the same as the argument, throws an Exception.

void pop();

The unchecked version. It still checks and throws if the stack is empty. This method may come handy occasionally, but in general the checked version should be preferred. Pretty much the only reason to use it would be if you try to restore after a major error and want to pop everything from all your FnReturns untill their stacks become empty. But there is much trouble with this kind of restoration.

int bindingStackSize() const;

Get the size of the return stack (AKA the stack of bindings). Useful for debugging.

typedef vector<Autoref<FnBinding> > BindingVec;
const BindingVec &bindingStack() const;

Get the current return stack. Useful for debugging.

Sunday, December 30, 2012

RowSetType

RowSetType, defined in types/RowSetType.h, is another item that is not visible in Perl. Maybe it will be in the future but at the moment things look good enough without it. It has been added for 1.1.0 and expresses the type ("return type" if you want to be precise) of a streaming function (FnReturn and FnBinding classes). Naturally, it's a sequence of the row types, and despite the word "set", the order matters.

A RowSetType is one of these objects that gets assembled from many parts and then initialized, like this:

Autoref<RowSetType> rst = initializeOr  Throw(RowSetType::make()
    ->addRow("name1", rt1)
    ->addRow("name2", rt2)
);

The function, or actually template,  initializeOrThrow() itself is also a new addition, that I'll describe in detail later.

Of course, nothing stops you from adding the row types one by one, in a loop or in some other way, and then calling initialize() manually. And yes, of course you can keep a reference to a row set type as soon as it has been constructed, not waiting for initialization. You could do instead:

Autoref<RowSetType> rst = new RowSetType();
rst->addRow("name1", rt1);
rst->addRow("name2", rt2);
rst->initialize();
if (rst->getErrors()->hasError()) {
  ...
}

You could use the initializeOrThrow() template here as well, just I also wanted to show the way for the manual handling of the errors. And you can use the new or make() interchangeably too.


All that the initialization does is fixate the row set, forbid the addition of the further row types to it. Which kind of makes sense at the moment but I'm not so sure about the future, in the future the dynamically expandable row sets might come useful. We'll see when we get there.
RowSetType();
static RowSetType *make();

Construct a row set type. The method make() is just a wrapper around the constructor that is more convenient to use with the following ->addRow(), because of the way the operator priorities work in C++. Like any other type, RowSetType is unnamed by itself, and takes no constructor arguments. Like any other type, RowSetType is an Mtarget and can be shared between multiple threads after it has been initialized.

RowSetType *addRow(const string &rname, const_Autoref<RowType>rtype);

Add a row type to the set. All the row types are named, and all the names must be unique within the set. The order of the addition matters too. See the further explanation of why it does in the description of the FnReturn. If this method detects an error (such as duplicate names), it will append the error to the internal Errors object, that can be read later by getErrors(). A type with errors must not be used.

The row types may not be added after the row set type has been initialized.

void initialize();

Initialize the type. Any detected errors can be read afterwards with getErrors(). The repeated calls of initialize() are ignored.

bool isInitialized() const;

Check whether the type has been initialized.

typedef vector<string> NameVec;
const NameVec &getRowNames() const;
typedef vector<Autoref<RowType> > RowTypeVec;
const RowTypeVec &getRowTypes() const;

Read back the contents of the type. The elements will go in the order they were added.

int size() const;

Read the number of row types in the set.

int findName(const string &name) const;

Translate the row type name to index (i.e. the order in which it was added, starting from 0).Returns -1 on an invalid name.

RowType *getRowType(const string &name) const;

Find the type by name. Returns NULL on an invalid name.

const string *getRowTypeName(int idx) const;
RowType *getRowType(int idx) const;

Read the data by index. These methods check that the index is in the valid range, and otherwise return NULL.

The usual methods inherited from Type also work: getErrors(), equals(), match(), printTo().

The row set types are considered equal if they contain the equal row types with equal names going in the same order. They are considered matching if they contain matching row types going in the same order, with any names. If the match condition seems surprising to you, think of it as "nothing will break if one type is substituted for another at execution time".

void addError(const string &msg);
Erref appendErrors();

The ways to add extra errors to the type's errors. It's for convenience of the users of this type, the thinking being that since we already have one Errors object, can as well use it for everything, and also keep all the errors reported in the order of the fields, rather than first all the errors from the type then all the errors from its user. The FnReturn and FnBinding use it.

Monday, November 26, 2012

Streaming functions and unit boundaries (and TQL guts)

Now let's take a look at the insides of the Tql module. I'll be skipping over the code that is less interesting, you can find the full version in the source code of perl/Triceps/lib/Triceps/X/Tql.pm as always. The constructor is one of these things to be skipped. The initialization part is more interesting:

sub initialize # ($self)
{
    my $myname = "Triceps::X::Tql::initialize";
    my $self = shift;

    return if ($self->{initialized});

    my %dispatch;
    my @labels;
    for (my $i = 0; $i <= $#{$self->{tables}}; $i++) {
        my $name = $self->{tableNames}[$i];
        my $table = $self->{tables}[$i];

        confess "$myname: found a duplicate table name '$name', all names are: "
                . join(", ", @{$self->{tableNames}})
            if (exists $dispatch{$name});

        $dispatch{$name} = $table;
        push @labels, $name, $table->getDumpLabel();
    }

    $self->{dispatch} = \%dispatch;
    $self->{fret} = Triceps::FnReturn->new(
        name => $self->{name} . ".fret",
        labels => \@labels,
    );

    $self->{initialized} = 1;
}

It creates a dispatch table of name-to-table and also an FnReturn that contains the dump labels of all the tables.

Each query will be created as its own unit. It will run, and then get cleared and disposed of, very convenient. By the way, that is the answer to the question of why would someone want to use multiple units in the same thread: for modular disposal.

But the labels in the main unit and the query unit can't be directly connected. A direct connection would create the stable references, and the disposal won't work. That's where the streaming function interface comes to the rescue: it provides a temporary connection. Build the query unit, build a binding for it, push the binding onto the FnReturn of the main unit, run the query, pop the binding, dispose of the query unit.

And the special capacity (or if you will, superpower) of the streaming functions that allows all that is that the FnReturn and FnBinding don't have to be of the same unit. They may be of the different units and will still work together fine.

The query() method then handles the creation of the unit and stuff:

sub query # ($self, $argline)
{
    my $myname = "Triceps::X::Tql::query";

    my $self = shift;
    my $argline = shift;

    confess "$myname: may be used only on an initialized object"
        unless ($self->{initialized});

    $argline =~ s/^([^,]*)(,|$)//; # skip the name of the label
    my $q = $1; # the name of the query itself
    #&Triceps::X::SimpleServer::outCurBuf("+DEBUGquery: $argline\n");
    my @cmds = split_braced($argline);
    if ($argline ne '') {
        # Presumably, the argument line should contain no line feeds, so it should be safe to send back.
        &Triceps::X::SimpleServer::outCurBuf("+ERROR,OP_INSERT,$q: mismatched braces in the trailing $argline\n");
        return
    }

    # The context for the commands to build up an execution of a query.
    # Unlike $self, the context is created afresh for every query.
    my $ctx = {};
    # The query will be built in a separate unit
    $ctx->{tables} = $self->{dispatch};
    $ctx->{fretDumps} = $self->{fret};
    $ctx->{u} = Triceps::Unit->new("${q}.unit");
    $ctx->{prev} = undef; # will contain the output of the previous command in the pipeline
    $ctx->{actions} = []; # code that will run the pipeline
    $ctx->{id} = 0; # a unique id for auto-generated objects

    # It's important to place the clearing trigger outside eval {}. Otherwise the
    # clearing will erase any errors in $@ returned from eval.
    my $cleaner = $ctx->{u}->makeClearingTrigger();
    if (! eval {
        foreach my $cmd (@cmds) {
            #&Triceps::X::SimpleServer::outCurBuf("+DEBUGcmd, $cmd\n");
            my @args = split_braced($cmd);
            my $argv0 = bunquote(shift @args);
            # The rest of @args do not get unquoted here!
            die "No such TQL command '$argv0'\n" unless exists $tqlDispatch{$argv0};
            $ctx->{id}++;
            &{$tqlDispatch{$argv0}}($ctx, @args);
            # Each command must set its result label (even if an undef) into
            # $ctx->{next}.
            die "Internal error in the command $argv0: missing result definition\n"
                unless (exists $ctx->{next});
            $ctx->{prev} = $ctx->{next};
            delete $ctx->{next};
        }
        if (defined $ctx->{prev}) {
            # implicitly print the result of the pipeline, no options
            &{$tqlDispatch{"print"}}($ctx);
        }

        # Now run the pipeline
        foreach my $code (@{$ctx->{actions}}) {
            &$code;
        }

        # Now run the pipeline
        1; # means that everything went OK
    }) {
        # XXX this won't work well with the multi-line errors
        &Triceps::X::SimpleServer::outCurBuf("+ERROR,OP_INSERT,$q: error: $@\n");
        return
    }
}

Each TQL command is defined as its own method, all of them collected in the %tqlDispatch. query() splits the pipeline and then lets each command build its part of the query, connecting them through $ctx. A command may also register an action to be run later. After everything is built, the actions run and produce the result.

The functions split_braced() and bunquote() are imported from the package Triceps::X::Braced that handles the parsing of the braced nested lists.

Another interesting part is the error reporting, done as a special label "+ERROR". It's actually one of the sticky points of why the code is not of production quality: because the errors may be multi-line, and the SimpleServer protocol really expects everything to be single-line. Properly, some quoting would have to be done.

Moving on, here is the "read" command handler:

sub _tqlRead # ($ctx, @args)
{
    my $ctx = shift;
    die "The read command may not be used in the middle of a pipeline.\n"
        if (defined($ctx->{prev}));
    my $opts = {};
    &Triceps::Opt::parse("read", $opts, {
        table => [ undef, \&Triceps::Opt::ck_mandatory ],
    }, @_);

    my $fret = $ctx->{fretDumps};
    my $tabname = bunquote($opts->{table});

    die ("Read found no such table '$tabname'\n")
        unless (exists $ctx->{tables}{$tabname});
    my $unit = $ctx->{u};
    my $table = $ctx->{tables}{$tabname};
    my $lab = $unit->makeDummyLabel($table->getRowType(), "lb" . $ctx->{id} . "read");
    $ctx->{next} = $lab;

    my $code = sub {
        Triceps::FnBinding::call(
            name => "bind" . $ctx->{id} . "read",
            unit => $unit,
            on => $fret,
            labels => [
                $tabname => $lab,
            ],
            code => sub {
                $table->dumpAll();
            },
        );
    };
    push @{$ctx->{actions}}, $code;
}

It's the only command that registers an action, which sends data into the query unit. The rest of commands just add more handlers to the pipeline in the unit, and get the data that flows from "read". The action sets up a binding and calls the table dump, to send the data into that binding.

The reading of the tables could have also been done without the bindings, and without the need to bind the units at all: just iterate through the table procedurally in the action. But this whole example has been built largely to showcase that the bindings can be used in this way, so naturally it uses bindings.

The bindings come more useful when the query logic has to react to the normal logic of the main unit, such as in the subscriptions: set up the query, read its initial state, and then keep reading as the state gets updated. But guess what, the subscriptions can't be done with the FnReturns as shown because the FnReturn only sends its data to the last binding pushed onto it. This means, if multiple subscriptions get set up, only the last one will be getting the data. There will be a separate mechanism for that.

Tuesday, November 20, 2012

Table dump

Another intermediate step for the example I'm working on is the table dumping. It allows to iterate on a table in a functional manner.

A new label "dump" is added to the table and its FnReturn. Whenever the method dumpAll() is called, it sends the whole contents of the table to that label. Then you can set a binding on the table's FnReturn, call dumpAll(), and the binding will iterate through the whole table's contents.

The grand plan is also to add the dumping by a a condition that selects a sub-index, but it's not implemented yet.

It's also possible to dump in an alternative order: dumpAllIdx() can send the rows in the order of any index, rather than the default first leaf index.

If you want to get the dump label explicitly, you can do it with

my $dlab = $table->getDumpLabel();

Normally the only reason to do that would be to add it to another FnReturn (besides the table's FnReturn). Chaining anything else directly to this label would not make much sense, because the dump of the table can be called from many places, and the directly chained label will receive data every time the dump is called.

The typical usage looks like this:

    Triceps::FnBinding::call(
        name => "iterate",
        on => $table->fnReturn(),
        unit => $unit,
        labels => [
            dump => sub { ... },        ],
        code => sub {
            $table->dumpAll();
        },
    );

It's less efficient than the normal iteration but sometimes comes handy.

Normally the rowops are sent with the opcode OP_INSERT. But the opcode can also be specified explicitly:

$table->dumpAll($opcode);

The alternative order can be achieved with:

$table->dumpAllIdx($indexType);
$table->dumpAllIdx($indexType, $opcode);

As usual, the index type must belong to the exact type of this table. For example:

$table->dumpAllIdx($table->getType()->findIndexPath("cb"), "OP_NOP");

And some more interesting examples will be forthcoming later.

Saturday, November 10, 2012

Streaming functions and template results

The same way as the FnReturns can be used to get back the direct results of the operations on the tables, can be also used on the templates in general. Indeed, it's a good idea to have a method that would create an FnReturn in all the templates. So I went ahead and added it to the LookupJoin, JoinTwo and Collapse.

For the joins, the resulting FnReturn has one label "out". It's created similarly to the table's:

my $fret = $join->fnReturn();

And then it can be used as usual. The implementation of this method is fairly simple:

sub fnReturn # (self)
{
    my $self = shift;
    if (!defined $self->{fret}) {
        $self->{fret} = Triceps::FnReturn->new(
            name => $self->{name} . ".fret",
            labels => [
                out => $self->{outputLabel},
            ],
        );
    }
    return $self->{fret};
}

All this kind of makes the method lookup() of LookupJoin redundant, since now pretty much all the same can be done with the streaming function API, and even better, because it provides the opcodes on rowops, can handle the full processing, and calls the rowops one by one without necessarily creating an array. But it could happen yet that the lookup() has some more convenient uses too, so I didn't remove it yet.

For Collapse the interface is a little more complicated: the FnReturn contains a label for each data set, named the same as the data set. The order of labels follows the order of the data set definitions (though right now it's kind of moot, because only one data set is supported). The implementation is:

sub fnReturn # (self)
{
    my $self = shift;
    if (!defined $self->{fret}) {
        my @labels;
        for my $n (@{$self->{dsetnames}}) {
            push @labels, $n, $self->{datasets}{$n}{lbOut};
        }
        $self->{fret} = Triceps::FnReturn->new(
            name => $self->{name} . ".fret",
            labels => \@labels,
        );
    }  
    return $self->{fret};
}   

It uses the new element $self->{dsetnames} that wasn't present in the code shown before. I've added it now to keep the array of data set names in the order they were defined.

Use these examples to write the fnReturn() in your templates.

Wednesday, November 7, 2012

Streaming functions and tables

The Copy Tray used in the tables in the version 1.0 was really a precursor to the streaming functions. Now when the full-blown streaming functions became worked out, there is no sense in keeping the copy trays any more, so I've removed them.

Instead, I've added a Table method that gets the FnReturn for that table:

$fret = $table->fnReturn();

The return contains the labels "pre", "out", and the named labels for each aggregators. The FnReturn object is created on the first call of this method and is kept in the table. All the following calls return the same object. This has some interesting consequences for the "pre" label: the rowop for the "pre" label doesn't get created at all if there is nothing chained from that label. But when the FnReturn gets created, one of its labels gets chained from the "pre" label. Which means that once, you call $table->fnReturn() for the first time, you will see that table's "pre" label called in all the traces. It's not a huge extra overhead, but still something to keep in mind and not be surprised when calling fnReturn() changes all your traces.

The produced FnReturn then gets used as any other one. If you use it with an FnBinding that has withTrace => 1, you get an improved equivalent of the Copy Tray. For example:

$fret2 = $t2->fnReturn();
$fbind2 = Triceps::FnBinding->new(
    unit => $u1,
    name => "fbind2",
    on => $fret2,
    withTray => 1,
    labels => [
        out => sub { }, # another way to make a dummy
    ],
);

$fret2->push($fbind2);
$t2->insert($r2);
$fret2->pop($fbind2);

# $ctr is the Copy Tray analog
$ctr = $fbind2->swapTray(); # get the updates on an insert

Of course, most of the time you would not want to make a dummy label and then iterate manually through the copy tray. You would want to create bindings to the actual next logical labels and simply execute them, immediately or delayed with a tray.

Wednesday, October 24, 2012

Streaming functions and recursion, part 6

The combination of the two previous examples (the one with the trays and the one with the forks) doesn't work. They could be combined but the combination just doesn't work right.

The problem is that the example with trays relies on the recursive function being executed before the tray gets called. But if both of them are forked, things break.

Well, if there is only one recursive call, it still works because the execution frame looks like this:

arg1
pop1

The rowop arg1 executes, places the result into the tray (provided that it calls the FnReturn label, not forks to it!). Then the rowop pop1 executes and calls the tray. So far so good.

Now let's do the recursion with depth two. The first level starts the same:

arg1
pop1

Then arg1 executes and forks the second level of recursion:

pop1
arg2
pop2

Do you see what went wrong? The unit execution frames are FIFO. So the second level of recursion got queued after the popping of the first level. That pop1 executes next, doesn't get any return values, and everything goes downhill from there.

Streaming functions and recursion, part 5

And there is also a way to run the recursive calls without even the need to increase the recursion depth limit. It can be left at the default 1, without setMaxRecursionDepth(). The secret is to fork the argument rowops to the functions instead of calling them.

###
# A streaming function that computes a Fibonacci number.

# Input:
#   $lbFibCompute: request to compute the number.
# Output (by FnReturn labels):
#   "result": the computed value.
# The opcode is preserved through the computation.

my @stackFib; # stack of the function states
my $stateFib; # The current state

my $frFib = Triceps::FnReturn->new(
    name => "Fib",
    unit => $uFib,
    labels => [
        result => $rtFibRes,
    ],
    onPush => sub { push @stackFib, $stateFib; $stateFib = { }; },
    onPop => sub { $stateFib = pop @stackFib; },
);

my $lbFibResult = $frFib->getLabel("result");

# Declare the label & binding variables in advance, to define them sequentially.
my ($lbFibCompute, $fbFibPrev1, $fbFibPrev2);
$lbFibCompute = $uFib->makeLabel($rtFibArg, "FibCompute", undef, sub {
    my $row = $_[1]->getRow();
    my $op = $_[1]->getOpcode();
    my $idx = $row->get("idx");

    if ($idx <= 1) {
        $uFib->fork($frFib->getLabel("result")->makeRowopHash($op,
            idx => $idx,
            fib => $idx < 1 ? 0 : 1,
        ));
    } else {
        $stateFib->{op} = $op;
        $stateFib->{idx} = $idx;

        $frFib->push($fbFibPrev1);
        $uFib->fork($lbFibCompute->makeRowopHash($op,
            idx => $idx - 1,
        ));
    }
}) or confess "$!";
$fbFibPrev1 = Triceps::FnBinding->new(
    unit => $uFib,
    name => "FibPrev1",
    on => $frFib,
    labels => [
        result => sub {
            $frFib->pop($fbFibPrev1);

            $stateFib->{prev1} = $_[1]->getRow()->get("fib");

            # must prepare before pushing new state and with it new $stateFib
            my $rop = $lbFibCompute->makeRowopHash($stateFib->{op},
                idx => $stateFib->{idx} - 2,
            );

            $frFib->push($fbFibPrev2);
            $uFib->fork($rop);
        },
    ],
);
$fbFibPrev2 = Triceps::FnBinding->new(
    unit => $uFib,
    on => $frFib,
    name => "FibPrev2",
    labels => [
        result => sub {
            $frFib->pop($fbFibPrev2);

            $stateFib->{prev2} = $_[1]->getRow()->get("fib");
            $uFib->fork($frFib->getLabel("result")->makeRowopHash($stateFib->{op},
                idx => $stateFib->{idx},
                fib => $stateFib->{prev1} + $stateFib->{prev2},
            ));
        },
    ],
);

# End of streaming function
###

This is a variety of the pre-previous example, with the split push and pop. The split is required for the fork to work: when the forked rowop executes, the calling label has already returned, so obviously the scoped approach won't work.

In this version the unit stack depth required to compute the 6th (and any) Fibonacci number reduces to 2: it's really only one level on top of the outermost frame.