As I've mentioned in the last post, Label::adopt() now accepts the rowops from other labels that belong to the other units without complaining.
But there are more refinements. The next one was already shown in the examples but I didn't draw the attention to it:
$lb2 = $lb1->makeChained($name, \&subClear, \&subExec, @args);
It creates a new label by chaining it from another existing label. The arguments are very much the same as in Unit::makeLabel(), only there is no need to specify the row type for the new label (nor obviously the Unit), these are taken from the original label.
Another one allows to check whether the label has been already cleared:
$result = $lb->isCleared();
This method has been available in C++ all along but not it's exported to Perl too.
This started as my thoughts on the field of Complex Event Processing, mostly about my OpenSource project Triceps. But now it's about all kinds of software-related things.
Monday, May 27, 2013
snapshot 1.0.92 and the release plans
I've released the new snapshot, 1.0.92, that contains all the most recent multithreading code. There is one more big example to go, and then the docs to be collected from the blog into the proper manual (and a bunch of them to be written yet), but other than that it's pretty much a preview of the next release.
Speaking of which, it has been clear for a while that the next release has overgrown the 1.1 designation. I should have done a couple intermediate releases but the multithreading looks like a very important feature, and I've been pushing towards that.
Hereby I officially proclaim that the next release will be 2.0, and in honor of that all the future blog posts about it will be tagged 2_0.
Speaking of which, it has been clear for a while that the next release has overgrown the 1.1 designation. I should have done a couple intermediate releases but the multithreading looks like a very important feature, and I've been pushing towards that.
Hereby I officially proclaim that the next release will be 2.0, and in honor of that all the future blog posts about it will be tagged 2_0.
how to export a table, or the guts of TQL join exposed
Now to the point of why the multithreaded TQl example got written: the export of a table between two threads.
It all starts in the Tql initialization method. In the multithreaded mode it builds the nexuses for communication. I'll skip the input nexus and show the building of only the output and request-dump nexuses:
# row type for dump requests and responses
my $rtRequest = Triceps::RowType->new(
client => "string", #requesting client
id => "string", # request id
name => "string", # the table name, for convenience of requestor
cmd => "string", # for convenience of requestor, the command that it is executing
) or confess "$!";
The request row type is used by the client writer thread to request the table dumps from the core logic, and to get back the notifications about the dumps.
# build the output side
for (my $i = 0; $i <= $#{$self->{tables}}; $i++) {
my $name = $self->{tableNames}[$i];
my $table = $self->{tables}[$i];
push @tabtypes, $name, $table->getType()->copyFundamental();
push @labels, "t.out." . $name, $table->getOutputLabel();
push @labels, "t.dump." . $name, $table->getDumpLabel();
}
push @labels, "control", $rtControl; # pass-through from in to out
push @labels, "beginDump", $rtRequest; # framing for the table dumps
push @labels, "endDump", $rtRequest;
$self->{faOut} = $owner->makeNexus(
name => $self->{nxprefix} . "out",
labels => [ @labels ],
tableTypes => [ @tabtypes ],
import => "writer",
);
$self->{beginDump} = $self->{faOut}->getLabel("beginDump");
$self->{endDump} = $self->{faOut}->getLabel("endDump");
On the output side each table is represented by 3 elements:
The row type $rtControl and label "control" is used to pass the commands from the client reader to client writer, but it's exact contents is not important here.
The dump request nexus is built in a similar way:
# build the dump requests, will be coming from below
undef @labels;
for (my $i = 0; $i <= $#{$self->{tables}}; $i++) {
my $name = $self->{tableNames}[$i];
my $table = $self->{tables}[$i];
push @labels, "t.rqdump." . $name, $rtRequest;
}
$self->{faRqDump} = $owner->makeNexus(
name => $self->{nxprefix} . "rqdump",
labels => [ @labels ],
reverse => 1, # avoids making a loop, and gives priority
import => "reader",
);
# tie together the labels
for (my $i = 0; $i <= $#{$self->{tables}}; $i++) {
my $name = $self->{tableNames}[$i];
my $table = $self->{tables}[$i];
$self->{faRqDump}->getLabel("t.rqdump." . $name)->makeChained(
$self->{nxprefix} . "rqdump." . $name, undef,
\&_dumpTable, $self, $table
);
}
The dumps are executed in the function _dumpTable:
sub _dumpTable # ($label, $rowop, $self, $table)
{
my ($label, $rop, $self, $table) = @_;
my $unit = $label->getUnit();
# pass through the client id to the dump
$unit->call($self->{beginDump}->adopt($rop));
$table->dumpAll();
$unit->call($self->{endDump}->adopt($rop));
$self->{faOut}->flushWriter();
}
The data gets framed around by the "beginDump" and "endDump" labels getting the copies of the original request. This helps the client writer thread keep track of its current spot. The flushing of the writer is not strictly needed. Just in case if multiple dump requests are received in a single tray, it breaks up the responses into a separate tray for each dump, keeping the size of the trays lower. Not that this situation could actually happen yet.
This part taken care of, let's jump around and see how the client writer thread processes a "querysub" request:
} elsif ($cmd eq "querysub") {
if ($id eq "" || exists $queries{$id}) {
printOrShut($app, $fragment, $sock,
"error,$id,Duplicate id '$id': query ids must be unique,bad_id,$id\n");
next;
}
my $ctx = compileQuery(
qid => $id,
qname => $args[0],
text => $args[1],
subError => sub {
chomp $_[2];
$_[2] =~ s/\n/\\n/g; # no real newlines in the output
$_[2] =~ s/,/;/g; # no confusing commas in the output
printOrShut($app, $fragment, $sock, "error,", join(',', @_), "\n");
},
faOut => $faOut,
faRqDump => $faRqDump,
subPrint => sub {
printOrShut($app, $fragment, $sock, @_);
},
);
if ($ctx) { # otherwise the error is already reported
$queries{$id} = $ctx;
&$runNextRequest($ctx);
}
}
The query id is used to keep track of the outstanding queries, so the code makes sure that it's unique, and you can see an example of the query response. The bulk of the work is done in the method compileQuery(). The arguments to it give the details of the query and also provide the closures for the functionality that differs between the single-threaded and multi-threaded versions. The option "subError" is used to send the errors to the client, and "subPrint" is used to send the output to the client, it gets used for building the labels in the "print" command of the query.
compileQuery() returns the query context, which contains a compiled sub-model that executes the query and a set of requests that tell the writer how to connect the query to the incoming data. Or on error it reports the error using subError and returns an undef. If the compilation succeeded, the writer remembers the query and starts the asynchronous execution of the requests. More about the requests later, now let's look at the query compilation and context.
The context is created in compileQuery() thusly:
my $ctx = {};
$ctx->{qid} = $opts->{qid};
$ctx->{qname} = $opts->{qname};
# .. skipped the parts related to single-threadde TQL
$ctx->{faOut} = $opts->{faOut};
$ctx->{faRqDump} = $opts->{faRqDump};
$ctx->{subPrint} = $opts->{subPrint};
$ctx->{requests} = []; # dump and subscribe requests that will run the pipeline
$ctx->{copyTables} = []; # the tables created in this query
# (have to keep references to the tables or they will disappear)
# The query will be built in a separate unit
$ctx->{u} = Triceps::Unit->new($opts->{nxprefix} . "${q}.unit");
$ctx->{prev} = undef; # will contain the output of the previous command in the pipeline
$ctx->{id} = 0; # a unique id for auto-generated objects
# deletion of the context will cause the unit in it to clean
$ctx->{cleaner} = $ctx->{u}->makeClearingTrigger();
It has some parts common and some parts differing for the single- and multi-threaded varieties, here I've skipped over the single-threaded parts.
One element that is left undefined here is $ctx->{prev}. It's the label created as the output of the previous stage of the query pipeline. As each command in the pipeline builds its piece of processing, it chains its logic from $ctx->{prev} and leaves its result label in $ctx->{next}. Then compileQuery() moves "next" to "prev" and calls the compilation of the next command in the pipeline. The only command that accepts an undefined "prev" (and it must be undefined for it) is "read", that reads the table at the start of the pipeline.
$ctx->{copyTables} also has an important point behind it. When you create a label, it's OK to discard the original reference after you chain the label into the logic, that chaining will keep a reference and the label will stay alive. Not so with a table: if you create a table, chain its input label and then drop the reference to a table, the table will be discarded. Then when the input label will try to send any data to the table, it will die (and unless very recently it outright crashed). So it's important to keep the table reference alive, and that's what this array is for.
$ctx->{id} is used to generate the unique names for the objects build in a query.
Each query is built in its own unit. This is convenient, after the query is done or the compilation encounters an error, the unit with its whole contents gets easily discarded. The clearing trigger placed in the context makes sure that the unit gets properly cleared and discarded.
Next goes the compilation of the join query command, I'll go through it in chunks.
sub _tqlJoin # ($ctx, @args)
{
my $ctx = shift;
die "The join command may not be used at the start of a pipeline.\n"
unless (defined($ctx->{prev}));
my $opts = {};
&Triceps::Opt::parse("join", $opts, {
table => [ undef, \&Triceps::Opt::ck_mandatory ],
rightIdxPath => [ undef, undef ],
by => [ undef, undef ],
byLeft => [ undef, undef ],
leftFields => [ undef, undef ],
rightFields => [ undef, undef ],
type => [ "inner", undef ],
}, @_);
my $tabname = bunquote($opts->{table});
my $unit = $ctx->{u};
my $table;
&Triceps::Opt::checkMutuallyExclusive("join", 1, "by", $opts->{by}, "byLeft", $opts->{byLeft});
my $by = split_braced_final($opts->{by});
my $byLeft = split_braced_final($opts->{byLeft});
my $rightIdxPath;
if (defined $opts->{rightIdxPath}) { # propagate the undef
$rightIdxPath = split_braced_final($opts->{rightIdxPath});
}
It starts by parsing the options and converting them to the internal representation, removing the braced quotes.
if ($ctx->{faOut}) {
# Potentially, the tables might be reused between multiple joins
# in the query if the required keys match. But for now keep things
# simpler by creating a new table from scratch each time.
my $tt = eval {
# copy to avoid adding an index to the original type
$ctx->{faOut}->impTableType($tabname)->copy();
};
die ("Join found no such table '$tabname'\n") unless ($tt);
if (!defined $rightIdxPath) {
# determine or add the index automatically
my @workby;
if (defined $byLeft) { # need to translate
my @leftfld = $ctx->{prev}->getRowType()->getFieldNames();
@workby = &Triceps::Fields::filterToPairs("Join option 'byLeft'",
\@leftfld, [ @$byLeft, "!.*" ]);
} else {
@workby = @$by;
}
my @idxkeys; # extract the keys for the right side table
for (my $i = 1; $i <= $#workby; $i+= 2) {
push @idxkeys, $workby[$i];
}
$rightIdxPath = [ $tt->findOrAddIndex(@idxkeys) ];
}
# build the table from the type
$tt->initialize() or confess "$!";
$table = $ctx->{u}->makeTable($tt, "EM_CALL", "tab" . $ctx->{id} . $tabname);
push @{$ctx->{copyTables}}, $table;
# build the request that fills the table with data and then
# keeps it up to date;
# the table has to be filled before the query's main flow starts,
# so put the request at the front
&_makeQdumpsub($ctx, $tabname, 1, $table->getInputLabel());
} else {
die ("Join found no such table '$tabname'\n")
unless (exists $ctx->{tables}{$tabname});
$table = $ctx->{tables}{$tabname};
}
The presence of $ctx->{faOut} means that the query is compiled in the multithreaded context.
The command handles may freely die, and the error messages will be caught by compileQuery() and nicely (at least, sort-of) reported back to the user.
If an explicit rightIdxPath was not requested, it gets found or added automatically. On the way there the index fields need to be determined. Which can be specified as either explicit pairs in the option "by" or the in the name translation syntax in the option "byLeft". If we've got a "byLeft", first it gets translated to the same format as "by", and then the right-side fields are extracted from the format of "by". After that $tt->findOrAddIndex() takes care of all the heavy lifting. It either finds a matching index type in the table type or creates a new one from the specified fields, and either way returns the index path. (An invalid field will make it confess).
It looks a bit anti-climactic, but the three lines of exporting with copyFundamental(), impTableType() and findOrAddIndex() is what this large example is all about.
You might wonder, how come the explicit rightIdxPath is not checked in any way? It will be checked later by LookupJoin(), so not much point in doing the check twice.
After that the table is created in a straightforward way, and rememebered in copyTables. And the requests list gets prepended with a request to dump and subscribe to this table. I'll get back to that, for now let's finish up with _tqlJoin().
my $isLeft = 0; # default for inner join
my $type = $opts->{type};
if ($type eq "inner") {
# already default
} elsif ($type eq "left") {
$isLeft = 1;
} else {
die "Unsupported value '$type' of option 'type'.\n"
}
my $leftFields = split_braced_final($opts->{leftFields});
my $rightFields = split_braced_final($opts->{rightFields});
my $join = Triceps::LookupJoin->new(
name => "join" . $ctx->{id},
unit => $unit,
leftFromLabel => $ctx->{prev},
rightTable => $table,
rightIdxPath => $rightIdxPath,
leftFields => $leftFields,
rightFields => $rightFields,
by => $by,
byLeft => $byLeft,
isLeft => $isLeft,
fieldsDropRightKey => 1,
);
$ctx->{next} = $join->getOutputLabel();
}
The rest of the options get parsed, and then all the collected data gets forwarded to the LookupJoin constructor. Finally the "next" label is assigned from the join's result.
Now jumping to the _makeQdumpsub(). It's used by both the "read" and "join" query commands to initiate the joins and subscriptions.
sub _makeQdumpsub # ($ctx, $tabname, [$front, $lbNext])
{
my $ctx = shift;
my $tabname = shift;
my $front = shift;
my $lbNext = shift;
my $unit = $ctx->{u};
my $lbrq = eval {
$ctx->{faRqDump}->getLabel("t.rqdump.$tabname");
};
my $lbsrc = eval {
$ctx->{faOut}->getLabel("t.out.$tabname");
};
die ("Found no such table '$tabname'\n") unless ($lbrq && $lbsrc);
# compute the binding for the data dumps, that would be a cross-unit
# binding to the original faOut but it's OK
my $fretOut = $ctx->{faOut}->getFnReturn();
my $dumpname = "t.dump.$tabname";
# the dump and following subscription data will merge on this label
if (!defined $lbNext) {
$lbNext = $unit->makeDummyLabel(
$lbsrc->getRowType(), "lb" . $ctx->{id} . "out_$tabname");
}
my $bindDump = Triceps::FnBinding->new(
on => $fretOut,
name => "bind" . $ctx->{id} . "dump",
labels => [ $dumpname => $lbNext ],
);
First it finds all the proper labels. The label $lbNext will accept the merged dump contents and the following subscription, and it might be either auto-generated or received as an argument. A join pass it as an argument, $table->getInputLabel(), so all the data goes to the copied table.
The binding is used to receive the dump. It's a bit of an optimization. Remember, the dump labels are shared between all the clients. Whenever any client requests a dump, all the clients will get the response. A client finds that the incoming dump is destined for it by processing the "beginDump" label. If it contains this client's name, the dump is destined here, and the client reacts by pushing the appropriate binding onto the facet's FnReturn, and the data flows. The matching "endDump" label then pops the binding and the data stops flowing. The binding allows to avoid checking every rowop for whethere it's supposed to be accepted and if yes then where exactly (rememeber, the same table may be dumped independently multiple times by multiple queries). Just check once at the start of the bundle and then let the data flow in bulk.
# qdumpsub:
# * label where to send the dump request to
# * source output label, from which a subscription will be set up
# at the end of the dump
# * target label in the query that will be tied to the source label
# * binding to be used during the dump, which also directs the data
# to the same target label
my $request = [ "qdumpsub", $lbrq, $lbsrc, $lbNext, $bindDump ];
if ($front) {
unshift @{$ctx->{requests}}, $request;
} else {
push @{$ctx->{requests}}, $request;
}
return $lbNext;
}
Finally, the created bits and pieces get packaged into a request and added to the list of requests in the query context. The last tricky part is that the request can be added at the back or the front of the list. The "normal" way is to add to the back, however the dimension tables for the joins have to be populated before the main data flow of the query starts. So for them the argument $front is set to 1, and they get added in the front.
Now jumping back to the writer thread logic, after it called compileQuery, it starts the query execution by calling &$runNextRequest(). Which is a closure function defined inside the client writer function, and knows how to process the "qdumpsub"s we've just seen created.
my $runNextRequest = sub { # ($ctx)
my $ctx = shift;
my $requests = $ctx->{requests};
undef $ctx->{curRequest}; # clear the info of the previous request
my $r = shift @$requests;
if (!defined $r) {
# all done, now just need to pump the data through
printOrShut($app, $fragment, $sock,
"querysub,$ctx->{qid},$ctx->{qname}\n");
return;
}
First it clears the information about the previous request, if any. This function will be called after each request, to send on the next one, so on all its calls except the first one for a query it will have something to clear.
Then it checks if all the requests are already done. If so, it sends the query confirmation to the client and returns. The subscription part of the query will continue running on its own.
$ctx->{curRequest} = $r; # remember until completed
my $cmd = $$r[0];
if ($cmd eq "qdumpsub") {
# qdumpsub:
# * label where to send the dump request to
# * source output label, from which a subscription will be set up
# at the end of the dump
# * target label in the query that will be tied to the source label
# * binding to be used during the dump, which also directs the data
# to the same target label
my $lbrq = $$r[1];
$unit->makeHashCall($lbrq, "OP_INSERT",
client => $fragment, id => $ctx->{qid}, name => $ctx->{qname}, cmd => $cmd);
The "qdumpsub" gets forwarded to the core logic. The responses will be processed in the handlers or "beginDump" and "endDump". One of the great pains of this "actor" architecture is that the linear logic gets broken up into many disjointed pieces in the separate handlers.
} else {
printOrShut($app, $fragment, $sock,
"error,", $ctx->{qid}, ",Internal error: unknown request '$cmd',internal,", $cmd, "\n");
$ctx->{requests} = [];
undef $ctx->{curRequest};
# and this will leave the query partially initialized,
# but it should never happen
return;
}
};
And a catch-all just in case if the query compiler ever decides to produce an invalid request.
Next goes the handling of the dump labels (again, this gets set up during the build of the client reader threads, and then the nature is left to run its course, reacting to the rowops as they come in).
$faOut->getLabel("beginDump")->makeChained("lbBeginDump", undef, sub {
my $row = $_[1]->getRow();
my ($client, $id, $name, $cmd) = $row->toArray();
return unless ($client eq $fragment);
if ($cmd eq "qdumpsub") {
return unless(exists $queries{$id});
my $ctx = $queries{$id};
$fretOut->push($ctx->{curRequest}[4]); # the binding for the dump
} else {
# .. skipped the handling of dump/dumpsub
}
});
All it does is checks if this is the destination client, and if there is an active request with this id, then it pushes the appropriate binding.
$faOut->getLabel("endDump")->makeChained("lbEndDump", undef, sub {
my $row = $_[1]->getRow();
my ($client, $id, $name, $cmd) = $row->toArray();
return unless ($client eq $fragment);
if ($cmd eq "qdumpsub") {
return unless(exists $queries{$id});
my $ctx = $queries{$id};
$fretOut->pop($ctx->{curRequest}[4]); # the binding for the dump
# and chain together all the following updates
$ctx->{curRequest}[2]->makeChained(
"qsub$id." . $ctx->{curRequest}[3]->getName(), undef,
sub {
# a cross-unit call
$_[2]->call($_[3]->adopt($_[1]));
},
$ctx->{u}, $ctx->{curRequest}[3]
);
&$runNextRequest($ctx);
} else {
# .. skipped the handling of dump/dumpsub
}
});
Same things as the "beginDump", checks if this is the right client, and if it has an outstanding dump request, then pops the binding. After the dump is completed, the subscription has to be set up, so it sets up a label that forwards the normal output of this table to the label specified in the request. Since each query is defined in its own unit, this forwarding is done as a cross-unit call.
And then the next request of this query can be started.
By the way, the cross-unit adopt() didn't work in Perl until I wrote this example. There was a check against it (the C++ API never bothered with this check). But the adoption between the units has turned out to be quite convenient, so I've removed that check.
And that's it. Long and winding but finally completed. It's kind of about only three lines of code, but I think the rest of it also shows the useful techniques of the work with threads.
It all starts in the Tql initialization method. In the multithreaded mode it builds the nexuses for communication. I'll skip the input nexus and show the building of only the output and request-dump nexuses:
# row type for dump requests and responses
my $rtRequest = Triceps::RowType->new(
client => "string", #requesting client
id => "string", # request id
name => "string", # the table name, for convenience of requestor
cmd => "string", # for convenience of requestor, the command that it is executing
) or confess "$!";
The request row type is used by the client writer thread to request the table dumps from the core logic, and to get back the notifications about the dumps.
# build the output side
for (my $i = 0; $i <= $#{$self->{tables}}; $i++) {
my $name = $self->{tableNames}[$i];
my $table = $self->{tables}[$i];
push @tabtypes, $name, $table->getType()->copyFundamental();
push @labels, "t.out." . $name, $table->getOutputLabel();
push @labels, "t.dump." . $name, $table->getDumpLabel();
}
push @labels, "control", $rtControl; # pass-through from in to out
push @labels, "beginDump", $rtRequest; # framing for the table dumps
push @labels, "endDump", $rtRequest;
$self->{faOut} = $owner->makeNexus(
name => $self->{nxprefix} . "out",
labels => [ @labels ],
tableTypes => [ @tabtypes ],
import => "writer",
);
$self->{beginDump} = $self->{faOut}->getLabel("beginDump");
$self->{endDump} = $self->{faOut}->getLabel("endDump");
On the output side each table is represented by 3 elements:
- its fundamental table type (stripped down to the primary key);
- its output label for normal updates;
- its dump label for the responses to the dump requests.
The row type $rtControl and label "control" is used to pass the commands from the client reader to client writer, but it's exact contents is not important here.
The dump request nexus is built in a similar way:
# build the dump requests, will be coming from below
undef @labels;
for (my $i = 0; $i <= $#{$self->{tables}}; $i++) {
my $name = $self->{tableNames}[$i];
my $table = $self->{tables}[$i];
push @labels, "t.rqdump." . $name, $rtRequest;
}
$self->{faRqDump} = $owner->makeNexus(
name => $self->{nxprefix} . "rqdump",
labels => [ @labels ],
reverse => 1, # avoids making a loop, and gives priority
import => "reader",
);
# tie together the labels
for (my $i = 0; $i <= $#{$self->{tables}}; $i++) {
my $name = $self->{tableNames}[$i];
my $table = $self->{tables}[$i];
$self->{faRqDump}->getLabel("t.rqdump." . $name)->makeChained(
$self->{nxprefix} . "rqdump." . $name, undef,
\&_dumpTable, $self, $table
);
}
The dumps are executed in the function _dumpTable:
sub _dumpTable # ($label, $rowop, $self, $table)
{
my ($label, $rop, $self, $table) = @_;
my $unit = $label->getUnit();
# pass through the client id to the dump
$unit->call($self->{beginDump}->adopt($rop));
$table->dumpAll();
$unit->call($self->{endDump}->adopt($rop));
$self->{faOut}->flushWriter();
}
The data gets framed around by the "beginDump" and "endDump" labels getting the copies of the original request. This helps the client writer thread keep track of its current spot. The flushing of the writer is not strictly needed. Just in case if multiple dump requests are received in a single tray, it breaks up the responses into a separate tray for each dump, keeping the size of the trays lower. Not that this situation could actually happen yet.
This part taken care of, let's jump around and see how the client writer thread processes a "querysub" request:
} elsif ($cmd eq "querysub") {
if ($id eq "" || exists $queries{$id}) {
printOrShut($app, $fragment, $sock,
"error,$id,Duplicate id '$id': query ids must be unique,bad_id,$id\n");
next;
}
my $ctx = compileQuery(
qid => $id,
qname => $args[0],
text => $args[1],
subError => sub {
chomp $_[2];
$_[2] =~ s/\n/\\n/g; # no real newlines in the output
$_[2] =~ s/,/;/g; # no confusing commas in the output
printOrShut($app, $fragment, $sock, "error,", join(',', @_), "\n");
},
faOut => $faOut,
faRqDump => $faRqDump,
subPrint => sub {
printOrShut($app, $fragment, $sock, @_);
},
);
if ($ctx) { # otherwise the error is already reported
$queries{$id} = $ctx;
&$runNextRequest($ctx);
}
}
The query id is used to keep track of the outstanding queries, so the code makes sure that it's unique, and you can see an example of the query response. The bulk of the work is done in the method compileQuery(). The arguments to it give the details of the query and also provide the closures for the functionality that differs between the single-threaded and multi-threaded versions. The option "subError" is used to send the errors to the client, and "subPrint" is used to send the output to the client, it gets used for building the labels in the "print" command of the query.
compileQuery() returns the query context, which contains a compiled sub-model that executes the query and a set of requests that tell the writer how to connect the query to the incoming data. Or on error it reports the error using subError and returns an undef. If the compilation succeeded, the writer remembers the query and starts the asynchronous execution of the requests. More about the requests later, now let's look at the query compilation and context.
The context is created in compileQuery() thusly:
my $ctx = {};
$ctx->{qid} = $opts->{qid};
$ctx->{qname} = $opts->{qname};
# .. skipped the parts related to single-threadde TQL
$ctx->{faOut} = $opts->{faOut};
$ctx->{faRqDump} = $opts->{faRqDump};
$ctx->{subPrint} = $opts->{subPrint};
$ctx->{requests} = []; # dump and subscribe requests that will run the pipeline
$ctx->{copyTables} = []; # the tables created in this query
# (have to keep references to the tables or they will disappear)
# The query will be built in a separate unit
$ctx->{u} = Triceps::Unit->new($opts->{nxprefix} . "${q}.unit");
$ctx->{prev} = undef; # will contain the output of the previous command in the pipeline
$ctx->{id} = 0; # a unique id for auto-generated objects
# deletion of the context will cause the unit in it to clean
$ctx->{cleaner} = $ctx->{u}->makeClearingTrigger();
It has some parts common and some parts differing for the single- and multi-threaded varieties, here I've skipped over the single-threaded parts.
One element that is left undefined here is $ctx->{prev}. It's the label created as the output of the previous stage of the query pipeline. As each command in the pipeline builds its piece of processing, it chains its logic from $ctx->{prev} and leaves its result label in $ctx->{next}. Then compileQuery() moves "next" to "prev" and calls the compilation of the next command in the pipeline. The only command that accepts an undefined "prev" (and it must be undefined for it) is "read", that reads the table at the start of the pipeline.
$ctx->{copyTables} also has an important point behind it. When you create a label, it's OK to discard the original reference after you chain the label into the logic, that chaining will keep a reference and the label will stay alive. Not so with a table: if you create a table, chain its input label and then drop the reference to a table, the table will be discarded. Then when the input label will try to send any data to the table, it will die (and unless very recently it outright crashed). So it's important to keep the table reference alive, and that's what this array is for.
$ctx->{id} is used to generate the unique names for the objects build in a query.
Each query is built in its own unit. This is convenient, after the query is done or the compilation encounters an error, the unit with its whole contents gets easily discarded. The clearing trigger placed in the context makes sure that the unit gets properly cleared and discarded.
Next goes the compilation of the join query command, I'll go through it in chunks.
sub _tqlJoin # ($ctx, @args)
{
my $ctx = shift;
die "The join command may not be used at the start of a pipeline.\n"
unless (defined($ctx->{prev}));
my $opts = {};
&Triceps::Opt::parse("join", $opts, {
table => [ undef, \&Triceps::Opt::ck_mandatory ],
rightIdxPath => [ undef, undef ],
by => [ undef, undef ],
byLeft => [ undef, undef ],
leftFields => [ undef, undef ],
rightFields => [ undef, undef ],
type => [ "inner", undef ],
}, @_);
my $tabname = bunquote($opts->{table});
my $unit = $ctx->{u};
my $table;
&Triceps::Opt::checkMutuallyExclusive("join", 1, "by", $opts->{by}, "byLeft", $opts->{byLeft});
my $by = split_braced_final($opts->{by});
my $byLeft = split_braced_final($opts->{byLeft});
my $rightIdxPath;
if (defined $opts->{rightIdxPath}) { # propagate the undef
$rightIdxPath = split_braced_final($opts->{rightIdxPath});
}
It starts by parsing the options and converting them to the internal representation, removing the braced quotes.
if ($ctx->{faOut}) {
# Potentially, the tables might be reused between multiple joins
# in the query if the required keys match. But for now keep things
# simpler by creating a new table from scratch each time.
my $tt = eval {
# copy to avoid adding an index to the original type
$ctx->{faOut}->impTableType($tabname)->copy();
};
die ("Join found no such table '$tabname'\n") unless ($tt);
if (!defined $rightIdxPath) {
# determine or add the index automatically
my @workby;
if (defined $byLeft) { # need to translate
my @leftfld = $ctx->{prev}->getRowType()->getFieldNames();
@workby = &Triceps::Fields::filterToPairs("Join option 'byLeft'",
\@leftfld, [ @$byLeft, "!.*" ]);
} else {
@workby = @$by;
}
my @idxkeys; # extract the keys for the right side table
for (my $i = 1; $i <= $#workby; $i+= 2) {
push @idxkeys, $workby[$i];
}
$rightIdxPath = [ $tt->findOrAddIndex(@idxkeys) ];
}
# build the table from the type
$tt->initialize() or confess "$!";
$table = $ctx->{u}->makeTable($tt, "EM_CALL", "tab" . $ctx->{id} . $tabname);
push @{$ctx->{copyTables}}, $table;
# build the request that fills the table with data and then
# keeps it up to date;
# the table has to be filled before the query's main flow starts,
# so put the request at the front
&_makeQdumpsub($ctx, $tabname, 1, $table->getInputLabel());
} else {
die ("Join found no such table '$tabname'\n")
unless (exists $ctx->{tables}{$tabname});
$table = $ctx->{tables}{$tabname};
}
The presence of $ctx->{faOut} means that the query is compiled in the multithreaded context.
The command handles may freely die, and the error messages will be caught by compileQuery() and nicely (at least, sort-of) reported back to the user.
If an explicit rightIdxPath was not requested, it gets found or added automatically. On the way there the index fields need to be determined. Which can be specified as either explicit pairs in the option "by" or the in the name translation syntax in the option "byLeft". If we've got a "byLeft", first it gets translated to the same format as "by", and then the right-side fields are extracted from the format of "by". After that $tt->findOrAddIndex() takes care of all the heavy lifting. It either finds a matching index type in the table type or creates a new one from the specified fields, and either way returns the index path. (An invalid field will make it confess).
It looks a bit anti-climactic, but the three lines of exporting with copyFundamental(), impTableType() and findOrAddIndex() is what this large example is all about.
You might wonder, how come the explicit rightIdxPath is not checked in any way? It will be checked later by LookupJoin(), so not much point in doing the check twice.
After that the table is created in a straightforward way, and rememebered in copyTables. And the requests list gets prepended with a request to dump and subscribe to this table. I'll get back to that, for now let's finish up with _tqlJoin().
my $isLeft = 0; # default for inner join
my $type = $opts->{type};
if ($type eq "inner") {
# already default
} elsif ($type eq "left") {
$isLeft = 1;
} else {
die "Unsupported value '$type' of option 'type'.\n"
}
my $leftFields = split_braced_final($opts->{leftFields});
my $rightFields = split_braced_final($opts->{rightFields});
my $join = Triceps::LookupJoin->new(
name => "join" . $ctx->{id},
unit => $unit,
leftFromLabel => $ctx->{prev},
rightTable => $table,
rightIdxPath => $rightIdxPath,
leftFields => $leftFields,
rightFields => $rightFields,
by => $by,
byLeft => $byLeft,
isLeft => $isLeft,
fieldsDropRightKey => 1,
);
$ctx->{next} = $join->getOutputLabel();
}
The rest of the options get parsed, and then all the collected data gets forwarded to the LookupJoin constructor. Finally the "next" label is assigned from the join's result.
Now jumping to the _makeQdumpsub(). It's used by both the "read" and "join" query commands to initiate the joins and subscriptions.
sub _makeQdumpsub # ($ctx, $tabname, [$front, $lbNext])
{
my $ctx = shift;
my $tabname = shift;
my $front = shift;
my $lbNext = shift;
my $unit = $ctx->{u};
my $lbrq = eval {
$ctx->{faRqDump}->getLabel("t.rqdump.$tabname");
};
my $lbsrc = eval {
$ctx->{faOut}->getLabel("t.out.$tabname");
};
die ("Found no such table '$tabname'\n") unless ($lbrq && $lbsrc);
# compute the binding for the data dumps, that would be a cross-unit
# binding to the original faOut but it's OK
my $fretOut = $ctx->{faOut}->getFnReturn();
my $dumpname = "t.dump.$tabname";
# the dump and following subscription data will merge on this label
if (!defined $lbNext) {
$lbNext = $unit->makeDummyLabel(
$lbsrc->getRowType(), "lb" . $ctx->{id} . "out_$tabname");
}
my $bindDump = Triceps::FnBinding->new(
on => $fretOut,
name => "bind" . $ctx->{id} . "dump",
labels => [ $dumpname => $lbNext ],
);
First it finds all the proper labels. The label $lbNext will accept the merged dump contents and the following subscription, and it might be either auto-generated or received as an argument. A join pass it as an argument, $table->getInputLabel(), so all the data goes to the copied table.
The binding is used to receive the dump. It's a bit of an optimization. Remember, the dump labels are shared between all the clients. Whenever any client requests a dump, all the clients will get the response. A client finds that the incoming dump is destined for it by processing the "beginDump" label. If it contains this client's name, the dump is destined here, and the client reacts by pushing the appropriate binding onto the facet's FnReturn, and the data flows. The matching "endDump" label then pops the binding and the data stops flowing. The binding allows to avoid checking every rowop for whethere it's supposed to be accepted and if yes then where exactly (rememeber, the same table may be dumped independently multiple times by multiple queries). Just check once at the start of the bundle and then let the data flow in bulk.
# qdumpsub:
# * label where to send the dump request to
# * source output label, from which a subscription will be set up
# at the end of the dump
# * target label in the query that will be tied to the source label
# * binding to be used during the dump, which also directs the data
# to the same target label
my $request = [ "qdumpsub", $lbrq, $lbsrc, $lbNext, $bindDump ];
if ($front) {
unshift @{$ctx->{requests}}, $request;
} else {
push @{$ctx->{requests}}, $request;
}
return $lbNext;
}
Finally, the created bits and pieces get packaged into a request and added to the list of requests in the query context. The last tricky part is that the request can be added at the back or the front of the list. The "normal" way is to add to the back, however the dimension tables for the joins have to be populated before the main data flow of the query starts. So for them the argument $front is set to 1, and they get added in the front.
Now jumping back to the writer thread logic, after it called compileQuery, it starts the query execution by calling &$runNextRequest(). Which is a closure function defined inside the client writer function, and knows how to process the "qdumpsub"s we've just seen created.
my $runNextRequest = sub { # ($ctx)
my $ctx = shift;
my $requests = $ctx->{requests};
undef $ctx->{curRequest}; # clear the info of the previous request
my $r = shift @$requests;
if (!defined $r) {
# all done, now just need to pump the data through
printOrShut($app, $fragment, $sock,
"querysub,$ctx->{qid},$ctx->{qname}\n");
return;
}
First it clears the information about the previous request, if any. This function will be called after each request, to send on the next one, so on all its calls except the first one for a query it will have something to clear.
Then it checks if all the requests are already done. If so, it sends the query confirmation to the client and returns. The subscription part of the query will continue running on its own.
$ctx->{curRequest} = $r; # remember until completed
my $cmd = $$r[0];
if ($cmd eq "qdumpsub") {
# qdumpsub:
# * label where to send the dump request to
# * source output label, from which a subscription will be set up
# at the end of the dump
# * target label in the query that will be tied to the source label
# * binding to be used during the dump, which also directs the data
# to the same target label
my $lbrq = $$r[1];
$unit->makeHashCall($lbrq, "OP_INSERT",
client => $fragment, id => $ctx->{qid}, name => $ctx->{qname}, cmd => $cmd);
The "qdumpsub" gets forwarded to the core logic. The responses will be processed in the handlers or "beginDump" and "endDump". One of the great pains of this "actor" architecture is that the linear logic gets broken up into many disjointed pieces in the separate handlers.
} else {
printOrShut($app, $fragment, $sock,
"error,", $ctx->{qid}, ",Internal error: unknown request '$cmd',internal,", $cmd, "\n");
$ctx->{requests} = [];
undef $ctx->{curRequest};
# and this will leave the query partially initialized,
# but it should never happen
return;
}
};
And a catch-all just in case if the query compiler ever decides to produce an invalid request.
Next goes the handling of the dump labels (again, this gets set up during the build of the client reader threads, and then the nature is left to run its course, reacting to the rowops as they come in).
$faOut->getLabel("beginDump")->makeChained("lbBeginDump", undef, sub {
my $row = $_[1]->getRow();
my ($client, $id, $name, $cmd) = $row->toArray();
return unless ($client eq $fragment);
if ($cmd eq "qdumpsub") {
return unless(exists $queries{$id});
my $ctx = $queries{$id};
$fretOut->push($ctx->{curRequest}[4]); # the binding for the dump
} else {
# .. skipped the handling of dump/dumpsub
}
});
All it does is checks if this is the destination client, and if there is an active request with this id, then it pushes the appropriate binding.
$faOut->getLabel("endDump")->makeChained("lbEndDump", undef, sub {
my $row = $_[1]->getRow();
my ($client, $id, $name, $cmd) = $row->toArray();
return unless ($client eq $fragment);
if ($cmd eq "qdumpsub") {
return unless(exists $queries{$id});
my $ctx = $queries{$id};
$fretOut->pop($ctx->{curRequest}[4]); # the binding for the dump
# and chain together all the following updates
$ctx->{curRequest}[2]->makeChained(
"qsub$id." . $ctx->{curRequest}[3]->getName(), undef,
sub {
# a cross-unit call
$_[2]->call($_[3]->adopt($_[1]));
},
$ctx->{u}, $ctx->{curRequest}[3]
);
&$runNextRequest($ctx);
} else {
# .. skipped the handling of dump/dumpsub
}
});
Same things as the "beginDump", checks if this is the right client, and if it has an outstanding dump request, then pops the binding. After the dump is completed, the subscription has to be set up, so it sets up a label that forwards the normal output of this table to the label specified in the request. Since each query is defined in its own unit, this forwarding is done as a cross-unit call.
And then the next request of this query can be started.
By the way, the cross-unit adopt() didn't work in Perl until I wrote this example. There was a check against it (the C++ API never bothered with this check). But the adoption between the units has turned out to be quite convenient, so I've removed that check.
And that's it. Long and winding but finally completed. It's kind of about only three lines of code, but I think the rest of it also shows the useful techniques of the work with threads.
TQL server with multithreading
The next big example I've been talking about is finally ready. It's the adaptation of the TQL to work with the multithreaded server framework. The big reason for this example is the export of the table types through a nexus and creation of tables from them. And we'll get to that, but first let's look at the new abilities of the TQL.
TQL is still not of a production quality, in either single- or multi-threaded variety, and contains a large number of simplifying assumptions in its code. As the single-threaded version works symbiotically with the SimpleServer, the multithreaded version works with the ThreadedServer.
One thread created by the programmer contains the "core logic" of the model. It doesn't technically have to be all in a single thread: the data can be forwarded to the other threads and then the results forwarded back from them. But a single core logic thread is a convenient simplification. This thread has some input labels, to receive data from the outside, and some tables with the computed results that can be read by TQL. Of course, it's entirely realistic to have also just the output labels without tables, sending a stream or computed rowops, but again for simplicity I've left this out for now.
This core logic thread creates a TQL instance, which listens on a socket, accepts the connections, forwards the input data to the core logic, performs queries on the tables from the core logic and sends the results back to the client. To this end, the TQL instance creates a few nexuses in the core logic thread and uses them to communicate between all the fragments. The input labels and tables in the core thread also get properly connected to these nexuses. The following figure shows the thread architecture, I'll use it for the reference throughout the discussion:
The core logic thread then goes into its main loop and performs as its name says the core logic computations.
Here is a very simple example of a TQL application:
sub appCoreT # (@opts)
{
my $opts = {};
&Triceps::Opt::parse("appCoreT", $opts, {@Triceps::Triead::opts,
socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
}, @_);
undef @_; # avoids a leak in threads module
my $owner = $opts->{owner};
my $app = $owner->app();
my $unit = $owner->unit();
# build the core logic
my $rtTrade = Triceps::RowType->new(
id => "int32", # trade unique id
symbol => "string", # symbol traded
price => "float64",
size => "float64", # number of shares traded
) or confess "$!";
my $ttWindow = Triceps::TableType->new($rtTrade)
->addSubIndex("byId",
Triceps::SimpleOrderedIndex->new(id => "ASC")
)
or confess "$!";
$ttWindow->initialize() or confess "$!";
# Represents the static information about a company.
my $rtSymbol = Triceps::RowType->new(
symbol => "string", # symbol name
name => "string", # the official company name
eps => "float64", # last quarter earnings per share
) or confess "$!";
my $ttSymbol = Triceps::TableType->new($rtSymbol)
->addSubIndex("bySymbol",
Triceps::SimpleOrderedIndex->new(symbol => "ASC")
)
or confess "$!";
$ttSymbol->initialize() or confess "$!";
my $tWindow = $unit->makeTable($ttWindow, "EM_CALL", "tWindow")
or confess "$!";
my $tSymbol = $unit->makeTable($ttSymbol, "EM_CALL", "tSymbol")
or confess "$!";
# export the endpoints for TQL (it starts the listener)
my $tql = Triceps::X::Tql->new(
name => "tql",
trieadOwner => $owner,
socketName => $opts->{socketName},
tables => [
$tWindow,
$tSymbol,
],
tableNames => [
"window",
"symbol",
],
inputs => [
$tWindow->getInputLabel(),
$tSymbol->getInputLabel(),
],
inputNames => [
"window",
"symbol",
],
);
$owner->readyReady();
$owner->mainLoop();
}
{
my ($port, $thread) = Triceps::X::ThreadedServer::startServer(
app => "appTql",
main => \&appCoreT,
port => 0,
fork => -1, # create a thread, not a process
);
}
This core logic is very simple: all it does is create two tables and then send the input data into them. The server gets started in a background thread (fork => -1) because this code is taken from a test that then goes and runs the expect with the SimpleClient.
The specification of inputs and tables for TQL is somewhat ugly but I kept it as it was historic (it was done this way to keep the parsing of the options simpler). The new options compared to the single-threaded TQL are the "threadOwner", "inputs" and "inputNames". The "threadOwner" is how TQL knows that it must run in the multithreaded mode, and it's used to create the nexuses for communication between the core logic and the rest of TQL. The inputs are needed because the multithreaded TQL parses and forwards the input data, unlike the single-threaded version that relies on the SimpleServer to do that according to the user-defined dispatch table.
The names options don't have to be used: if you name your labels and tables nicely and suitable for the external vieweing, the renaming-for-export can be skipped.
Similar to the single-threaded version, if any of the options "tables" or "inputs" is used, the TQL object gets initialized automatically, otherwise the tables and inputs can be added piecemeal with addTable(), addNamedTable(), addInput(), addNamedInput(), and then the whole thing initialized manually.
Then the clients can establish the connections with the TQL server, send in the data and the queries. To jump in, here is a trace of a simple session that sends some data, then does some table dumps and subscribes, not touching the queries yet. I'll go through it fragment by fragment and explain the meaning. The dumps and subscribes were the warm-up exercises before writing the full queries, but they're useful in their own right, and here they serve as the warm-up exercises for the making of the queries!
> connect c1
c1|ready
The "connect" is not an actual command send but just the indication in the trace that the connection was set up by the client "c1" (it's a trace from the SimpleClient, so it follows the usual conventions). The "ready" response is set when the connection is opened, similar to the chat server shown before.
> c1|subscribe,s1,symbol
c1|subscribe,s1,symbol
This is a subscription request. It means "I'm not interested in the current state of a table but send me all the updates". The response is the mirror of the request, so that the client knows that the request has been processed. "s1" is the unique identifier of the request, so that the client can match together the responses it received to the requests it sent (and keeping the uniqueness is up to the client, the server may refuse the requests with duplicate identifiers). And "symbol" is the name of the table. Once a subscription is in place, there is no way to unsubscribe other than by disconnecting the client (it's doable but adds complications, and I wanted to skip over the nonessential parts). Subscribing multiple times to the same table will send a confirmation every time but the repeated confirmations will have no effect: only one copy of the data will be sent anyway.
> c1|d,symbol,OP_INSERT,ABC,ABC Corp,1.0
c1|d,symbol,OP_INSERT,ABC,ABC Corp,1
This sends the data into the model. And since it propagates through the subscription, the data gets sent back too. The "symbol" here means two different things: on the input side it's the name of the label where the data is sent, on the output side it's the name of the table that has been subscribed to.
The data lines start with the command "d" (since the data is sent much more frequently than the commands, I've picked a short one-letter "command name" for it), then the label/table name, opcode and the row fields in CSV format.
> c1|confirm,cf1
c1|confirm,cf1,,,
The "confirm" command provides a way for the client to check that the data it send had propagated through the model. And it doesn't have to subscribe back to the data and read them. Send some data lines, then send the "confirm" command and wait for it to come back (again, the unique id allows to keep multiple confirmations in flight if you please). This command doesn't guarantee that all the clients have seen the results from that data. It only guarantees that the core logic had seen the data, and more weakly guarantees that the data has been processed by the core logic, and this particular client had already seen all the results from it.
Why weakly? It has to do with the way it works inside, and it depends on the core logic. If the core logic consists of one thread, the guarantee is quite strong. But if the core logic farms out the work from the main thread to the other threads and then collects the results back, the guarantee breaks.
On the Fig. 1 you can see that unlike the chat server shown before, TQL doesn't have any private nexuses for communication between the reader and writer threads of a client. Instead it relies on the same input and output nexuses, adding a control label to them, to forward the commands from the reader to the writer. The TQL object in the core logic thread creates a short-circuit connection between the control labels in the input and output nexuses, forwarding the commands. And if the core logic all runs in one thread, this creates a natural pipeline: the data comes in, gets processed, comes out, the "confirm" command comes in, comes out after the data. But if the core logic farms out the work to more threads, the confirmation can "jump the line" because its path is a direct short circuit.
> c1|drain,dr1
c1|drain,dr1,,,
The "drain" is an analog of "confirm" but more reliable and slower: the reader thread drains the whole model before sending the command on. This guarantees that all the processing is done, and all the output from it has been sent to all the clients.
> c1|dump,d2,symbol
c1|startdump,d2,symbol
c1|d,symbol,OP_INSERT,ABC,ABC Corp,1
c1|dump,d2,symbol
The "dump" command dumps the current contents of a table. Its result starts with "startdump", and the same id and table name as in the request, then goes the data (all with OP_INSERT), finishing with the completion confirmation echoing the original command. The dump is atomic, the contents of the table doesn't change in the middle of the dump. However if a subscription on this table is active, the data rows from that subscription may come before and after the dump.
I'm not going to describe the error reporting, but it's worth mentioning that if a command contains errors, its "confirmation" will be an error line with the same identifier.
> c1|dumpsub,ds3,symbol
c1|startdump,ds3,symbol
c1|d,symbol,OP_INSERT,ABC,ABC Corp,1
c1|dumpsub,ds3,symbol
The "dumpsub" command is a combination of a dump and subscribe: get the initial state and then get all the updates. The confirmation of "dumpsub" marks the boundary between the original dump and the following updates.
> c1|d,symbol,OP_INSERT,DEF,Defense Corp,2.0
c1|d,symbol,OP_INSERT,DEF,Defense Corp,2
Send some more data, and it comes back only once, even though the subscription was done twice: once in "subscribe" and once in "dumpsub". The repeated subscription requests simply get consumed into one subscription.
> c1|d,window,OP_INSERT,1,ABC,101,10
This sends a row to the other table but nothing comes back because there is no subscription to that table.
> c1|dumpsub,ds4,window
c1|startdump,ds4,window
c1|d,window,OP_INSERT,1,ABC,101,10
c1|dumpsub,ds4,window
> c1|d,window,OP_INSERT,2,ABC,102,12
c1|d,window,OP_INSERT,2,ABC,102,12
This demonstrates the pure dump-and-subscribe without any interventions.
> c1|shutdown
c1|shutdown,,,,
c1|__EOF__
And the shutdown command works the same as in the chat server, draning and then shutting down the whole server.
Now on to the queries.
> connect c1
c1|ready
> c1|d,symbol,OP_INSERT,ABC,ABC Corp,1.0
Starts a client connection and sends some data.
> c1|querysub,q1,query1,{read table symbol}{print tokenized 0}
c1|d,query1,OP_INSERT,ABC,ABC Corp,1
c1|querysub,q1,query1
The "querysub" command does the "query-and-subscribe": reads the initial state of the table, processed through the query, and then subscribes to any future updates. The single-threaded variety of TQL doesn't do this, it does just the one-time queries. The multithreaded TQL could also do the one-time queries, and also just the subscribes without the initial state, but I've been cutting corners for this example and the only thing that's actually available is the combination of two, the "querysub".
"q1" is similar to the other command, the command identifier. The next field "query1" is the name for the query, it's the name that will be shown for the data lines coming out of the query. And then goes the query in the brace-quoted format, same as the single-threaded TQL (and there is no further splitting by commas, so the commas can be used freely in the query).
The identified and the name for the query sound kind of redundant. But the client may generate them in different ways and need both. The name has the more symbolic character. The identifier can be generated as a sequence of numbers, so that the client can keep track of its progress more easily. And the error reports include the identifier but not the query name in them.
For the query, there is no special line coming out before the initial dump. Supposedly, there would not be more than one query in flight with the same name, so this could be easily told apart based on the name in the data lines. There is also an underlying consideration that when the query involves a join, in the future the initial dump might be happening in multiple chunks, requiring to either surround every chunk with the start-end lines or just let them go without the extra notifications, as they are now.
And the initial dump ends as usual with getting the echo of the command (without the query part) back.
This particular query is very simple and equivalent to a "dumpsub".
> c1|d,symbol,OP_INSERT,DEF,Defense Corp,2.0
c1|d,query1,OP_INSERT,DEF,Defense Corp,2
Send more data and it will come out of the query.
> c1|querysub,q2,query2,{read table symbol}{where istrue {$%symbol =~ /^A/}}{project fields {symbol eps}}
c1|t,query2,query2 OP_INSERT symbol="ABC" eps="1"
c1|querysub,q2,query2
This query is more complicated, doing a selection (the "where" query command) and projection. It also prints the results in the tokenized format (the "print" command gets added automatically if it wasn't used explicitly, and the default options for it enable the tokenized format).
The tokenized lines come out with the command "t", query name and then the contents of the row. The query name happens to be sent twice, and I'm not sure yet if it's a feature or a bug.
> c1|d,symbol,OP_INSERT,AAA,Absolute Auto Analytics Inc,3.0
c1|d,query1,OP_INSERT,AAA,Absolute Auto Analytics Inc,3
c1|t,query2,query2 OP_INSERT symbol="AAA" eps="3"
> c1|d,symbol,OP_DELETE,DEF,Defense Corp,2.0
c1|d,query1,OP_DELETE,DEF,Defense Corp,2
More examples of the data sent, getting processed by both queries. In the second case the "where" filters out the row from query2, so only query1 produces the result.
> c1|shutdown
c1|shutdown,,,,
c1|__EOF__
And the shutdown as usual.
Now the "piece de resistance": queries with joins.
> connect c1
c1|ready
> c1|d,symbol,OP_INSERT,ABC,ABC Corp,2.0
> c1|d,symbol,OP_INSERT,DEF,Defense Corp,2.0
> c1|d,symbol,OP_INSERT,AAA,Absolute Auto Analytics Inc,3.0
> c1|d,window,OP_INSERT,1,AAA,12,100
Connect and send some starting data.
> c1|querysub,q1,query1,{read table window}{join table symbol byLeft {symbol} type left}
c1|t,query1,query1 OP_INSERT id="1" symbol="AAA" price="12" size="100" name="Absolute Auto Analytics Inc" eps="3"
c1|querysub,q1,query1
A left join of the tables "window" and "symbol", by the field "symbol" as join condition.
Note that unlike the previous single-threaded TQL examples, the index type path for the table "symbol" is not explicitly specified. It's the result of the new method TableType::findIndexPathForKeys() described before, now the index gets found automatically. And the single-threaded TQL now has this feature too. If you really want, you can still specify the index path but usually there is no need to.
The TQL joins, even in the multithreaded mode, are still implemented internally as LookupJoin, driven only by the main flow of the query. So the changes to the joined dimension tables will not update the query results, and will be visible only when a change on the main flow picks them up, potentially creating inconsistencies in the output. This is wrong, but fixing it presents complexities that I've left alone until some later time.
> c1|d,window,OP_INSERT,2,ABC,13,100
c1|t,query1,query1 OP_INSERT id="2" symbol="ABC" price="13" size="100" name="ABC Corp" eps="2"
> c1|d,window,OP_INSERT,3,AAA,11,200
c1|t,query1,query1 OP_INSERT id="3" symbol="AAA" price="11" size="200" name="Absolute Auto Analytics Inc" eps="3"
Sending data updates the results of the query.
> c1|d,symbol,OP_DELETE,AAA,Absolute Auto Analytics Inc,3.0
> c1|d,symbol,OP_INSERT,AAA,Alcoholic Abstract Aliens,3.0
As described above, the modifications of the dimension table are mot visible in the query directly.
> c1|d,window,OP_DELETE,1
c1|t,query1,query1 OP_DELETE id="1" symbol="AAA" price="12" size="100" name="Alcoholic Abstract Aliens" eps="3"
But an update on the main flow brings them up (an in this case inconsistently, the row getting deleted is not exactly the same as the row inserted before).
> c1|querysub,q2,query2,{read table window}{join table symbol byLeft {symbol} type left}{join table symbol byLeft {eps} type left rightFields {symbol/symbol2}}
c1|t,query2,query2 OP_INSERT id="2" symbol="ABC" price="13" size="100" name="ABC Corp" eps="2" symbol2="ABC"
c1|t,query2,query2 OP_INSERT id="2" symbol="ABC" price="13" size="100" name="ABC Corp" eps="2" symbol2="DEF"
c1|t,query2,query2 OP_INSERT id="3" symbol="AAA" price="11" size="200" name="Alcoholic Abstract Aliens" eps="3" symbol2="AAA"
c1|querysub,q2,query2
This is a more complicated query, involving two joins, with the same dimension table "symbol". The second join by "eps" makes no real-world sense whatsoever but it's interesting from the technical perspective: if you check the table type of this table at the start of the post, you'll find that it has no index on the field "eps". The join adds this index on demand!
The way it works, all the dimension tables are copied into the client's writer thread, created from the table types exported by the core logic throuhg the output nexus. (And if a table is used in the same query twice, it's currently also copied twice). This provides a nice opportunity to amend the table type by adding any necessary secondary index before creating the table, and TQL makes a good use of it.
The details are forthcoming in the next post.
TQL is still not of a production quality, in either single- or multi-threaded variety, and contains a large number of simplifying assumptions in its code. As the single-threaded version works symbiotically with the SimpleServer, the multithreaded version works with the ThreadedServer.
One thread created by the programmer contains the "core logic" of the model. It doesn't technically have to be all in a single thread: the data can be forwarded to the other threads and then the results forwarded back from them. But a single core logic thread is a convenient simplification. This thread has some input labels, to receive data from the outside, and some tables with the computed results that can be read by TQL. Of course, it's entirely realistic to have also just the output labels without tables, sending a stream or computed rowops, but again for simplicity I've left this out for now.
This core logic thread creates a TQL instance, which listens on a socket, accepts the connections, forwards the input data to the core logic, performs queries on the tables from the core logic and sends the results back to the client. To this end, the TQL instance creates a few nexuses in the core logic thread and uses them to communicate between all the fragments. The input labels and tables in the core thread also get properly connected to these nexuses. The following figure shows the thread architecture, I'll use it for the reference throughout the discussion:
Fig. 1. TQL application. |
The core logic thread then goes into its main loop and performs as its name says the core logic computations.
Here is a very simple example of a TQL application:
sub appCoreT # (@opts)
{
my $opts = {};
&Triceps::Opt::parse("appCoreT", $opts, {@Triceps::Triead::opts,
socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
}, @_);
undef @_; # avoids a leak in threads module
my $owner = $opts->{owner};
my $app = $owner->app();
my $unit = $owner->unit();
# build the core logic
my $rtTrade = Triceps::RowType->new(
id => "int32", # trade unique id
symbol => "string", # symbol traded
price => "float64",
size => "float64", # number of shares traded
) or confess "$!";
my $ttWindow = Triceps::TableType->new($rtTrade)
->addSubIndex("byId",
Triceps::SimpleOrderedIndex->new(id => "ASC")
)
or confess "$!";
$ttWindow->initialize() or confess "$!";
# Represents the static information about a company.
my $rtSymbol = Triceps::RowType->new(
symbol => "string", # symbol name
name => "string", # the official company name
eps => "float64", # last quarter earnings per share
) or confess "$!";
my $ttSymbol = Triceps::TableType->new($rtSymbol)
->addSubIndex("bySymbol",
Triceps::SimpleOrderedIndex->new(symbol => "ASC")
)
or confess "$!";
$ttSymbol->initialize() or confess "$!";
my $tWindow = $unit->makeTable($ttWindow, "EM_CALL", "tWindow")
or confess "$!";
my $tSymbol = $unit->makeTable($ttSymbol, "EM_CALL", "tSymbol")
or confess "$!";
# export the endpoints for TQL (it starts the listener)
my $tql = Triceps::X::Tql->new(
name => "tql",
trieadOwner => $owner,
socketName => $opts->{socketName},
tables => [
$tWindow,
$tSymbol,
],
tableNames => [
"window",
"symbol",
],
inputs => [
$tWindow->getInputLabel(),
$tSymbol->getInputLabel(),
],
inputNames => [
"window",
"symbol",
],
);
$owner->readyReady();
$owner->mainLoop();
}
{
my ($port, $thread) = Triceps::X::ThreadedServer::startServer(
app => "appTql",
main => \&appCoreT,
port => 0,
fork => -1, # create a thread, not a process
);
}
This core logic is very simple: all it does is create two tables and then send the input data into them. The server gets started in a background thread (fork => -1) because this code is taken from a test that then goes and runs the expect with the SimpleClient.
The specification of inputs and tables for TQL is somewhat ugly but I kept it as it was historic (it was done this way to keep the parsing of the options simpler). The new options compared to the single-threaded TQL are the "threadOwner", "inputs" and "inputNames". The "threadOwner" is how TQL knows that it must run in the multithreaded mode, and it's used to create the nexuses for communication between the core logic and the rest of TQL. The inputs are needed because the multithreaded TQL parses and forwards the input data, unlike the single-threaded version that relies on the SimpleServer to do that according to the user-defined dispatch table.
The names options don't have to be used: if you name your labels and tables nicely and suitable for the external vieweing, the renaming-for-export can be skipped.
Similar to the single-threaded version, if any of the options "tables" or "inputs" is used, the TQL object gets initialized automatically, otherwise the tables and inputs can be added piecemeal with addTable(), addNamedTable(), addInput(), addNamedInput(), and then the whole thing initialized manually.
Then the clients can establish the connections with the TQL server, send in the data and the queries. To jump in, here is a trace of a simple session that sends some data, then does some table dumps and subscribes, not touching the queries yet. I'll go through it fragment by fragment and explain the meaning. The dumps and subscribes were the warm-up exercises before writing the full queries, but they're useful in their own right, and here they serve as the warm-up exercises for the making of the queries!
> connect c1
c1|ready
The "connect" is not an actual command send but just the indication in the trace that the connection was set up by the client "c1" (it's a trace from the SimpleClient, so it follows the usual conventions). The "ready" response is set when the connection is opened, similar to the chat server shown before.
> c1|subscribe,s1,symbol
c1|subscribe,s1,symbol
This is a subscription request. It means "I'm not interested in the current state of a table but send me all the updates". The response is the mirror of the request, so that the client knows that the request has been processed. "s1" is the unique identifier of the request, so that the client can match together the responses it received to the requests it sent (and keeping the uniqueness is up to the client, the server may refuse the requests with duplicate identifiers). And "symbol" is the name of the table. Once a subscription is in place, there is no way to unsubscribe other than by disconnecting the client (it's doable but adds complications, and I wanted to skip over the nonessential parts). Subscribing multiple times to the same table will send a confirmation every time but the repeated confirmations will have no effect: only one copy of the data will be sent anyway.
> c1|d,symbol,OP_INSERT,ABC,ABC Corp,1.0
c1|d,symbol,OP_INSERT,ABC,ABC Corp,1
This sends the data into the model. And since it propagates through the subscription, the data gets sent back too. The "symbol" here means two different things: on the input side it's the name of the label where the data is sent, on the output side it's the name of the table that has been subscribed to.
The data lines start with the command "d" (since the data is sent much more frequently than the commands, I've picked a short one-letter "command name" for it), then the label/table name, opcode and the row fields in CSV format.
> c1|confirm,cf1
c1|confirm,cf1,,,
The "confirm" command provides a way for the client to check that the data it send had propagated through the model. And it doesn't have to subscribe back to the data and read them. Send some data lines, then send the "confirm" command and wait for it to come back (again, the unique id allows to keep multiple confirmations in flight if you please). This command doesn't guarantee that all the clients have seen the results from that data. It only guarantees that the core logic had seen the data, and more weakly guarantees that the data has been processed by the core logic, and this particular client had already seen all the results from it.
Why weakly? It has to do with the way it works inside, and it depends on the core logic. If the core logic consists of one thread, the guarantee is quite strong. But if the core logic farms out the work from the main thread to the other threads and then collects the results back, the guarantee breaks.
On the Fig. 1 you can see that unlike the chat server shown before, TQL doesn't have any private nexuses for communication between the reader and writer threads of a client. Instead it relies on the same input and output nexuses, adding a control label to them, to forward the commands from the reader to the writer. The TQL object in the core logic thread creates a short-circuit connection between the control labels in the input and output nexuses, forwarding the commands. And if the core logic all runs in one thread, this creates a natural pipeline: the data comes in, gets processed, comes out, the "confirm" command comes in, comes out after the data. But if the core logic farms out the work to more threads, the confirmation can "jump the line" because its path is a direct short circuit.
> c1|drain,dr1
c1|drain,dr1,,,
The "drain" is an analog of "confirm" but more reliable and slower: the reader thread drains the whole model before sending the command on. This guarantees that all the processing is done, and all the output from it has been sent to all the clients.
> c1|dump,d2,symbol
c1|startdump,d2,symbol
c1|d,symbol,OP_INSERT,ABC,ABC Corp,1
c1|dump,d2,symbol
The "dump" command dumps the current contents of a table. Its result starts with "startdump", and the same id and table name as in the request, then goes the data (all with OP_INSERT), finishing with the completion confirmation echoing the original command. The dump is atomic, the contents of the table doesn't change in the middle of the dump. However if a subscription on this table is active, the data rows from that subscription may come before and after the dump.
I'm not going to describe the error reporting, but it's worth mentioning that if a command contains errors, its "confirmation" will be an error line with the same identifier.
> c1|dumpsub,ds3,symbol
c1|startdump,ds3,symbol
c1|d,symbol,OP_INSERT,ABC,ABC Corp,1
c1|dumpsub,ds3,symbol
The "dumpsub" command is a combination of a dump and subscribe: get the initial state and then get all the updates. The confirmation of "dumpsub" marks the boundary between the original dump and the following updates.
> c1|d,symbol,OP_INSERT,DEF,Defense Corp,2.0
c1|d,symbol,OP_INSERT,DEF,Defense Corp,2
Send some more data, and it comes back only once, even though the subscription was done twice: once in "subscribe" and once in "dumpsub". The repeated subscription requests simply get consumed into one subscription.
> c1|d,window,OP_INSERT,1,ABC,101,10
This sends a row to the other table but nothing comes back because there is no subscription to that table.
> c1|dumpsub,ds4,window
c1|startdump,ds4,window
c1|d,window,OP_INSERT,1,ABC,101,10
c1|dumpsub,ds4,window
> c1|d,window,OP_INSERT,2,ABC,102,12
c1|d,window,OP_INSERT,2,ABC,102,12
This demonstrates the pure dump-and-subscribe without any interventions.
> c1|shutdown
c1|shutdown,,,,
c1|__EOF__
And the shutdown command works the same as in the chat server, draning and then shutting down the whole server.
Now on to the queries.
> connect c1
c1|ready
> c1|d,symbol,OP_INSERT,ABC,ABC Corp,1.0
Starts a client connection and sends some data.
> c1|querysub,q1,query1,{read table symbol}{print tokenized 0}
c1|d,query1,OP_INSERT,ABC,ABC Corp,1
c1|querysub,q1,query1
The "querysub" command does the "query-and-subscribe": reads the initial state of the table, processed through the query, and then subscribes to any future updates. The single-threaded variety of TQL doesn't do this, it does just the one-time queries. The multithreaded TQL could also do the one-time queries, and also just the subscribes without the initial state, but I've been cutting corners for this example and the only thing that's actually available is the combination of two, the "querysub".
"q1" is similar to the other command, the command identifier. The next field "query1" is the name for the query, it's the name that will be shown for the data lines coming out of the query. And then goes the query in the brace-quoted format, same as the single-threaded TQL (and there is no further splitting by commas, so the commas can be used freely in the query).
The identified and the name for the query sound kind of redundant. But the client may generate them in different ways and need both. The name has the more symbolic character. The identifier can be generated as a sequence of numbers, so that the client can keep track of its progress more easily. And the error reports include the identifier but not the query name in them.
For the query, there is no special line coming out before the initial dump. Supposedly, there would not be more than one query in flight with the same name, so this could be easily told apart based on the name in the data lines. There is also an underlying consideration that when the query involves a join, in the future the initial dump might be happening in multiple chunks, requiring to either surround every chunk with the start-end lines or just let them go without the extra notifications, as they are now.
And the initial dump ends as usual with getting the echo of the command (without the query part) back.
This particular query is very simple and equivalent to a "dumpsub".
> c1|d,symbol,OP_INSERT,DEF,Defense Corp,2.0
c1|d,query1,OP_INSERT,DEF,Defense Corp,2
Send more data and it will come out of the query.
> c1|querysub,q2,query2,{read table symbol}{where istrue {$%symbol =~ /^A/}}{project fields {symbol eps}}
c1|t,query2,query2 OP_INSERT symbol="ABC" eps="1"
c1|querysub,q2,query2
This query is more complicated, doing a selection (the "where" query command) and projection. It also prints the results in the tokenized format (the "print" command gets added automatically if it wasn't used explicitly, and the default options for it enable the tokenized format).
The tokenized lines come out with the command "t", query name and then the contents of the row. The query name happens to be sent twice, and I'm not sure yet if it's a feature or a bug.
> c1|d,symbol,OP_INSERT,AAA,Absolute Auto Analytics Inc,3.0
c1|d,query1,OP_INSERT,AAA,Absolute Auto Analytics Inc,3
c1|t,query2,query2 OP_INSERT symbol="AAA" eps="3"
> c1|d,symbol,OP_DELETE,DEF,Defense Corp,2.0
c1|d,query1,OP_DELETE,DEF,Defense Corp,2
More examples of the data sent, getting processed by both queries. In the second case the "where" filters out the row from query2, so only query1 produces the result.
> c1|shutdown
c1|shutdown,,,,
c1|__EOF__
And the shutdown as usual.
Now the "piece de resistance": queries with joins.
> connect c1
c1|ready
> c1|d,symbol,OP_INSERT,ABC,ABC Corp,2.0
> c1|d,symbol,OP_INSERT,DEF,Defense Corp,2.0
> c1|d,symbol,OP_INSERT,AAA,Absolute Auto Analytics Inc,3.0
> c1|d,window,OP_INSERT,1,AAA,12,100
Connect and send some starting data.
> c1|querysub,q1,query1,{read table window}{join table symbol byLeft {symbol} type left}
c1|t,query1,query1 OP_INSERT id="1" symbol="AAA" price="12" size="100" name="Absolute Auto Analytics Inc" eps="3"
c1|querysub,q1,query1
A left join of the tables "window" and "symbol", by the field "symbol" as join condition.
Note that unlike the previous single-threaded TQL examples, the index type path for the table "symbol" is not explicitly specified. It's the result of the new method TableType::findIndexPathForKeys() described before, now the index gets found automatically. And the single-threaded TQL now has this feature too. If you really want, you can still specify the index path but usually there is no need to.
The TQL joins, even in the multithreaded mode, are still implemented internally as LookupJoin, driven only by the main flow of the query. So the changes to the joined dimension tables will not update the query results, and will be visible only when a change on the main flow picks them up, potentially creating inconsistencies in the output. This is wrong, but fixing it presents complexities that I've left alone until some later time.
> c1|d,window,OP_INSERT,2,ABC,13,100
c1|t,query1,query1 OP_INSERT id="2" symbol="ABC" price="13" size="100" name="ABC Corp" eps="2"
> c1|d,window,OP_INSERT,3,AAA,11,200
c1|t,query1,query1 OP_INSERT id="3" symbol="AAA" price="11" size="200" name="Absolute Auto Analytics Inc" eps="3"
Sending data updates the results of the query.
> c1|d,symbol,OP_DELETE,AAA,Absolute Auto Analytics Inc,3.0
> c1|d,symbol,OP_INSERT,AAA,Alcoholic Abstract Aliens,3.0
As described above, the modifications of the dimension table are mot visible in the query directly.
> c1|d,window,OP_DELETE,1
c1|t,query1,query1 OP_DELETE id="1" symbol="AAA" price="12" size="100" name="Alcoholic Abstract Aliens" eps="3"
But an update on the main flow brings them up (an in this case inconsistently, the row getting deleted is not exactly the same as the row inserted before).
> c1|querysub,q2,query2,{read table window}{join table symbol byLeft {symbol} type left}{join table symbol byLeft {eps} type left rightFields {symbol/symbol2}}
c1|t,query2,query2 OP_INSERT id="2" symbol="ABC" price="13" size="100" name="ABC Corp" eps="2" symbol2="ABC"
c1|t,query2,query2 OP_INSERT id="2" symbol="ABC" price="13" size="100" name="ABC Corp" eps="2" symbol2="DEF"
c1|t,query2,query2 OP_INSERT id="3" symbol="AAA" price="11" size="200" name="Alcoholic Abstract Aliens" eps="3" symbol2="AAA"
c1|querysub,q2,query2
This is a more complicated query, involving two joins, with the same dimension table "symbol". The second join by "eps" makes no real-world sense whatsoever but it's interesting from the technical perspective: if you check the table type of this table at the start of the post, you'll find that it has no index on the field "eps". The join adds this index on demand!
The way it works, all the dimension tables are copied into the client's writer thread, created from the table types exported by the core logic throuhg the output nexus. (And if a table is used in the same query twice, it's currently also copied twice). This provides a nice opportunity to amend the table type by adding any necessary secondary index before creating the table, and TQL makes a good use of it.
The details are forthcoming in the next post.
Friday, May 24, 2013
how to find an index
When working on the next example (still in progress but now getting closer to completion), I've solved one more nagging little problem: finding an index type in the table type by key fields. It makes the joins more convenient: if you know, by which fields you want to join, it's nice to find the correct index automatically. My previous solution to this problem has been to go in the opposite direction: specify the indexes for the join, and find the list of fields automatically from there. But that created a problem with deciding, which field on the left to pair with which on the right, and so either the indexes had to be carefully controlled to have the key fields in the same order, or a separate option still had to be used to specify the pairing of the fields.
Now this problem is solved with the method:
@idxPath = $tableType-> findIndexPathForKeys(@keyFields);
It returns the array that represents the path to an index type that matches these key fields (the index type and all the types in the path still have to be of the Hashed variety). If the correct index can not be found, an empty array is returned. If you specify the fields that aren't present in the row type in the first place, this is simply treated the same as being unable to find an index for these fields.
If more that one index would match, the first one found in the direct order of the index tree walk is returned.
This allowed to change the LookupJoin to make the option "rightIdxPath" optional: if undefined, the index that matches the key fields specified in the option "by" or "byLeft" will be found automatically. Of course, if such an index doesn't exist, it's still an error, the join could not create it for you yet.
The same way, in the JoinTwo now the options "leftIdxPath" and "rightIdxPath" have become optional if either "by" or "byLeft" is specified. The old way of specifying "leftIdxPath" and "rightIdxPath" without any of the "by" varieties still works too.
And this propagated into the TQL command "join": no more need for the option "rightIdxPath". You can still use it if you want but there is rarely any point to it, only if there are multiple matching indexes and you strongly prefer one of them.
One more indirect fall-out from these changes is the new option to LookupJoin:
fieldsDropRightKey => 0/1
The default is 0, and if you set it to 1, the logic will automatically exclude from the result row type the key fields from the right side. This is convenient because these fields contain the values that are duplicates of the key fields from the left side anyway. It's what the TQL join was doing all along, and the implementation of this logic becomes simpler when it gets moved directly into LookupJoin.
Now this problem is solved with the method:
@idxPath = $tableType-> findIndexPathForKeys(@keyFields);
It returns the array that represents the path to an index type that matches these key fields (the index type and all the types in the path still have to be of the Hashed variety). If the correct index can not be found, an empty array is returned. If you specify the fields that aren't present in the row type in the first place, this is simply treated the same as being unable to find an index for these fields.
If more that one index would match, the first one found in the direct order of the index tree walk is returned.
This allowed to change the LookupJoin to make the option "rightIdxPath" optional: if undefined, the index that matches the key fields specified in the option "by" or "byLeft" will be found automatically. Of course, if such an index doesn't exist, it's still an error, the join could not create it for you yet.
The same way, in the JoinTwo now the options "leftIdxPath" and "rightIdxPath" have become optional if either "by" or "byLeft" is specified. The old way of specifying "leftIdxPath" and "rightIdxPath" without any of the "by" varieties still works too.
And this propagated into the TQL command "join": no more need for the option "rightIdxPath". You can still use it if you want but there is rarely any point to it, only if there are multiple matching indexes and you strongly prefer one of them.
One more indirect fall-out from these changes is the new option to LookupJoin:
fieldsDropRightKey => 0/1
The default is 0, and if you set it to 1, the logic will automatically exclude from the result row type the key fields from the right side. This is convenient because these fields contain the values that are duplicates of the key fields from the left side anyway. It's what the TQL join was doing all along, and the implementation of this logic becomes simpler when it gets moved directly into LookupJoin.
Sunday, May 19, 2013
time and threads and ThreadedClient
I've been making the ThreadedClient more resilient, to handle the unexpected better. First, if it encounters a socket disconnect (__EOF__) while expecting data, it will immediately return and set the error message in $@. Second, there is now a way to specify a timeout for waiting. If the message doesn't get received in time, expect() with a timeout will also return immediately with a message in $@.
As an unrelated change, I've renamed the method protocol () to getTrace(). The word "protocol" creates to much confusion when used around the sockets, "trace" is a more distinct one.
And a mix thereof, the error messages get not only set in $@ but also included into the trace, so it can be detected all together. There is also a separate trace of the error messages only, that can be obtained with getErrorTrace().
There are three ways to specify the timeout. Two of them are in the new():
my $client = Triceps::X::ThreadedClient->new(
owner => $owner,
totalTimeout => $timeout,
);
my $client = Triceps::X::ThreadedClient->new(
owner => $owner,
timeout => $timeout,
);
The option "totalTimeout" gives a timeout for the whole run to complete. Once that timeout is reached, all future expect()s just fail immediately. The option "timeout" gives the default timeout for each expect(). It's possible to use both, and each call of expect() will be limited by the shorter time limit of the two (remember, "totalTimeout" starts counting since the call of new (not from startClient!), "timeout" starts counting since the call of expect).
All the timeouts are specified in seconds with fractions, so for 0.1 seconds you just use 0.1.
The third way is to use an extra argument in expect():
$client->expect("c1", "expected text", $timeout);
This timeout completely overrides whatever was set in new(). The value of 0 disables the timeout for this call, and 0 overrides the timeout from new() too, so it can be used for the one-off calls without the timeout.
And now, how it works inside. First thing, the call Triceps::now() returns the current time in seconds since epoch as a floating-point value, including the fractions of the seconds. The Triceps Perl API deals with time in this format.
Then, how to do the timeouts. Remember, if you look for repeatability of computation, you should really use an external time source synchronized with your data. The interface described here is for quick-and-dirty things, like time-limiting the tests, so that the unexpected input would not get the test stuck.
The core part of expect(), after it computes the time limit from the three sources, is this:
$self->{error} = undef;
$self->{expectDone} = 0;
$owner->unit()->makeHashCall($self->{faCtl}->getLabel("msg"), "OP_INSERT",
cmd => "expect",
client => $client,
arg => $pattern,
);
$owner->flushWriters();
if ($limit > 0.) {
while(!$self->{expectDone} && $owner->nextXtrayTimeLimit($limit)) { }
# on timeout reset the expect and have that confirmed
if (!$self->{expectDone}) {
$owner->unit()->makeHashCall($self->{faCtl}->getLabel("msg"), "OP_INSERT",
cmd => "cancel",
client => $client,
);
$owner->flushWriters();
# wait for confirmation
while(!$self->{expectDone} && $owner->nextXtray()) { }
}
} else {
while(!$self->{expectDone} && $owner->nextXtray()) { }
}
$@ = $self->{error};
The expects and inputs get combined in the collector thread. The expect request gets forwarded to it and then the current thread waits for the response. The response gets processed with nextXtray(), which represents one step of main loop. The main loop is literally implemented in C++ like this:
void TrieadOwner::mainLoop()
{
while (nextXtray())
{ }
markDead();
}
The nextXtray() takes the next tray from the Triead's read nexuses and processes it. "Xtray" is a special form of the trays used to pass the data across the nexus. It returns true until the thread is requested dead, and then it returns false.
The normal nextXtray() waits until there is more data to process or the thread is requested to die. But there are special forms of it:
nextXtrayNoWait()
Returns immediately if there is no data available at the moment.
nextXtrayTimeLimit($deadline)
Returns if no data becomes available before the absolute deadline.
nextXtrayTimeout($timeout)
Returns if no data becomes available before the timeout, starting at the time of the call. Just to reiterate, the difference is that the nextXtrayTimeLimit() receives the absolute time since the epoch while nextXtrayTimeou() receives the length of the timeout starting from the time of the call, both as seconds in floating-point.
All the versions that may return early on no data return false if they have to do so.
Expect() uses the version with the absolute deadline. If the collector thread finds a match, it will send a rowop back to the expect thread, it will get processed in nextXtrayTimeLimit(), calling a label that sets the flag $self->{expectDone}, and then nextXtrayTimeLimit() will return true, and the loop will find the flag and exit.
If the collector thread doesn't find a match, nextXtrayTimeLimit() will return false, and the loop will again exit. But then it will fill find the "done" flag not set, so it knows that the timeout has expired and it has to tell the controller thread that the call is being cancelled. So it sends another rowop for the cancel, and then waits for the confirmation with another nextXtray(), this time with no limit on it since the confirmation must arrive back quickly in any case.
It's the confirmation rowop processing that sets $self->{error}. But there is always a possibility that the match will arrive just after the timeout has expired but just before the cancellation. It's one of these things that you have to deal with when multiple threads exchange messages. What then? Then the normal confirmation will arrive back to the expecting thread. And when the cancel message will arrive to the collector thread, it will find that this client doesn't have an outstanding expect requests any more, and will just ignore the cancel. Thus, the second nextXtray() will receive either a confirmation of the cancel and set the error message, of it will receive the last-moment success message. Either way it will fall through and return (setting $@ if the cancel confirmation came back).
By the way, if the collector thread finds the socket closed, it will immediately return an error rowop, very similar to the confirmation of the cancel. And it will set both $self->{expectDone} and $self->{error} in the expect thread.
As an unrelated change, I've renamed the method protocol () to getTrace(). The word "protocol" creates to much confusion when used around the sockets, "trace" is a more distinct one.
And a mix thereof, the error messages get not only set in $@ but also included into the trace, so it can be detected all together. There is also a separate trace of the error messages only, that can be obtained with getErrorTrace().
There are three ways to specify the timeout. Two of them are in the new():
my $client = Triceps::X::ThreadedClient->new(
owner => $owner,
totalTimeout => $timeout,
);
my $client = Triceps::X::ThreadedClient->new(
owner => $owner,
timeout => $timeout,
);
The option "totalTimeout" gives a timeout for the whole run to complete. Once that timeout is reached, all future expect()s just fail immediately. The option "timeout" gives the default timeout for each expect(). It's possible to use both, and each call of expect() will be limited by the shorter time limit of the two (remember, "totalTimeout" starts counting since the call of new (not from startClient!), "timeout" starts counting since the call of expect).
All the timeouts are specified in seconds with fractions, so for 0.1 seconds you just use 0.1.
The third way is to use an extra argument in expect():
$client->expect("c1", "expected text", $timeout);
This timeout completely overrides whatever was set in new(). The value of 0 disables the timeout for this call, and 0 overrides the timeout from new() too, so it can be used for the one-off calls without the timeout.
And now, how it works inside. First thing, the call Triceps::now() returns the current time in seconds since epoch as a floating-point value, including the fractions of the seconds. The Triceps Perl API deals with time in this format.
Then, how to do the timeouts. Remember, if you look for repeatability of computation, you should really use an external time source synchronized with your data. The interface described here is for quick-and-dirty things, like time-limiting the tests, so that the unexpected input would not get the test stuck.
The core part of expect(), after it computes the time limit from the three sources, is this:
$self->{error} = undef;
$self->{expectDone} = 0;
$owner->unit()->makeHashCall($self->{faCtl}->getLabel("msg"), "OP_INSERT",
cmd => "expect",
client => $client,
arg => $pattern,
);
$owner->flushWriters();
if ($limit > 0.) {
while(!$self->{expectDone} && $owner->nextXtrayTimeLimit($limit)) { }
# on timeout reset the expect and have that confirmed
if (!$self->{expectDone}) {
$owner->unit()->makeHashCall($self->{faCtl}->getLabel("msg"), "OP_INSERT",
cmd => "cancel",
client => $client,
);
$owner->flushWriters();
# wait for confirmation
while(!$self->{expectDone} && $owner->nextXtray()) { }
}
} else {
while(!$self->{expectDone} && $owner->nextXtray()) { }
}
$@ = $self->{error};
The expects and inputs get combined in the collector thread. The expect request gets forwarded to it and then the current thread waits for the response. The response gets processed with nextXtray(), which represents one step of main loop. The main loop is literally implemented in C++ like this:
void TrieadOwner::mainLoop()
{
while (nextXtray())
{ }
markDead();
}
The nextXtray() takes the next tray from the Triead's read nexuses and processes it. "Xtray" is a special form of the trays used to pass the data across the nexus. It returns true until the thread is requested dead, and then it returns false.
The normal nextXtray() waits until there is more data to process or the thread is requested to die. But there are special forms of it:
nextXtrayNoWait()
Returns immediately if there is no data available at the moment.
nextXtrayTimeLimit($deadline)
Returns if no data becomes available before the absolute deadline.
nextXtrayTimeout($timeout)
Returns if no data becomes available before the timeout, starting at the time of the call. Just to reiterate, the difference is that the nextXtrayTimeLimit() receives the absolute time since the epoch while nextXtrayTimeou() receives the length of the timeout starting from the time of the call, both as seconds in floating-point.
All the versions that may return early on no data return false if they have to do so.
Expect() uses the version with the absolute deadline. If the collector thread finds a match, it will send a rowop back to the expect thread, it will get processed in nextXtrayTimeLimit(), calling a label that sets the flag $self->{expectDone}, and then nextXtrayTimeLimit() will return true, and the loop will find the flag and exit.
If the collector thread doesn't find a match, nextXtrayTimeLimit() will return false, and the loop will again exit. But then it will fill find the "done" flag not set, so it knows that the timeout has expired and it has to tell the controller thread that the call is being cancelled. So it sends another rowop for the cancel, and then waits for the confirmation with another nextXtray(), this time with no limit on it since the confirmation must arrive back quickly in any case.
It's the confirmation rowop processing that sets $self->{error}. But there is always a possibility that the match will arrive just after the timeout has expired but just before the cancellation. It's one of these things that you have to deal with when multiple threads exchange messages. What then? Then the normal confirmation will arrive back to the expecting thread. And when the cancel message will arrive to the collector thread, it will find that this client doesn't have an outstanding expect requests any more, and will just ignore the cancel. Thus, the second nextXtray() will receive either a confirmation of the cancel and set the error message, of it will receive the last-moment success message. Either way it will fall through and return (setting $@ if the cancel confirmation came back).
By the way, if the collector thread finds the socket closed, it will immediately return an error rowop, very similar to the confirmation of the cancel. And it will set both $self->{expectDone} and $self->{error} in the expect thread.
Saturday, May 11, 2013
on the export of table types
I've made an example for the export of the table types between threads but it didn't come out well. It has turned out to not particularly need this feature, and came out contrived and ugly. I'm working on a better example now, so in the meantime I want to tell more about the subject matter.
As I've said before, the limitation of exporting the table types between the threads is in keeping the involved Perl code snippets as source code. Their support for the sorted and ordered indexes has been already described, and I've also mentioned the aggregators. I've done this support for the basic aggregators too, with a side effect that the fatal errors in both the index and aggregator code snippets are now propagated much more nicely and come out as the table operations confessing. However when it came to the SimpleAggregator, I've found that I can't just do it as-is, the missing piece of the puzzle is the aggregator initialization routine that would run at the table type initialization time. It's a nice feature to have overall but I'm trying to cut a release here and push everything non-essential past it.
Fortunately, some thinking had showed that this feature is not really needed. There usually just isn't any need to export a table type with aggregators. Moreover, there is a need to export the table types with many elements stripped. What is to be stripped and why?
The most central part of the table type is its primary index. It defines how the data gets organized. And then the secondary indexes and aggregators perform the computations from the data in the table. The tables can not be shared between threads, and thus the way to copy a table between the threads is to export the table type and send the data, and let the other thread construct a copy of the table from that. But the table created in another thread really needs only the base data organization. If it does any computations on that data, that would be its own computations, different than the ones in the exporting thread. So all it needs to get is the basic table type with the primary index, very rarely some secondary indexes, and pretty much never the aggregators.
The way to get such a stripped table type with only the fundamentally important parts is:
$tabtype_fundamental = $tabtype->copyFundamental();
That copies the row type and the primary index (the whole path to the first leaf index type) and leaves alone the rest. All the aggregators on all the indexes, even on the primary one, are not included in the copy. In the context of the full nexus making it can look like
$facet = $owner->makeNexus(
name => "data"
labels => [ @labels ],
tableTypes => [
mytable => $mytable->getType()->copyFundamental(),
],
import => "writer",
);
In case if more index types need to be included, they can be specified by path in the arguments of copyFundamental():
$tabtype_fundamental = $tabtype->copyFundamental(
[ "byDate", "byAddress", "fifo" ],
[ "byDate", "byPriority", "fifo" ],
);
The paths may overlap, as shown here, and the matching subtrees will be copied correctly, still properly overlapping in the result. There is also a special syntax:
$tabtype_fundamental = $tabtype->copyFundamental(
[ "secondary", "+" ],
);
The "+" in the path means "do the path to the first leaf index of that subtree" and saves the necessity to write out the whole path.
Finally, what if you don't want to include the original primary index at all? You can use the string "NO_FIRST_LEAF" as the first argument. That would skip it. You can still include it by using its explicit path, possibly at the other position.
For example, suppose that you have a table type with two top-level indexes, "first" is the primary index and "second" as secondary, and make a copy:
$tabtype_fundamental = $tabtype->copyFundamental(
"NO_FIRST_LEAF",
[ "second", "+" ],
[ "first", "+" ],
);
In the copied table type the index "second" becomes primary and "first" secondary.
As I've said before, the limitation of exporting the table types between the threads is in keeping the involved Perl code snippets as source code. Their support for the sorted and ordered indexes has been already described, and I've also mentioned the aggregators. I've done this support for the basic aggregators too, with a side effect that the fatal errors in both the index and aggregator code snippets are now propagated much more nicely and come out as the table operations confessing. However when it came to the SimpleAggregator, I've found that I can't just do it as-is, the missing piece of the puzzle is the aggregator initialization routine that would run at the table type initialization time. It's a nice feature to have overall but I'm trying to cut a release here and push everything non-essential past it.
Fortunately, some thinking had showed that this feature is not really needed. There usually just isn't any need to export a table type with aggregators. Moreover, there is a need to export the table types with many elements stripped. What is to be stripped and why?
The most central part of the table type is its primary index. It defines how the data gets organized. And then the secondary indexes and aggregators perform the computations from the data in the table. The tables can not be shared between threads, and thus the way to copy a table between the threads is to export the table type and send the data, and let the other thread construct a copy of the table from that. But the table created in another thread really needs only the base data organization. If it does any computations on that data, that would be its own computations, different than the ones in the exporting thread. So all it needs to get is the basic table type with the primary index, very rarely some secondary indexes, and pretty much never the aggregators.
The way to get such a stripped table type with only the fundamentally important parts is:
$tabtype_fundamental = $tabtype->copyFundamental();
That copies the row type and the primary index (the whole path to the first leaf index type) and leaves alone the rest. All the aggregators on all the indexes, even on the primary one, are not included in the copy. In the context of the full nexus making it can look like
$facet = $owner->makeNexus(
name => "data"
labels => [ @labels ],
tableTypes => [
mytable => $mytable->getType()->copyFundamental(),
],
import => "writer",
);
In case if more index types need to be included, they can be specified by path in the arguments of copyFundamental():
$tabtype_fundamental = $tabtype->copyFundamental(
[ "byDate", "byAddress", "fifo" ],
[ "byDate", "byPriority", "fifo" ],
);
The paths may overlap, as shown here, and the matching subtrees will be copied correctly, still properly overlapping in the result. There is also a special syntax:
$tabtype_fundamental = $tabtype->copyFundamental(
[ "secondary", "+" ],
);
The "+" in the path means "do the path to the first leaf index of that subtree" and saves the necessity to write out the whole path.
Finally, what if you don't want to include the original primary index at all? You can use the string "NO_FIRST_LEAF" as the first argument. That would skip it. You can still include it by using its explicit path, possibly at the other position.
For example, suppose that you have a table type with two top-level indexes, "first" is the primary index and "second" as secondary, and make a copy:
$tabtype_fundamental = $tabtype->copyFundamental(
"NO_FIRST_LEAF",
[ "second", "+" ],
[ "first", "+" ],
);
In the copied table type the index "second" becomes primary and "first" secondary.
Tuesday, May 7, 2013
Label chaining at the front, and label method confessions.
It has been bothering me, how the threaded pipeline example was sensitive to the order of chaining the output facet and the internal logic to the input facet. To get the input data pass through first and only then have the processed data come out, the output facet had to be connected (and thus defined) first, and only then the internal logic could be connected to the input facet. Things would be much easier if the output facet could be connected at the end, but still put at the front of the chain of the input facet's label. And the FnReturn in general suffers from this problem as well.
So I've added this feature: a way to chain a label, placing it at the front of the chain.
Along the way I've also changed all the Label methods to use the new way of error reporting, confessing on errors. No more need to add "or confess" after that manually (and I've been forgetting to do that properly all over the place).
The new method is:
$label->chainFront($otherLabel);
In C++ this is done slightly differently, by adding an extra argument to chain:
err = label->chain(otherLabel, true);
The second argument has the default value of false, so the method is still backwards-compatible, and you can call either way
err = label->chain(otherLabel);
err = label->chain(otherLabel, false);
to chain a label normally, at the end of the chain. The return value in C++ is still the Erref (though hm, maybe it could use an exception as well).
Having done this, I went and changed the TrieadOwner::makeNexus() and FnReturn::new to chain their labels at the front by default. This can be switched to the old behavior by using a new option:
chainFront => 0
The default value of this option is 1.
In C++ this is expressed also by an extra argument to FnReturn::addFrontLabel(), that also has a default value, and the default value is true, matching the Perl code. Now when you call
ret = initialize(FnReturn::make(unit, name)
->addLabel("lb1", rt1)
->addFromLabel("lb2", lbX)
);
or
ret = initialize(FnReturn::make(unit, name)
->addLabel("lb1", rt1)
->addFromLabel("lb2", lbX, true)
);
you add the FnReturn's label to the front of the lbX's chain. To get the old behavior, use:
ret = initialize(FnReturn::make(unit, name)
->addLabel("lb1", rt1)
->addFromLabel("lb2", lbX, false)
);
I've changed the default behavior because there would not be many uses for the old one.
I haven't described yet, how the nexuses are created in C++, but they are created from an FnReturn, and thus this change to FnReturn covers them both.
So I've added this feature: a way to chain a label, placing it at the front of the chain.
Along the way I've also changed all the Label methods to use the new way of error reporting, confessing on errors. No more need to add "or confess" after that manually (and I've been forgetting to do that properly all over the place).
The new method is:
$label->chainFront($otherLabel);
In C++ this is done slightly differently, by adding an extra argument to chain:
err = label->chain(otherLabel, true);
The second argument has the default value of false, so the method is still backwards-compatible, and you can call either way
err = label->chain(otherLabel);
err = label->chain(otherLabel, false);
to chain a label normally, at the end of the chain. The return value in C++ is still the Erref (though hm, maybe it could use an exception as well).
Having done this, I went and changed the TrieadOwner::makeNexus() and FnReturn::new to chain their labels at the front by default. This can be switched to the old behavior by using a new option:
chainFront => 0
The default value of this option is 1.
In C++ this is expressed also by an extra argument to FnReturn::addFrontLabel(), that also has a default value, and the default value is true, matching the Perl code. Now when you call
ret = initialize(FnReturn::make(unit, name)
->addLabel("lb1", rt1)
->addFromLabel("lb2", lbX)
);
or
ret = initialize(FnReturn::make(unit, name)
->addLabel("lb1", rt1)
->addFromLabel("lb2", lbX, true)
);
you add the FnReturn's label to the front of the lbX's chain. To get the old behavior, use:
ret = initialize(FnReturn::make(unit, name)
->addLabel("lb1", rt1)
->addFromLabel("lb2", lbX, false)
);
I've changed the default behavior because there would not be many uses for the old one.
I haven't described yet, how the nexuses are created in C++, but they are created from an FnReturn, and thus this change to FnReturn covers them both.
Wednesday, May 1, 2013
a little more of ThreadedClient
Forgot to mention one more method of ThreadedClient, it's used like this:
$client->sendClose("c4", "WR");
This allows to close the client socket (in socket terms, shut it down). The first argument is the client name. The second argument determines, which side of the socket gets closed: "WR" for the writing side, "RD" for the reading side, and "RDWR" for both. It's the same names as for the system call shutdown().
$client->sendClose("c4", "WR");
This allows to close the client socket (in socket terms, shut it down). The first argument is the client name. The second argument determines, which side of the socket gets closed: "WR" for the writing side, "RD" for the reading side, and "RDWR" for both. It's the same names as for the system call shutdown().