Tuesday, June 25, 2013

TrieadOwner reference, Perl, part 2

I've described it before but forgot to mention in the part 1 of the reference, the mark* methods bring the Triead to the target state through all the intermediate states. So if a Triead was not constructed, markReady() will mark it first constructed and then ready.

The same applies to markDead() but with an even more interesting twist. Suppose there is an application with an incorrect topology, and all the Trieads in it but one are ready. That last Triead then experiences an error, and proceeds directly to call markDead() and then exit. This markDead() will involve an intermediate step marking the Triead as ready. Since it's the last Triead to be ready, it will trigger the topology check, and since the topology is incorrect, it will fail. If it happened in markReady(), the method would confess. But in markDead() confessing doesn't make a whole lot of sense: after all, the thread is about to exit anyway. So markDead() will catch all these confessions and throw them away, it will never fail. However the failed check will still abort the App, and the rest of the threads will wake up and fail as usual.

The other thing about markDead() is that it clears and unreferences all the TrieadOwner's registered units, not waiting for the TrieadOwner object to be destroyed. This unravels any potential cyclic references where the code in a label might be referring back to the TrieadOwner.

And now continuing with more methods.

$facet = $to-> makeNexus(@options);

Create, export and (optionally) import a nexus. The result is an imported Facet of this Nexus, except when the options specify the no-import mode (then the result will be undef). Confesses on errors.

The options are:

name => $name
Name of the nexus, it will be used both as the export name and the local imported "as-name" of the facet.

labels => [
  name => $rowType,
  name => $fromLabel,
]
Defines the labels similarly to FnReturn in a referenced array. The array contains the pairs of (label_name, label_definition). The definition may be either a RowType, and then a label of this row type will be created, or a Label, and then a label of the same row type will be created and chained from that original label. The created label objects can be later found from Facets, and used like normal labels, by chaining them or sending rowops to them (but chaining from them is probably not the best idea, although it works anyway).

Optional, or may be an empty array; the implicit labels _BEGIN_ and _END_ will allways be added automatically if not explicitly defined.

The labels are used to construct an implicit FnReturn in the current Triead's main unit, and this is the FnReturn that will be visible in the Facet that gets imported back. If the import mode is "none", the FnReturn will still be  constructed and then abandoned (and freed by the reference count going to 0, as usual). The labels used as $fromLabel must always belong to the Triead's main unit.

rowTypes => [
  name => $rowType,
]
Defines the row types exported in this Nexus as a referenced array of name-value pairs. The types imported back into this Triead's facet will be references to the exact same type objects. Optional, or may be empty.

tableTypes => [
  name => $tableType,
]
Defines the table types exported in this Nexus as a referenced array of name-value pairs. The types imported back into this Triead's facet will be references to the exact same type objects. Optional, or may be empty.

reverse => 0/1
Flag: this Nexus goes in the reverse direction. The reverse nexuses are used to break up the topological loops, to prevent the deadlocks on the queueing. They have no limit on the queue size, and the data is read from them at a higher priority than from the direct nexuses. Default: 0.

chainFront => 0/1
Flag: when the labels are specified as $fromLabel, chain them at the front of the original labels. Default: 1. The default is this way because chaining at the front is what is typically needed. The reasons are described at length in the pipelined example, but the short gist is that you might want to send the rows from both the inputs, intermediate points, and the end of processing into an output nexus. It's most convenient to create the nexus in one go, after the whole thread's computation is defined. But the rowops from the earlier stages of computations have to come to the nexus before the rowops from the later stage. Chaining at the front ensures that each such label will send the rowop into the nexus first, and only then to the next stage of the computation.

queueLimit => $number
Defines the size limit after which the writes to the queue of this Nexus block. In reality because of the double-buffering the queue may contain up to twice that many trays before the future writes block. This option has no effect on the  reverse nexuses. Default: &Facet::DEFAULT_QUEUE_LIMIT, 500 or so.

import => $importType
A string value, essentially an enum, determining how this Nexus gets immediately imported back into this Triead. The supported values are:
  • reader (or anything starting from "read") - import for reading
  • writer (or anything starting from "write") - import for writing
  • none (or anything starting from "no") - do not import
The upper/lowercase doesn't matter. The use of the canonical strings is recommended.

$facet = $to->importNexus(@options);

Import a nexus into this Triead. Returns the imported Facet. The repeated attempts to import the same Nexus will return references to the same Facet object. Confesses on errors. An attempt to import the same nexus for both reading and writing is an error.

The options are:

from => "$t/$n"
Identifier of the nexus to import, consisting of two parts separated by a slash:
  •   thread name
  •   nexus name
The nexus name will also be used as the name of the local facet, unless overridden by the option "as". The reason for slash separator is that normally both the thread name and the nexus name parts may contain further components separated by dots, and a different separator allows to find the boundary between them. If a dot were used, in "a.b.c" it would be impossible to say, does it mean the thread "a" and nexus "b.c" in it, or thread "a.b" and nexus "c"? However "a/b.c" or "a.b/c" have no such ambiguity. Mutually exclusive with options "fromTriead" and "fromNexus".

fromTriead => $t
fromNexus => $n
The alternative way to specify the source thread and nexus as separate options. Both options must be present or absent at the same time. Mutually exclusive with "from".

as => $name
Specifies an override name for the local facet (and thus also to the FnReturn created in the facet). Logically similar to the SQL clause AS. Default is to reuse the nexus name.

import => $importType
A string value, essentially an enum, determining how this Nexus gets imported. The supported values are the same as for makeNexus (except "none", since there is no point in a no-op import):
  • reader (or anything starting from "read") - import for reading
  • writer (or anything starting from "write") - import for writing
The upper/lowercase doesn't matter.

immed => 0/1
Flag: do not wait for the thread that exported the nexus to be fully constructed. Waiting synchronizes with the exporter and prevents a race of an import attempt trying to find a nexus before it is made and failing. However if two threads are waiting for each other, it becomes a deadlock that gets caught and aborts the App. The immediate import allows to avoid such deadlocks for the circular topologies with helper threads.


The helper threads are the "blind alleys" in the topology: the "main thread" outsources some computation to a "helper thread", sending it the arguments, then later receiving the results and continuing with its logic.

For example, consider the following sequence:
  • thread A creates the nexus O;
  • thread A creates the helper thread B and tells it to import the nexus A/O for its input immediately and create the reverse nexus R for result;
  • thread A requests a (normal) import of the nexus B/R and falls asleep because B is not constructed yet;
  •     thread B starts running;
  •     thread B imports the nexus A/O immediately and succeeds;
  •     thread B defines its result nexus R;
  •     thread B marks itself as constructed and ready;
  • thread A wakes up after B is constructed, finds the nexus B/R and completes its import;
  • then thread A can complete its initialization, export other nexuses etc.
Default: 0, except if importing a nexus that has been exported from the same Triead. Importing from the same Triead is not used often, since the export also imports the nexus back right away, and there is rarely any use in importing separately. But it's possible, and importing back from the same Triead is always treated as immediate to avoid deadlocks.

TrieadOwner reference, Perl, part 1

TrieadOwner is the thread's private interface used to control its state and interact with the App (the App uses the thread's identity to detect the deadlocks). Whenever a Triead is constructed, its OS/Perl thread receives the TrieadOwner object for it.

Normally the TrieadOwner object is constructed inside Triead::start() or Triead::startHere() and passed to the thread's main function. The following constructor is used inside start(), and it's pretty much a private method. The only reason to use it would be if you want to do something very unusual, and even then you probably should write a wrapper method for your unusual thing and then call that wrapper method. The constructor constructs both Triead and TrieadOwner as a two sides of the same item, and registers the thread with the App.

$to = Triceps::TrieadOwner::new($tid, $handle, $appOrName, $tname, $fragname);

Here $tid is the Perl thread id where this TrieadOwner belongs (it can be obtained with $thr->tid()). $handle is the Perl thread's low-level handle (as in $thr->handle_()), it's the underlying POSIX thread handle, used to interrupt the thread on shutdown (the long story is that in the Perl threads the kill() call doesn't actually send a signal to another thread but just sends a flag, to interrupt a sleeping system call a real signal has to be delivered through the POSIX API). $handle is a dangerous argument, and passing a wrong value there may cause a crash.

Both $tid and $handle may be undef. If $tid is undef, the thread won't be joined by the harvester and you can either detach it or join it yourself. If either $tid or $handle is undef, the thread won't be interrupted on shutdown.

The signal used for interruption is SIGUSR2.Triceps sets its default handler that does nothing on this signal, but you can define your own handler instead.

$appOrName is the App object or its name that would be automatically looked up (or will confess if not found). $tname is the name of the previously created thread, that must be unique within the App (though it might be declared before). $fragname is the name of the fragment where the thread belongs, use "" for no fragment.

$app = $to->app();

Get the App where this Triead belongs.

$unit = $to->unit();

Whenever a Triead is constructed, a Unit is automatically  created to execute its logic. This call returns that unit. When the Triead is destroyed, the unit will be cleaned and unreferenced.

The unit is named the same as the thread.

$to->addUnit($moreUnit);

It's possible to split the Triead's logic into multiple units, all running in the same Perl thread. This call puts an extra unit under Triead's control, and has two effects: First, the unit will be referenced for the life of the Triead, and cleaned and unreferenced when the Triead is destroyed. Second, when the Triead's main loop runs, after each incoming rowop it will check all the controlled units for any rowops scheduled in them, and will run them until all such rowops are processed.

The names of the units are not checked in any way, it's your responsibility to name them sensibly and probably differently from each other.

The repeated calls with the same unit will have no effect.

$to->forgetUnit($moreUnit);

Pull a unit out of Triead's control. After that the cleaning of the unit becomes your responsibility. The thread's main unit can not be forgotten, the attempts to forget it will be simply ignored. The same goes for the units that aren't under the Triead's control in the first place, these calls are ignored.

@units = $to->listUnits();

Get the list of units under Triead's control. The main unit (the same as returned with $to->unit()) will always be the first in the list. The list contains only the unit references, not the name-value pairs (and you can always get the names from the unit objects themselves).

$triead = $to->get();

Get the public API of this Triead.

$name = $to->getName();

Get this Triead's name.

$frag = $to->fragment();

Get the name of this Triead's fragment ("" if not in a fragment).

$to->markConstructed();

Advance the Triead to the Constructed state. After that point no more nexuses may be exported in the Triead. Any look-ups by other Trieads for the Nexuses of this Triead will proceed at this point, either succeeding or failing (if no requested nexus is exported).

If the Triead is already in the Constructed or  later state, this call has no effect.

$to->markReady();

Advance the Triead to the Ready (fully initialized) state. After that point no more nexuses may be imported into this Triead.

If the App has been already shut down, this Triead will be immediately requested to die.

If this is the last Triead to become ready, this method will invoke the check for the topological correctness of the App. If the check finds an error (a loop of nexuses of the same direction), it will abort the App and confess with a message describing the nature of the error.

If the Triead is already in the Ready or  later state, this call has no effect.



$to->readyReady();

Mark this Triead as Ready and wait for all the App's Trieads to become Ready. There is no method that just waits for readiness because that would be likely causing a deadlock. When the thread waits for readiness, it must be ready itself, so this call does both. All the error checks of markReady() apply.

It is possible and reasonable to call this method repeatedly: more Trieads may be added to the App later, and it's a good idea to call readyReady() again before communicating with these new threads. Otherwise any rowops sent before these threads become ready will never arrive to these threads.

$to->markDead();

Mark this Triead as Dead. A dead thread will not receive any more input, and any its output will be thrown away. This notifies the harvester that it needs to join the Perl thread, so there should not be too much time left between making this call and exiting the Perl thread. The repeated calls have no effect.

Normally the Triead::start() and startHere() call markDead() automatically in their wrapper logic, and there is no need for a manual call. However if you decide to bypass them, you must call markDead() manually before exiting the thread, or the harvester will be stuck forever waiting for this thread to exit.

$to->abort($msg);

Abort the App with a message. This is a convenience wrapper that translates to App::abortBy().

$result = $to->isRqDead();

Check whether the thread was requested to die. For most threads, mainLoop() does this check automatically, and nextXtray() also returns the same value. However in the special cases, such as doing some long processing in response to a rowop, or doing some timeouts, it's best to do a manual check of isRqDead() periodically and abort the long operation if the thread has been requested to die, since any output will be thrown away anyway.

Note that even when the Triead has been requested to die, it still must call markDead() when it actually dies (normally the Triead::start() or startHere() takes care of it in its wrapper).

$result = $to->isConstructed();
$result = $to->isReady();

$result = $to->isDead();

$result = $to->isInputOnly();

Check the state of the Triead, the same as Triead methods.

Sunday, June 23, 2013

snapshot 1.0.93-20120632

I've decided to do one more snapshot before the release, largely to run it through CPAN's test infrastructure.

I've improved the detection of the NSPR library, and hopefully now it would build out of the box pretty much everywhere. Even if NSPR is not available, it would just use the implementation of the atomic integers through the mutexes.

Another build item, the Perl tests contain a dependency on the locale. They have the English text in some of the error strings received from the OS and Perl, so if you try to build in a non-English locale, these tests failed. To work around this issue, I've added "LANG=C" in the top-level Makefile. However if you run "make test" directly in the perl/Triceps, it has no such override (because the Makefile there is built by Perl). There just do you own "LANG=C make test".

The other items picked up in this snapshot are the last example of the fork-join topology, and other small fixes.

So far I've uploaded the snapshot to CPAN, and soon will upload it to the other locations as well.

Friday, June 21, 2013

Triead reference, C++

Naturally, Triead is a class that can be referenced from multiple threads and inherits from Mtarget. It's defined in app/Triead.h. (By the way, I forgot to mention it for App, but obviously App is also an Mtarget).

The meaning of the C++ methods is exactly the same as in Perl, only the format of values is slightly different. Obviously, the start* methods are Perl-only.

const string &getName() const;
const string &fragment() const;
bool isConstructed() const;
bool isReady() const;
bool isDead() const;
bool isInputOnly() const;

typedef map<string, Autoref<Nexus> > NexusMap;
void exports(NexusMap &ret) const;
void imports(NexusMap &ret) const;
void readerImports(NexusMap &ret) const;
void writerImports(NexusMap &ret) const;

The argument map gets cleared and then filled with the new returned contents.

Onceref<Nexus> findNexus(const string &srcName, const string &appName, const string &name) const;

This is a method unique to the C++ API. It looks up an exported nexus by name without involving the overhead of getting the whole map. Here name is the name of the nexus to look up (its short part, without the thread's name in it). If there is no such nexus, an Exception will be thrown.

The srcName and appName are used for the error message in the Exception: srcName is the name of the thread that requested the look-up, and appName is the name of the App where the threads belong. (It might seem surprising, but a Triead object has no reference to its App, and doesn't know the App's name. It has to do with the avoidance of the circular references).

Triead reference, Perl

The Triead class is the public interface of a Triceps thread, i.e. what of it is visible to the other threads. It's intended pretty much for introspection only, and all its method are only for reading the state. They are all synchronized but of course the thread may change its state at any moment. The Triead objects can be obtained by calling App::getTrieads().

The class also contains some static methods that are used to construct the Trieads.

$result = $t->same($t2);

Check that two Triead references point to the same Triead object.

$name = $t->getName();

Get the Triead's name.

$fragment = $t->fragment();

Get the name of the Triead's fragment. If the Triead doesn't belong to a fragment, returns "".

$result = $t->isConstructed();

Check whether the Triead has been constructed. (For the explanation of the Triead lifecycle states, see http://babkin-cep.blogspot.com/2013/04/the-triead-lifecycle.html.

$result = $t->isReady();

Check whether the Triead is ready.

$result = $t->isDead();

Check whether the Triead is dead.

$result = $t->isInputOnly();

Check whether the Triead is input-only, that is has no reader nexuses imported into it. When the Triead is created, this flag starts its life as false (0 for Perl), and then its correct value is computed when the Triead becomes ready. So, to check this flag correctly, you must first check that the Triead is ready.

@nexuses = $t-> exports();

Get the list of nexuses exported from this Triead,  as name-value pairs, suitable to be assigned into a hash. The values are the references to nexus objects.

@nexuses = $t->imports();
@nexuses = $t->readerImports();
@nexuses = $t->writerImports();


Get the list of nexuses imported into this Triead, as name-value pairs. The imports() returns the full list, without the ability to tell, which nexuses are imported for reading and which for writing, while readImports() and writeImports() return these subsets separately.

The names here are the "as-names" used for the import (the full names of the Nexuses can be obtained from the Nexus objects). The values are the references to nexus objects.

The next part of the API is the static construction methods. They really are wrappers to the TrieadOwner methods but Triead is a shorter name, and thus more convenient.

Triceps::Triead::start(@options);

Start a new Triead in a new Perl thread. The options are:

app => $appname
Name of the App that owns the new Triead. The App object will be looked up by name for the actual construction.

thread => $threadname
Name of the new Triead.

fragment => $fragname
Name of the new Triead's fragment. Default: "", which means no fragment.

immed => 0/1
Flag: when the new thread imports its nexuses, it should import them in the immediate mode. This flag is purely advisory, and the thread's main function is free to use or ignore it depending on its logic. It's provided as a convenience, since it's a typical concern for the helper threads. Default: 0.

main => \&function
The main function of the thread that will be called with all the options of start() plus some more:

    &$func(@opts, owner => $ownerObj)

The extra option of the main are:
owner: the TrieadOwner object constructed for this thread

Also, any other options may be added, and they will be forwarded to the main function without parsing. The main function is then free to parse them by itself, and if it finds any unknown options, it will fail.

For the convenience of writing the main functions, the set of "standard" options is provided in the global valiable

@Triceps::Triead::opts

The main function then uses this variable as a preable for any of its own options, for example:

sub listenerT          
{
    my $opts = {}; 
    &Triceps::Opt::parse("listenerT", $opts, {@Triceps::Triead::opts,
        socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
    }, @_);
    ...
}

Another convenience method is:

Triceps::Triead::startHere(@options);


It's very much the same as start() but starts the Triead in the current Perl thread. It's intended for the short-lived Apps that perform some computation and then have to return the result into the original thread. The unit tests are a good example of such apps.


And because of the typical usage, this method has some extra functionality compared to the plain start(): unless told otherwise, it will first create the App (with the name specified by the option "app"), then construct and run it, and after the main function of this thread exits, run the harvester and drop the App. So it just does the whole package, similar to App::build(), only typically the Triead started here runs as a normal real Triead, collects the results of computations and places them into some variables (global or referenced by the user-specific options of the main function).


This extra functionality can be disabled by the additional options (disable by setting them to 0):


harvest => 0/1
After the main function exits, automatically run the harvesrer. If you set it to 0, don't forget to call the harvester after this  function returns. Default: 1.

makeApp => 0/1
Before doing anything, create the App.  Obviously, this App must not exist yet. Default: 1.


These options are not passed through to the main function, unlike all the others.

App reference, C++

As usual, I won't be repeating the descriptions from the Perl reference, but just point out the methods and differences of the C++ version. And there are more detailed descriptions directly in the header files.

The static part of the API is:

static Onceref<App> make(const string &name);
static Onceref<App> find(const string &name);
static void drop(Onceref<App> app);

typedef map<string, Autoref<App> > Map;
static void listApps(Map &ret);

The App constructor is private, and the Apps get constructed with make(). make() throws an Exception if an App with this name already exists, find() throws an exception if an App with this name does not exist, and drop() just does nothing if its argument App had already been dropped.

All the operations on the global list of apps are internally synchronized and thread-safe. listApps() clears its argument map and fills it with the copy of the current list. Of course, after it returns and releases the mutex, other threads may create or delete apps, so the returned list (well, map) may immediately become obsolete. But since it all is done with the reference counters, the App objects will continue to exist and be operable as long as the references to them exist.

The App instance API is as follows. It's also all internally synchronized.

const string &getName() const;

Get the name of the App.

Onceref<TrieadOwner> makeTriead(const string &tname, const string &fragname = "");

Define a new Triead. This method is called either from the OS thread where the Triead will run or from its parent thread (unlike Perl, in C++ it's perfectly possible to pass the reference to the TrieadOwner from a parent OS thread to the thread that will run it). Either way, the OS thread that will run this Triead ends up with the TrieadOwner object reference, and no other thread must have it. The arguments are the thread name and fragment name, and the empty fragment name means that this thread won't belong to any fragment.

And just to reiterate, this does not create the OS thread. Creating the OS thread is your responsibility. This call only creates the Triceps Triead management structures to put it under control of the App.

If a thread with this name has already been defined, or if the thread name is empty, throws an Exception.

void declareTriead(const string &tname);

Declare a new thread. I forgot to tell it in the Perl API description, but declaring a thread more than once, or declaring a thread that has been already defined, is perfectly OK.

void defineJoin(const string &tname, Onceref<TrieadJoin> j);

This is a call without an analog in the Perl API. This defines a way for the harvester to join the thread. For the Perl API it happens to be hardcoded to join the Perl threads. But the C++ API can deal with any threads: POSIX ones, Perl ones, whatever. If a thread wants to be properly joined by the harvester, it must define its join interface, done as a TrieadJoin object. Each kind of threads will define its own subclass of TrieadJoin.

If there is no join defined for a Triead, then when it exits, the harvester will just update the state and manage the Triead object properly but won't do any joining. Which is useful in case if the OS thread is detached (not the best idea but doable) or if the Triead is created from the parent OS thread, and then the actual OS thread creation fails and there is nothing to join.

If the thread is not declared nor defined yet, defineJoin() throws an exception.

It's possible (though unusual) to call this method multiple times for the same thread. That would just replace the joiner object. The joiner is a reference-counted object, so the old object will just have itse reference count decreased. It's possible to pass the joiner as NULL, that would just drop the existing joiner, if any was defined.


typedef map<string, Autoref<Triead> > TrieadMap;
void getTrieads(TrieadMap &ret) const;

List the Trieads in this App. Same as with listing the Apps, the argument map gets cleared and then filled with the current contents.

void harvester(bool throwAbort = true);
bool harvestOnce();
void waitNeedHarvest();

The harvester API is very similar to Perl, with confession replaced by Exception, with the only difference of how the flag for throwing an Exception on detecting an App abort (the Exception will still be thrown only after joining all the App's threads).

The result of harvestOnce() is true if the App is dead. The Exceptions in harvestOnce() originate from the TrieadJoin::join() method that performs the actual joining. All the caveats apply in the same way as in Perl.

bool isDead();

Returns true if the App is dead.

void waitDead();

Wait for the App to become dead.

bool isAborted() const;

Returns true if the App is aborted.

string getAbortedBy() const;
string getAbortedMsg() const;

Get the thread name and message that caused the abort. If the App is not aborted, will return the empty strings.

void abortBy(const string &tname, const string &msg);

Abort the app, by the thread tname, with the error message msg.

bool isShutdown();


Returns true if the App has been requested to shut down.


 void shutdown();

Request the App to shut down. This involves interrupting all the threads in case if they are sleeping.  The interruption is another functionality of the TrieadJoin object. It's possible for the TrieadJoin interruptor to encounter an error and throw an Exception. If this happens, shutdown() will still go through all the Trieads and interrupt them, and then repackage the error messages from all the received Exceptions into one Exception and re-throw it.

Technically, this means that in the Perl API the shutdown might also confess, when its underlying C++ call returns an Exception. This should theoretically never happen, but practically you never know.

void shutdownFragment(const string &fragname);

Shutdown a fragment. All the logic described in the Perl API applies. Again, this involves interruption of all the threads in the fragment, and if any of them through Exceptions, these will be re-thrown as a single Exception.

enum {
    DEFAULT_TIMEOUT = 30,
};
void setTimeout(int sec, int fragsec = -1);

Set the readiness timeouts (main and fragment), in seconds. If the fragment timeout argument is <0, it gets set to the same value as the main timeout.

void setDeadline(const timespec &dl);

Set the deadline (unlike timeout, with fractional seconds) for the main readiness.

void refreshDeadline();

Explicitly refresh the deadline, using the fragment timeout.

void requestDrain();

Request a shared drain.

void requestDrainExclusive(TrieadOwner *to);

Request an exclusive drain, with the argument TrieadOwner. Unlike Perl API, the C++ API supports the methods for requesting the exclusive drain on both App and TrieadOwner classes (the TrieadOwner method is really a wrapper for the App method). In general, using the TrieadOwner method for this purpose probably looks nicer.

Since the TrieadOwner reference is really private to the OS thread that runs it, this method can be called only from that OS thread. Of course, being C++, you could pass it around to the other threads, but don't, TrieadOwner is not thread-safe internally and any operations on it must be done from one thread only.

void waitDrain();

Wait for the drain (either shared or exclusive) to complete.

void drain();

A combination of requestDrain() and waitDrain().

void drainExclusive(TrieadOwner *to);

A combination of requestDrainExclusive() and waitDrain().

bool isDrained();

Quickly check if the App is currently drained (should be used only if the App is known to be requested to drain).

void undrain();

End the drain sequence.

The file descriptor store/load API is really of not much use in C++ as such, in C++ it's easy to pass and share the file descriptors and file objects between the threads as-is. It has been built into the App class for the benefit of Perl and possibly other interpreted languages.

void storeFd(const string &name, int fd);

Store a file descriptor. Unlike the Perl API, the file descriptor is NOT dupped before storing. It's stored as is, and if you want dupping, you have to do it yourself. Throws an Exception if a file descriptor with this name is already stored.

int loadFd(const string &name) const;

Load back the file descriptor. Again, no dupping, returns the stored value as-is. If the name is unknown, returns -1.

bool forgetFd(const string &name);

Forget the file descriptor. Returns true if this name was known and became forgotten, or false if it wasn't known. Normally, you wold load the descriptor, take over its ownership, and then tell the App to forget it.

bool closeFd(const string &name);

Close the file descriptor (if it was known) and then forget it. Returns true if this name was known and became forgotten, or false if it wasn't known.

The rest of the Perl App methods have no analogs in C++. They are just purely Perl convenience wrappers.

App reference, Perl, part 2

Triceps::App::setTimeout($appOrName, $main_to_sec);
Triceps::App::setTimeout($appOrName, $main_to_sec, $frag_to_sec);

Set the readiness timeout. Even though Triceps is quite eager in watching for deadlocks in the threads topology, it's still possible to get some threads, and with them the whole program, stuck during initialization. For example, if you declare a thread and then never define it. These situations ere very unpleasant because you start the program and expect it to work but it doesn't, without any indication to why. So Triceps imposes an initialization timeout. The App (and thus all its threads) must become ready within the timeout after the definition or declaration of the last Triead. If not, the App will be aborted (and the error message will tell, which thread did not initialize in time). The same applies to the creation of Trieads (usually in the fragments) after the App is already running: all the threads must become ready within the timeout since the last thread has been defined or declared.

The default timeout is 30 seconds, or symbolically

&Triceps::App::DEFAULT_TIMEOUT

(subject to possible changes of the exact value in the future).

But the timeout can be changed. Technically, there are two timeouts:

  • one starts when the App is created
  • one restarts when any Triead is defined or declared

For the App to be aborted, both timeouts must expire. By default they are set to the same length, so only the second timeout really matters, since it will always be the last one to start and last one to end. But if you set the first timeout to be longer, you can allow for a longer initialization period at the App start ("main timeout") than later when more threads are added to a running App ("frag timeout", since the threads added later are typically the threads nin fragments).

The one-argument form of setTimeout() sets both timeouts to the same value, The two-argument form sets these timeouts separately.

The timeouts may be set only before the first thread has been created. This is largely due to the historical reasons of implementation, with the current implementation it should actually be safe to allow changing the timeouts at a later point as well, and this limitation may be removed in the future.

The timeout values are represented as integer whole seconds.

Note that it's still possible to get the initialization stuck without any indication if it gets stuck in the other libraries. The reason is that the harvester waits for all the threads to be joined before it propagates the error, so if a thread doesn't get joined, the harvester will be stuck. For example, if you open a file on NFS as a part of Triead initialization, and the NFS server doesn't respond, this thread will be stuck for a long time if not forever. The App will detect a timeout and try to interrupt the threads but the NFS operations are often not interruptable, so the harvester will wait for this thread to complete this operation and exit before it propagates the error, and thus the whole program will be silently stuck forever (and to avoid this, the NFS mounts should be done in the "soft" mode but it's a separate story). This will likely be improved in the future but it needs more thinking about how to do it right.

Triceps::App::setDeadline($appOrName, $deadline_sec);

Set the "main" timeout in the form of an absolute deadline. This is actually closer to the way it works internally: the limit is expressed as a deadline, and setTimeout() just adds the timeout value to the current time to compute the deadline, while setDeadline() sets it straight. Like setTimeout(), this may be called only before any Trieads were created.

The time is represented as floating-point seconds since epoch. Triceps::now() can be used to get the current time with the fractional seconds (or if you don't care about the fractional seconds, you can always use the stock time()).

Triceps::App::refreshDeadline($appOrName);

Restart the  "fragment" timeout. Same as when a Triead is defined or declared, only without involving a Triead. Again, the name has to do with how the time computations are done internally, computing the deadline from both timeouts. This method can be called at any time.

The next few methods have to do with the drains. Generally, using the AutoDrain class to automatically limit the scope of the drains is a better idea. But if the automatic scoping is not desired, the App methods can be used directly.

Triceps::App::requestDrain($appOrName);

Request a shared drain. Does not wait for the drain to complete. This method may be called repeatedly, potentially from multiple threads, which will keep the drain active and increase the recursion count. The drain will be released when undrain() is called the matching number of times.

If an exclusive drain by another thread is active when requestDrain() is called, the call will be stuck until the exclusive drain becomes undrained (and potentially more exclusive drains might be queued up before the shared drain, using the POSIX read-write-lock implementation). Otherwise the call will set the appropriate state and return immediately.

If this thread has requested an exclusive drain previously (and didn't undrain it yet), an attempt to get a shared drain in the same thread will likely deadlock. The same applies in the opposite order as well.

Once the shared drain is requested, the input-only threads will be blocked from sending more data (any their attempts to flush their write facets will get stuck until undrain), and the rest of the threads will continue churning through the data buffered in the nexuses until all the nexuses are empty and there is no more data to process (yes, these threads will continue writing to their write facets).

It is important to not request a shared drain from an input-only thread and then try to write more data into an output facet, that would deadlock. The whole concept is doable, but an exclusive drain must be used instead ("exclusive" means that a designated input-only thread is excluded from blocking).

Triceps::App::waitDrain($appOrName);

Wait for the requested shared drain to complete. This means that all the queues in all the nexuses have become empty.

The effect of waitDrain() without a preceding requestDrain() is undefined. If you definitely know that some other thread has requested a drain and didn't undrain yet, it will work as normal, so you can have any number of threads wait for drain in parallel. (And the semantics, shared or exclusive, will match that currently active request). If no thread requested a drain, it might either return immediately irrespective of the state of the nexuses or might wait for some other thread to request a drain and succeed.

This call may also be used with an exclusive (or shared) drain requested through a TrieadOwner or any drain requested through an AutoDrain (though a better style is to use the same object as used for requesting the drain).

Triceps::App::drain($appOrName);

A combination of requestDrain() and waitDrain() in one call.

Triceps::App::undrain($appOrName);

Release the drain and let the normal processing continue. Make sure to call it exactly the same number of times as the requestDrain().

This call may also be used with a drain requested through a TrieadOwner (though a better style is to use the same object as used for requesting the drain).

$result = Triceps::App::isDrained($appOrName);

Check whether the App is currently drained (i.e. all the nexuses are empty), without waiting for it. Returns 1 if drained, 0 if not. If no drain is active during this call, the result is undefined, not even in the "best effort" sense: it may return 1 even if there are millions of records queued up. The only reliable way is to request a drain first.

This call may also be used with a drain requested through a TrieadOwner or through an AutoDrain.


The next part of the API deals with the passing of the file descriptors between the Perl threads. The concepts have been described in detail before, this is just a short reference. This API works under the hood of TrieadOwner::TrackGetSocket() and friends, those desribed in http://babkin-cep.blogspot.com/2013/04/multithreaded-socket-server-part-3.html and neighboring posts, but can also be used directly.

Triceps::App::storeFd($appOrName, $name, $fd);

Store a file descriptor in the App object, allowing to load it into other threads, and thus pass it around between the threads. $name is the name for this descriptor that will later be used to get the file descriptor back (generally, you want to generate a unique name for each file descriptor stored to avoid confusion, and then pass this name to the target thread). $fd is the file descriptor, an integer number, normally received from fileno(). The file descriptor is dupped before it gets stored, so the original will continue to exist, and if you have no other use for it, should be closed.

If a file descriptor with this name already exists in the App, this call will confess.

$fd = Triceps::App::loadFd($appOrName, $name);

Load back the file descriptor that was previously stored. If no descriptor with such a name is stored in the App, it will confess. The descriptor will keep existing in the App, so to keep things consistent, there are two options:

One is to let your code take over the ownership of the file descriptor, and tell the App to forget about it with forgetFd().

The other one is to never close the received descriptor in your code (a good example would be to dup it right away for the future use and then leave the original alone), and let App keep its ownership.

$fd = Triceps::App::loadDupFd($appOrName, $name);

Very much the same as loadFd(), only does the dupping for you and returns the dupped descriptor. In this case your code is responsible for closing that descriptor. Which method is more suitable, loadFd() or dupFd(), depends on the nature of the code that will use the file descriptor and whether you want to leave the descriptor in the App for more threads to load.

Triceps::App::forgetFd($appOrName, $name);

Forget the file descriptor in the App. It doesn't get closed, so closing it becomes your responsibility, or it will leak. If no descriptor with this name exists, the call will confess.

Triceps::App::closeFd($appOrName, $name);

Close and forget the file descriptor in the App. If no descriptor with this name exists, the call will confess.

Triceps::App::storeFile($appOrName, $name, FILE);

A convenience wrapper for closeFd(), calls fileno() on the file and stores the resulting file descriptor.

Triceps::App::storeCloseFile($appOrName, $name, FILE);

Stores the file descriptor extracted from the file, and closes the original file (since the file descriptor gets dupped on store, that copy continues to exist in the App).

$file = Triceps::App::loadDupFile($appOrName, $name, $mode);

Load a file descriptor and build a file handle object (IO::Handle) from it. The mode string may be specified in either the open() format (</>/>>/+</+>/+>>) or the C stdio format (r/w/a/r+/w+/a+). Note that the mode must match or be a subset of the mode used to originally open the file descriptor. If you open a file read-only, store its descriptor, and load back as a write-only file, you will have a bad time.

There is no corresponding loadFile(), since loadFd() is a more dangerous method that is useful for the low-level operations but doesn't make much sense for the higher level.

$file = Triceps::App::loadDupSocket($appOrName, $name, $mode);

Load a file descriptor and build an IO::Socket::INET object from it. The mode string meaning is the same as for loadDupFile().


$file = Triceps::App::loadDupFileClass($appOrName, $name, $mode, $class);


Load a file descriptor and build an arbitrary class object from it. The class (specified by its name) should normally be a subclass of IO::Handle, or at the very least must implement the method new_from_fd() similar to IO::Handle. This is the common underlying implementation of but loadDupFile() and loadDupSocket().


The last part of the API is the convenience methods for building and starting an App.

Triceps::App::build($name,  \&builder );

Build an App instance. It creates the App instance, then the builder function is called to create the App's nexuses and threads, then the harvester is executed, that eventually destroys the App object after collecting all its threads. For a very basic example:

Triceps::App::build "a1", sub {
    Triceps::App::globalNexus(
        name => "types",
        rowTypes => [
            rt1 => $rt1,
        ],
    );
    Triceps::Triead::start(
        app => $Triceps::App::name,
        thread => "t1",
        main => sub {
            my $opts = {};
            &Triceps::Opt::parse("t1 main", $opts, {@Triceps::Triead::opts}, @_);
            my $to = $opts->{owner};
            $to->importNexus(
                from => "global/types",
                import => "writer", # if importing just for the types, use "writer"!
            );
            $to->readyReady();
        },
    );
};

The builder function runs in the current Perl thread, however from the logical standpoint it runs in the App's first Triead named "global" (this Triead name is hardcoded in build()). This allows you not to worry about the App being technically dead until the first Triead is created: by the time the builder function is called, the first Triead is already created. However it also means that you can't change the readiness timeouts. After the builder function returns, the global Triead exits, and the harvester starts in the same current Perl thread.

The builder has access to a few global variables:

  • $Triceps::App::name is the App name, from the build() first argument.
  • $Triceps::App::app is the reference to the App object.
  • $Triceps::App::global is the reference to the TrieadOwner object of the global Triead.
After the builder function exits, these variables become undefined.

Triceps::App::globalNexus(@nexusOptions);

Creates a nexus with the import mode of "none", on the global Triead. The arguments are the same options as for the normal nexus creation. This is a simple convenience wrapper for the nexus creation. Since the global thread is supposed to exit immediately, there is no point in importing this nexus into it.

The global thread is found by this function from $Triceps::App::global, so this method can only be used from inside the builder function of build().

Tuesday, June 18, 2013

$! !!!!!!

Turns out, the more recent versions of Perl (starting with 5.16.4 or maybe even earlier) have changed the way they treat the error variable $!. This special can not be assigned any arbitrary error text any more, now they want it to be a proper integer OS error code. So all the Triceps error reporting through $! breaks on the newer versions of Perl. The code still works, except that the text of the errors can not be extracted,

Well, looks like it's about time to bite the bullet and finally convert everything to the newer and better error reporting with confessions.

Sunday, June 16, 2013

App reference, Perl, part 1

The App API has two parts to it: The first part keeps track of all the App instances in the program, and allows to list and find them. The second part is the manipulations on a particular instance.

The global part of the API is:

@apps = Triceps::App::listApps();

Returns the array of name-value pairs, values containing the App references, listing all the Apps in the program (more exactly, in this Triceps library, you you compile multiple Triceps libraries together by renaming them, each of them will have its own Apps list). The returned array can be placed into a hash.

$app = Triceps::App::find($name);

Find an App by name. If an App with such a name does not exist, it will confess.

$app = Triceps::App:make($name);

Create a new App, give it the specified name, and put it on the list. The names must not be duplicate with the other existing Apps, or the method will confess.

drop($app);
drop($appName);

Drop the app, by reference or by name, from the list. The App is as usual reference-counted and will exist while there are references to it. The global list provides one of these references, so an App is guaranteed to exist while it's still on the list. When dropped, it will still be accessible through the existing references, but obviously will not be listed any more and could not be found by name.

Moreover, a new App with the same name can be added to the list. Because of this, dropping an App by name requires some care in case if there could be a new App created again with the same name: it creates a potential for a race, and you might end up dropping the new App instead of the old one. Of course, if it's the same thread that drops the old one and creates the new one, then there is no race. Dropping an application by name that doesn't exist at the moment is an error and will confess.

Dropping the App by reference theoretically allows to avoid the race: a specific object gets dropped, and if it already has been dropped, the call has no effect. Theoretically. However in practice Perl has a limitation of passing the object values between threads, and thus whenever each thread starts, first thing it does is finding its App by name. It's a very similar kind of race and is absolutely unavoidable except by making sure that all the App's threads have exited and joined (i.e. harvesting them). So make sure to complete the App's thread harvesting before dropping the App in the harvester thread, and by then it doesn't matter a whole lot if the App is dropped by name or by reference.

Now the second part of the API, working with an App instance.

Many (but not all) of the App methods allow to specify the App either by reference or by name, and they automatically sort it out, doing the internal look-up by name if necessary. So the same method could be used as either of:

$app->method(...);
Triceps::App::method($app, ...);
Triceps::App::method($appName, ...);

Obviously, you can not use the "->" syntax with a name, and obviously if the name is not in the app list, the method will confess. Below I'll show the calls that allow the dual formats as Triceps::App::method($appOrName, ...) but keep in mind that you can use the "->" form of them too with a reference.

$app = Triceps::App::resolve($appOrName);
Do just the automatically-sorting-out part: gets a reference or name and returns a reference either way. A newly created reference is returned in either case (not the argument reference). You can use this resolver before the methods that accept only a reference.

$result = $app->same($app2);

Check if two references are for the same App object. Here they both must be references and not names.

$name = $app->getName();

Get the name of the App, from a reference.

Triceps::App::declareTriead($appOrName, $trieadName);

Declare a Triead (Triceps thread) in advance. Any attempts to look up the nexuses in that thread will then wait for the thread to become ready. (Attempts to look up in an undeclared and undefined thread are errors). This is necessary to prevent a race at the thread creation time.  For the most part, the method Triead::start() just does the right thing by calling this method automatically and you don't need the use it manually, except in some very special circumstances.

@trieads = Triceps::App::getTrieads($appOrName);

Get the list of currently defined Trieads, as name-value pairs. Keep in mind that the other threads may be modifying the list of Trieads, so if you do this call multiple times, you may get different results. However the Trieads are returned as references, so they are guaranteed to stay alive and readable even if they get removed from the App, or even if the App gets dropped.

$app->harvester(@options);

Run the harvester in the current thread. The harvester gets notifications from the threads when they are about to exit, and joins them.  After all the threads have been joined, it automatically drops the App, and returns.
 
The harvesting is an absolutely necessary part of the App life cycle, however in most of the usage patterns (such as with Triead::startHere or App::build) the harvester is called implicitly from the wrapping library functions, so you don't need to care about it.

Note also that if you're running the harvester manually, you must call it only after the first thread has been defined or at least declared. Otherwise it will find no threads in the App, consider it dead and immediately drop it.

If the App was aborted, the harvester will normally confess after if had joined all the threads and disposed of the App, unless the option die_on_abort (see below) has been set to 0. This propagates the error to the caller. However there is a catch: if some of the threads don't react properly by exiting on an abort indication, the program will be stuck and you will see no error message until these threads get unstuck, possibly forever.

 Options:

 die_on_abort => 1/0

(default: 1) If the App was aborted, the harvester will normally confess after if had joined all the threads and disposed of the App.  Setting this option to 0 will make the aborts silently ignored. This option does not affect the errors in joining the threads: if any of those are detected, harvester will still confess after it had disposed of the app.

$dead = $app->harvestOnce();

Do one run of the harvesting.  Joins all the threads that have exited since its last call. If no threads have exited since then, returns immediately. Returns 1 if all the threads have exited (and thus the App is dead), 0 otherwise.  If a thread join fails, immediately confesses (if multiple threads were ready for joining, the ones queued after the failed one won't be joined, call harvestOnce() once more to join them).

$app->waitNeedHarvest();

Wait for at least one thread to become ready for harvesting. If the App is already dead (i.e. all its threads have exited), returns immediately.

These two methods allow to write the custom harvesters if you're not happy with the default one. The basic harvester logic can be written as:

do {
  $app->waitNeedHarvest()
} while(!$app->harvestOnce());
$app->drop();

However the real harvester also does some smarter things around the error handling. You can look it up in the source code in cpp/app/App.cpp.

$res = Triceps::App::isDead($appOrName);

Returns 1 if the App is dead (i.e. has no more live threads), otherwise 0.  Calling this method with a name for the argument is probably a bad idea, since normally the harvester will drop the App quickly after it becomes dead, and you may end up with this method confessing when it could not find the dropped App.

$res = Triceps::App::isShutdown($appOrName);

Returns 1  if the App has been requested to shut down, either normally or by being aborted.

$res = Triceps::App::isAborted($appOrName);

Returns 1 if the App has been aborted. The App may be aborted explicitly by calling the method abortBy(), or the thread wrapper logic automatically converts any unexpected deaths in the App's threads to the aborts. If any thread dies, this aborts the App, which in turn requests the other threads to die on their next thread-related call. Eventually the harvester collects them all and confesses, normally making the whole program die with an error.

($tname, $message) = Triceps::App::getAborted($appOrName);

Get the App abort information: name of the thread that caused the abort, and its error message.

Triceps::App::abortBy($appOrName, $tname, $msg);

Abort the application. The thread name and message will be remembered, and returned later by getAborted() or in the harvester. If abortBy() is called multiple times, only the first pair of thread name and message gets remembered. The reason is that on abort all the threads get interrupted in a fairly rough manner (all their ongoing and following calls to the threading API die), which typically causes them to call abortBy() as well, and there is no point in collecting these spurious messages.

The thread name here doesn't have to be the name of the actual thread that reports the issue. For example, if the thread creation as such fails (maybe because the OS limit on the thread count) that gets detected by the parent thread but reported in the name of the thread whose creation has failed. And in general you can pass just any string as the thread name, App itself doesn't care, just make it something that makes sense to you.

$res = Triceps::App::isDead($appOrName);


Returns 1 if the App is dead (i.e. it has no alive Trieads, all the defined and declared threads have exited). Right after the App is created, before the first Triead is created, the App is also considered dead, and becomes alive when the first Triead is declared or defined. If an App becomes dead later, when all the Trieads exit, it can still be brought back alive by creating more Trieads. But this is considered bad practice, and will cause a race with the harvester (if you want to do this, you have to make your own custom harvester).

$res = Triceps::App::isShutdown($appOrName);


Returns 1 if the App was requested to shut down. The Trieads might still run for some time, until they properly detect and process the shutdown, and exit. So this condition is not equivalent to Dead, althouh they are connected. If any new Trieads get started, they will be shut down right away and won't run.

To reiterate: if all the Trieads just exit by themselves, the App becomes dead but not shut down. You could still start more Trieads and bring the App back alive. If the App has been shut down, it won't become immediately dead, but it will send the shutdown indication to all the Trieads, and after all of them eventually exit, the App will become dead too. And after shutdown there is no way to bring the App back alive, since any new Trieads will be shut down right away (OK, there might be a short period until they detect the shutdown, so the App could spike as alive for a short time, but then will become dead again).

Triceps::App::waitDead($appOrName);


Will wait for the App to become dead and return after that. Make sure to not call waitDead() from any of App's Trieads: that would cause a deadlock.


Triceps::App::shutdown($appOrName);

Shut down the App. The shutdown state is sticky, so any repeated calls will have no effect. The call returns immediately and doesn't wait for the App to die. If you want to wait, call waitDead() afterwards. Make sure to not call waitDead() from a Triead: that would cause a deadlock.


Triceps::App::shutdownFragment($appOrName, $fragName);

Shut down a named fragment. This does not shut down the whole App, it just selectively shuts down the Trieads belonging to this fragment . See the explanation of the fragments in http://babkin-cep.blogspot.com/2013/03/triceps-multithreading-concepts.html. The fragment shutdown is not sticky: after a fragment has been shut down, it's possible to create another fragment with the same name. To avoid races, a fragment may be shut down only after all its Trieads are ready. So the caller Triead must call readyReady() before it calls shutdownFragment(). If any of the fragment's Trieads are not ready, the call will confess.

Tuesday, June 11, 2013

options passing through

I've already shown it in the examples, but here is also the official description: you can accept the arbitrary options, typically if your function is a wrapper to another function, and you just want to process a few options and let the others through. The Triead::start() is a good example, passing the options through to the main function of the thread.

You specify the acceptance of the arbitrary options by using "*" in the Opt::parse() arguments. For example:

  &Triceps::Opt::parse($myname, $opts, {
    app => [ undef, \&Triceps::Opt::ck_mandatory ],
    thread => [ undef, \&Triceps::Opt::ck_mandatory ],
    fragment => [ "", undef ],
    main => [ undef, sub { &Triceps::Opt::ck_ref(@_, "CODE") } ],
    '*' => [],
  }, @_);

The specification array for "*" is empty. The unknown options will be collected in the array referred to from $opts->{'*'}, that is @{$opts->{'*'}}.

From there on your wrapper has the choice of either passing through all the options to the wrapped function, using @_, or explicitly specifying a few options and  passing through the  rest from @{$opts->{'*'}}.

There is also the third possibility: filter out only some of the incoming options. This can be done with Opt::drop(). For example, Triead::startHere() works like this:

our @startOpts = (
  app => [ undef, \&Triceps::Opt::ck_mandatory ],
  thread => [ undef, \&Triceps::Opt::ck_mandatory ],
  fragment => [ "", undef ],
  main => [ undef, sub { &Triceps::Opt::ck_ref(@_, "CODE") } ],
);

sub startHere # (@opts)
{
  my $myname = "Triceps::Triead::start";
  my $opts = {};
  my @myOpts = ( # options that don't propagate through
    harvest => [ 1, undef ],
    makeApp => [ 1, undef ],
  );

  &Triceps::Opt::parse($myname, $opts, {
    @startOpts,
    @myOpts,
    '*' => [],
  }, @_);

  my @args = &Triceps::Opt::drop({
    @myOpts
  }, \@_);
  @_ = (); # workaround for threads leaking objects

  # no need to declare the Triead, since all the code executes synchronously anyway
  my $app;
  if ($opts->{makeApp}) {
    $app = &Triceps::App::make($opts->{app});
  } else {
    $app = &Triceps::App::resolve($opts->{app});
  }
  my $owner = Triceps::TrieadOwner->new(undef, undef, $app, $opts->{thread}, $opts->{fragment});
  push(@args, "owner", $owner);
  eval { &{$opts->{main}}(@args) };
...

The @startOpts are both used by the startHere() and passed through. The @myOpts are only used in startHere() and do not pass through. And the rest of the options pass through without baing used in startHere(). So the options from @myOpts get dropped from @_, and the result goes to the main thread.

The Opt::drop() takes the specification of the options to drop as a hash reference, the same as Opt::parse(). The values in the hash are not important in this case, only the keys are used. But it's simpler to store the same specification of the options and reuse it for both parse() and drop() than to write it twice.

There is also an opposite function, Opt::dropExcept(). It passes through only the listed options and drops the rest. It can come handy if your wrapper wants to pass different subsets of its incoming options to multiple functions.

The functions drop() and dropExcept() can really be used on any name-value arrays, not just the options as such. And the same goes for the Fields::filter() and friends. So you can use them interchangeably: you can use Opt::drop() on the row type specifications and Fields::filter() on the options if you feel that it makes your code simpler.

Monday, June 10, 2013

checking a row for emptiness

I've mentioned that the rowops with the empty rows (i.e. rows with all fields NULL) on the _BEGIN_ and _END_ labels in the facets are treated specially. The same check is also available as the directly callable method, in case if you have some other uses for it.

In Perl it is:

$result = $row->isEmpty();

It returns 1 if all the fields are NULL and 0 otherwise.

In C++ this is done as a method of the RowType class:

virtual bool RowType::isRowEmpty(const Row *row) const;

And is used like this:

bool res = type->isRowEmpty(row);

It's also available as a convenience wrapper method on the Rowref:

Rowref r1(...);
if (r1.isRowEmpty()) { ... }

Friday, June 7, 2013

reordering the data from multiple threads, part 3

And finally the code of the example. I've written it with the simple approach of read stdin, print to stdout. It's easier this way, and anyway to demonstrate the rowop reordering, all the input has to be sent quickly in one chunk. The application starts as:

Triceps::Triead::startHere(
    app => "ForkJoin",
    thread => "main",
    main => \&mainT,
    workers => 2,
    delay => 0.02,
);

The main thread is:

sub mainT # (@opts)
{
    my $opts = {};
    &Triceps::Opt::parse("mainT", $opts, {@Triceps::Triead::opts,
        workers => [ 1, undef ], # number of worker threads
        delay => [ 0, undef ], # artificial delay for the 0th thread
    }, @_);
    undef @_; # avoids a leak in threads module
    my $owner = $opts->{owner};
    my $app = $owner->app();
    my $unit = $owner->unit();


So far the pretty standard boilerplate with the argument parsing.

    my $rtRate = Triceps::RowType->new( # an exchange rate between two currencies
        ccy1 => "string", # currency code
        ccy2 => "string", # currency code
        rate => "float64", # multiplier when exchanging ccy1 to ccy2
    ) or confess "$!";

    # the resulting trade recommendations
    my $rtResult = Triceps::RowType->new(
        triead => "int32", # id of the thread that produced it
        ccy1 => "string", # currency code
        ccy2 => "string", # currency code
        ccy3 => "string", # currency code
        rate1 => "float64",
        rate2 => "float64",
        rate3 => "float64",
        looprate => "float64",
    ) or confess "$!";

The row types originate from the self-join example code, same as before.

    # each tray gets sequentially numbered and framed
    my $rtFrame = Triceps::RowType->new(
        seq => "int64", # sequence number
        triead => "int32", # id of the thread that produced it (optional)
    ) or confess "$!";

    # the plain-text output of the result
    my $rtPrint = Triceps::RowType->new(
        text => "string",
    ) or confess "$!";

These are the new additions. The frame row type is used to send the information about the sequence number in the _BEGIN_ label. The print row type is used to send the text for printing back to the main thread.

    # the input data
    my $faIn = $owner->makeNexus(
        name => "input",
        labels => [
            rate => $rtRate,
            _BEGIN_ => $rtFrame,
        ],
        import => "none",
    );

    # the raw result collected from the workers
    my $faRes = $owner->makeNexus(
        name => "result",
        labels => [
            result => $rtResult,
            _BEGIN_ => $rtFrame,
        ],
        import => "none",
    );

    my $faPrint = $owner->makeNexus(
        name => "print",
        labels => [
            raw => $rtPrint, # in raw order as received by collator
            cooked => $rtPrint, # after collation
        ],
        import => "reader",
    );

The processing will go in essentially a pipeline: read the input -> process in the worker threads -> collate -> print in the main thread. Only the worker thread stage spreads into multiple threads, that are then joining the data paths back together in the collator.

    Triceps::Triead::start(
        app => $app->getName(),
        thread => "reader",
        main => \&readerT,
        to => $owner->getName() . "/input",
    );

    for (my $i = 0; $i < $opts->{workers}; $i++) {
        Triceps::Triead::start(
            app => $app->getName(),
            thread => "worker$i",
            main => \&workerT,
            from => $owner->getName() . "/input",
            to => $owner->getName() . "/result",
            delay => ($i == 0? $opts->{delay} : 0),
            workers => $opts->{workers},
            identity => $i,
        );
    }

    Triceps::Triead::start(
        app => $app->getName(),
        thread => "collator",
        main => \&collatorT,
        from => $owner->getName() . "/result",
        to => $owner->getName() . "/print",
    );

    my @rawp; # the print in original order
    my @cookedp; # the print in collated order

    $faPrint->getLabel("raw")->makeChained("lbRawP", undef, sub {
        push @rawp, $_[1]->getRow()->get("text");
    });
    $faPrint->getLabel("cooked")->makeChained("lbCookedP", undef, sub {
        push @cookedp, $_[1]->getRow()->get("text");
    });

    $owner->readyReady();

    $owner->mainLoop();

    print("--- raw ---\n", join("\n", @rawp), "\n");
    print("--- cooked ---\n", join("\n", @cookedp), "\n");
}

The collator will send the data for printing twice: first time in the order it was received ("raw"), second time in the order after collation ("cooked").

sub readerT # (@opts)
{
    my $opts = {};
    &Triceps::Opt::parse("readerT", $opts, {@Triceps::Triead::opts,
        to => [ undef, \&Triceps::Opt::ck_mandatory ], # dest nexus
    }, @_);
    undef @_; # avoids a leak in threads module
    my $owner = $opts->{owner};
    my $unit = $owner->unit();

    my $faIn = $owner->importNexus(
        from => $opts->{to},
        import => "writer",
    );

    my $lbRate = $faIn->getLabel("rate");
    my $lbBegin = $faIn->getLabel("_BEGIN_");
    # _END_ is always defined, even if not defined explicitly
    my $lbEnd = $faIn->getLabel("_END_");

This demonstrates that the labels _BEGIN_ and _END_ always get defined in each nexus, even if they are not defined explicitly. Well, here _BEGIN_ was defined explicitly but _END_ was not, and nevertheless it can be found and used.

    my $seq = 0; # the sequence

    $owner->readyReady();

    while(<STDIN>) {
        chomp;

        ++$seq; # starts with 1
        $unit->makeHashCall($lbBegin, "OP_INSERT", seq => $seq);
        my @data = split(/,/); # starts with a string opcode
        $unit->makeArrayCall($lbRate, @data);
        # calling _END_ is an equivalent of flushWriter()
        $unit->makeHashCall($lbEnd, "OP_INSERT");
    }

    {
        # drain the pipeline before shutting down
        my $ad = Triceps::AutoDrain::makeShared($owner);
        $owner->app()->shutdown();
    }
}

Each input row is sent through in a separate transaction, or in another word, a separate tray. The _BEGIN_ label carries the sequence number of the tray. The trays can as well be sent on with flushWriters() or flushWriter(), but I wanted to show that you can also flush it by calling the _END_ label.

sub workerT # (@opts)
{
    my $opts = {};
    &Triceps::Opt::parse("workerT", $opts, {@Triceps::Triead::opts,
        from => [ undef, \&Triceps::Opt::ck_mandatory ], # src nexus
        to => [ undef, \&Triceps::Opt::ck_mandatory ], # dest nexus
        delay => [ 0, undef ], # processing delay
        workers => [ undef, \&Triceps::Opt::ck_mandatory ], # how many workers
        identity => [ undef, \&Triceps::Opt::ck_mandatory ], # which one is us
    }, @_);
    undef @_; # avoids a leak in threads module
    my $owner = $opts->{owner};
    my $unit = $owner->unit();
    my $delay = $opts->{delay};
    my $workers = $opts->{workers};
    my $identity = $opts->{identity};

    my $faIn = $owner->importNexus(
        from => $opts->{from},
        import => "reader",
    );

    my $faRes = $owner->importNexus(
        from => $opts->{to},
        import => "writer",
    );

    my $lbInRate = $faIn->getLabel("rate");
    my $lbResult = $faRes->getLabel("result");
    my $lbResBegin = $faRes->getLabel("_BEGIN_");
    my $lbResEnd = $faRes->getLabel("_END_");

The worker thread starts with the pretty usual boilerplate.

    my $seq; # sequence from the frame labels
    my $compute; # the computation is to be done by this label
    $faIn->getLabel("_BEGIN_")->makeChained("lbInBegin", undef, sub {
        $seq = $_[1]->getRow()->get("seq");
    });

The processing of each transaction starts by remembering its sequence number from the _BEGIN_ label. It doesn't send a _BEGIN_ to the output yet because all the threads get the input, to be able to update its table, but then only one thread produces the output. And the thread doesn't know whether it will be the one producing the output until it knows, what is the primary key in the data. So it can start sending the output only after it had seen the data. This whole scheme works because there is exactly one data row per each transaction. A more general approach might be to have the reader thread decide, which worker will produce the result and put this information (as either a copy of the primary key or the computed thread id) into the _BEGIN_ rowop.

    # the table gets updated for every incoming rate
    $lbInRate->makeChained("lbIn", undef, sub {
        my $ccy1 = $_[1]->getRow()->get("ccy1");
        # decide, whether this thread is to perform the join
        $compute = ((ord(substr($ccy1, 0, 1)) - ord('A')) % $workers == $identity);

        # this relies on every Xtray containing only one rowop,
        # otherwise one Xtray will be split into multiple
        if ($compute) {
            $unit->makeHashCall($lbResBegin, "OP_INSERT", seq => $seq, triead => $identity);
            select(undef, undef, undef, $delay) if ($delay);
        }

        # even with $compute is set, this might produce some output or not,
        # but the frame still goes out every time $compute is set, because
        # _BEGIN_ forces it
        $unit->call($lbRateInput->adopt($_[1]));
    });

There the decision is made of whether this join is to be computed for this thread, remembered in the flag $compute, and used to generate the _BEGIN_ rowop for the output. Then the table gets updated in any case ($lbRateInput is the table's input label). I've skipped over the table creation, it's the same as in the self-join example, and you can always find the full tetx of the example in xForkJoinMt.t.

    $tRate->getOutputLabel()->makeChained("lbCompute", undef, sub {
        return if (!$compute); # not this thread's problem
        ...
                $unit->call($result);
        }
    });
    ##################################################

    $owner->readyReady();

    $owner->mainLoop();
}

Here again I've skipped over the way the result is computed. The important part is that if the $compute flag is not set, the whole self-joining computation is not performed. The _END_ label is not touched, the flushing of transactions is taken care of by the mainLoop(). Note that the _BEGIN_ label is always sent if the data is designated to this thread, even if no output as such is produced. This is done because the collator needs to get an uninterrupted sequence of transactions. Otherwise it would not be able to say if some transaction has been dropped or is only delayed.

 sub collatorT # (@opts)
{
    my $opts = {};
    &Triceps::Opt::parse("collatorT", $opts, {@Triceps::Triead::opts,
        from => [ undef, \&Triceps::Opt::ck_mandatory ], # src nexus
        to => [ undef, \&Triceps::Opt::ck_mandatory ], # dest nexus
    }, @_);
    undef @_; # avoids a leak in threads module
    my $owner = $opts->{owner};
    my $unit = $owner->unit();

    my $faRes = $owner->importNexus(
        from => $opts->{from},
        import => "reader",
    );

    my $faPrint = $owner->importNexus(
        from => $opts->{to},
        import => "writer",
    );

    my $lbResult = $faRes->getLabel("result");
    my $lbResBegin = $faRes->getLabel("_BEGIN_");
    my $lbResEnd = $faRes->getLabel("_END_");

    my $lbPrintRaw = $faPrint->getLabel("raw");
    my $lbPrintCooked = $faPrint->getLabel("cooked");

    my $seq = 1; # next expected sequence
    my @trays; # trays held for reordering: $trays[0] is the slot for sequence $seq
        # (only of course that slot will be always empty but the following ones may
        # contain the trays that arrived out of order)
    my $curseq; # the sequence of the current arriving tray

The collator thread starts very much as usual. It has its expectation of the next tray in order, which gets set correctly. The trays that arrive out of order will be buffered in the array @trays. Well, more exactly, for simplicity, all the trays get buffered there and then sent on if their turn has come.But it's possible to make an optimized version that would let the data flow through immediately if it's arriving in order.

    # The processing of data after it has been "cooked", i.e. reordered.
    my $bindRes = Triceps::FnBinding->new(
        name => "bindRes",
        on => $faRes->getFnReturn(),
        unit => $unit,
        withTray => 1,
        labels => [
            "_BEGIN_" => sub {
                $unit->makeHashCall($lbPrintCooked, "OP_INSERT", text => $_[1]->printP("BEGIN"));
            },
            "result" => sub {
                $unit->makeHashCall($lbPrintCooked, "OP_INSERT", text => $_[1]->printP("result"));
            }
        ],
    );
    $faRes->getFnReturn()->push($bindRes); # will stay permanently

The data gets collected into trays through a binding that gets permanently pushed onto the facet's FnReturn. Then when the tray's order comes, it wil lbe simply called and will produce the print calls for the cooked data order.

    # manipulation of the reordering,
    # and along the way reporting of the raw sequence
    $lbResBegin->makeChained("lbBegin", undef, sub {
        $unit->makeHashCall($lbPrintRaw, "OP_INSERT", text => $_[1]->printP("BEGIN"));
        $curseq = $_[1]->getRow()->get("seq");
    });
    $lbResult->makeChained("lbResult", undef, sub {
        $unit->makeHashCall($lbPrintRaw, "OP_INSERT", text => $_[1]->printP("result"));
    });
    $lbResEnd->makeChained("lbEnd", undef, sub {
        my $tray = $bindRes->swapTray();
        if ($curseq == $seq) {
            $unit->call($tray);
            shift @trays;
            $seq++;
            while ($#trays >= 0 && defined($trays[0])) {
                # flush the trays that arrived misordered
                $unit->call(shift @trays);
                $seq++;
            }
        } elsif ($curseq > $seq) {
            $trays[$curseq-$seq] = $tray; # remember for the future
        } else {
            # should never happen but just in case
            $unit->call($tray);
        }
    });

    $owner->readyReady();

    $owner->mainLoop();
};

The input rowops get not only collected in the binding's tray but also chained directly to the labels that print the raw order of arrival. The handling of _BEGIN_ also remembers its sequence number.

The handler of _END_ (which rowops get produced implicitly at the end of transaction) then does the heavy lifting. It looks at the sequence number remembered from _BEGIN_ and makes the decision. If the received sequence is the next expected one, the data collected in the tray gets sent on immediately, and then the contents of the @trays array gets sent on until it hits a blank spot of missing data. Or if the received sequence leaves a gap, the tray is placed into an appropriate spot in @trays for later processing.

This whole logic can be encapsulated in a class but I haven't decided yet on the best way to do it. Maybe somewhere in the future.

Thursday, June 6, 2013

reordering the data from multiple threads, part 2

Before going to the example itself, let's talk more about the general issues of the data partitioning.

If the data processing is stateless, it's easy: just partition by the primary key, and each thread can happily work on its own. If the data depends on the previous data with the same primary key, the partitioning is still easy: each thread keeps the state for its part of the keys and updates it with the new data.

But what if you need to join two tables with independent primary keys, where the matches of the keys between them are fairly arbitrary? Then you can partition by one table but you have to give a copy of the second table to each thread. Typically, if one table is larger than the other, you would partition by the key of the big table, and copy the small table everywhere. Since the data rows are referenced in Triceps, the actual data of the smaller table won't be copied, it would be just referenced from multiple threads, but each copy will still have the overhead of its own index.

With some luck, and having enough CPUs, you might be able to save a little overhead by doing a matrix: if you have one table partitioned into the parts A, B and C, and the other table into parts 1, 2 and 3, you can then do a matrix-combination into 9 threads processing the combinations A1, A2, A3, B1, B2, B3, C1, C2, C3. If both tables are of about the same size, this would create a total of 18 indexes, each keeping 1/3 of one original table, so the total size of indexes will b 6 times the size of one original table (or 3 times the combined sizes of both tables). On the other hand, if you were to copy the first table to each thread and split the second table into 9 parts, creating the same 9 threads, the total size of indexes will be 9 times the first table and 1 times the second table, resulting in the 10 times the size of an original table (or 5 times the combined sizes of both tables). There is a win to this, but the catch is that the results from this kind of 3x3 matrix will really have to be restored to the correct order afterwards.

The reason is that when a row in the first table changes, it might make it join, say, in the thread A2 instead of the thread A1. So the thread A1 would generate a DELETE for the join result, and the thread A2 would generate a following INSERT. With two separate threads, the resulting order will be unpredictable, and the INSERT coming before the DELETE would be bad. The post-reordering is really in order.

By contrast, if you just partition the first table and copy the second everywhere, you get 9 threads A, B, C, D E, F, G, H, I, and the change in a row will still keep it in the same thread, so the updates will come out of that thread strictly sequentially. If you don't care about the order changes between different primary keys, you can get away without the post-reordering. Of course, if a key field might change and you care about it being processed in order, you'd still need the post-reordering.

The example I'm going to show is a somewhat of s strange mix. It's the adaptation of the Forex arbitration example from the section 12.13. Self-join done manually. As you can see from the name of the section, it's doing a self-join, kind of like going through the same table 3 times.

The partitioning in this example works as follows:
  • All the data is sent to all the threads. All the threads keep a full copy of the table and update it according to the input. 
  • But then they compute the join only if the first currency name in the update falls into the thread's partition. 
  • The partitioning is done by the first letter of the symbol, with interleaving: the symbols starting with A are handled by the thread 0, with B by thread 1, and so on until the threads end, and then continuing again with the thread 0. A production-ready implementation would use a hash function instead. But the interleaving approach makes much easier to predict, which symbol goes to which thread for the example.
  • Naturally, all this means that the loop of 3 currencies might get created by a change in one pair and then very quickly disappear by a change to another pair.of currencies. So the post-reordering of the result is important to keep the things consistent.
I've also added a tweak allowing to artificially slow down the thread 0, making the incorrect order to show up reliably, and make the reordering code really work. For example, suppose the following input sent quickly:

OP_INSERT,AAA,BBB,1.30
OP_INSERT,BBB,AAA,0.74
OP_INSERT,AAA,CCC,1.98
OP_INSERT,CCC,AAA,0.49
OP_INSERT,BBB,CCC,1.28
OP_INSERT,CCC,BBB,0.78
OP_DELETE,BBB,AAA,0.74
OP_INSERT,BBB,AAA,0.64

With two threads, and thread 0 working slowly, it would produce the raw result:

BEGIN OP_INSERT seq="2" triead="1"
BEGIN OP_INSERT seq="5" triead="1"
BEGIN OP_INSERT seq="7" triead="1"
result OP_DELETE ccy1="AAA" ccy2="CCC" ccy3="BBB" rate1="1.98" rate2="0.78" rate3="0.74" looprate="1.142856"
BEGIN OP_INSERT seq="8" triead="1"
BEGIN OP_INSERT seq="1" triead="0"
BEGIN OP_INSERT seq="3" triead="0"
BEGIN OP_INSERT seq="4" triead="0"
BEGIN OP_INSERT seq="6" triead="0"
result OP_INSERT ccy1="AAA" ccy2="CCC" ccy3="BBB" rate1="1.98" rate2="0.78" rate3="0.74" looprate="1.142856"

Here the BEGIN lines are generated by the code and show the sequence number of the input row and the id of the thread that did the join. The result lines show the arbitration opportunities produced by the join. Obviously, not every update produces a new result, most of them don't. But the INSERT and DELETE in the result come in the wrong order: the update 7 had overtaken the update 6.

The post-reordering comes to the resque and restores the order:

BEGIN OP_INSERT seq="1" triead="0"
BEGIN OP_INSERT seq="2" triead="1"
BEGIN OP_INSERT seq="3" triead="0"
BEGIN OP_INSERT seq="4" triead="0"
BEGIN OP_INSERT seq="5" triead="1"
BEGIN OP_INSERT seq="6" triead="0"
result OP_INSERT ccy1="AAA" ccy2="CCC" ccy3="BBB" rate1="1.98" rate2="0.78" rate3="0.74" looprate="1.142856"
BEGIN OP_INSERT seq="7" triead="1"
result OP_DELETE ccy1="AAA" ccy2="CCC" ccy3="BBB" rate1="1.98" rate2="0.78" rate3="0.74" looprate="1.142856"
BEGIN OP_INSERT seq="8" triead="1"

As you can see, now the sequence numbers go in the sequential order.