Showing posts with label pipeline. Show all posts
Showing posts with label pipeline. Show all posts

Sunday, July 13, 2014

monads

I've recently been bullshitting onna internets with some fans of the functional programming, and the said fans are all ablaze about the monads. Yet they have a very hard time explaining what's the benefit of the monads, and even what these monads are. So I've got around and read up about this stuff, and bullshitted some more, and here it is: the short explanation of the monads in programming in all their glory. This is the third version of it that I've figured out, and even though it might happen that I'd have more, I think that I've got to the bottom of it this time. The first version of my understanding was gained from Wikipedia, and as it always happens with Wikipedia,  it turned out to be crap. Not all crap, only half-crap, but still.

If you're interested in the benefits, let me tell it up front: they seem to be that the glorious community of functional programming can now honestly use the loops and imbibe all their goodness with break and continue statements, and even exceptions, without losing the cherry of the strict mathematical theory. For the rest of us, nothing new.

Now, to the details. What the monads are really about is the iteration on containers, and nesting these iterations. For use in a monad, a container of objects has to have a couple of properties.

First, if you have a container, you must be able to iterate over it and create another container with exactly the same structure (i.e. if you can find an element in the first container in some way, you must be able to find an element in the second container in the same way) but different data (difference may be both in the value and the type of the data). The data is supposed to  be the result of some operation of the data from the first container at the same position but the concept of operation is quite wide-open.

This property means that if your container is a list, you need to know how to create another list of the same number of elements, if it's an associative array, you need to know how to create another associative array with the same keys, if your container is a tree, you need to know how to create another tree with the exact same structure.

Second, if you have two containers, you need to have some rule on how to "combine" them into the third container of the same type. The structure of that third container can be any, as long as it follows some consistent rule, and the data elements represent the results of some operation on one element from the first container and one element from the second container. For example, for the lists you can pick the rule that concatenates them, or produces every possible pair of elements from the first list and the second list, or combines the elements at the same position in both lists. Just as in the first property, the data placed in the result would be the result of some operation of the data from the two incoming containers.

What the monad does is apply that combination rule over a sequence of containers, combining it into one. The first two get combined together, then that one is combined with the third one, and so on. The monad is really a bunch of nested loops.  In the texts about monads they like to discuss the examples monads on Haskell kinds of containers, like lists, and Maybe (a value that may be NULL), and IO (a strange container that performs input/output), but for all I can tell, the most entertaining and straightforward container would be an associative array, or as its variety, a database table.

For example, the following Perl loop represents a monad (remember, the data in the result may be produced by any operation on the data extracted from the argument container).

my %result;
my %a = ...; my %b = ...; my %c = ...;

while (($ka, $va) = each %a) {
  while (($kb, $vb) = each %b) {
    while (($kc, $vc) = each %c) {
      $result{"$ka,$kb,$kc"} = printf("%d" , $vc? $va: $vb);
    }
  }
}

And if A, B and C were tables, that would be an unconditional join. The loops don't have to go over the whole container, they may even pick only one or even zero elements, and if such is the rule, they may as well be ifs:

while (($ka, $va) = each %a) {
  if (exists $b{$ka} ) {
    if (exists $c{$ka} ) {
      $result{"$ka"} = [ $va, $b{$ka}, $c{$ka} ];
    }

  }

}

And it's also a monad! If these were tables, it would be a left join.

The first property of the container lets you do a loop without nesting, and the second property allows to do any number of nestings by applying it recursively.


The rules as stated above say that each nesting must follow the same rule of iteration on the same container type. But there is a weaseling way around it: we can always say that the container is a union that also contains the indication of what operation needs to be done with it, and then the rule that combines two containers would look at this indication in the argument containers and act accordingly. The type Maybe from Haskell does exactly this (and a monad on it provides the Haskell version of what you can think of as exceptions or break/continue). We can also weasel further and say that oh, we know this indication hardcoded at the compilation time, so instead of putting it into the container and then extracting it, we'll just hardcode the loop appropriately (and yes, Haskell does this). The end result is that ANY nested loops can form a monad. And since the if is a special case of a loop, and the unconditional execution is a special case of an if, any procedural code can be a monad.


Not everything would be a monad, but guess what, you can nest monads in monads, and that would give everything. And the returned container doesn't have to be anything like any of the containers involved in the loop, it's the same weasely thing.


The only thing that is left separating the monads from the usual procedural code is that all the data defined outside the loops is read-only (the local variables in the loop blocks can be read-write, this also can be explained in the weasely ways, however the variables outside the current block also can't be changed), and the only result produced is the returned container. However the result can contain a description of how the outside data needs to be modified, and the caller of the monad is then free to apply that description. It's just the ass-backwards ways of the functional programming. So with enough nested and recursive monads (not just nested loops but nested monads) and this apply-description business, all this monkeying can properly simulate the procedural programming.

That description thing can also be though of as the redo log of a transaction: first the redo log is collected and then the whole transaction is applied. Obviously, for the operations inside the transaction to see the state as modified by the previous operations in the transaction, they would have to dig through the redo log.


The definition of the container is also pretty wide. Containers may among other things be infinite - "for (;;)" or "for (i=0;; i++)"  are examples of iteration over an infinite container.


This brings us to the point of how the monads are related to CEP.  Each nesting level in the monad gets an item of key and data from the previous level, uses it somehow to select zero or more values for the container it has, and sends to the next nesting level a newly combined item of key and data information for every value selected. It's a pipeline! The pipeline approach was actually the second version of my understanding of the monads.


What kind of a pipeline it is? It's a straight one, no branching, nor god forbid, joining. And it's a read-only one: each stage of the pipeline contains no modifiable state, it's all read-only. All the updates have to be carried in the data traveling through the pipeline. And each stage of the pipeline essentially sends out a copy of its incoming data with its own section added to it.


Quite boring and useless. By the way, note that the join as described before would work only on the static tables, not on streams.

Now we get a bit off the monad subject as such and get into the depths of Haskell's dealing with pipelines.

Of course there are weaseling ways around the boring limitations. First, each stage of the pipeline may contain the nested pipelines (i.e. nested monads), and may decide, which of the nested pipelines to take. That gives the fork-join topology (but as you may notice no "fork-and-no-join"). Second, these nested pipelines can be nested further to infinity. Third, the input point itself is a data object that can be passed through the pipeline.


This third item is something that takes some time getting the mind around. Basically, the first stage in the pipeline says "I'm not going to get the input from the outside world any more, instead I'm passing this functionality to the second stage". Obviously, after that the first stage becomes dead and can as well be discarded. Well, not always: there may be MULTIPLE data input points, either within one stage (remember, they are just data, and can be held together), or sprinkled throughout the pipeline. The first stage will become dead only after it gets rid of all its inputs. But let's just say for the sake of an example that there is one input point.


So, with all this weaseling a Haskell program simulates the changing states with two pipelines (i.e. monads):


The pipeline A implements the computation of the next application state.


The pipeline B consists of two stages:
1. Read one item of data, call pipeline A, collect the result form it, and send its result along with the input point to the stage 2.
2. Instantiate a nested copy of the pipeline B,  then send the input point and the state to that pipeline, and let it do its work.


It's an infinite number of pipelines, created as long at there is more data coming. And being the tail recursion, Haskell optimizes it out by replacing the second stage of pipeline B with a new copy of pipeline B, instead of nesting it. It discards the old stage 1 after it becomes dead. This really becomes a dynamic pipeline, endlessly growing at the back and dying out at the front. Of course, when things get to the actual running, that gets optimized out too. Instead of creating a new pipeline they just turn back to the old pipeline, say "now it's the new pipeline", give it the new state and let it run along.


As far as the parallelism is concerned, the stages of the pipeline A can work in parallel, but then when the time comes to apply the result and get the next input, it hiccups and becomes single-threaded.


Is there an important lesson in this? I don't see any, at least yet. Other than an interesting mental exercise in the ass-backwardness. And from all the asking of the monad enthusiasts on the internets, what exactly is the benefit they get from the monads, I didn't get any answer other than the hand-waving about the Mathematical Foundations. For all I can tell, there isn't one.

If you're interested in learning more about Haskell and its monads, I can recommend http://learnyouahaskell.com/

Thursday, April 11, 2013

Multithreaded pipeline, part 4

Let's look at the aggregation by the hour. First, the short version that skips over the actual logic and concentrates on how the nexuses are connected.

sub RawToHourlyMain # (@opts)
{
  my $opts = {};
  Triceps::Opt::parse("traffic main", $opts, {
    @Triceps::Triead::opts,
    from => [ undef, \&Triceps::Opt::ck_mandatory ],
  }, @_);
  my $owner = $opts->{owner};
  my $unit = $owner->unit();

  my $faIn = $owner->importNexus(
    from => $opts->{from},
    as => "input",
    import => "reader",
  );

  # ... create the table and aggregation ...

  my $faOut = $owner->makeNexus(
    name => "data",
    labels => [
      $faIn->getFnReturn()->getLabelHash(),
      hourly => $lbHourlyFiltered,
    ],
    import => "writer",
  );

  # ... connect the input nexus to the table ...
  # ... create the table dump logic ...

  $owner->readyReady();
  $owner->mainLoop(); # all driven by the reader
}

This function inherits the options from Triead::start() as usual and adds the option from of its own. This option's value is then used as the name of nexus to import for reading. The row types of the labels from that imported facet are then used to create the table and aggregation. But they aren't connected to the input labels yet.

First, the output nexus is created. The creation passes through all the incoming data, short-circuiting the input and output, and adds the extra label for the aggregated output. After that the rest of the logic can be connected to the inputs (and to the outputs too).

The reason why this connection order is important is that the labels get caller in the order they are chained from the input label. And when this thread reacts to some event, we want the original event to pass through to the output first and then send the reaction to it.

And after that it's all usual readyReady() and mainLoop().

The full text of the function follows. The logic is based on the previous example from the chapter 13, and the only big change is the use of SimpleAggergator instead of a manually-built one. The HourlyToDailyMain() is very similar, so I won't even show it, you can find the full text in SVN.

# compute an hour-rounded timestamp (in microseconds)
sub hourStamp # (time)
{
  return $_[0]  - ($_[0] % (1000*1000*3600));
}

sub RawToHourlyMain # (@opts)
{
  my $opts = {};
  Triceps::Opt::parse("traffic main", $opts, {
    @Triceps::Triead::opts,
    from => [ undef, \&Triceps::Opt::ck_mandatory ],
  }, @_);
  my $owner = $opts->{owner};
  my $unit = $owner->unit();

  # The current hour stamp that keeps being updated;
  # any aggregated data will be propagated when it is in the
  # current hour (to avoid the propagation of the aggregator clearing).
  my $currentHour;

  my $faIn = $owner->importNexus(
    from => $opts->{from},
    as => "input",
    import => "reader",
  );

  # the full stats for the recent time
  my $ttPackets = Triceps::TableType->new($faIn->getLabel("packet")->getRowType())
    ->addSubIndex("byHour",
      Triceps::IndexType->newPerlSorted("byHour", undef, sub {
        return &hourStamp($_[0]->get("time")) <=> &hourStamp($_[1]->get("time"));
      })
      ->addSubIndex("byIP",
        Triceps::IndexType->newHashed(key => [ "local_ip", "remote_ip" ])
        ->addSubIndex("group",
          Triceps::IndexType->newFifo()
        )
      )
    )
  or confess "$!";

  # type for a periodic summary, used for hourly, daily etc. updates
  my $rtSummary;
  Triceps::SimpleAggregator::make(
    tabType => $ttPackets,
    name => "hourly",
    idxPath => [ "byHour", "byIP", "group" ],
    result => [
      # time period's (here hour's) start timestamp, microseconds
      time => "int64", "last", sub {&hourStamp($_[0]->get("time"));},
      local_ip => "string", "last", sub {$_[0]->get("local_ip");},
      remote_ip => "string", "last", sub {$_[0]->get("remote_ip");},
      # bytes sent in a time period, here an hour
      bytes => "int64", "sum", sub {$_[0]->get("bytes");},
    ],
    saveRowTypeTo => \$rtSummary,
  );

  $ttPackets->initialize() or confess "$!";
  my $tPackets = $unit->makeTable($ttPackets,
    &Triceps::EM_CALL, "tPackets") or confess "$!";

  # Filter the aggregator output to match the current hour.
  my $lbHourlyFiltered = $unit->makeDummyLabel($rtSummary, "hourlyFiltered");
  $tPackets->getAggregatorLabel("hourly")->makeChained("hourlyFilter", undef, sub {
    if ($_[1]->getRow()->get("time") == $currentHour) {
      $unit->call($lbHourlyFiltered->adopt($_[1]));
    }
  });

  # It's important to connect the pass-through data first,
  # before chaining anything to the labels of the faIn, to
  # make sure that any requests and raw inputs get through before
  # our reactions to them.
  my $faOut = $owner->makeNexus(
    name => "data",
    labels => [
      $faIn->getFnReturn()->getLabelHash(),
      hourly => $lbHourlyFiltered,
    ],
    import => "writer",
  );

  my $lbPrint = $faOut->getLabel("print");

  # update the notion of the current hour before the table
  $faIn->getLabel("packet")->makeChained("processPackets", undef, sub {
    my $row = $_[1]->getRow();
    $currentHour = &hourStamp($row->get("time"));
    # skip the timestamp updates without data
    if (defined $row->get("bytes")) {
      $unit->call($tPackets->getInputLabel()->adopt($_[1]));
    }
  });

  # the dump request processing
  $tPackets->getDumpLabel()->makeChained("printDump", undef, sub {
    $unit->makeArrayCall($lbPrint, "OP_INSERT", $_[1]->getRow()->printP() . "\n");
  });
  $faIn->getLabel("dumprq")->makeChained("dump", undef, sub {
    if ($_[1]->getRow()->get("what") eq "packets") {
      $tPackets->dumpAll();
    }
  });

  $owner->readyReady();
  $owner->mainLoop(); # all driven by the reader
}

Multithreaded pipeline, part 3

The rest of this example might be easier to understand by looking at an example of a run first. The lines starting with a "!" are the copies of the input lines that ReaderMain() sends and PrintMain() faithfully prints.

input.packet are the rows that reach the PrintMain on the "print" label (remember, "input" is the name with which it imports its input nexus). input.hourly is the data aggregated by the hour intervals (and also by the IP addresses, dropping the port information), and input.daily further aggregates it per day (and again per the IP addresses). The timestamps in the hourly and daily rows are rounded down to the start of the hour or day.

And the lines without any prefixes are the dumps of the table contents that again reach the PrintMain() through the "print" label:

! new,OP_INSERT,1330886011000000,1.2.3.4,5.6.7.8,2000,80,100
input.packet OP_INSERT time="1330886011000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="100"
input.hourly OP_INSERT time="1330884000000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="100"
input.daily OP_INSERT time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="100"
! new,OP_INSERT,1330886012000000,1.2.3.4,5.6.7.8,2000,80,50
input.packet OP_INSERT time="1330886012000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="50"
input.hourly OP_DELETE time="1330884000000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="100"
input.daily OP_DELETE time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="100"
input.hourly OP_INSERT time="1330884000000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
input.daily OP_INSERT time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
! new,OP_INSERT,1330889612000000,1.2.3.4,5.6.7.8,2000,80,150
input.packet OP_INSERT time="1330889612000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="150"
input.hourly OP_INSERT time="1330887600000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
input.daily OP_DELETE time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
input.daily OP_INSERT time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="300"
! new,OP_INSERT,1330889811000000,1.2.3.4,5.6.7.8,2000,80,300
input.packet OP_INSERT time="1330889811000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="300"
input.hourly OP_DELETE time="1330887600000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
input.daily OP_DELETE time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="300"
input.daily OP_INSERT time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
input.hourly OP_INSERT time="1330887600000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="450"
input.daily OP_DELETE time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
input.daily OP_INSERT time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="600"
! new,OP_INSERT,1330972411000000,1.2.3.5,5.6.7.9,3000,80,200
input.packet OP_INSERT time="1330972411000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" local_port="3000" remote_port="80" bytes="200"
input.hourly OP_INSERT time="1330970400000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" bytes="200"
input.daily OP_INSERT time="1330905600000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" bytes="200"
! new,OP_INSERT,1331058811000000
input.packet OP_INSERT time="1331058811000000"
! new,OP_INSERT,1331145211000000
input.packet OP_INSERT time="1331145211000000"
! dump,packets
time="1330886011000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="100"
time="1330886012000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="50"
time="1330889612000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="150"
time="1330889811000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="300"
time="1330972411000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" local_port="3000" remote_port="80" bytes="200"
! dump,hourly
time="1330884000000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
time="1330887600000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="450"
time="1330970400000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" bytes="200"
! dump,daily
time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="600"
time="1330905600000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" bytes="200"

Note that the order of the lines is completely nice and predictable, nothing goes out of order. Each nexus preserves the order of the rows put into it, and the fact that there is only one writer per nexus and that every thread is fed from only one nexus, avoids the races.

Multithreaded pipeline, part 1

The Perl API for the threads is pretty much done now, time for the examples.

The first one re-does a variation of already shown example, the traffic data aggregation from the Chapter 13. The short recap is that it gets the data for each network packet going through and keeps it for a some time, aggregates the data by the hour and keeps it for a longer time, and aggregates it by the day and keeps for a longer time yet. This multi-stage computation naturally matches the pipeline approach.

Since this new example highlights different features than the one in chapter 13, I've changed it logic a little too: it updates both the hourly and daily summaries on every packet received. And I didn't bother to implement the part with the automatic cleaning of the old data, it doesn't add anything to the pipeline works.

The pipeline topologies are quite convenient for working with the threads. The parallel computations create a possibility of things happening in an unpredictable order and producing unpredictable results. The pipeline topology allows the parallelism and at the same time also keeps the data in the same predictable order, with no possibility of rows overtaking each other.

The computation can be split into the following threads:

  • read the input, convert and send the data into the model
  • store the recent data and aggregate it by the hour
  • store the hourly data and aggregate it by the day
  • store the daily data
  • get the data at the end of the pipeline and print it

Technically, each next stage only needs the data from the previous stage, but to get the updates to the printing stage, they all go all the way through.

Dumping the contents of the tables also requires some special support. Each table is local to its thread and can't be access from the other threads. To dump its contents, the dump request needs to be sent to its thread, which would extract the data and send it through. There are multiple ways to deal with the dump results. One is to have a special label for each table's dump and propagate it to the last stage to print. If all that is needed is text, one label that allows to send strings is good enough, all the dumps can send the data converted to text into it, and it would go all the way through the pipeline. For this example I've picked the last approach.

And now is time to show some code. The main part goes like this:

Triceps::Triead::startHere(
  app => "traffic",
  thread => "print",
  main => \&PrintMain,
);


The startHere() creates an App and starts a Triead in the current OS thread. "traffic" is the app name, "print" the thread name. This thread will be the end of the pipeline, and it will create the rest of the threads. This is a convenient pattern when the results of the model need to be fed back to the current thread, and it works out very conveniently for the unit tests. PrintMain() is the body function of this printing thread:

sub PrintMain # (@opts)
{
  my $opts = {};
  Triceps::Opt::parse("traffic main", $opts, {@Triceps::Triead::opts}, @_);
  my $owner = $opts->{owner};
  my $unit = $owner->unit();

  Triceps::Triead::start(
    app => $opts->{app},
    thread => "read",
    main => \&ReaderMain,
  );
  Triceps::Triead::start(
    app => $opts->{app},
    thread => "raw_hour",
    main => \&RawToHourlyMain,
    from => "read/data",
  );
  Triceps::Triead::start(
    app => $opts->{app},
    thread => "hour_day",
    main => \&HourlyToDailyMain,
    from => "raw_hour/data",
  );
  Triceps::Triead::start(
    app => $opts->{app},
    thread => "day",
    main => \&StoreDailyMain,
    from => "hour_day/data",
  );

  my $faIn = $owner->importNexus(
    from => "day/data",
    as => "input",
    import => "reader",
  );

  $faIn->getLabel("print")->makeChained("print", undef, sub {
    print($_[1]->getRow()->get("text"));
  });
  for my $tag ("packet", "hourly", "daily") {
    makePrintLabel($tag, $faIn->getLabel($tag));
  }

  $owner->readyReady();
  $owner->mainLoop(); # all driven by the reader
}

startHere() accepts a number of fixed options plus arbitrary options that itself doesn't care about but passes to the thread's main function, which are then the responsibility of the main function to parse. To reiterate, the main function gets all the options from the call of startHere(), both these that startHere() parses and these that it simply passes through. startHere() also adds one more option on its own: owner containing the TrieadOwner object that the thread uses to communicate with the rest of the App.

In this case PrintMain() doesn't have any extra options on its own, it's just happy to get startHere()'s standard set that it takes all together from @Triceps::Triead::opts.

It gets the TrieadOwner object $owner from the option appended by startHere(). Each TrieadOwner is created with its own Unit, so the unit is obtained from it to create the thread's model in it. Incidentally, the TrieadOwner  also acts as a clearing trigger object for the Unit, so when the TrieadOwner is destroyed, it properly clears the Unit.

Then it goes and creates all the threads of the pipeline. The start() works very much like startHere(), only it actually creates a new thread and starts the main function in it. The main function can be the same whether it runs through start() or startHere(). The special catch is that the options to start() must contain only the plain Perl values, not Triceps objects. It has to do with how Perl works with threads: it makes a copy of every value for the new thread, and it cant's copy the XS objects, so they simply become undefined in the new thread.

All but the first thread in the pipeline have the extra option from: it tells the input nexus for this thread, and each thread creates an output nexus "data". As mentioned before, the nexus namespaces are per thread that created it, so when the option from says "read/data", it's the nexus "data" created by the thread "read".

So, the pipeline gets all connected sequentially until eventually PrintMain() imports the nexus at its tail. importNexus() returns a facet, which is the thread's API to the nexus. A facet looks very much like an FnReturn for most purposes, with a few additions. It even has a real FnReturn in it, and you work with the labels of that FnReturn. The option as of importNexus gives the name to the facet and to its same-named FnReturn (without it the facet would be named the same as the short name of the nexus, in this case "data"). The option import tells whether this thread will be reading or writing to the nexus, and in this case it's reading.

By the time the pipeline gets to the last stage, it connects a few label:

  • print - carries the direct text lines to print in its field "text", and its contents gets printed
  • dumprq - carries the dump requests to the tables, so the printing thread doesn't care about it
  • packet - carries the raw data about the packets
  • hourly - carries the hourly summaries
  • daily - carries the daily summaries

The last three get also printed but this time as whole rows.

And after everything is connected, the thread both tells that it's ready and waits for all the other threads to be ready by calling readyReady(). Then its the run time, and mainLoop() takes care of it: until it's told to shutdown, it keeps reading data from the nexus and processes it. The shutdown will be controlled by the file reading thread at the start of the pipeline.The processing is done by getting the rowops from the nexus and calling them on the appropriate label in the facet, which then calls the the labels chained from it, and that gets all the rest of the thread's model running.

Sunday, November 25, 2012

TQL: the Trivial Query Language

In the Developer's Guide section 7.8. "Main loop with a socket" I've been showing the execution of the simple queries. I've wanted to use the queries to demonstrate a feature of the streaming functions, so I've substantially extended that example.

Now the query example has grown to have its own language, TQL. You can think of it as a Trivial Query Language or Triceps Query Language. It's trivial, and so far it's of only an example quality, but it's extensible and it already can do some interesting things.

Why not SQL, after all, there are multiple parser building tools available in Perl? Partially, because I wanted to keep it trivial and to avoid introducing extra dependencies, especially just for the examples. Partially, because I don't like SQL. I think that the queries can be expressed much more naturally in the form of shell-like pipelines. Back at DB when I wrote a simple toolkit for querying and comparison of the CSV files (yeah, I didn't find the DBD::CSV module), I've used a pipeline semantics and it worked pretty well. It also did things that are quite difficult with SQL, like mass renaming and reordering of fields, and diffing. Although TQL is not a descendant of the language I've used in that query tool, it is a further development of the pipeline idea.

Syntactically, TQL is very simple: its query is a represented as a nested list, similar to Tcl (or if you like Lisp better, you can think that it's similar to Lisp but with different parentheses). A list is surrounded by curly braces "{}". The elements of a list are either other lists or words, consisting of non-space characters.

{word1 {word21 word22} word3}

Unlike Tcl, there are no quotes in the TQL syntax, the quote characters are just the normal word characters. If you want to include spaces into a word, you use the curly braces instead of the quotes.

{   this is a {brace-enquoted} string with spaces and nested braces  }

Note that the spaces inside a list are used as delimiters and thrown away but within a brace-quoted word-string they are significant. How do you know, which way they will be treated in a particular case? It all depends on what is expected in this case. If the command expects a string as an argument, it will treat it as a string. If the command expects a list as an argument, it will treat it as a list.

What if you need to include an unbalanced brace character inside a string? Escape it with a backslash, "\{". The other usual Perl backslash sequences work too (though in the future TQL may get separated from Perl and then only the C sequences will work, that is to be seen). Any non-alphanumeric characters (including spaces) can be prepended with a backslash too. An important point is that when you build the lists, unlike shell, and like Tcl, you do the backslash escaping only once, when accepting a raw string. After that you can include into the lists of any depth without any extra escapes (and you must not add any extra escapes in the lists).

Unlike shell, you can't combine a single string out of the quoted and unquoted parts. Instead the quoting braces work as implicit separators. For example, if you specify a list as {a{b}c d}, you don't get two strings "abc" and "d", you get four strings "a", "b", "c", "d".

A TQL query is a list that represents a pipeline. Each element of the list is a command. The first command reads the data from a table, and the following commands perform transformations on that data. For example:

{read table tWindow} {project fields {symbol price}} {print tokenized 0}

If the print command is missing at the end of the pipeline, it will be added implicitly, with the default arguments: {print}.

The arguments of each TQL command are always in the option name-value format, very much like the Perl constructors of many Triceps objects. There aren't any arguments in TQL that go by themselves without an option name.

So for example the command "read" above has the option "table" with value "tWindow". The command "project" has an option "fields" with a list value of two elements. In this case the elements are simple words and don't need the further quoting. But the extra quoting won't hurt. Say, if you wanted to rename the field "price" to "trade_price", you use the Triceps::Fields::filter() syntax for it, and even though the format doesn't contain any spaces and can be still used just as a word, it looks nicer with the extra braces:

{project fields {symbol {price/trade_price} }}

I'm sure that the list of commands and their options will expand and change over time. So far the supported commands are:

read
Defines a table to read from and starts the command pipeline.
Options:
table - name of the table to read from.

project
Projects (and possibly renames) a subset of fields in the current pipeline.
Options:
fields - an array of field definitions in the syntax of Triceps::Fields::filter() (same as in the joins).

print
The last command of the pipeline, which prints the results. If not used explicitly, the query adds this command implicitly at the end of the pipeline, with the default options.
Options:
tokenized (optional) - Flag: print in the name-value format, as in Row::printP(). Otherwise prints only the values in the CSV format. (default: 1)

join
Joins the current pipeline with another table.This is functionally similar to LookupJoin, although the options are closer to JoinTwo.
Options:
table - name of the table to join with. The current pipeline is considered the "left side", the table the "right side". The duplicate key fields on the right side are always excluded from the result, like JoinTwo option (fieldsUniqKey => "left").
rightIdxPath - path name of the table's index on which to join. At the moment there is no way to join without knowing the name of the index. (As usual, the path is an array of nested names).
by (semi-optional) - the join equality condition specified as pairs of fields. Similarly to JoinTwo, it's a single-level array with the fields logically paired:{leftFld1 rightFld1 leftFld2 rightFld2 ... }.  Options "by" and "byLeft" are mutually exclusive, and one of them must be present.
byLeft (semi-optional) - the join equality condition specified as a transformation on the left-side field set in the syntax of Triceps::Fields::filter(), with an implicit element {!.*} added at the end. Options "by" and "byLeft" are mutually exclusive, and one of them must be present.
leftFields (optional) - the list of patterns for the left-side fields to pass through and possibly rename, in the syntax of  Triceps::Fields::filter(). (default: pass all, with the same name)
rightFields (optional) - the list of patterns for the right-side fields to pass through and possibly rename, in the syntax of Triceps::Fields::filter(). The key fields get implicitly removed before. (default: pass all, with the same name)
type (optional) - type of the join, "inner" or "left". (default: "inner")

where
Filters/selects the rows.
Options:
istrue - a Perl expression, the condition for the rows to pass through. The particularly dangerous constructions are not allowed in the expression, including the loops and the general function calls. The fields of the row are referred to as $%field, these references get translated before the expression is compiled.

Here are some examples of the Tql queries, with results produced from the output of the code examples I'll show in a moment.

> query,{read table tSymbol}
lb1read OP_INSERT symbol="AAA" name="Absolute Auto Analytics Inc" eps="0.5"
+EOD,OP_NOP,lb1read

Reads the stock symbol information table and prints it in the default tokenized format. The result format is a bit messy for now, a mix of tokenized and CSV data. In the previous examples in the chapter 7 I've been marking the end-of-data either by a row with opcode OP_NOP or not marking it at all. For the TQL queries I've decided to try out a different approach: send a CSV row on the pseudo-label "+EOD" with the value equal to the name of the label that has been completed. The labels with names starting with "+" are special in this convention, they represent some kind of metadata.

The name "lb1read" in the result rows is coming from an auto-generated label name in TQL. It will probably become less random-looking in the future, but for now I haven't yet figured out the best way to to it.

 > query,{read table tWindow} {project fields {symbol price}}
 lb2project OP_INSERT symbol="AAA" price="20"
lb2project OP_INSERT symbol="AAA" price="30"
+EOD,OP_NOP,lb2project

Reads the trade window rows and projects the fields "symbol" and "price" from them.

> query,{read table tWindow} {project fields {symbol price}} {print tokenized 0}
lb2project,OP_INSERT,AAA,20
lb2project,OP_INSERT,AAA,30
+EOD,OP_NOP,lb2project

The same, only explicitly prints the data in the CSV format.

 > query,{read table tWindow} {where istrue {$%price == 20}}
 lb2where OP_INSERT id="3" symbol="AAA" price="20" size="20"
+EOD,OP_NOP,lb2where

Selects the trade window row with price equal to 20.

> query,{read table tWindow} {join table tSymbol rightIdxPath bySymbol byLeft {symbol}}
join2.out OP_INSERT id="3" symbol="AAA" price="20" size="20" name="Absolute Auto Analytics Inc" eps="0.5"
join2.out OP_INSERT id="5" symbol="AAA" price="30" size="30" name="Absolute Auto Analytics Inc" eps="0.5"
+EOD,OP_NOP,join2.out

Reads the trade window and enriches it by joining with the symbol information.

A nice feature of TQL is that it allows to combine the operations in the pipeline in any order, repeated any number of times. For example, you can read a table, filter it, join with another table, filter again, join with the third table, filter again and so on. SQL in the same situation has to resort to specially named clauses, for example WHERE filters before grouping and HAVING filters after grouping.

Of course, a typical smart SQL compiler would determine the earliest application point for each WHERE sub-expression and build a similar pipeline. But TQL allows to keep the compiler trivial, following the explicit pipelining in the query. And nothing really prevents a smart TQL compiler either, it could as well analyze, split and reorder the pipeline stages.

Thursday, October 11, 2012

Streaming functions and recursion, part 1

Let's look again at the pipeline example. Suppose we want to do the encryption twice (you know, maybe we have a secure channel to a semi-trusted intermediary who can can read the envelopes and forward the encrypted messages he can't read to the final destination). The pipeline becomes

decrypt | decrypt | process | encrypt | encrypt

Or if you want to think about it in a more function-like notation, rather than a pipeline, the logic can also be expressed as:

encrypt(encrypt(process(decrypt(decrypt(data)))))


However it would not work directly: a decrypt function has only one output and it can not have two bindings at the same time, it would not know which one to use at any particular time.

Instead you can make decrypt into a template, instantiate it twice, and connect into a pipeline. It's very much like what the Unix shell does: it instantiates a new process for each part of its pipeline.

But there is also another possibility: instead of collecting the whole pipeline in advance, do it in steps.

Start by adding in every binding:

withTray => 1,

This will make all the bindings collect the result on a tray instead of sending it on immediately. Then modify the main loop:

while(<STDIN>) {
  chomp;

  # receive
  my $abReceive = Triceps::AutoFnBind->new(
    $retReceive => $bindDecrypt,
  );
  $unit->makeArrayCall($lbReceive, "OP_INSERT", $_);

  # 1st decrypt
  my $abDecrypt1 = Triceps::AutoFnBind->new(
    $retDecrypt => $bindDecrypt,
  );
  $bindDecrypt->callTray();

  # 2nd decrypt
  my $abDecrypt2 = Triceps::AutoFnBind->new(
    $retDecrypt => $bindDispatch,
  );
  $bindDecrypt->callTray();

  # processing
  my $abProcess = Triceps::AutoFnBind->new(
    $retOutput => $bindEncrypt,
  );
  $bindDispatch->callTray();

  # 1st encrypt
  my $abEncrypt1 = Triceps::AutoFnBind->new(
    $retEncrypt => $bindEncrypt,
  );
  $bindEncrypt->callTray();

  # 2nd encrypt
  my $abEncrypt2 = Triceps::AutoFnBind->new(
    $retEncrypt => $bindSend,
  );
  $bindEncrypt->callTray();

  # send
  $bindSend->callTray();
}

Here I've dropped the encrypted-or-unencrypted choice to save the space, the data is always encrypted twice. The drainFrame() call has been dropped because it has nothing to do anyway, and actually with the way the function calls work here. The rest of the code stays the same.

The bindings have been split in stages. In each stage the next binding is set, and the data from the previous binding gets sent into it. The binding method callTray() replaces the tray in the binding with an empty one, and then calls all the rowops collected on the old tray (and then if you wonder, what happens to the old tray, it gets discarded). Because of this the first decryption stage with binding



    $retDecrypt => $bindDecrypt,

doesn't send the data circling forever. It just does one pass through the decryption and prepares for the second pass.

Every time AutoFnBind->new() runs, it doesn't replace the binding of the return but pushes a new binding onto the return's stack. Each FnReturn has its own stack of bindings (this way it's easier to manage than a single stack). When an AutoFnBind gets destroyed, it pops the binding from the return's stack. And yes, if you specify multiple bindings in one AutoFnBind, all of them get pushed on construction and popped on destruction. In this case all the auto-binds are in the same block, so they will all be destroyed at the end of block in the opposite order. Which means that in effect the code is equivalent to the nested blocks, and this version might be easier for you to think of:

while(<STDIN>) {
  chomp;

  # receive
  my $abReceive = Triceps::AutoFnBind->new(
    $retReceive => $bindDecrypt,
  );
  $unit->makeArrayCall($lbReceive, "OP_INSERT", $_);

  {
    # 1st decrypt
    my $abDecrypt1 = Triceps::AutoFnBind->new(
      $retDecrypt => $bindDecrypt,
    );
    $bindDecrypt->callTray();

    {
      # 2nd decrypt
      my $abDecrypt1 = Triceps::AutoFnBind->new(
        $retDecrypt => $bindDispatch,
      );
      $bindDecrypt->callTray();

      {
        # processing
        my $abProcess = Triceps::AutoFnBind->new(
          $retOutput => $bindEncrypt,
        );
        $bindDispatch->callTray();

        {
          # 1st encrypt
          my $abEncrypt1 = Triceps::AutoFnBind->new(
            $retEncrypt => $bindEncrypt,
          );
          $bindEncrypt->callTray();

          {
            # 2nd encrypt
            my $abEncrypt1 = Triceps::AutoFnBind->new(
              $retEncrypt => $bindSend,
            );
            $bindEncrypt->callTray();

            # send
            $bindSend->callTray();
          }
        }
      }
    }
  }
}

An interesting consequence of all this nesting, pushing and popping is that you can put the inner calls into the procedural loops if you with. For example, if you want to process every input line thrice:
while(<STDIN>) {
  chomp;

  # receive
  my $abReceive = Triceps::AutoFnBind->new(
    $retReceive => $bindDecrypt,
  );

  for (my $i = 0; $i < 3; $i++) {
    $unit->makeArrayCall($lbReceive, "OP_INSERT", $_);

    {
      # 1st decrypt
      my $abDecrypt1 = Triceps::AutoFnBind->new(
        $retDecrypt => $bindDecrypt,
      );
      $bindDecrypt->callTray();

      {
        # 2nd decrypt
        my $abDecrypt1 = Triceps::AutoFnBind->new(
          $retDecrypt => $bindDispatch,
        );
        $bindDecrypt->callTray();

        {
          # processing
          my $abProcess = Triceps::AutoFnBind->new(
            $retOutput => $bindEncrypt,
          );
          $bindDispatch->callTray();

          {
            # 1st encrypt
            my $abEncrypt1 = Triceps::AutoFnBind->new(
              $retEncrypt => $bindEncrypt,
            );
            $bindEncrypt->callTray();

            {
              # 2nd encrypt
              my $abEncrypt1 = Triceps::AutoFnBind->new(
                $retEncrypt => $bindSend,
              );
              $bindEncrypt->callTray();

              # send
              $bindSend->callTray();
            }
          }
        }
      }
    }
  }
}

This code will run the whole pipeline three times for each input line, and print out three output lines, such as:

>3639366536333263346635303566343934653533343535323534326336313632363332633332
37323635373337353663373432303466353035663439346535333435353235343230366536313664363533643232363136323633323232303633366637353665373433643232333332323230
37323635373337353663373432303466353035663439346535333435353235343230366536313664363533643232363136323633323232303633366637353665373433643232333332323230
37323635373337353663373432303466353035663439346535333435353235343230366536313664363533643232363136323633323232303633366637353665373433643232333332323230

If you wonder, what is the meaning of these lines, they are the same as before. The input is :

inc,OP_INSERT,abc,2

And each line of output is:

result OP_INSERT name="abc" count="3"

I suppose, it would be more entertaining if the processing weren't just incrementing a value in the input data but incrementing some static counter, then the three output lines would be different.

However this is not the only way to do the block nesting. The contents of the FnBinding's tray is not affected in any way by the binding being pushed or popped. It stays there throughout, until it's explicitly flushed by callTray(). So it could use the blocks formed in a more pipeline fashion (as opposed to the more function-call-like fashion shown before):

while(<STDIN>) {
  chomp;

  # receive
  {
    my $abReceive = Triceps::AutoFnBind->new(
      $retReceive => $bindDecrypt,
    );
    $unit->makeArrayCall($lbReceive, "OP_INSERT", $_);
  }

  # 1st decrypt
  {
    my $abDecrypt1 = Triceps::AutoFnBind->new(
      $retDecrypt => $bindDecrypt,
    );
    $bindDecrypt->callTray();
  }

  # 2nd decrypt
  {
    my $abDecrypt1 = Triceps::AutoFnBind->new(
      $retDecrypt => $bindDispatch,
    );
    $bindDecrypt->callTray();
  }

  # processing
  {
    my $abProcess = Triceps::AutoFnBind->new(
      $retOutput => $bindEncrypt,
    );
    $bindDispatch->callTray();
  }

  # 1st encrypt
  {
    my $abEncrypt1 = Triceps::AutoFnBind->new(
      $retEncrypt => $bindEncrypt,
    );
    $bindEncrypt->callTray();
  }

  # 2nd encrypt
  {
    my $abEncrypt1 = Triceps::AutoFnBind->new(
      $retEncrypt => $bindSend,
    );
    $bindEncrypt->callTray();
  }

  # send
  $bindSend->callTray();
}

After each stage its binding is popped, but the tray is carried through to the next stage.

Which way of blocking is better? I'd say they're  pretty equivalent in functionality, and your preference would depend on what style you prefer to express.

Wednesday, October 3, 2012

Streaming functions and pipelines

The streaming functions can be arranged into a pipeline by binding the result of one function to the input of another one. Fundamentally, the pipelines in the world of streaming functions are analogs of the nested calls with the common functions. For example, a pipeline (written for shortness in the Unix way)

a | b | c

is an analog of the common function calls

c(b(a()))

Of course, if the pipeline is fixed, it can as well be connected directly with the label chaining and then stay like this. A more interesting case is when the pipeline needs to be reconfigured dynamically based on the user requests.

An interesting example of pipeline usage comes from the data security. A client may connect to a CEP model element in a clear-text or encrypted way. In the encrypted way the data received from the client needs to be decrypted, then processed, and then the results encrypted before sending them back:

receive | decrypt | process | encrypt | send

In the clear-text mode the pipeline becomes shorter:

receive | process | send

Let's make an example around this idea: To highlight the flexibility, the configuration will be selectable for each input line. If the input starts with a "+", it will be considered encrypted, otherwise clear-text. Since the actual security is not important here, it will be simulated by encoding the text in hex (each byte of data becomes two hexadecimal digits). The real encryption, such as SSL, would of course require the key negotiation, but this little example just skips over this part, since it has no key.

First, define the input and output (receive and send) endpoints:

# All the input and output gets converted through an intermediate
# format of a row with one string field.
my $rtString = Triceps::RowType->new(
    s => "string"
) or confess "$!";

# All the input gets sent here.
my $lbReceive = $unit->makeDummyLabel($rtString, "lbReceive");
my $retReceive = Triceps::FnReturn->new(
    name => "retReceive",
    labels => [
        data => $lbReceive,
    ],
);

# The binding that actually prints the output.
my $bindSend = Triceps::FnBinding->new(
    name => "bindSend",
    on => $retReceive, # any matching return will do
    unit => $unit,
    labels => [
        data => sub {
            print($_[1]->getRow()->get("s"), "\n");
        },
    ],
);

The same row type $rtString will be used for the whole pipeline, sending through the arbitrary strings of text. The binding $bindSend is defined on $retReceive, so they can actually be short-circuited together. But they don't have to. $bindSend can be bound to any matching return. The matching return is defined as having the same number of labels in it, with matching row types. The names of the labels don't matter but their order does. It's a bit tricky: when a binding is created, the labels in it get connected to the return on which it's defined by name. But at this point each of them gets assigned a number, in order the labels went in that original return. After that only this number matters: if this binding gets connected to another matching return, it will get the data from the return's label with the same number, not the same name.

Next step, define the endpoints for the processing: the dispatcher and the output label. All of them use the same row type and matching returns. The actual processing will eventually be hard-connected between these endpoints.

my %dispatch; # the dispatch table will be set here

# The binding that dispatches the input data
my $bindDispatch = Triceps::FnBinding->new(
    name => "bindDispatch",
    on => $retReceive,
    unit => $unit,
    labels => [
        data => sub {
            my @data = split(/,/, $_[1]->getRow()->get("s")); # starts with a command, then string opcode
            my $type = shift @data;
            my $lb = $dispatch{$type};
            my $rowop = $lb->makeRowopArray(@data);
            $unit->call($rowop);
        },
    ],
);

# All the output gets converted to rtString and sent here.
my $lbOutput = $unit->makeDummyLabel($rtString, "lbOutput");
my $retOutput = Triceps::FnReturn->new(
    name => "retOutput",
    labels => [
        data => $lbOutput,
    ],
);

And the filters for encryption and decryption. Each of them has a binding for its input and a return for its output. The actual pseudo-encryption transformation is done with Perl functions unpack() and pack().

# The encryption pipeline element.
my $retEncrypt = Triceps::FnReturn->new(
    name => "retEncrypt",
    unit => $unit,
    labels => [
        data => $rtString,
    ],
);
my $lbEncrypt = $retEncrypt->getLabel("data") or confess "$!";
my $bindEncrypt = Triceps::FnBinding->new(
    name => "bindEncrypt",
    on => $retReceive,
    unit => $unit,
    labels => [
        data => sub {
            my $s = $_[1]->getRow()->get("s");
            $unit->makeArrayCall($lbEncrypt, "OP_INSERT", unpack("H*", $s));
        },
    ],
);

# The decryption pipeline element.
my $retDecrypt = Triceps::FnReturn->new(
    name => "retDecrypt",
    unit => $unit,
    labels => [
        data => $rtString,
    ],
);
my $lbDecrypt = $retDecrypt->getLabel("data") or confess "$!";
my $bindDecrypt = Triceps::FnBinding->new(
    name => "bindDecrypt",
    on => $retReceive,
    unit => $unit,
    labels => [
        data => sub {
            my $s = $_[1]->getRow()->get("s");
            $unit->makeArrayCall($lbDecrypt, "OP_INSERT", pack("H*", $s));
        },
    ],
);

Then goes the body of the model. It defines the actual row types for the data that gets parsed from strings and the business logic (which is pretty simple, increasing an integer field). The dispatch table connects the dispatcher with the business logic, and the conversion from the data rows to the plain text rows is done with template makePipePrintLabel(). This template is very similar to the tempate makePrintLabel() that was shown in the section "Simple wrapper templates" http://triceps.sourceforge.net/docs-1.0.1/guide.html#sc_template_wrapper.

sub makePipePrintLabel($$$) # ($print_label_name, $parent_label, $out_label)
{
    my $name = shift;
    my $lbParent = shift;
    my $lbOutput = shift;
    my $unit = $lbOutput->getUnit();
    my $lb = $lbParent->getUnit()->makeLabel($lbParent->getType(), $name,
        undef, sub { # (label, rowop)
            $unit->makeArrayCall(
                $lbOutput, "OP_INSERT", $_[1]->printP());
        }) or confess "$!";
    $lbParent->chain($lb) or confess "$!";
    return $lb;
}

# The body of the model: pass through the name, increase the count.
my $rtData = Triceps::RowType->new(
    name => "string",
    count => "int32",
) or confess "$!";

my $lbIncResult = $unit->makeDummyLabel($rtData, "result");
my $lbInc = $unit->makeLabel($rtData, "inc", undef, sub {
    my $row = $_[1]->getRow();
    $unit->makeHashCall($lbIncResult, $_[1]->getOpcode(),
        name  => $row->get("name"),
        count => $row->get("count") + 1,
    );
}) or confess ("$!");
makePipePrintLabel("printResult", $lbIncResult, $lbOutput);

%dispatch = (
    inc => $lbInc,
);

Finally, the main loop. It will check the input lines for the leading "+" and construct one or the other pipeline for processing. Of course, the pipelines don't have to be constructed in the main loop. They could have been constructed in the handler of $lbReceive just as well (then it would need a separate label to send its result to, and to include into $retReceive).

while(&readLine) {
    my $ab;
    chomp;
    if (/^\+/) {
        $ab = Triceps::AutoFnBind->new(
            $retReceive => $bindDecrypt,
            $retDecrypt => $bindDispatch,
            $retOutput => $bindEncrypt,
            $retEncrypt => $bindSend,
        );
        $_ = substr($_, 1);
    } else {
        $ab = Triceps::AutoFnBind->new(
            $retReceive => $bindDispatch,
            $retOutput => $bindSend,
        );
    };
    $unit->makeArrayCall($lbReceive, "OP_INSERT", $_);
    $unit->drainFrame();
}

The constructor of AutoFnBind (and also FnBinding::call()) can accept multiple return-binding pairs. It will bind them all, and unbind them back on its object destruction. It's the same thing as creating multiple AutoFnBind objects for one pair each, only more efficient.

And here is an example of a run ("> " as usual per new tradition shows the input lines):

> inc,OP_INSERT,abc,1
result OP_INSERT name="abc" count="2"
> inc,OP_DELETE,def,100
result OP_DELETE name="def" count="101"
> +696e632c4f505f494e534552542c6162632c32
726573756c74204f505f494e53455254206e616d653d226162632220636f756e743d22332220
> +696e632c4f505f44454c4554452c6465662c313031
726573756c74204f505f44454c455445206e616d653d226465662220636f756e743d223130322220

What is in the encrypted data? The input lines have been produced by running a Perl expression manually:

$ perl -e 'print((unpack "H*", "inc,OP_INSERT,abc,2"), "\n");'
696e632c4f505f494e534552542c6162632c32
$ perl -e 'print((unpack "H*", "inc,OP_DELETE,def,101"), "\n");'
696e632c4f505f44454c4554452c6465662c313031

They and the results can be decoded by running another Perl expression:

$ perl -e 'print((pack "H*", "726573756c74204f505f494e53455254206e616d653d226162632220636f756e743d22332220"), "\n");'
result OP_INSERT name="abc" count="3"
$ perl -e 'print((pack "H*", "726573756c74204f505f44454c455445206e616d653d226465662220636f756e743d223130322220"), "\n");'
result OP_DELETE name="def" count="102"