## Tuesday, October 2, 2012

### Streaming functions and loops

The streaming functions can be used to replace the topological loops (where the connection between the labels go in circles) with the procedural ones. Just make the body of the loop into a streaming function and connect its output with its own input (and of course also to the loop results). Then call this function in a procedural while-loop until the data stop circulating.

The way the streaming functions have been described so far, there is a catch, even two of them: First, with such a connection, the output of the streaming function would immediately circulate to its input, and would try to keep circulating until the loop is done, with no need for a while-loop. Second, as soon as it attempts to circulate, the scheduler will detect a recursive call and die.

But there is also a solution that has not been described yet: an FnBinding can collect the incoming rowops in a tray instead of immediately forwarding them. This tray can be called later, after the call completes. This way the iteration has its data collected, function completes, and then the next iteration of the while-loop starts, sending the data from the previous iteration. When there is nothing to send, the loop completes.

Using this logic, let's rewrite the Fibonacci example with the streaming function loops. Its original version and description of the logic can be found in the manual section "Example of a topological loop" http://triceps.sourceforge.net/docs-1.0.1/guide.html#sc_sched_loop_ex.

The new version is:

my \$uFib = Triceps::Unit->new("uFib");

###
# A streaming function that computes one step of a
# Fibonacci number, will be called repeatedly.

# Type of its input and output.
my \$rtFib = Triceps::RowType->new(
iter => "int32", # number of iterations left to do
cur => "int64", # current number
prev => "int64", # previous number
) or confess "\$!";

# Input:
#   \$lbFibCompute: request to do a step. iter will be decremented,
#     cur moved to prev, new value of cur computed.
# Output (by FnReturn labels):
#   "next": data to send to the next step, if the iteration
#     is not finished yet (iter in the produced row is >0).
#   "result": the result data if the iretaion is finished
#     (iter in the produced row is 0).
# The opcode is preserved through the computation.

my \$frFib = Triceps::FnReturn->new(
name => "Fib",
unit => \$uFib,
labels => [
next => \$rtFib,
result => \$rtFib,
],
);

my \$lbFibCompute = \$uFib->makeLabel(\$rtFib, "FibCompute", undef, sub {
my \$row = \$_[1]->getRow();
my \$prev = \$row->get("cur");
my \$cur = \$prev + \$row->get("prev");
my \$iter = \$row->get("iter") - 1;
\$uFib->makeHashCall(\$frFib->getLabel(\$iter > 0? "next" : "result"), \$_[1]->getOpcode(),
iter => \$iter,
cur => \$cur,
prev => \$prev,
);
}) or confess "\$!";

# End of streaming function
###

my \$lbPrint = \$uFib->makeLabel(\$rtFib, "Print", undef, sub {
print(\$_[1]->getRow()->get("cur"));
});

# binding to run the Triceps steps in a loop
my \$fbFibLoop = Triceps::FnBinding->new(
name => "FibLoop",
on => \$frFib,
withTray => 1,
labels => [
next => \$lbFibCompute,
result => \$lbPrint,
],
);

my \$lbMain = \$uFib->makeLabel(\$rtFib, "Main", undef, sub {
my \$row = \$_[1]->getRow();
{
my \$ab = Triceps::AutoFnBind->new(\$frFib, \$fbFibLoop);

# send the request into the loop
\$uFib->makeHashCall(\$lbFibCompute, \$_[1]->getOpcode(),
iter => \$row->get("iter"),
cur => 0, # the "0-th" number
prev => 1,
);

# now keep cycling the loop until it's all done
while (!\$fbFibLoop->trayEmpty()) {
\$fbFibLoop->callTray();
}
}
print(" is Fibonacci number ", \$row->get("iter"), "\n");
}) or confess "\$!";

while(<STDIN>) {
chomp;
my @data = split(/,/);
\$uFib->makeArrayCall(\$lbMain, @data);
\$uFib->drainFrame(); # just in case, for completeness
}

It produces the same output as before (as usual in the new convention, the lines marked with "> " are the input lines):

> OP_INSERT,1
1 is Fibonacci number 1
> OP_DELETE,2
1 is Fibonacci number 2
> OP_INSERT,5
5 is Fibonacci number 5
> OP_INSERT,6
8 is Fibonacci number 6

The option "withTray" of FnBind is what makes it collect the rowops in a tray. The rowops are not the original incoming ones but already translated to call the FnBinding's output labels. The method callTray() swaps the tray with a fresh one and then calls the original tray with the collected rowops. There are more methods for the tray control: swapTray() swaps the tray with a fresh one and returns the original one, which can then be read or called; traySize() returns not just the emptiness condition but the whole size of the tray.

The whole loop runs in one binding scope, because it doesn't change with the iterations. The first row primes the loop, and then it continues while there is anything to circulate.

This example sends both the next iteration rows and the result rows through the binding. But for the result rows it doesn't have to. They can be sent directly out of the loop:

my \$lbFibCompute = \$uFib->makeLabel(\$rtFib, "FibCompute", undef, sub {
my \$row = \$_[1]->getRow();
my \$prev = \$row->get("cur");
my \$cur = \$prev + \$row->get("prev");
my \$iter = \$row->get("iter") - 1;
\$uFib->makeHashCall(\$iter > 0? \$frFib->getLabel("next") : \$lbPrint, \$_[1]->getOpcode(),
iter => \$iter,
cur => \$cur,
prev => \$prev,
);
}) or confess "\$!";

The printed result is exactly the same.