Wednesday, October 3, 2012

Streaming functions and pipelines

The streaming functions can be arranged into a pipeline by binding the result of one function to the input of another one. Fundamentally, the pipelines in the world of streaming functions are analogs of the nested calls with the common functions. For example, a pipeline (written for shortness in the Unix way)

a | b | c

is an analog of the common function calls

c(b(a()))

Of course, if the pipeline is fixed, it can as well be connected directly with the label chaining and then stay like this. A more interesting case is when the pipeline needs to be reconfigured dynamically based on the user requests.

An interesting example of pipeline usage comes from the data security. A client may connect to a CEP model element in a clear-text or encrypted way. In the encrypted way the data received from the client needs to be decrypted, then processed, and then the results encrypted before sending them back:

receive | decrypt | process | encrypt | send

In the clear-text mode the pipeline becomes shorter:

receive | process | send

Let's make an example around this idea: To highlight the flexibility, the configuration will be selectable for each input line. If the input starts with a "+", it will be considered encrypted, otherwise clear-text. Since the actual security is not important here, it will be simulated by encoding the text in hex (each byte of data becomes two hexadecimal digits). The real encryption, such as SSL, would of course require the key negotiation, but this little example just skips over this part, since it has no key.

First, define the input and output (receive and send) endpoints:

# All the input and output gets converted through an intermediate
# format of a row with one string field.
my $rtString = Triceps::RowType->new(
    s => "string"
) or confess "$!";

# All the input gets sent here.
my $lbReceive = $unit->makeDummyLabel($rtString, "lbReceive");
my $retReceive = Triceps::FnReturn->new(
    name => "retReceive",
    labels => [
        data => $lbReceive,
    ],
);

# The binding that actually prints the output.
my $bindSend = Triceps::FnBinding->new(
    name => "bindSend",
    on => $retReceive, # any matching return will do
    unit => $unit,
    labels => [
        data => sub {
            print($_[1]->getRow()->get("s"), "\n");
        },
    ],
);

The same row type $rtString will be used for the whole pipeline, sending through the arbitrary strings of text. The binding $bindSend is defined on $retReceive, so they can actually be short-circuited together. But they don't have to. $bindSend can be bound to any matching return. The matching return is defined as having the same number of labels in it, with matching row types. The names of the labels don't matter but their order does. It's a bit tricky: when a binding is created, the labels in it get connected to the return on which it's defined by name. But at this point each of them gets assigned a number, in order the labels went in that original return. After that only this number matters: if this binding gets connected to another matching return, it will get the data from the return's label with the same number, not the same name.

Next step, define the endpoints for the processing: the dispatcher and the output label. All of them use the same row type and matching returns. The actual processing will eventually be hard-connected between these endpoints.

my %dispatch; # the dispatch table will be set here

# The binding that dispatches the input data
my $bindDispatch = Triceps::FnBinding->new(
    name => "bindDispatch",
    on => $retReceive,
    unit => $unit,
    labels => [
        data => sub {
            my @data = split(/,/, $_[1]->getRow()->get("s")); # starts with a command, then string opcode
            my $type = shift @data;
            my $lb = $dispatch{$type};
            my $rowop = $lb->makeRowopArray(@data);
            $unit->call($rowop);
        },
    ],
);

# All the output gets converted to rtString and sent here.
my $lbOutput = $unit->makeDummyLabel($rtString, "lbOutput");
my $retOutput = Triceps::FnReturn->new(
    name => "retOutput",
    labels => [
        data => $lbOutput,
    ],
);

And the filters for encryption and decryption. Each of them has a binding for its input and a return for its output. The actual pseudo-encryption transformation is done with Perl functions unpack() and pack().

# The encryption pipeline element.
my $retEncrypt = Triceps::FnReturn->new(
    name => "retEncrypt",
    unit => $unit,
    labels => [
        data => $rtString,
    ],
);
my $lbEncrypt = $retEncrypt->getLabel("data") or confess "$!";
my $bindEncrypt = Triceps::FnBinding->new(
    name => "bindEncrypt",
    on => $retReceive,
    unit => $unit,
    labels => [
        data => sub {
            my $s = $_[1]->getRow()->get("s");
            $unit->makeArrayCall($lbEncrypt, "OP_INSERT", unpack("H*", $s));
        },
    ],
);

# The decryption pipeline element.
my $retDecrypt = Triceps::FnReturn->new(
    name => "retDecrypt",
    unit => $unit,
    labels => [
        data => $rtString,
    ],
);
my $lbDecrypt = $retDecrypt->getLabel("data") or confess "$!";
my $bindDecrypt = Triceps::FnBinding->new(
    name => "bindDecrypt",
    on => $retReceive,
    unit => $unit,
    labels => [
        data => sub {
            my $s = $_[1]->getRow()->get("s");
            $unit->makeArrayCall($lbDecrypt, "OP_INSERT", pack("H*", $s));
        },
    ],
);

Then goes the body of the model. It defines the actual row types for the data that gets parsed from strings and the business logic (which is pretty simple, increasing an integer field). The dispatch table connects the dispatcher with the business logic, and the conversion from the data rows to the plain text rows is done with template makePipePrintLabel(). This template is very similar to the tempate makePrintLabel() that was shown in the section "Simple wrapper templates" http://triceps.sourceforge.net/docs-1.0.1/guide.html#sc_template_wrapper.

sub makePipePrintLabel($$$) # ($print_label_name, $parent_label, $out_label)
{
    my $name = shift;
    my $lbParent = shift;
    my $lbOutput = shift;
    my $unit = $lbOutput->getUnit();
    my $lb = $lbParent->getUnit()->makeLabel($lbParent->getType(), $name,
        undef, sub { # (label, rowop)
            $unit->makeArrayCall(
                $lbOutput, "OP_INSERT", $_[1]->printP());
        }) or confess "$!";
    $lbParent->chain($lb) or confess "$!";
    return $lb;
}

# The body of the model: pass through the name, increase the count.
my $rtData = Triceps::RowType->new(
    name => "string",
    count => "int32",
) or confess "$!";

my $lbIncResult = $unit->makeDummyLabel($rtData, "result");
my $lbInc = $unit->makeLabel($rtData, "inc", undef, sub {
    my $row = $_[1]->getRow();
    $unit->makeHashCall($lbIncResult, $_[1]->getOpcode(),
        name  => $row->get("name"),
        count => $row->get("count") + 1,
    );
}) or confess ("$!");
makePipePrintLabel("printResult", $lbIncResult, $lbOutput);

%dispatch = (
    inc => $lbInc,
);

Finally, the main loop. It will check the input lines for the leading "+" and construct one or the other pipeline for processing. Of course, the pipelines don't have to be constructed in the main loop. They could have been constructed in the handler of $lbReceive just as well (then it would need a separate label to send its result to, and to include into $retReceive).

while(&readLine) {
    my $ab;
    chomp;
    if (/^\+/) {
        $ab = Triceps::AutoFnBind->new(
            $retReceive => $bindDecrypt,
            $retDecrypt => $bindDispatch,
            $retOutput => $bindEncrypt,
            $retEncrypt => $bindSend,
        );
        $_ = substr($_, 1);
    } else {
        $ab = Triceps::AutoFnBind->new(
            $retReceive => $bindDispatch,
            $retOutput => $bindSend,
        );
    };
    $unit->makeArrayCall($lbReceive, "OP_INSERT", $_);
    $unit->drainFrame();
}

The constructor of AutoFnBind (and also FnBinding::call()) can accept multiple return-binding pairs. It will bind them all, and unbind them back on its object destruction. It's the same thing as creating multiple AutoFnBind objects for one pair each, only more efficient.

And here is an example of a run ("> " as usual per new tradition shows the input lines):

> inc,OP_INSERT,abc,1
result OP_INSERT name="abc" count="2"
> inc,OP_DELETE,def,100
result OP_DELETE name="def" count="101"
> +696e632c4f505f494e534552542c6162632c32
726573756c74204f505f494e53455254206e616d653d226162632220636f756e743d22332220
> +696e632c4f505f44454c4554452c6465662c313031
726573756c74204f505f44454c455445206e616d653d226465662220636f756e743d223130322220

What is in the encrypted data? The input lines have been produced by running a Perl expression manually:

$ perl -e 'print((unpack "H*", "inc,OP_INSERT,abc,2"), "\n");'
696e632c4f505f494e534552542c6162632c32
$ perl -e 'print((unpack "H*", "inc,OP_DELETE,def,101"), "\n");'
696e632c4f505f44454c4554452c6465662c313031

They and the results can be decoded by running another Perl expression:

$ perl -e 'print((pack "H*", "726573756c74204f505f494e53455254206e616d653d226162632220636f756e743d22332220"), "\n");'
result OP_INSERT name="abc" count="3"
$ perl -e 'print((pack "H*", "726573756c74204f505f44454c455445206e616d653d226465662220636f756e743d223130322220"), "\n");'
result OP_DELETE name="def" count="102"

No comments:

Post a Comment