Showing posts with label facet. Show all posts
Showing posts with label facet. Show all posts

Saturday, July 13, 2013

odds and ends

While working on threads support, I've added a few small features here and there. Some of them have been already described, some will be described now. I've also done a few more small clean-ups.

First, the historic methods setName() are now gone everywhere. This means Unit and Label classes, and in C++ also the Gadget. The names can now only be specified during the object construction.

FnReturn has the new method:

$res = fret->isFaceted();

bool isFaceted() const;

It returns true (or 1 in Perl) if this FnReturn object is a part of a Facet.

Unit has gained a couple of methods:

$res = $unit->isFrameEmpty();

bool isFrameEmpty() const;

Check whether the current frame is empty. This is different from the method empty() that checks whether the the whole unit is empty. This method is useful if you run multiple units in the same thread, with some potentially complicated cross-unit scheduling. It's what nextXtray() does with a multi-unit Triead, repeatedly calling drainFrame() for all the units that are found not empty. In this situation the simple empty() can not be used because the current inner frame might not be the outer frame, and draining the inner frame can be repeated forever while the outer frame will still contain rowops. The more precise check of isFrameEmpty() prevents the possibility of such endless loops.

$res = $unit->isInOuterFrame();

bool isInOuterFrame() const;

Check whether the unit's current inner frame is the same as its outer frame, which means that the unit is not in the middle of a call.


In Perl the method Rowop::printP() has gained an optional argument for the printed label name:

$text = $rop->printP();
$text = $rop->printP($lbname);

The reason for that is to make the printing of rowops in the chained labels more convenient. A chained label's execution handler receives the original unchanged rowop that refers to the first label in the chain. So when it gets printed, it will print the name of the first label in the chain, which might be very surprising. The explicit argument allows to override it to the name of the chained label (or to any other value).


 In C++ the Autoref has gained the method swap():

void swap(Autoref &other);

It swaps the values of two references without changing the reference counts in the referred values. This is a minor optimization for such a special situation. One or both references may contain NULL.


In C++ the Table has gained the support for sticky errors. The table internals contain a few places where the errors can't just throw an Exception because it will mess up the logic big time, most specifically the comparator functions for the indexes. The Triceps built-in indexes can't encounter any errors in the comparators but the user-defined ones, such as the Perl Sorted Index, can. Previously there was no way to report these errors other than print the error message and then either continue pretending that nothing happened or abort the program.

The sticky errors provide a way out of this sticky situation. When an index comparator encounters an error, it reports it as a sticky error in the table and then returns false. The table logic then unrolls like nothing happened for a while, but before returning from the user-initiated method it will find this sticky error and throw an Exception at a safe time. Obviously, the incorrect comparison means that the table enters some messed-up state, so all the further operations on the table will keep finding this sticky error and throw an Exception right away, before doing anything. The sticky error can't be unstuck. The only way out of it is to just discard the table and move on.

void setStickyError(Erref err);

Set the sticky error from a location where an exception can not be thrown, such as from the comparators in the indexes. Only the first error sticks, all the others are ignored since (a) the table will be dead and throwing this error in exceptions from this point on anyway and (b) the comparator is likely to report the same error repeatedly and there is no point in seeing multiple copies.

Errors *getStickyError() const;

Get the table's sticky error. Normally there is no point in doing this manually, but just in case.

void checkStickyError() const;

If the sticky error has been set, throw an Exception with it.

Tuesday, July 2, 2013

Facet reference, C++

The general functioning of a facet is the same in C++ as in Perl, so please refer to the part 1 of the Perl description for this information.

However the construction of a Facet is different in C++. The import is still the same: you call TrieadOwner::importNexus() method or one of its varieties and it returns a Facet. However the export is different: first you construct a Facet from scratch and then give it to TrieadOwner::exportNexus() to create a Nexus from it.

In the C++ API the Facet has a notion of being imported, very much like say the FnReturn has a notion of being initialized. When a Facet is first constructed, it's not imported. Then exportNexus() creates the nexus from the facet, exports it into the App, and also imports the nexus information back into the facet, marking the facet as imported. It returns back a reference to the exact same Facet object, only now that object becomes imported. Obviously, you can use either the original or returned reference, they point to the same object. Once a Facet has been imported, it can not be modified any more. The Facet object returned by the importNexus() is also marked as imported, so it can not be modified either. And also an imported facet can not be exported again.

However exportNexus() has an exception. If the export is done with the argument import set to false, the facet object will be left unchanged and not marked as imported. A facet that is not marked as imported can not be used to send or receive data. Theoretically, it can be used to export another nexus, but practically this would not work because that would be an attempt to export another nexus with the same name from the same thread. In reality such a Facet can only be thrown away, and there is not much use for it. You can read its components and use them to construct another Facet but that's about it.
It might be more convenient to use the TrieadOwner::makeNexus*() methods to build a Facet object rather than building it directly. In either case, the methods are the same and accept the same arguments, just the Facet methods return a Facet pointer while the nexus maker methods return a NexusMaker pointer.

The Facet class is defined in app/Facet.h. It inherits from Mtarget for an obscure reason that has to do with App topology analysis but it's intended to be used from one thread only.

You don't have to keep your own references to all your Facets. The TrieadOwner will keep a reference to all the imported Facets, and they will not be destroyed while the TrieadOwner exists (and this applies to Perl as well).

    enum {
        DEFAULT_QUEUE_LIMIT = 500,
    };

The default value used for the nexus queue limit (as a count of Xtrays, not rowops). Since the reading from the nexus involves double-buffering, the real queue size might grow up to twice that amount.

static string buildFullName(const string &tname, const string &nxname);

Build the full nexus name from its components.

Facet(Onceref<FnReturn> fret, bool writer);

Create a Facet, initially non-imported (and non-exported). The FnReturn object fret defines the set of labels in the facet (and nexus), and the name of FnReturn also becomes the name of the Facet, and of the Nexus. The writer flag determines whether this facet will become a writer (if true) or a reader (if false) when a nexus is created from it. If the nexus gets created without importing the facet back, the writer flag doesn't matter and can be set either way.

The FnReturn should generally be not initialized yet. The Facet constructor will check if FnReturn already has the labels _BEGIN_ and _END_ defined, and if either is missing, will add it to the FnReturn, then initialize it. If both _BEGIN_ and _END_ are already present, then the FnReturn can be used even if it's already initialized. But if any of them is missing, FnReturn must be not initialized yet, otherwise the Facet constructor will fail to add these labels.

The same FnReturn object may be used to create only one Facet object. And no, you can not import a Facet, get an FnReturn from it, then use it to create another Facet.

If anything goes wrong, the constructor will not throw but will remember the error, and later the exportNexus() will find it and throw an Exception from it.

static Facet *make(Onceref<FnReturn> fret, bool writer);

Same as the constructor, used for the more convenient operator priority for the chained calls.

static Facet *makeReader(Onceref<FnReturn> fret);
static Facet *makeWriter(Onceref<FnReturn> fret);

Syntactic sugar around the constructor, hardcoding the writer flag.

Normally the facets are constructed and exported with the chained calls, like:

Autoref<Facet> myfacet = to->exportNexus(
   Facet::makeWriter(FnReturn::make("My")->...)
   ->setReverse()
   ->exportTableType(Table::make(...)->...)
);

Because of this, the methods that are used for post-construction return the pointer to the original Facet object. They also almost never throw the Exceptions, to prevent the memory leaks through the orphaned Facet objects. The only way an Exception might get thrown is on an attempt to use these methods on an already imported Facet. Any errors get collected, and eventually exportNexus() will find them and properly throw an Exception, making sure that the Facet object gets properly disposed of.

Facet *exportRowType(const string &name, Onceref<RowType> rtype);

Add a row type to the Facet. May throw an Exception if the the facet is already imported. On other errors remembers them to be thrown on an export attempt.

Facet *exportTableType(const string &name, Autoref<TableType> tt);

Add a table type to the Facet. May throw an Exception if the the facet is already imported. On other errors remembers them to be thrown on an export attempt. The table type must also be deep-copyable and contain no errors. Not sure if I described this before, but if the deep copy can not proceed (say, a table type involves a Perl sort condition with a direct reference to the compiled Perl code) the deepCopy() method must still return a newly created object but remember the error inside it. Later when the table type is initialized, that object's initialization must return this error. The exportTableType() does a deep copy then initializes the copied table type. If this detects any errors, they get remembered and cause an Exception later in exportNexus().


Facet *setReverse(bool on = true);

Set (or clear) the nexus reverse flag. May throw an Exception if the the facet is already imported.

Facet *setQueueLimit(int limit);

Set the nexus queue limit. May throw an Exception if the the facet is already imported.

Erref getErrors() const;

Get the collected errors, so that they can be found without an export attempt.

bool isImported() const;

Check whether this facet is imported.

The rest of the methods are the same as in Perl. They can be used even if the facet is not imported.

bool isWriter() const;

Check whether this is a writer facet (or if returns false, a reader facet).

bool isReverse() const;

Check whether the underlying nexus is reverse.

int queueLimit() const;

Get the queue size limit of the nexus. Until the facet is exported, this will always return the last value set by setQueueLimit(). However if the nexus is reverse, on import the value will be changed to a very large integer value, currently INT32_MAX, and on all the following calls this value will be returned. Technically speaking, the queue size of the reverse nexuses is not unlimited, it's just very large, but in practice it amounts to the same thing.

FnReturn *getFnReturn() const;

Get the FnReturn object. If you plan to destroy the Facet object soon after this method is called, make sure that you put the FnReturn pointer into an Autoref first.

const string &getShortName() const;

Get the short name, AKA "as-name", which is the same as the FnReturn's name.

const string &getFullName() const;

Get the full name of the nexus imported through this facet. If the facet is not imported, will return an empty string.

typedef map<string, Autoref<RowType> > RowTypeMap;const RowTypeMap &rowTypes() const;

Get the map of the defined row types. Returns the reference to the Facet's internal map object.

typedef map<string, Autoref<TableType> > TableTypeMap;
const TableTypeMap &tableTypes() const;

Get the map of defined table types. Returns the reference to the Facet's internal map object.

 RowType *impRowType(const string &name) const;

Find a single row type by name. If the name is not known, returns NULL.

TableType *impTableType(const string &name) const;

Find a single table type by name. If the name is not known, returns NULL.

Nexus *nexus() const;

Get the nexus of this facet. If the facet is not imported, returns NULL.

int beginIdx() const;
int endIdx() const;

Return the indexes (as in "integer offset") of the _BEGIN_ and _END_ labels in FnReturn.

bool flushWriter();

Flush the collected rowops into the nexus as a single Xtray. If there is no data collected, does nothing. Returns true on a successful flush (even if there was no data collected), false if the Triead was requested to die and thus all the data gets thrown away.

Facet reference, Perl, part 2

As mentioned before, a Facet object is returned from either a nexus creation or nexus import. Then the owner thread can work with it.

$result = $fa->same($fa2);

Check whether two references point to the same Facet object.

$name = $fa->getShortName();

Get the short name of the facet (AKA "as-name", with which it has been imported).

$name = $fa->getFullName();

Get the full name of the nexus represented by this facet. The name consists of two parts separated by a slash, "$thread/$nexus".

$result = $fa->isWriter();

Check whether this is a writer facet (i.e. writes to the nexus). Each facet is either a writer or a reader, so if this method returns 0, it means that this is a reader facet.

$result = $fa->isReverse();

Check whether this facet represents a reverse nexus.

$limit = $fa->queueLimit();

Get the queue size limit of the facet's nexus. I think I forgot to mention it in the Nexus reference, but for a reverse nexus the returned value will be a large integer (currently INT32_MAX but the exact value might change in the future). And if some different limit value was specified during the creation of the reverse nexus, it will be ignored.

$limit = &Triceps::Facet::DEFAULT_QUEUE_LIMIT();

The constant of the default queue size limit that is used for the nexus creation, unless explicitly overridden.

$fret = $fa->getFnReturn();

Get the FnReturn object of this facet. This FnReturn will have the same name as the facet's short name, and it has a special symbiotic relation with the Facet object. Its use depends on whether this is a reader or writer facet. For a writer facet, sending rowops to the labels in FnReturn (directly or by chaining them off the other labels) causes these rowops to be buffered for sending into the nexus. For a reader facet, you can either chain your logic directly off the FnReturn's labels, or push an FnBinding onto it as usual.

$nexus = $fa->nexus();

Get the facet's nexus. There is not a whole lot that can be done with the nexus object, just get the introspection information, and the same information can be obtained directly with the facet's methods.

$idx = $fa->beginIdx();

Index (as in "integer offset", not a table index) of the _BEGIN_ label in the FnReturn's set of labels. There probably isn't much use for this method, and its name is somewhat confusing.

$idx = $fa->endIdx();


Index (as in "integer offset", not a table index) of the _END_ label in the FnReturn's set of labels. There probably isn't much use for this method, and its name is somewhat confusing.

$label = $fa-> getLabel($labelName);

Get a label from FnReturn by name. This is a convenience method, equivalent to $fa->getFnReturn()->getLabel($labelName). Confesses if the label with this name is not found.

@rowTypes = $fa->impRowTypesHash();

Get ("import") the whole set of row types exported through the nexus. The result is an array containing the name-value pairs, values being the imported row types. This array can be assigned into a hash to populate it. As it happens, the pairs will be ordered by name in the lexicographical order but there are no future guarantees about it.

The actual import of the types is done only once, when the nexus is imported to create the facet, and the repeated calls of the imp* methods will return the same objects.

$rt = $fa->impRowType($rtName);

Get ("import") one row type by name. If the name is not known, will confess.

@tableTypes = $fa->impTableTypesHash();

Get ("import") the whole set of table types exported through the nexus. The result is an array containing the name-value pairs, values being the imported table types. This array can be assigned into a hash to populate it. As it happens, the pairs will be ordered by name in the lexicographical order but there are no future guarantees about it.


The actual import of the types is done only once, when the nexus is imported to create the facet, and the repeated calls of the imp* methods will return the same objects.


$tt = $fa->impTableType($ttName);


Get ("import") one table type by name. If the name is not known, will confess.

$result = $fa-> flushWriter();

Flush the collected buffered rowops to the nexus as a single Xtray. If there are no collected rowops, does nothing. Returns 1 if the flush succeeded (even if there was no data to send), 0 if this thread was requested to die and thus all the collected data gets thrown away, same as for the TrieadOwner::flushWriters(). The rules for when this method may be called is also the same: only after calling readyReady(), or it will confess.

If this facet is in an input-only Triead, this call may sleep if a drain is currently active, until the drain is released.

Saturday, June 29, 2013

Facet reference, Perl, part 1

A Facet represents a Nexus endpoint imported into a Triead. A facet is either a reader (reading from the nexus) or a writer (writing into the nexus).

In the Perl API the Facets are created by TrieadOwner::makeNexus() or TrieadOwner::importNexus(). After that the metadata in the Facet is fixed and is available for reading only. Of course, the rowops can then be read or written.

The reading of data from a Facet is done by TrieadOwner::nextXtray(). There is no way to read a tray from a particular facet, nextXtray() reads from all the Triead's imported reader facets, alternating in a fair fashion if more than one of them has data available.

Each Facet has an FnReturn connected to it. The reading from a reader facet happens by forwarding the incoming rowops to that FnReturn. To actually process the data, you can either chain your handler labels directly to the FnReturn labels, or push an FnBinding onto that FnReturn. An incoming Xtray is always processed as a unit, with no intermixing with the other Xtrays.

The writing to a writer facet happens by calling the labels of its FnReturn. Which then has the logic that collects all these rowops into a buffer. Then when the facet is flushed, that buffer becomes an indivisible Xtray that gets sent to the nexus as a unit, and then read by the reader facets as a unit.

The facet metadata consists of:
  • a set of labels, same as for an FnReturn, used to build the facet's internal FnReturn; these labels define the data that can be carried through the nexus;
  • a set of row types that gets exported through the nexus;
  • a set of table types that gets exported through the nexus.
The table types must not contain any references to the Perl functions, or the export will fail. The Perl code snippets in the text format can be used instead.

There are two special labels, named _BEGIN_ and _END_. They may be defined explicitly, but if they aren't, they will be always added implicitly, with an empty row type (i.e. a row type with no fields).

When reading an Xtray, the _BEGIN_ label will always be called first, and _END_ last, thus framing the rest of the data. There are optimizations that skip the calling if there is nothing chained to these labels in FnReturn nor to the top FnBinding, and the rowop as such carries no extra data. The optimization is actually a bit deeper: the _BEGIN_ and _END_ rowops that have no extra data in them aren't even carried in the Xtray through the nexus. They are generated on the fly if there is an interest in them, or otherwise the generation is skipped.

What is meant by the "extra data"? It's either the opcode is not OP_INSERT or there are some non-NULL fields (or both). If the _BEGIN_ and _END_ labels were auto-generated, their row type will contain no fields, so the only way to sent the non-default data in them will be the non-default opcode. But if you define them explicitly with a different row type, you can also send the data in them.

When sending the data into a writer Facet, you don't have to send the _BEGIN_ and _END_ rowops, if you don't, they will be generated automatically as needed, with the default contents (opcode OP_INSERT and NULLs in all the fields). Moreover, they will really be generated automatically on the reader side, thus saving the overhead of passing them through the nexus. Another consequence of this optimization is that it's impossible to create an Xtray consisting of only a default _BEGIN_, a default _END_ and no payload rowops between them. It would be an empty Xtray, that would never be sent through the nexus. Even if you create these _BEGIN_ and _END_ rowops manually (but with the default contents), they will be thrown away when they reach the writer facet. If you want an Xtray to get through, you've got to either send the payload or put something non-default into at least one of the _BEGIN_ or _END_ rowops, at the very minimum a different opcode.

Sending the _BEGIN_ and _END_ rowops into a writer facet also has the effect of flushing it. Even if these rowops have the default contents and become thrown away by the facet, the flushing effect still works. The _BEGIN_ rowop flushes any data that has been collected in the buffer before it. The _END_ rowop gets added to the buffer (or might get thrown away) and then flushes the buffer. If the buffer happens to contain anything at the flush time, that contents forms an Xtray and gets forwarded to the nexus.

It's a long and winding explanation, but really it just does what is intuitively expected.

A Facet has two names, the "full" one and the "short" one:
  • The full name is copied from the nexus and consists of the name of the thread that exported the nexus and the name of the nexus itself separated by a slash, such as "t1/nx1".
  • The short name is the name with which the facet was imported. By default it's taken from the short name of the nexus. But it can also be given a different explicit name during the import, which is known as the "as-name" (because it's similar to the SQL "AS" clause). So if the full name is "t1/nx1", the default short name will be "nx1", but it can be overridden. The facet's FnReturn is named with the facet's short name.

Friday, June 28, 2013

TrieadOwner reference, C++

The TrieadOwner is defined in app/TrieadOwner.h. Its constructor is protected, the normal way of constructing is by calling App::makeTriead(). This is also called "defining a Triead".

TrieadOwner is an Starget, and must be accessed only from the thread that owns it (though it's possible to create it in the parent thread and then pass to the actual owner thread, as long as the synchronization between the threads is done properly).

Triead *get() const;

Get the public side if the Triead. In C++, unlike Perl, the Triead methods are not duplicated in TrieadOwner. So they are accessed through get(), and for example to get the Triead name, call to->get()->getName(). The TrieadOwner holds a reference to Triead, so the Triead object will never get destroyed as long as the TrieadOwner is alive.

App *app() const;

Get the App where this Triead belongs. The TrieadOwner holds a reference to App, so the App object will never get destroyed as long as the TrieadOwner is alive.

bool isRqDead() const;

Check whether this triead has been requested to die.

void requestMyselfDead();

Request this thread itself to die (see the reasoning for the in the Perl reference).

Unit *unit() const;

Get the main unit of this Triead. It has the same name as the Triead itself.

void addUnit(Autoref<Unit> u);

Keep track of an additional unit. Adding a unit multiple times has no effect. See the other implications in the Perl reference.

bool forgetUnit(Unit *u);

Forget about an additional unit. If the unit is already unknown, has no effect. The main unit can not be forgotten.

typedef list<Autoref<Unit> > UnitList;
const UnitList &listUnits() const;

List the tracked units. The main unit is always included at the front of the list.

void markConstructed();

Advance the Triead state to Constructed. Repeated calls have no effect.

void markReady();

Advance the Triead state to Ready. Repeated calls have no effect. The advancement is cumulative: if the Triead was not constructed yet, it will be automatically advanced first to Constructed and then to Ready. If this is the last Triead to become ready, it will trigger the App topology check, and if the check fails, abort the App and throw an Exception.

void readyReady();

Advance the Triead to the Ready state, and wait for all the Trieads to become ready. The topology check applies in the same way as in markReady().

void markDead();

Advance the Triead to the Dead state. If this Triead was not Ready yet and it's the last one to become so, the topology check will run. If the topology check fails, the App will be aborted but markDead() will not throw an exception.

This method is automatically called from the TrieadOwner destructor, so most of the time there is no need to call it explicitly.

It also clears all the tracked units.

void abort(const string &msg) const;

Abort the App. The name of this thread will be forwarded to the App along with the error message. The error message may be multi-line.

Onceref<Triead> findTriead(const string &tname, bool immed = false);

Find a Triead in the App by name. The flag immed controls whether this method may wait. If immed is false, and the target thread is not constructed yet (but at least declared), the method will sleep until it becomes constructed, and then returns it. If the target thread is not declared, it will throw an Exception. If immed is true, the look-up is immediate: it will return the thread even if it's not constructed but is at least defined. If it's not defined (even if it's declared), an Exception will be thrown. The look-up of this Triead itself is always immediate, irrespective of the immed flag.

An Exception may also be thrown if a circular sequence of Trieads deadlocks waiting for each other.

Onceref<Facet> exportNexus(Autoref<Facet> facet, bool import = true);

Export a Nexus. The Nexus definition is constructed as a Facet object, which is then used by this method to construct and export the Nexus. The same argument Facet reference is then returned back as the result. The import flag tells, whether the Nexus is to be also imported back by connecting the same original Facet object to it. If import is false, the original Facet reference is still returned back but it can't be used for anything, and can only be thrown away. The direction of the import (reading or writing) is defined in the Facet, as well as the nexus name and all the other information.

Throws an Exception on any errors, in particular on the duplicate facet names within the Triead.

Onceref<Facet> exportNexusNoImport(Autoref<Facet> facet);

A convenience wrapper around exportNexus() with import=false.

Onceref<Facet> importNexus(const string &tname, const string &nexname, const string &asname, bool writer, bool immed = false);

Import a Nexus from another Triead. tname is the name of the other thread, nexname is the name of nexus exported from it, asname is the local name for the imported facet ("" means "same as nexname"), the writer flag determines if the import is for writing, and the immed flag has the same meaning as in findTriead(). The import of a nexus involves finding its exporting thread, and the immed flag controls, how this finding is done.

Throws an Exception if anything is not found, or the local import name conflicts with another imported facet.

Onceref<Facet> importNexusImmed(const string &tname, const string &nexname, const string &asname, bool writer);
Onceref<Facet> importReader(const string &tname, const string &nexname, const string &asname = "", bool immed=false);
Onceref<Facet> importWriter(const string &tname, const string &nexname, const string &asname = "", bool immed=false);
Onceref<Facet> importReaderImmed(const string &tname, const string &nexname, const string &asname = "");
Onceref<Facet> importWriterImmed(const string &tname, const string &nexname, const string &asname = "");

Convenience wrappers for importNexus(), providing the default arguments and the more mnemonic names.

typedef map<string, Autoref<Nexus> > NexusMap;
void exports(NexusMap &ret) const;

Get the nexuses exported here. The map argument will be cleared and refilled with the new values.

typedef map<string, Autoref<Facet> > FacetMap;
void imports(FacetMap &ret) const;

Get the facets imported here. The map argument will be cleared and refilled with the new values.


NexusMaker *makeNexusReader(const string &name);
NexusMaker *makeNexusWriter(const string &name);
NexusMaker *makeNexusNoImport(const string &name);


A convenient way to build the nexuses for export in a chained fashion. The name argument is the nexus name. The NexusMaker is an opaque class that has the same building methods as a Facet, plus the method complete() that finishes the export. This call sequence is more convenient than building a Facet and then exporting it. For example:


Autoref<Facet> myfacet = ow->makeNexusReader("my")
    ->addLabel("one", rt1)
    ->addFromLabel("two", lb2)
    ->setContext(new MyFnContext)
    ->setReverse()
    ->complete();


Only one nexus may be built like this at a time, since there is only one instance of NexusMaker per TrieadOwner that gets reused over and over. It keeps the Facet instance being built in it. If you don't complete the build, that Facet instance will be left sitting around until another makeNexus*() calls when it will get thrown away. But in general if you do the calling in the sequence as shown, you can't forget to call complete() at the end, since otherwise the return type would not match and the compiler will fail.


bool flushWriters();


Flush all the writer facets. Returns true on the successfull completion, false if the thread was requested to die, and thus all the output was thrown away.


bool nextXtray(bool wait = true, const struct timespec &abstime = *(const struct timespec *)NULL);


Read and process the next Xtray. Automatically calls the flushWriters() after processing. The rest works in the same way as described in Perl, however this is a combination of Perl's nextXtray(), nextXtrayNoWait() and nextXtrayTimeLimit(). If wait is false, this method will never wait. If wait is true and the abstime reference is not NULL, it might wait but not past that absolute time. Otherwise it will wait until the data becomes available of the thread is requested to die.

Returns true if an Xtray has been processed, false if it wasn't (for any reason, a timeout expiring or thread being requested to die).

 bool nextXtrayNoWait();

A convenience wrapper over nextXtray(false).

bool nextXtrayTimeout(int64_t sec, int32_t nsec);

Another convenience wrapper over nextXtray(): read and process an Xtray, with a timeout limit. The timeout value consists of the seconds and nanoseconds parts.

void mainLoop();

Run the main loop, calling nextXtray() repeatedly until the thread is requested to die.

bool isRqDrain();

Check if a drain is currently requested by any thread (and applies to this thread). In case if an exclusive drain is requested with the exclusion of this thread, this method will return false.

void requestDrainShared();
void requestDrainExclusive();
void waitDrain();
bool isDrained();
void drainShared();
void drainExclusive();
void undrain();

The drain control, same as in Perl. These methods are really wrappers over the corresponding App methods. And generally a better idea is to do the scoped drains with AutoDrain rather than to call these methods directly.

The C++ API provides no methods for the file descriptor tracking as such. Instead these methods are implemented in the class FileInterrupt. TrieadOwner has a public field

Autoref<FileInterrupt> fileInterrupt_;

to keep an instance of the interruptor. TrieadOwner itself has no use for it, nor does any other part of Triceps, it's just a location to keep this reference for the convenience of the application developer. The Perl API makes use of this location.

But how does the thread then get interrupted when it's requested to die? The answer is that the TrieadJoin object also has a reference to the FileInterrupt object, and even creates that object in its own constructor. So when a joiner method is defined for a thread, that supplies the App with the access to its interruptor as well. And to put the file descriptors into the FileInterrupt object, you can either keep a direct reference to it somewhere in your code, or copy that reference into the TrieadOwner object.

Here is for example how the Perl thread joiner is created for the TrieadOwner inside the Perl implementation:

            string tn(tname);
            Autoref<TrieadOwner> to = appv->makeTriead(tn, fragname);
            PerlTrieadJoin *tj = new PerlTrieadJoin(appv->getName(), tname,
                SvIOK(tid)? SvIV(tid): -1,
                SvIOK(handle)? SvIV(handle): 0,
                testfail);
            to->fileInterrupt_ = tj->fileInterrupt();
            appv->defineJoin(tn, tj);

This is somewhat cumbersome, but the native C++ programs can create their threads using the class BasicPthread that takes care of this and more.

Tuesday, June 25, 2013

TrieadOwner reference, Perl, part 2

I've described it before but forgot to mention in the part 1 of the reference, the mark* methods bring the Triead to the target state through all the intermediate states. So if a Triead was not constructed, markReady() will mark it first constructed and then ready.

The same applies to markDead() but with an even more interesting twist. Suppose there is an application with an incorrect topology, and all the Trieads in it but one are ready. That last Triead then experiences an error, and proceeds directly to call markDead() and then exit. This markDead() will involve an intermediate step marking the Triead as ready. Since it's the last Triead to be ready, it will trigger the topology check, and since the topology is incorrect, it will fail. If it happened in markReady(), the method would confess. But in markDead() confessing doesn't make a whole lot of sense: after all, the thread is about to exit anyway. So markDead() will catch all these confessions and throw them away, it will never fail. However the failed check will still abort the App, and the rest of the threads will wake up and fail as usual.

The other thing about markDead() is that it clears and unreferences all the TrieadOwner's registered units, not waiting for the TrieadOwner object to be destroyed. This unravels any potential cyclic references where the code in a label might be referring back to the TrieadOwner.

And now continuing with more methods.

$facet = $to-> makeNexus(@options);

Create, export and (optionally) import a nexus. The result is an imported Facet of this Nexus, except when the options specify the no-import mode (then the result will be undef). Confesses on errors.

The options are:

name => $name
Name of the nexus, it will be used both as the export name and the local imported "as-name" of the facet.

labels => [
  name => $rowType,
  name => $fromLabel,
]
Defines the labels similarly to FnReturn in a referenced array. The array contains the pairs of (label_name, label_definition). The definition may be either a RowType, and then a label of this row type will be created, or a Label, and then a label of the same row type will be created and chained from that original label. The created label objects can be later found from Facets, and used like normal labels, by chaining them or sending rowops to them (but chaining from them is probably not the best idea, although it works anyway).

Optional, or may be an empty array; the implicit labels _BEGIN_ and _END_ will allways be added automatically if not explicitly defined.

The labels are used to construct an implicit FnReturn in the current Triead's main unit, and this is the FnReturn that will be visible in the Facet that gets imported back. If the import mode is "none", the FnReturn will still be  constructed and then abandoned (and freed by the reference count going to 0, as usual). The labels used as $fromLabel must always belong to the Triead's main unit.

rowTypes => [
  name => $rowType,
]
Defines the row types exported in this Nexus as a referenced array of name-value pairs. The types imported back into this Triead's facet will be references to the exact same type objects. Optional, or may be empty.

tableTypes => [
  name => $tableType,
]
Defines the table types exported in this Nexus as a referenced array of name-value pairs. The types imported back into this Triead's facet will be references to the exact same type objects. Optional, or may be empty.

reverse => 0/1
Flag: this Nexus goes in the reverse direction. The reverse nexuses are used to break up the topological loops, to prevent the deadlocks on the queueing. They have no limit on the queue size, and the data is read from them at a higher priority than from the direct nexuses. Default: 0.

chainFront => 0/1
Flag: when the labels are specified as $fromLabel, chain them at the front of the original labels. Default: 1. The default is this way because chaining at the front is what is typically needed. The reasons are described at length in the pipelined example, but the short gist is that you might want to send the rows from both the inputs, intermediate points, and the end of processing into an output nexus. It's most convenient to create the nexus in one go, after the whole thread's computation is defined. But the rowops from the earlier stages of computations have to come to the nexus before the rowops from the later stage. Chaining at the front ensures that each such label will send the rowop into the nexus first, and only then to the next stage of the computation.

queueLimit => $number
Defines the size limit after which the writes to the queue of this Nexus block. In reality because of the double-buffering the queue may contain up to twice that many trays before the future writes block. This option has no effect on the  reverse nexuses. Default: &Facet::DEFAULT_QUEUE_LIMIT, 500 or so.

import => $importType
A string value, essentially an enum, determining how this Nexus gets immediately imported back into this Triead. The supported values are:
  • reader (or anything starting from "read") - import for reading
  • writer (or anything starting from "write") - import for writing
  • none (or anything starting from "no") - do not import
The upper/lowercase doesn't matter. The use of the canonical strings is recommended.

$facet = $to->importNexus(@options);

Import a nexus into this Triead. Returns the imported Facet. The repeated attempts to import the same Nexus will return references to the same Facet object. Confesses on errors. An attempt to import the same nexus for both reading and writing is an error.

The options are:

from => "$t/$n"
Identifier of the nexus to import, consisting of two parts separated by a slash:
  •   thread name
  •   nexus name
The nexus name will also be used as the name of the local facet, unless overridden by the option "as". The reason for slash separator is that normally both the thread name and the nexus name parts may contain further components separated by dots, and a different separator allows to find the boundary between them. If a dot were used, in "a.b.c" it would be impossible to say, does it mean the thread "a" and nexus "b.c" in it, or thread "a.b" and nexus "c"? However "a/b.c" or "a.b/c" have no such ambiguity. Mutually exclusive with options "fromTriead" and "fromNexus".

fromTriead => $t
fromNexus => $n
The alternative way to specify the source thread and nexus as separate options. Both options must be present or absent at the same time. Mutually exclusive with "from".

as => $name
Specifies an override name for the local facet (and thus also to the FnReturn created in the facet). Logically similar to the SQL clause AS. Default is to reuse the nexus name.

import => $importType
A string value, essentially an enum, determining how this Nexus gets imported. The supported values are the same as for makeNexus (except "none", since there is no point in a no-op import):
  • reader (or anything starting from "read") - import for reading
  • writer (or anything starting from "write") - import for writing
The upper/lowercase doesn't matter.

immed => 0/1
Flag: do not wait for the thread that exported the nexus to be fully constructed. Waiting synchronizes with the exporter and prevents a race of an import attempt trying to find a nexus before it is made and failing. However if two threads are waiting for each other, it becomes a deadlock that gets caught and aborts the App. The immediate import allows to avoid such deadlocks for the circular topologies with helper threads.


The helper threads are the "blind alleys" in the topology: the "main thread" outsources some computation to a "helper thread", sending it the arguments, then later receiving the results and continuing with its logic.

For example, consider the following sequence:
  • thread A creates the nexus O;
  • thread A creates the helper thread B and tells it to import the nexus A/O for its input immediately and create the reverse nexus R for result;
  • thread A requests a (normal) import of the nexus B/R and falls asleep because B is not constructed yet;
  •     thread B starts running;
  •     thread B imports the nexus A/O immediately and succeeds;
  •     thread B defines its result nexus R;
  •     thread B marks itself as constructed and ready;
  • thread A wakes up after B is constructed, finds the nexus B/R and completes its import;
  • then thread A can complete its initialization, export other nexuses etc.
Default: 0, except if importing a nexus that has been exported from the same Triead. Importing from the same Triead is not used often, since the export also imports the nexus back right away, and there is rarely any use in importing separately. But it's possible, and importing back from the same Triead is always treated as immediate to avoid deadlocks.

Friday, June 7, 2013

reordering the data from multiple threads, part 3

And finally the code of the example. I've written it with the simple approach of read stdin, print to stdout. It's easier this way, and anyway to demonstrate the rowop reordering, all the input has to be sent quickly in one chunk. The application starts as:

Triceps::Triead::startHere(
    app => "ForkJoin",
    thread => "main",
    main => \&mainT,
    workers => 2,
    delay => 0.02,
);

The main thread is:

sub mainT # (@opts)
{
    my $opts = {};
    &Triceps::Opt::parse("mainT", $opts, {@Triceps::Triead::opts,
        workers => [ 1, undef ], # number of worker threads
        delay => [ 0, undef ], # artificial delay for the 0th thread
    }, @_);
    undef @_; # avoids a leak in threads module
    my $owner = $opts->{owner};
    my $app = $owner->app();
    my $unit = $owner->unit();


So far the pretty standard boilerplate with the argument parsing.

    my $rtRate = Triceps::RowType->new( # an exchange rate between two currencies
        ccy1 => "string", # currency code
        ccy2 => "string", # currency code
        rate => "float64", # multiplier when exchanging ccy1 to ccy2
    ) or confess "$!";

    # the resulting trade recommendations
    my $rtResult = Triceps::RowType->new(
        triead => "int32", # id of the thread that produced it
        ccy1 => "string", # currency code
        ccy2 => "string", # currency code
        ccy3 => "string", # currency code
        rate1 => "float64",
        rate2 => "float64",
        rate3 => "float64",
        looprate => "float64",
    ) or confess "$!";

The row types originate from the self-join example code, same as before.

    # each tray gets sequentially numbered and framed
    my $rtFrame = Triceps::RowType->new(
        seq => "int64", # sequence number
        triead => "int32", # id of the thread that produced it (optional)
    ) or confess "$!";

    # the plain-text output of the result
    my $rtPrint = Triceps::RowType->new(
        text => "string",
    ) or confess "$!";

These are the new additions. The frame row type is used to send the information about the sequence number in the _BEGIN_ label. The print row type is used to send the text for printing back to the main thread.

    # the input data
    my $faIn = $owner->makeNexus(
        name => "input",
        labels => [
            rate => $rtRate,
            _BEGIN_ => $rtFrame,
        ],
        import => "none",
    );

    # the raw result collected from the workers
    my $faRes = $owner->makeNexus(
        name => "result",
        labels => [
            result => $rtResult,
            _BEGIN_ => $rtFrame,
        ],
        import => "none",
    );

    my $faPrint = $owner->makeNexus(
        name => "print",
        labels => [
            raw => $rtPrint, # in raw order as received by collator
            cooked => $rtPrint, # after collation
        ],
        import => "reader",
    );

The processing will go in essentially a pipeline: read the input -> process in the worker threads -> collate -> print in the main thread. Only the worker thread stage spreads into multiple threads, that are then joining the data paths back together in the collator.

    Triceps::Triead::start(
        app => $app->getName(),
        thread => "reader",
        main => \&readerT,
        to => $owner->getName() . "/input",
    );

    for (my $i = 0; $i < $opts->{workers}; $i++) {
        Triceps::Triead::start(
            app => $app->getName(),
            thread => "worker$i",
            main => \&workerT,
            from => $owner->getName() . "/input",
            to => $owner->getName() . "/result",
            delay => ($i == 0? $opts->{delay} : 0),
            workers => $opts->{workers},
            identity => $i,
        );
    }

    Triceps::Triead::start(
        app => $app->getName(),
        thread => "collator",
        main => \&collatorT,
        from => $owner->getName() . "/result",
        to => $owner->getName() . "/print",
    );

    my @rawp; # the print in original order
    my @cookedp; # the print in collated order

    $faPrint->getLabel("raw")->makeChained("lbRawP", undef, sub {
        push @rawp, $_[1]->getRow()->get("text");
    });
    $faPrint->getLabel("cooked")->makeChained("lbCookedP", undef, sub {
        push @cookedp, $_[1]->getRow()->get("text");
    });

    $owner->readyReady();

    $owner->mainLoop();

    print("--- raw ---\n", join("\n", @rawp), "\n");
    print("--- cooked ---\n", join("\n", @cookedp), "\n");
}

The collator will send the data for printing twice: first time in the order it was received ("raw"), second time in the order after collation ("cooked").

sub readerT # (@opts)
{
    my $opts = {};
    &Triceps::Opt::parse("readerT", $opts, {@Triceps::Triead::opts,
        to => [ undef, \&Triceps::Opt::ck_mandatory ], # dest nexus
    }, @_);
    undef @_; # avoids a leak in threads module
    my $owner = $opts->{owner};
    my $unit = $owner->unit();

    my $faIn = $owner->importNexus(
        from => $opts->{to},
        import => "writer",
    );

    my $lbRate = $faIn->getLabel("rate");
    my $lbBegin = $faIn->getLabel("_BEGIN_");
    # _END_ is always defined, even if not defined explicitly
    my $lbEnd = $faIn->getLabel("_END_");

This demonstrates that the labels _BEGIN_ and _END_ always get defined in each nexus, even if they are not defined explicitly. Well, here _BEGIN_ was defined explicitly but _END_ was not, and nevertheless it can be found and used.

    my $seq = 0; # the sequence

    $owner->readyReady();

    while(<STDIN>) {
        chomp;

        ++$seq; # starts with 1
        $unit->makeHashCall($lbBegin, "OP_INSERT", seq => $seq);
        my @data = split(/,/); # starts with a string opcode
        $unit->makeArrayCall($lbRate, @data);
        # calling _END_ is an equivalent of flushWriter()
        $unit->makeHashCall($lbEnd, "OP_INSERT");
    }

    {
        # drain the pipeline before shutting down
        my $ad = Triceps::AutoDrain::makeShared($owner);
        $owner->app()->shutdown();
    }
}

Each input row is sent through in a separate transaction, or in another word, a separate tray. The _BEGIN_ label carries the sequence number of the tray. The trays can as well be sent on with flushWriters() or flushWriter(), but I wanted to show that you can also flush it by calling the _END_ label.

sub workerT # (@opts)
{
    my $opts = {};
    &Triceps::Opt::parse("workerT", $opts, {@Triceps::Triead::opts,
        from => [ undef, \&Triceps::Opt::ck_mandatory ], # src nexus
        to => [ undef, \&Triceps::Opt::ck_mandatory ], # dest nexus
        delay => [ 0, undef ], # processing delay
        workers => [ undef, \&Triceps::Opt::ck_mandatory ], # how many workers
        identity => [ undef, \&Triceps::Opt::ck_mandatory ], # which one is us
    }, @_);
    undef @_; # avoids a leak in threads module
    my $owner = $opts->{owner};
    my $unit = $owner->unit();
    my $delay = $opts->{delay};
    my $workers = $opts->{workers};
    my $identity = $opts->{identity};

    my $faIn = $owner->importNexus(
        from => $opts->{from},
        import => "reader",
    );

    my $faRes = $owner->importNexus(
        from => $opts->{to},
        import => "writer",
    );

    my $lbInRate = $faIn->getLabel("rate");
    my $lbResult = $faRes->getLabel("result");
    my $lbResBegin = $faRes->getLabel("_BEGIN_");
    my $lbResEnd = $faRes->getLabel("_END_");

The worker thread starts with the pretty usual boilerplate.

    my $seq; # sequence from the frame labels
    my $compute; # the computation is to be done by this label
    $faIn->getLabel("_BEGIN_")->makeChained("lbInBegin", undef, sub {
        $seq = $_[1]->getRow()->get("seq");
    });

The processing of each transaction starts by remembering its sequence number from the _BEGIN_ label. It doesn't send a _BEGIN_ to the output yet because all the threads get the input, to be able to update its table, but then only one thread produces the output. And the thread doesn't know whether it will be the one producing the output until it knows, what is the primary key in the data. So it can start sending the output only after it had seen the data. This whole scheme works because there is exactly one data row per each transaction. A more general approach might be to have the reader thread decide, which worker will produce the result and put this information (as either a copy of the primary key or the computed thread id) into the _BEGIN_ rowop.

    # the table gets updated for every incoming rate
    $lbInRate->makeChained("lbIn", undef, sub {
        my $ccy1 = $_[1]->getRow()->get("ccy1");
        # decide, whether this thread is to perform the join
        $compute = ((ord(substr($ccy1, 0, 1)) - ord('A')) % $workers == $identity);

        # this relies on every Xtray containing only one rowop,
        # otherwise one Xtray will be split into multiple
        if ($compute) {
            $unit->makeHashCall($lbResBegin, "OP_INSERT", seq => $seq, triead => $identity);
            select(undef, undef, undef, $delay) if ($delay);
        }

        # even with $compute is set, this might produce some output or not,
        # but the frame still goes out every time $compute is set, because
        # _BEGIN_ forces it
        $unit->call($lbRateInput->adopt($_[1]));
    });

There the decision is made of whether this join is to be computed for this thread, remembered in the flag $compute, and used to generate the _BEGIN_ rowop for the output. Then the table gets updated in any case ($lbRateInput is the table's input label). I've skipped over the table creation, it's the same as in the self-join example, and you can always find the full tetx of the example in xForkJoinMt.t.

    $tRate->getOutputLabel()->makeChained("lbCompute", undef, sub {
        return if (!$compute); # not this thread's problem
        ...
                $unit->call($result);
        }
    });
    ##################################################

    $owner->readyReady();

    $owner->mainLoop();
}

Here again I've skipped over the way the result is computed. The important part is that if the $compute flag is not set, the whole self-joining computation is not performed. The _END_ label is not touched, the flushing of transactions is taken care of by the mainLoop(). Note that the _BEGIN_ label is always sent if the data is designated to this thread, even if no output as such is produced. This is done because the collator needs to get an uninterrupted sequence of transactions. Otherwise it would not be able to say if some transaction has been dropped or is only delayed.

 sub collatorT # (@opts)
{
    my $opts = {};
    &Triceps::Opt::parse("collatorT", $opts, {@Triceps::Triead::opts,
        from => [ undef, \&Triceps::Opt::ck_mandatory ], # src nexus
        to => [ undef, \&Triceps::Opt::ck_mandatory ], # dest nexus
    }, @_);
    undef @_; # avoids a leak in threads module
    my $owner = $opts->{owner};
    my $unit = $owner->unit();

    my $faRes = $owner->importNexus(
        from => $opts->{from},
        import => "reader",
    );

    my $faPrint = $owner->importNexus(
        from => $opts->{to},
        import => "writer",
    );

    my $lbResult = $faRes->getLabel("result");
    my $lbResBegin = $faRes->getLabel("_BEGIN_");
    my $lbResEnd = $faRes->getLabel("_END_");

    my $lbPrintRaw = $faPrint->getLabel("raw");
    my $lbPrintCooked = $faPrint->getLabel("cooked");

    my $seq = 1; # next expected sequence
    my @trays; # trays held for reordering: $trays[0] is the slot for sequence $seq
        # (only of course that slot will be always empty but the following ones may
        # contain the trays that arrived out of order)
    my $curseq; # the sequence of the current arriving tray

The collator thread starts very much as usual. It has its expectation of the next tray in order, which gets set correctly. The trays that arrive out of order will be buffered in the array @trays. Well, more exactly, for simplicity, all the trays get buffered there and then sent on if their turn has come.But it's possible to make an optimized version that would let the data flow through immediately if it's arriving in order.

    # The processing of data after it has been "cooked", i.e. reordered.
    my $bindRes = Triceps::FnBinding->new(
        name => "bindRes",
        on => $faRes->getFnReturn(),
        unit => $unit,
        withTray => 1,
        labels => [
            "_BEGIN_" => sub {
                $unit->makeHashCall($lbPrintCooked, "OP_INSERT", text => $_[1]->printP("BEGIN"));
            },
            "result" => sub {
                $unit->makeHashCall($lbPrintCooked, "OP_INSERT", text => $_[1]->printP("result"));
            }
        ],
    );
    $faRes->getFnReturn()->push($bindRes); # will stay permanently

The data gets collected into trays through a binding that gets permanently pushed onto the facet's FnReturn. Then when the tray's order comes, it wil lbe simply called and will produce the print calls for the cooked data order.

    # manipulation of the reordering,
    # and along the way reporting of the raw sequence
    $lbResBegin->makeChained("lbBegin", undef, sub {
        $unit->makeHashCall($lbPrintRaw, "OP_INSERT", text => $_[1]->printP("BEGIN"));
        $curseq = $_[1]->getRow()->get("seq");
    });
    $lbResult->makeChained("lbResult", undef, sub {
        $unit->makeHashCall($lbPrintRaw, "OP_INSERT", text => $_[1]->printP("result"));
    });
    $lbResEnd->makeChained("lbEnd", undef, sub {
        my $tray = $bindRes->swapTray();
        if ($curseq == $seq) {
            $unit->call($tray);
            shift @trays;
            $seq++;
            while ($#trays >= 0 && defined($trays[0])) {
                # flush the trays that arrived misordered
                $unit->call(shift @trays);
                $seq++;
            }
        } elsif ($curseq > $seq) {
            $trays[$curseq-$seq] = $tray; # remember for the future
        } else {
            # should never happen but just in case
            $unit->call($tray);
        }
    });

    $owner->readyReady();

    $owner->mainLoop();
};

The input rowops get not only collected in the binding's tray but also chained directly to the labels that print the raw order of arrival. The handling of _BEGIN_ also remembers its sequence number.

The handler of _END_ (which rowops get produced implicitly at the end of transaction) then does the heavy lifting. It looks at the sequence number remembered from _BEGIN_ and makes the decision. If the received sequence is the next expected one, the data collected in the tray gets sent on immediately, and then the contents of the @trays array gets sent on until it hits a blank spot of missing data. Or if the received sequence leaves a gap, the tray is placed into an appropriate spot in @trays for later processing.

This whole logic can be encapsulated in a class but I haven't decided yet on the best way to do it. Maybe somewhere in the future.

Thursday, June 6, 2013

reordering the data from multiple threads, part 2

Before going to the example itself, let's talk more about the general issues of the data partitioning.

If the data processing is stateless, it's easy: just partition by the primary key, and each thread can happily work on its own. If the data depends on the previous data with the same primary key, the partitioning is still easy: each thread keeps the state for its part of the keys and updates it with the new data.

But what if you need to join two tables with independent primary keys, where the matches of the keys between them are fairly arbitrary? Then you can partition by one table but you have to give a copy of the second table to each thread. Typically, if one table is larger than the other, you would partition by the key of the big table, and copy the small table everywhere. Since the data rows are referenced in Triceps, the actual data of the smaller table won't be copied, it would be just referenced from multiple threads, but each copy will still have the overhead of its own index.

With some luck, and having enough CPUs, you might be able to save a little overhead by doing a matrix: if you have one table partitioned into the parts A, B and C, and the other table into parts 1, 2 and 3, you can then do a matrix-combination into 9 threads processing the combinations A1, A2, A3, B1, B2, B3, C1, C2, C3. If both tables are of about the same size, this would create a total of 18 indexes, each keeping 1/3 of one original table, so the total size of indexes will b 6 times the size of one original table (or 3 times the combined sizes of both tables). On the other hand, if you were to copy the first table to each thread and split the second table into 9 parts, creating the same 9 threads, the total size of indexes will be 9 times the first table and 1 times the second table, resulting in the 10 times the size of an original table (or 5 times the combined sizes of both tables). There is a win to this, but the catch is that the results from this kind of 3x3 matrix will really have to be restored to the correct order afterwards.

The reason is that when a row in the first table changes, it might make it join, say, in the thread A2 instead of the thread A1. So the thread A1 would generate a DELETE for the join result, and the thread A2 would generate a following INSERT. With two separate threads, the resulting order will be unpredictable, and the INSERT coming before the DELETE would be bad. The post-reordering is really in order.

By contrast, if you just partition the first table and copy the second everywhere, you get 9 threads A, B, C, D E, F, G, H, I, and the change in a row will still keep it in the same thread, so the updates will come out of that thread strictly sequentially. If you don't care about the order changes between different primary keys, you can get away without the post-reordering. Of course, if a key field might change and you care about it being processed in order, you'd still need the post-reordering.

The example I'm going to show is a somewhat of s strange mix. It's the adaptation of the Forex arbitration example from the section 12.13. Self-join done manually. As you can see from the name of the section, it's doing a self-join, kind of like going through the same table 3 times.

The partitioning in this example works as follows:
  • All the data is sent to all the threads. All the threads keep a full copy of the table and update it according to the input. 
  • But then they compute the join only if the first currency name in the update falls into the thread's partition. 
  • The partitioning is done by the first letter of the symbol, with interleaving: the symbols starting with A are handled by the thread 0, with B by thread 1, and so on until the threads end, and then continuing again with the thread 0. A production-ready implementation would use a hash function instead. But the interleaving approach makes much easier to predict, which symbol goes to which thread for the example.
  • Naturally, all this means that the loop of 3 currencies might get created by a change in one pair and then very quickly disappear by a change to another pair.of currencies. So the post-reordering of the result is important to keep the things consistent.
I've also added a tweak allowing to artificially slow down the thread 0, making the incorrect order to show up reliably, and make the reordering code really work. For example, suppose the following input sent quickly:

OP_INSERT,AAA,BBB,1.30
OP_INSERT,BBB,AAA,0.74
OP_INSERT,AAA,CCC,1.98
OP_INSERT,CCC,AAA,0.49
OP_INSERT,BBB,CCC,1.28
OP_INSERT,CCC,BBB,0.78
OP_DELETE,BBB,AAA,0.74
OP_INSERT,BBB,AAA,0.64

With two threads, and thread 0 working slowly, it would produce the raw result:

BEGIN OP_INSERT seq="2" triead="1"
BEGIN OP_INSERT seq="5" triead="1"
BEGIN OP_INSERT seq="7" triead="1"
result OP_DELETE ccy1="AAA" ccy2="CCC" ccy3="BBB" rate1="1.98" rate2="0.78" rate3="0.74" looprate="1.142856"
BEGIN OP_INSERT seq="8" triead="1"
BEGIN OP_INSERT seq="1" triead="0"
BEGIN OP_INSERT seq="3" triead="0"
BEGIN OP_INSERT seq="4" triead="0"
BEGIN OP_INSERT seq="6" triead="0"
result OP_INSERT ccy1="AAA" ccy2="CCC" ccy3="BBB" rate1="1.98" rate2="0.78" rate3="0.74" looprate="1.142856"

Here the BEGIN lines are generated by the code and show the sequence number of the input row and the id of the thread that did the join. The result lines show the arbitration opportunities produced by the join. Obviously, not every update produces a new result, most of them don't. But the INSERT and DELETE in the result come in the wrong order: the update 7 had overtaken the update 6.

The post-reordering comes to the resque and restores the order:

BEGIN OP_INSERT seq="1" triead="0"
BEGIN OP_INSERT seq="2" triead="1"
BEGIN OP_INSERT seq="3" triead="0"
BEGIN OP_INSERT seq="4" triead="0"
BEGIN OP_INSERT seq="5" triead="1"
BEGIN OP_INSERT seq="6" triead="0"
result OP_INSERT ccy1="AAA" ccy2="CCC" ccy3="BBB" rate1="1.98" rate2="0.78" rate3="0.74" looprate="1.142856"
BEGIN OP_INSERT seq="7" triead="1"
result OP_DELETE ccy1="AAA" ccy2="CCC" ccy3="BBB" rate1="1.98" rate2="0.78" rate3="0.74" looprate="1.142856"
BEGIN OP_INSERT seq="8" triead="1"

As you can see, now the sequence numbers go in the sequential order.

reordering the data from multiple threads, part 1

The pipelined examples shown before had very conveniently preserved the order of the data while spreading the computational load among multiple threads. But it forced the computation to pass sequentially through every thread, increasing the latency and adding overhead. The other frequently used option is to farm out the work to a number of parallel threads and then collect the results back from them, as shown in the Fig. 1. This topology is colloquially known as "diamond" or "fork-join" (having nothing to do with the SQL joins, just that the arrows first fork from one box to multiple and then join back to one box).

Fig. 1. Diamond topology.

There are multiple way to decide, which thread gets which unit of data to process. One possibility that provides the natural load balancing is to keep a common queue of work items, and have the worker threads (B and C in Fig. 1) read the next work item when they become free after processing the last item. But this way has an important limitation: there is no way to tell in advance, which work item will go to which thread, so there is no way for the worker threads to keep any state. All the state would have to be encapsulated into the work item (that would be a tray in the Triceps terms). And at the moment Triceps provides no way to maintain such a shared queue.

The other way is to partition the data between the worker threads based on the primary key. Usually it's done by using a hash of the key, which would distribute the data randomly and hopefully evenly. Then a worker thread can keep the state for its subset of keys, forming a partition (also known as "shard") and process the sequential updates for this partition. The way to send a rowop only to the thread B or only to the thread C would be by having a separate designated nexus for each worker thread, and the thread A making the decision based on the hash of the primary key in the data. The processed data from all the worker threads can then be sent to a single nexus feeding the thread D.

But either way the data will arrive at D in an unpredictable order. The balance of load between the worker threads is not perfect, and there is no way to predict, how long will each tray take to process. A tray following one path may overtake a tray following another path.

If the order of the data is important, D must collate the data back into the original order before processing it further. Obviously, for that it must know what the original order was, so A must tag the data with the sequential identifiers. Since the data rows themselves may get multiplied, the tagging is best done at the tray level (what happens if the trays split in the worker threads is a separate story for the future, for now the solution is simply "keep the tray together").

When the data goes through a nexus, Triceps always keeps a tray as in indivisible bundle. It always gets read from the reading facet together, without being interspersed with the data from the other trays. As the data is sent into a writer facet, it's collected in a tray, and sent on only when the facet gets flushed, with the TrieadOwner::flushWriters() or Facet::flushWriter().

There also are two special labels defined on every nexus, that tell the tray boundaries to the reading thread:

  • _BEGIN_ is called at the start of the tray.
  • _END_ is called at the end of the tray.

These labels can be defined explicitly, or otherwise they become defined implicitly anyway. If they get defined implicitly, they get the empty row type (one with no fields). If you define them explicitly, you can use any row type you please, and this is a good place for the tray sequence id.

And in a writer facet you can send data to these labels. When you want to put a sequence id on a tray, you define the _BEGIN_ label with that sequence id field, and then call the _BEGIN_ label with the appropriate id values. Even if you don't call the _BEGIN_ and _END_ labels, they do get called (not quite called but placed on the tray) automatically anyway, with the opcode of OP_INSERT and all the fields NULL. The direct calling of these labels will also cause the facet to be flushed: _BEGIN_ flushes any data previously collected on the tray and then adds itself; _END_ adds itself and then flushes the tray.

The exact rules of how the _BEGIN_ and _END_ get called are actually somewhat complicated, having to deal with the optimizations for the case when nobody is interested in them, but they do what makes sense intuitively.

The case when these labels get a call with OP_INSERT and no data gets optimized out by not placing it into the actual Xtray, even if it was called explicitly. Then in the reader facet these implicit rowops are re-created but only if there is a chaining for their labels from the facet's FnReturn or if they are defined in the currently pushed FnBinding. So if you do a trace on the reading thread's unit, you will see these implicit rowops to be called only if they are actually used.

Monday, May 27, 2013

how to export a table, or the guts of TQL join exposed

Now to the point of why the multithreaded TQl example got written: the export of a table between two threads.

It all starts in the Tql initialization method. In the multithreaded mode it builds the nexuses for communication. I'll skip the input nexus and show the building of only the output and request-dump nexuses:

    # row type for dump requests and responses
    my $rtRequest = Triceps::RowType->new(
      client => "string", #requesting client
      id => "string", # request id
      name => "string", # the table name, for convenience of requestor
      cmd => "string", # for convenience of requestor, the command that it is executing
    ) or confess "$!";

The request row type is used by the client writer thread to request the table dumps from the core logic, and to get back the notifications about the dumps.

    # build the output side
    for (my $i = 0; $i <= $#{$self->{tables}}; $i++) {
      my $name = $self->{tableNames}[$i];
      my $table = $self->{tables}[$i];

      push @tabtypes, $name, $table->getType()->copyFundamental();
      push @labels, "t.out." . $name, $table->getOutputLabel();
      push @labels, "t.dump." . $name, $table->getDumpLabel();
    }
    push @labels, "control", $rtControl; # pass-through from in to out
    push @labels, "beginDump", $rtRequest; # framing for the table dumps
    push @labels, "endDump", $rtRequest;

    $self->{faOut} = $owner->makeNexus(
      name => $self->{nxprefix} . "out",
      labels => [ @labels ],
      tableTypes => [ @tabtypes ],
      import => "writer",
    );
    $self->{beginDump} = $self->{faOut}->getLabel("beginDump");
    $self->{endDump} = $self->{faOut}->getLabel("endDump");

On the output side each table is represented by 3 elements:
  • its fundamental table type (stripped down to the primary key);
  • its output label for normal updates;
  • its dump label for the responses to the dump requests.
There also are the "beginDump" and "endDump" labels that frame each response to a dump request.

The row type $rtControl and label "control" is used to pass the commands from the client reader to client writer, but it's exact contents is not important here.

The dump request nexus is built in a similar way:

    # build the dump requests, will be coming from below
    undef @labels;
    for (my $i = 0; $i <= $#{$self->{tables}}; $i++) {
      my $name = $self->{tableNames}[$i];
      my $table = $self->{tables}[$i];

      push @labels, "t.rqdump." . $name, $rtRequest;
    }
    $self->{faRqDump} = $owner->makeNexus(
      name => $self->{nxprefix} . "rqdump",
      labels => [ @labels ],
      reverse => 1, # avoids making a loop, and gives priority
      import => "reader",
    );
    # tie together the labels
    for (my $i = 0; $i <= $#{$self->{tables}}; $i++) {
      my $name = $self->{tableNames}[$i];
      my $table = $self->{tables}[$i];

      $self->{faRqDump}->getLabel("t.rqdump." . $name)->makeChained(
        $self->{nxprefix} . "rqdump." . $name, undef,
        \&_dumpTable, $self, $table
      );
    }

The dumps are executed in the function _dumpTable:

sub _dumpTable # ($label, $rowop, $self, $table)
{
  my ($label, $rop, $self, $table) = @_;
  my $unit = $label->getUnit();
  # pass through the client id to the dump
  $unit->call($self->{beginDump}->adopt($rop));
  $table->dumpAll();
  $unit->call($self->{endDump}->adopt($rop));
  $self->{faOut}->flushWriter();
}

The data gets framed around by the "beginDump" and "endDump" labels getting the copies of the original request. This helps the client writer thread keep track of its current spot. The flushing of the writer is not strictly needed. Just in case if multiple dump requests are received in a single tray, it breaks up the responses into a separate tray for each dump, keeping the size of the trays lower. Not that this situation could actually happen yet.

This part taken care of, let's jump around and see how the client writer thread processes a "querysub" request:

      } elsif ($cmd eq "querysub") {
        if ($id eq "" || exists $queries{$id}) {
          printOrShut($app, $fragment, $sock,
            "error,$id,Duplicate id '$id': query ids must be unique,bad_id,$id\n");
          next;
        }
        my $ctx = compileQuery(
          qid => $id,
          qname => $args[0],
          text => $args[1],
          subError => sub {
            chomp $_[2];
            $_[2] =~ s/\n/\\n/g; # no real newlines in the output
            $_[2] =~ s/,/;/g; # no confusing commas in the output
            printOrShut($app, $fragment, $sock, "error,", join(',', @_), "\n");
          },
          faOut => $faOut,
          faRqDump => $faRqDump,
          subPrint => sub {
            printOrShut($app, $fragment, $sock, @_);
          },
        );
        if ($ctx) { # otherwise the error is already reported
          $queries{$id} = $ctx;
          &$runNextRequest($ctx);
        }
      }

The query id is used to keep track of the outstanding queries, so the code makes sure that it's unique, and you can see an example of the query response. The bulk of the work is done in the method compileQuery(). The arguments to it give the details of the query and also provide the closures for the functionality that differs between the single-threaded and multi-threaded versions. The option "subError" is used to send the errors to the client, and "subPrint" is used to send the output to the client, it gets used for building the labels in the "print" command of the query.

compileQuery() returns the query context, which contains a compiled sub-model that executes the query and a set of requests that tell the writer how to connect the query to the incoming data. Or on error it reports the error using subError and returns an undef. If the compilation succeeded, the writer remembers the query and starts the asynchronous execution of the requests. More about the requests later, now let's look at the query compilation and context.

The context is created in compileQuery() thusly:

  my $ctx = {};
  $ctx->{qid} = $opts->{qid};
  $ctx->{qname} = $opts->{qname};

  # .. skipped the parts related to single-threadde TQL

  $ctx->{faOut} = $opts->{faOut};
  $ctx->{faRqDump} = $opts->{faRqDump};
  $ctx->{subPrint} = $opts->{subPrint};
  $ctx->{requests} = []; # dump and subscribe requests that will run the pipeline
  $ctx->{copyTables} = []; # the tables created in this query
    # (have to keep references to the tables or they will disappear)

  # The query will be built in a separate unit
  $ctx->{u} = Triceps::Unit->new($opts->{nxprefix} . "${q}.unit");
  $ctx->{prev} = undef; # will contain the output of the previous command in the pipeline
  $ctx->{id} = 0; # a unique id for auto-generated objects
  # deletion of the context will cause the unit in it to clean
  $ctx->{cleaner} = $ctx->{u}->makeClearingTrigger();

It has some parts common and some parts differing for the single- and multi-threaded varieties, here I've skipped over the single-threaded parts.

One element that is left undefined here is $ctx->{prev}. It's the label created as the output of the previous stage of the query pipeline. As each command in the pipeline builds its piece of processing, it chains its logic from $ctx->{prev} and leaves its result label in $ctx->{next}. Then compileQuery() moves "next" to "prev" and calls the compilation of the next command in the pipeline. The only command that accepts an undefined "prev" (and it must be undefined for it) is "read", that reads the table at the start of the pipeline.

$ctx->{copyTables} also has an important point behind it. When you create a label, it's OK to discard the original reference after you chain the label into the logic, that chaining will keep a reference and the label will stay alive. Not so with a table: if you create a table, chain its input label and then drop the reference to a table, the table will be discarded. Then when the input label will try to send any data to the table, it will die (and unless very recently it outright crashed). So it's important to keep the table reference alive, and that's what this array is for.

$ctx->{id} is used to generate the unique names for the objects build in a query.

Each query is built in its own unit. This is convenient, after the query is done or the compilation encounters an error, the unit with its whole contents gets easily discarded. The clearing trigger placed in the context makes sure that the unit gets properly cleared and discarded.

Next goes the compilation of the join query command, I'll go through it in chunks.

sub _tqlJoin # ($ctx, @args)
{
  my $ctx = shift;
  die "The join command may not be used at the start of a pipeline.\n"
    unless (defined($ctx->{prev}));
  my $opts = {};
  &Triceps::Opt::parse("join", $opts, {
    table => [ undef, \&Triceps::Opt::ck_mandatory ],
    rightIdxPath => [ undef, undef ],
    by => [ undef, undef ],
    byLeft => [ undef, undef ],
    leftFields => [ undef, undef ],
    rightFields => [ undef, undef ],
    type => [ "inner", undef ],
  }, @_);

  my $tabname = bunquote($opts->{table});
  my $unit = $ctx->{u};
  my $table;

  &Triceps::Opt::checkMutuallyExclusive("join", 1, "by", $opts->{by}, "byLeft", $opts->{byLeft});
  my $by = split_braced_final($opts->{by});
  my $byLeft = split_braced_final($opts->{byLeft});

  my $rightIdxPath;
  if (defined $opts->{rightIdxPath}) { # propagate the undef
    $rightIdxPath = split_braced_final($opts->{rightIdxPath});
  }

It starts by parsing the options and converting them to the internal representation, removing the braced quotes.

  if ($ctx->{faOut}) {
    # Potentially, the tables might be reused between multiple joins
    # in the query if the required keys match. But for now keep things
    # simpler by creating a new table from scratch each time.

    my $tt = eval {
      # copy to avoid adding an index to the original type
      $ctx->{faOut}->impTableType($tabname)->copy();
    };
    die ("Join found no such table '$tabname'\n") unless ($tt);

    if (!defined $rightIdxPath) {
      # determine or add the index automatically
      my @workby;
      if (defined $byLeft) { # need to translate
        my @leftfld = $ctx->{prev}->getRowType()->getFieldNames();
        @workby = &Triceps::Fields::filterToPairs("Join option 'byLeft'",
          \@leftfld, [ @$byLeft, "!.*" ]);
      } else {
        @workby = @$by;
      }

      my @idxkeys; # extract the keys for the right side table
      for (my $i = 1; $i <= $#workby; $i+= 2) {
        push @idxkeys, $workby[$i];
      }
      $rightIdxPath = [ $tt->findOrAddIndex(@idxkeys) ];
    }

    # build the table from the type
    $tt->initialize() or confess "$!";
    $table = $ctx->{u}->makeTable($tt, "EM_CALL", "tab" . $ctx->{id} . $tabname);
    push @{$ctx->{copyTables}}, $table;

    # build the request that fills the table with data and then
    # keeps it up to date;
    # the table has to be filled before the query's main flow starts,
    # so put the request at the front
    &_makeQdumpsub($ctx, $tabname, 1, $table->getInputLabel());
  } else {
    die ("Join found no such table '$tabname'\n")
      unless (exists $ctx->{tables}{$tabname});
    $table = $ctx->{tables}{$tabname};
  }

The presence of $ctx->{faOut} means that the query is compiled in the multithreaded context.

The command handles may freely die, and the error messages will be caught by compileQuery() and nicely (at least, sort-of) reported back to the user.

If an explicit rightIdxPath was not requested, it gets found or added automatically. On the way there the index fields need to be determined. Which can be specified as either explicit pairs in the option "by" or the in the name translation syntax in the option "byLeft". If we've got a "byLeft", first it gets translated to the same format as "by", and then the right-side fields are extracted from the format of "by". After that $tt->findOrAddIndex() takes care of all the heavy lifting. It either finds a matching index type in the table type or creates a new one from the specified fields, and either way returns the index path. (An invalid field will make it confess).

It looks a bit anti-climactic, but the three lines of exporting with copyFundamental(), impTableType() and findOrAddIndex() is what this large example is all about.

You might wonder, how come the explicit rightIdxPath is not checked in any way? It will be checked later by LookupJoin(), so not much point in doing the check twice.

After that the table is created in a straightforward way, and rememebered in copyTables. And the requests list gets prepended with a request to dump and subscribe to this table. I'll get back to that, for now let's finish up with _tqlJoin().

  my $isLeft = 0; # default for inner join
  my $type = $opts->{type};
  if ($type eq "inner") {
    # already default
  } elsif ($type eq "left") {
    $isLeft = 1;
  } else {
    die "Unsupported value '$type' of option 'type'.\n"
  }

  my $leftFields = split_braced_final($opts->{leftFields});
  my $rightFields = split_braced_final($opts->{rightFields});

  my $join = Triceps::LookupJoin->new(
    name => "join" . $ctx->{id},
    unit => $unit,
    leftFromLabel => $ctx->{prev},
    rightTable => $table,
    rightIdxPath => $rightIdxPath,
    leftFields => $leftFields,
    rightFields => $rightFields,
    by => $by,
    byLeft => $byLeft,
    isLeft => $isLeft,
    fieldsDropRightKey => 1,
  );

  $ctx->{next} = $join->getOutputLabel();
}

The rest of the options get parsed, and then all the collected data gets forwarded to the LookupJoin constructor. Finally the "next" label is assigned from the join's result.

Now jumping to the _makeQdumpsub(). It's used by both the "read" and "join" query commands to initiate the joins and subscriptions.

sub _makeQdumpsub # ($ctx, $tabname, [$front, $lbNext])
{
  my $ctx = shift;
  my $tabname = shift;
  my $front = shift;
  my $lbNext = shift;

  my $unit = $ctx->{u};

  my $lbrq = eval {
    $ctx->{faRqDump}->getLabel("t.rqdump.$tabname");
  };
  my $lbsrc = eval {
    $ctx->{faOut}->getLabel("t.out.$tabname");
  };
  die ("Found no such table '$tabname'\n") unless ($lbrq && $lbsrc);

  # compute the binding for the data dumps, that would be a cross-unit
  # binding to the original faOut but it's OK
  my $fretOut = $ctx->{faOut}->getFnReturn();
  my $dumpname = "t.dump.$tabname";
  # the dump and following subscription data will merge on this label
  if (!defined $lbNext) {
    $lbNext = $unit->makeDummyLabel(
      $lbsrc->getRowType(), "lb" . $ctx->{id} . "out_$tabname");
  }

  my $bindDump = Triceps::FnBinding->new(
    on => $fretOut,
    name => "bind" . $ctx->{id} . "dump",
    labels => [ $dumpname => $lbNext ],
  );

First it finds all the proper labels. The label $lbNext will accept the merged dump contents and the following subscription, and it might be either auto-generated or received as an argument. A join pass it as an argument, $table->getInputLabel(), so all the data goes to the copied table.

The binding is used to receive the dump. It's a bit of an optimization. Remember, the dump labels are shared between all the clients. Whenever any client requests a dump, all the clients will get the response. A client finds that the incoming dump is destined for it by processing the "beginDump" label. If it contains this client's name, the dump is destined here, and the client reacts by pushing the appropriate binding onto the facet's FnReturn, and the data flows. The matching "endDump" label then pops the binding and the data stops flowing. The binding allows to avoid checking every rowop for whethere it's supposed to be accepted and if yes then where exactly (rememeber, the same table may be dumped independently multiple times by multiple queries). Just check once at the start of the bundle and then let the data flow in bulk.

  # qdumpsub:
  #   * label where to send the dump request to
  #   * source output label, from which a subscription will be set up
  #     at the end of the dump
  #   * target label in the query that will be tied to the source label
  #   * binding to be used during the dump, which also directs the data
  #     to the same target label
  my $request = [ "qdumpsub", $lbrq, $lbsrc, $lbNext, $bindDump ];
  if ($front) {
    unshift @{$ctx->{requests}}, $request;
  } else {
    push @{$ctx->{requests}}, $request;
  }
  return $lbNext;
}

Finally, the created bits and pieces get packaged into a request and added to the list of requests in the query context. The last tricky part is that the request can be added at the back or the front of the list. The "normal" way is to add to the back, however the dimension tables for the joins have to be populated before the main data flow of the query starts. So for them the argument $front is set to 1, and they get added in the front.

Now jumping back to the writer thread logic, after it called compileQuery, it starts the query execution by calling &$runNextRequest(). Which is a closure function defined inside the client writer function, and knows how to process the "qdumpsub"s we've just seen created.

  my $runNextRequest = sub { # ($ctx)
    my $ctx = shift;
    my $requests = $ctx->{requests};
    undef $ctx->{curRequest}; # clear the info of the previous request
    my $r = shift @$requests;
    if (!defined $r) {
      # all done, now just need to pump the data through
      printOrShut($app, $fragment, $sock,
        "querysub,$ctx->{qid},$ctx->{qname}\n");
      return;
    }

First it clears the information about the previous request, if any. This function will be called after each request, to send on the next one, so on all its calls except the first one for a query it will have something to clear.

Then it checks if all the requests are already done. If so, it sends the query confirmation to the client and returns. The subscription part of the query will continue running on its own.

    $ctx->{curRequest} = $r; # remember until completed
    my $cmd = $$r[0];
    if ($cmd eq "qdumpsub") {
      # qdumpsub:
      #   * label where to send the dump request to
      #   * source output label, from which a subscription will be set up
      #     at the end of the dump
      #   * target label in the query that will be tied to the source label
      #   * binding to be used during the dump, which also directs the data
      #     to the same target label
      my $lbrq = $$r[1];
      $unit->makeHashCall($lbrq, "OP_INSERT",
        client => $fragment, id => $ctx->{qid}, name => $ctx->{qname}, cmd => $cmd);

The "qdumpsub" gets forwarded to the core logic. The responses will be processed in the handlers or "beginDump" and "endDump". One of the great pains of this "actor" architecture is that the linear logic gets broken up into many disjointed pieces in the separate handlers.

    } else {
      printOrShut($app, $fragment, $sock,
        "error,", $ctx->{qid}, ",Internal error: unknown request '$cmd',internal,", $cmd, "\n");
      $ctx->{requests} = [];
      undef $ctx->{curRequest};
      # and this will leave the query partially initialized,
      # but it should never happen
      return;
    }
  };

And a catch-all just in case if the query compiler ever decides to produce an invalid request.

Next goes the handling of the dump labels (again, this gets set up during the build of the client reader threads, and then the nature is left to run its course, reacting to the rowops as they come in).

  $faOut->getLabel("beginDump")->makeChained("lbBeginDump", undef, sub {
    my $row = $_[1]->getRow();
    my ($client, $id, $name, $cmd) = $row->toArray();
    return unless ($client eq $fragment);
    if ($cmd eq "qdumpsub") {
      return unless(exists $queries{$id});
      my $ctx = $queries{$id};
      $fretOut->push($ctx->{curRequest}[4]); # the binding for the dump
    } else {
      # .. skipped the handling of dump/dumpsub
    }
  });

All it does is checks if this is the destination client, and if there is an active request with this id, then it pushes the appropriate binding.

  $faOut->getLabel("endDump")->makeChained("lbEndDump", undef, sub {
    my $row = $_[1]->getRow();
    my ($client, $id, $name, $cmd) = $row->toArray();
    return unless ($client eq $fragment);

    if ($cmd eq "qdumpsub") {
      return unless(exists $queries{$id});
      my $ctx = $queries{$id};
      $fretOut->pop($ctx->{curRequest}[4]); # the binding for the dump
      # and chain together all the following updates
      $ctx->{curRequest}[2]->makeChained(
        "qsub$id." . $ctx->{curRequest}[3]->getName(), undef,
        sub {
          # a cross-unit call
          $_[2]->call($_[3]->adopt($_[1]));
        },
        $ctx->{u}, $ctx->{curRequest}[3]
      );

      &$runNextRequest($ctx);
    } else {
      # .. skipped the handling of dump/dumpsub
    }
  });

Same things as the "beginDump", checks if this is the right client, and if it has an outstanding dump request, then pops the binding. After the dump is completed, the subscription has to be set up, so it sets up a label that forwards the normal output of this table to the label specified in the request. Since each query is defined in its own unit, this forwarding is done as a cross-unit call.

And then the next request of this query can be started.

By the way, the cross-unit adopt() didn't work in Perl until I wrote this example. There was a check against it (the C++ API never bothered with this check). But the adoption between the units has turned out to be quite convenient, so I've removed that check.

And that's it. Long and winding but finally completed. It's kind of about only three lines of code, but I think the rest of it also shows the useful techniques of the work with threads.