Saturday, September 29, 2012

More of Collapse with functions

The Collapse as shown before sends all the collected deletes before all the collected inserts. For example, if it has collected the updates for four rows, the output will be (assuming that the Collapse element is named "collapse" and the data set in it is named "idata"):

collapse.idata.out OP_DELETE local_ip="3.3.3.3" remote_ip="7.7.7.7" bytes="100"
collapse.idata.out OP_DELETE local_ip="2.2.2.2" remote_ip="6.6.6.6" bytes="100"
collapse.idata.out OP_DELETE local_ip="4.4.4.4" remote_ip="8.8.8.8" bytes="100"
collapse.idata.out OP_DELETE local_ip="1.1.1.1" remote_ip="5.5.5.5" bytes="100"
collapse.idata.out OP_INSERT local_ip="3.3.3.3" remote_ip="7.7.7.7" bytes="300"
collapse.idata.out OP_INSERT local_ip="2.2.2.2" remote_ip="6.6.6.6" bytes="300"
collapse.idata.out OP_INSERT local_ip="4.4.4.4" remote_ip="8.8.8.8" bytes="300"
collapse.idata.out OP_INSERT local_ip="1.1.1.1" remote_ip="5.5.5.5" bytes="300"

What if you want the deletes followed directly by the matching inserts? Like this:

collapse.idata.out OP_DELETE local_ip="3.3.3.3" remote_ip="7.7.7.7" bytes="100"
collapse.idata.out OP_INSERT local_ip="3.3.3.3" remote_ip="7.7.7.7" bytes="300"
collapse.idata.out OP_DELETE local_ip="2.2.2.2" remote_ip="6.6.6.6" bytes="100"
collapse.idata.out OP_INSERT local_ip="2.2.2.2" remote_ip="6.6.6.6" bytes="300"
collapse.idata.out OP_DELETE local_ip="4.4.4.4" remote_ip="8.8.8.8" bytes="100"
collapse.idata.out OP_INSERT local_ip="4.4.4.4" remote_ip="8.8.8.8" bytes="300"
collapse.idata.out OP_DELETE local_ip="1.1.1.1" remote_ip="5.5.5.5" bytes="100"
collapse.idata.out OP_INSERT local_ip="1.1.1.1" remote_ip="5.5.5.5" bytes="300"

With the procedural version it required doing an look-up in the insert table after processing each row in the delete table and handling it if found. So I've left it out to avoid complicating the example. But in the streaming function form it becomes easy, just change the binding a little bit:

        my $lbInsInput = $dataset->{tbInsert}->getInputLabel();

        my $fbind = Triceps::FnBinding->new(
            name => $self->{name} . "." . $dataset->{name} . ".bndTbl",
            on => $fret,
            unit => $unit,
            labels => [
                del => sub {
                    if ($_[1]->isDelete()) {
                        $unit->call($lbOut->adopt($_[1]));
                        # If the INSERT is available after this DELETE, this
                        # will produce it.
                        $unit->call($lbInsInput->adopt($_[1]));
                    }
                },
                ins => sub {
                    if ($_[1]->isDelete()) {
                        $unit->call($lbOut->makeRowop($OP_INSERT, $_[1]->getRow()));
                    }
                },
            ],
        );

The "del" binding first sends the result out as usual and then forwards the DELETE rowop to the insert table's input. Which then causes the insert rowop to be sent of a match is found. Mind you, the look-up and conditional processing still happens. But now it all happens inside the table machinery, all you need to do is add one more line to invoke it.

Let's talk in a little more detail, what happens when the clearing of the Delete table deletes the row with (local_ip="3.3.3.3" remote_ip="7.7.7.7").

  • The Delete table sends a rowop with this row and OP_DELETE to its output label collapse.idata.tbDelete.out.
  • Which then gets forwarded to a chained label in the FnReturn, collapse.idata.retTbl.del.
  • FnReturn has a FnBinding pushed into it, so the rowop passes to the matching label in the binding, collapse.idata.bndTbl.del.
  • The Perl handler of that label gets called, first forwards the rowop to the Collapse output label collapse.idata.out, and then to the Insert table's input label collapse.idata.tbInsert.in.
  • The Insert table looks up the row by the key, finds it, removes it from the table, and sends an OP_DELETE rowop to its output label collapse.idata.tbInsert.out.
  • Which then gets forwarded to a chained label in the FnReturn, collapse.idata.retTbl.ins.
  • FnReturn has a FnBinding pushed into it, so the rowop passes to the matching label in the binding, collapse.idata.bndTbl.ins.
  • The Perl handler of that label gets called and sends the rowop with the opcode changed to OP_INSERT to the Collapse output label collapse.idata.out.
It's a fairly complicated sequence but all you needed to do was to add one line of code. The downside of course is that if something goes not the way you expected, you'd have to trace and understand the whole  long sequence.

Since when the INSERTs are send after DELETEs, their data is removed from the Insert table too, the following clear() of the Insert table won't find them any more and won't send any duplicates; it will send only the inserts for which there were no matching deletes.

You may notice that the code in the "del" handler only forwards the rows around, and that can be replaced by a chaining:

        my $lbDel = $unit->makeDummyLabel(
            $dataset->{tbDelete}->getOutputLabel()->getRowType(),
            $self->{name} . "." . $dataset->{name} . ".lbDel");
        $lbDel->chain($lbOut);
        $lbDel->chain($lbInsInput);

        my $fbind = Triceps::FnBinding->new(
            name => $self->{name} . "." . $dataset->{name} . ".bndTbl",
            on => $fret,
            unit => $unit,
            labels => [
                del => $lbDel,
                ins => sub {
                    $unit->call($lbOut->makeRowop($OP_INSERT, $_[1]->getRow()));
                },
            ],
        );

This shows another way of label definition in FnBinding: an actual label is created first and then given to FnBinding, instead of letting it automatically create a label from the code. The "if ($_[1]->isDelete())" condition has been removed from the "ins", since it's really redundant, and the delete part with its chaining doesn't do the same check anyway.

This code works just as well and even more efficiently than the previous version, since no Perl code needs to be invoked for "del", it all propagates internally through the chaining. However the price is that the DELETE rowops coming out of the output label will have the head-of-the-chain label in them:

collapse.idata.lbDel OP_DELETE local_ip="3.3.3.3" remote_ip="7.7.7.7" bytes="100"
collapse.idata.out OP_INSERT local_ip="3.3.3.3" remote_ip="7.7.7.7" bytes="300"
collapse.idata.lbDel OP_DELETE local_ip="2.2.2.2" remote_ip="6.6.6.6" bytes="100"
collapse.idata.out OP_INSERT local_ip="2.2.2.2" remote_ip="6.6.6.6" bytes="300"
collapse.idata.lbDel OP_DELETE local_ip="4.4.4.4" remote_ip="8.8.8.8" bytes="100"
collapse.idata.out OP_INSERT local_ip="4.4.4.4" remote_ip="8.8.8.8" bytes="300"
collapse.idata.lbDel OP_DELETE local_ip="1.1.1.1" remote_ip="5.5.5.5" bytes="100"
collapse.idata.out OP_INSERT local_ip="1.1.1.1" remote_ip="5.5.5.5" bytes="300"

The "ins" side can't be handled just by chaining because it has to replace the opcode in the rowops. A potential different way to handle this would be to define various label types in C++ for many primitive operations, like replacing the opcode, and then build by combining them.

The final item is that the code shown in this post involved a recursive call of the streaming function. Its output from the "del" label got fed back to the function, producing more output on the "ins" label. This worked because it invoked a different code path in the streaming function than the one that produced the "del" data. If it were to form a topological loop back to the same path with the same labels, that would have been an error. The recursion will be discussed in more detail later.

No comments:

Post a Comment