Saturday, June 29, 2013

Facet reference, Perl, part 1

A Facet represents a Nexus endpoint imported into a Triead. A facet is either a reader (reading from the nexus) or a writer (writing into the nexus).

In the Perl API the Facets are created by TrieadOwner::makeNexus() or TrieadOwner::importNexus(). After that the metadata in the Facet is fixed and is available for reading only. Of course, the rowops can then be read or written.

The reading of data from a Facet is done by TrieadOwner::nextXtray(). There is no way to read a tray from a particular facet, nextXtray() reads from all the Triead's imported reader facets, alternating in a fair fashion if more than one of them has data available.

Each Facet has an FnReturn connected to it. The reading from a reader facet happens by forwarding the incoming rowops to that FnReturn. To actually process the data, you can either chain your handler labels directly to the FnReturn labels, or push an FnBinding onto that FnReturn. An incoming Xtray is always processed as a unit, with no intermixing with the other Xtrays.

The writing to a writer facet happens by calling the labels of its FnReturn. Which then has the logic that collects all these rowops into a buffer. Then when the facet is flushed, that buffer becomes an indivisible Xtray that gets sent to the nexus as a unit, and then read by the reader facets as a unit.

The facet metadata consists of:
  • a set of labels, same as for an FnReturn, used to build the facet's internal FnReturn; these labels define the data that can be carried through the nexus;
  • a set of row types that gets exported through the nexus;
  • a set of table types that gets exported through the nexus.
The table types must not contain any references to the Perl functions, or the export will fail. The Perl code snippets in the text format can be used instead.

There are two special labels, named _BEGIN_ and _END_. They may be defined explicitly, but if they aren't, they will be always added implicitly, with an empty row type (i.e. a row type with no fields).

When reading an Xtray, the _BEGIN_ label will always be called first, and _END_ last, thus framing the rest of the data. There are optimizations that skip the calling if there is nothing chained to these labels in FnReturn nor to the top FnBinding, and the rowop as such carries no extra data. The optimization is actually a bit deeper: the _BEGIN_ and _END_ rowops that have no extra data in them aren't even carried in the Xtray through the nexus. They are generated on the fly if there is an interest in them, or otherwise the generation is skipped.

What is meant by the "extra data"? It's either the opcode is not OP_INSERT or there are some non-NULL fields (or both). If the _BEGIN_ and _END_ labels were auto-generated, their row type will contain no fields, so the only way to sent the non-default data in them will be the non-default opcode. But if you define them explicitly with a different row type, you can also send the data in them.

When sending the data into a writer Facet, you don't have to send the _BEGIN_ and _END_ rowops, if you don't, they will be generated automatically as needed, with the default contents (opcode OP_INSERT and NULLs in all the fields). Moreover, they will really be generated automatically on the reader side, thus saving the overhead of passing them through the nexus. Another consequence of this optimization is that it's impossible to create an Xtray consisting of only a default _BEGIN_, a default _END_ and no payload rowops between them. It would be an empty Xtray, that would never be sent through the nexus. Even if you create these _BEGIN_ and _END_ rowops manually (but with the default contents), they will be thrown away when they reach the writer facet. If you want an Xtray to get through, you've got to either send the payload or put something non-default into at least one of the _BEGIN_ or _END_ rowops, at the very minimum a different opcode.

Sending the _BEGIN_ and _END_ rowops into a writer facet also has the effect of flushing it. Even if these rowops have the default contents and become thrown away by the facet, the flushing effect still works. The _BEGIN_ rowop flushes any data that has been collected in the buffer before it. The _END_ rowop gets added to the buffer (or might get thrown away) and then flushes the buffer. If the buffer happens to contain anything at the flush time, that contents forms an Xtray and gets forwarded to the nexus.

It's a long and winding explanation, but really it just does what is intuitively expected.

A Facet has two names, the "full" one and the "short" one:
  • The full name is copied from the nexus and consists of the name of the thread that exported the nexus and the name of the nexus itself separated by a slash, such as "t1/nx1".
  • The short name is the name with which the facet was imported. By default it's taken from the short name of the nexus. But it can also be given a different explicit name during the import, which is known as the "as-name" (because it's similar to the SQL "AS" clause). So if the full name is "t1/nx1", the default short name will be "nx1", but it can be overridden. The facet's FnReturn is named with the facet's short name.

Friday, June 28, 2013

Nexus reference, Perl and C++

The Nexus class is pretty much opaque. It's created and managed entirely inside the App infrastructure, and even the public API for importing a nexus doesn't deal with the Nexus object itself, but only with its name. The only public use of the Nexus object is for the introspection and entertainment value, to see what Trieads export and import what Nexuses: pretty much the only way to get a Nexus reference is by listing the exports or imports of a Triead.

The Perl API of a Nexus is very limited and is very much an afterthought:

$nxname = $nx->getName();

Get the name of the nexus (the short name, inside the thread).

$tname = $nx->getTrieadName();

Get the name of the Triead that exported this nexus.

$result = $nx->same($nx2);

Check whether two Nexus references point to the same object.

$result = $nx->isReverse();

Check whether the Nexus is reverse.

$limit = $nx->queueLimit();

Get the queue limit of the nexus.

In C++ the corresponding methods are:

const string &getName() const;
const string &getTrieadName() const;
bool isReverse() const;
int queueLimit() const;

In C++ the Nexus object is an Mtarget, and safe to access from multiple threads. It's defined in app/Nexus.h.

TrieadOwner reference, C++

The TrieadOwner is defined in app/TrieadOwner.h. Its constructor is protected, the normal way of constructing is by calling App::makeTriead(). This is also called "defining a Triead".

TrieadOwner is an Starget, and must be accessed only from the thread that owns it (though it's possible to create it in the parent thread and then pass to the actual owner thread, as long as the synchronization between the threads is done properly).

Triead *get() const;

Get the public side if the Triead. In C++, unlike Perl, the Triead methods are not duplicated in TrieadOwner. So they are accessed through get(), and for example to get the Triead name, call to->get()->getName(). The TrieadOwner holds a reference to Triead, so the Triead object will never get destroyed as long as the TrieadOwner is alive.

App *app() const;

Get the App where this Triead belongs. The TrieadOwner holds a reference to App, so the App object will never get destroyed as long as the TrieadOwner is alive.

bool isRqDead() const;

Check whether this triead has been requested to die.

void requestMyselfDead();

Request this thread itself to die (see the reasoning for the in the Perl reference).

Unit *unit() const;

Get the main unit of this Triead. It has the same name as the Triead itself.

void addUnit(Autoref<Unit> u);

Keep track of an additional unit. Adding a unit multiple times has no effect. See the other implications in the Perl reference.

bool forgetUnit(Unit *u);

Forget about an additional unit. If the unit is already unknown, has no effect. The main unit can not be forgotten.

typedef list<Autoref<Unit> > UnitList;
const UnitList &listUnits() const;

List the tracked units. The main unit is always included at the front of the list.

void markConstructed();

Advance the Triead state to Constructed. Repeated calls have no effect.

void markReady();

Advance the Triead state to Ready. Repeated calls have no effect. The advancement is cumulative: if the Triead was not constructed yet, it will be automatically advanced first to Constructed and then to Ready. If this is the last Triead to become ready, it will trigger the App topology check, and if the check fails, abort the App and throw an Exception.

void readyReady();

Advance the Triead to the Ready state, and wait for all the Trieads to become ready. The topology check applies in the same way as in markReady().

void markDead();

Advance the Triead to the Dead state. If this Triead was not Ready yet and it's the last one to become so, the topology check will run. If the topology check fails, the App will be aborted but markDead() will not throw an exception.

This method is automatically called from the TrieadOwner destructor, so most of the time there is no need to call it explicitly.

It also clears all the tracked units.

void abort(const string &msg) const;

Abort the App. The name of this thread will be forwarded to the App along with the error message. The error message may be multi-line.

Onceref<Triead> findTriead(const string &tname, bool immed = false);

Find a Triead in the App by name. The flag immed controls whether this method may wait. If immed is false, and the target thread is not constructed yet (but at least declared), the method will sleep until it becomes constructed, and then returns it. If the target thread is not declared, it will throw an Exception. If immed is true, the look-up is immediate: it will return the thread even if it's not constructed but is at least defined. If it's not defined (even if it's declared), an Exception will be thrown. The look-up of this Triead itself is always immediate, irrespective of the immed flag.

An Exception may also be thrown if a circular sequence of Trieads deadlocks waiting for each other.

Onceref<Facet> exportNexus(Autoref<Facet> facet, bool import = true);

Export a Nexus. The Nexus definition is constructed as a Facet object, which is then used by this method to construct and export the Nexus. The same argument Facet reference is then returned back as the result. The import flag tells, whether the Nexus is to be also imported back by connecting the same original Facet object to it. If import is false, the original Facet reference is still returned back but it can't be used for anything, and can only be thrown away. The direction of the import (reading or writing) is defined in the Facet, as well as the nexus name and all the other information.

Throws an Exception on any errors, in particular on the duplicate facet names within the Triead.

Onceref<Facet> exportNexusNoImport(Autoref<Facet> facet);

A convenience wrapper around exportNexus() with import=false.

Onceref<Facet> importNexus(const string &tname, const string &nexname, const string &asname, bool writer, bool immed = false);

Import a Nexus from another Triead. tname is the name of the other thread, nexname is the name of nexus exported from it, asname is the local name for the imported facet ("" means "same as nexname"), the writer flag determines if the import is for writing, and the immed flag has the same meaning as in findTriead(). The import of a nexus involves finding its exporting thread, and the immed flag controls, how this finding is done.

Throws an Exception if anything is not found, or the local import name conflicts with another imported facet.

Onceref<Facet> importNexusImmed(const string &tname, const string &nexname, const string &asname, bool writer);
Onceref<Facet> importReader(const string &tname, const string &nexname, const string &asname = "", bool immed=false);
Onceref<Facet> importWriter(const string &tname, const string &nexname, const string &asname = "", bool immed=false);
Onceref<Facet> importReaderImmed(const string &tname, const string &nexname, const string &asname = "");
Onceref<Facet> importWriterImmed(const string &tname, const string &nexname, const string &asname = "");

Convenience wrappers for importNexus(), providing the default arguments and the more mnemonic names.

typedef map<string, Autoref<Nexus> > NexusMap;
void exports(NexusMap &ret) const;

Get the nexuses exported here. The map argument will be cleared and refilled with the new values.

typedef map<string, Autoref<Facet> > FacetMap;
void imports(FacetMap &ret) const;

Get the facets imported here. The map argument will be cleared and refilled with the new values.


NexusMaker *makeNexusReader(const string &name);
NexusMaker *makeNexusWriter(const string &name);
NexusMaker *makeNexusNoImport(const string &name);


A convenient way to build the nexuses for export in a chained fashion. The name argument is the nexus name. The NexusMaker is an opaque class that has the same building methods as a Facet, plus the method complete() that finishes the export. This call sequence is more convenient than building a Facet and then exporting it. For example:


Autoref<Facet> myfacet = ow->makeNexusReader("my")
    ->addLabel("one", rt1)
    ->addFromLabel("two", lb2)
    ->setContext(new MyFnContext)
    ->setReverse()
    ->complete();


Only one nexus may be built like this at a time, since there is only one instance of NexusMaker per TrieadOwner that gets reused over and over. It keeps the Facet instance being built in it. If you don't complete the build, that Facet instance will be left sitting around until another makeNexus*() calls when it will get thrown away. But in general if you do the calling in the sequence as shown, you can't forget to call complete() at the end, since otherwise the return type would not match and the compiler will fail.


bool flushWriters();


Flush all the writer facets. Returns true on the successfull completion, false if the thread was requested to die, and thus all the output was thrown away.


bool nextXtray(bool wait = true, const struct timespec &abstime = *(const struct timespec *)NULL);


Read and process the next Xtray. Automatically calls the flushWriters() after processing. The rest works in the same way as described in Perl, however this is a combination of Perl's nextXtray(), nextXtrayNoWait() and nextXtrayTimeLimit(). If wait is false, this method will never wait. If wait is true and the abstime reference is not NULL, it might wait but not past that absolute time. Otherwise it will wait until the data becomes available of the thread is requested to die.

Returns true if an Xtray has been processed, false if it wasn't (for any reason, a timeout expiring or thread being requested to die).

 bool nextXtrayNoWait();

A convenience wrapper over nextXtray(false).

bool nextXtrayTimeout(int64_t sec, int32_t nsec);

Another convenience wrapper over nextXtray(): read and process an Xtray, with a timeout limit. The timeout value consists of the seconds and nanoseconds parts.

void mainLoop();

Run the main loop, calling nextXtray() repeatedly until the thread is requested to die.

bool isRqDrain();

Check if a drain is currently requested by any thread (and applies to this thread). In case if an exclusive drain is requested with the exclusion of this thread, this method will return false.

void requestDrainShared();
void requestDrainExclusive();
void waitDrain();
bool isDrained();
void drainShared();
void drainExclusive();
void undrain();

The drain control, same as in Perl. These methods are really wrappers over the corresponding App methods. And generally a better idea is to do the scoped drains with AutoDrain rather than to call these methods directly.

The C++ API provides no methods for the file descriptor tracking as such. Instead these methods are implemented in the class FileInterrupt. TrieadOwner has a public field

Autoref<FileInterrupt> fileInterrupt_;

to keep an instance of the interruptor. TrieadOwner itself has no use for it, nor does any other part of Triceps, it's just a location to keep this reference for the convenience of the application developer. The Perl API makes use of this location.

But how does the thread then get interrupted when it's requested to die? The answer is that the TrieadJoin object also has a reference to the FileInterrupt object, and even creates that object in its own constructor. So when a joiner method is defined for a thread, that supplies the App with the access to its interruptor as well. And to put the file descriptors into the FileInterrupt object, you can either keep a direct reference to it somewhere in your code, or copy that reference into the TrieadOwner object.

Here is for example how the Perl thread joiner is created for the TrieadOwner inside the Perl implementation:

            string tn(tname);
            Autoref<TrieadOwner> to = appv->makeTriead(tn, fragname);
            PerlTrieadJoin *tj = new PerlTrieadJoin(appv->getName(), tname,
                SvIOK(tid)? SvIV(tid): -1,
                SvIOK(handle)? SvIV(handle): 0,
                testfail);
            to->fileInterrupt_ = tj->fileInterrupt();
            appv->defineJoin(tn, tj);

This is somewhat cumbersome, but the native C++ programs can create their threads using the class BasicPthread that takes care of this and more.

TrieadOwner and TrackedFile reference, Perl, part 4

The file interruption part of the API deals with how the thread handles a request to die. It should stop its work and exit, and for the normal threads that read from nexuses, this is fairly straightforward. The difficulty is with the threads that read from the outside sources (sockets and such). They may be in the middle of a read call, with no way to tell when the next chunk of data will arrive. These long calls on the file descriptors need to be interrupted when the thread is requested to die.

The interruption is done by revoking the file descriptors (dupping a /dev/null into it) and sending the signal SIGUSR2 to the thread. Even if the dupping doesn't interrupt the file operation, SIGUSR2 does, and on restart it will find that the file descriptor now returns to /dev/null and return immediately. Triceps defines a SIGUSR2 handler that does nothing, but you can override it with a custom one.

For this to work, Triceps needs to know, which file descriptors are to be revoked, which is achieved by registering for tracking them in the TrieadOwner. To avoid revoking the unrelated descriptors, they must be unregistered before closing. The normal sequence is:
  • open a file descriptor
  • register it for tracking
  • do the file operations
  • unregister the file descriptor
  • close it

The lowest-level calls deal with the raw tracking:

$to->trackFd($fd);

Register the file descriptor (obtained with a fileno()  or such) for tracking. The repeated calls for the same descriptor have no effect.

$to->forgetFd($fd);

Unregister the file descriptor. If the descriptor is not registered, the call is ignored.

The next level of the API deals with the file handles, extracting the file descriptors from them as needed.

$to->track(FILE);

Get a file descriptor from the file handle and track it.

$to->forget(FILE);

Get a file descriptor from the file handle and unregister it.

$to->close(FILE);

Unegister the file handle's descriptor then close the file handle. It's a convenience wrapper, to make the unregistering easier to remember.

The correct sequence might be hard to follow if the code involves some dying and evals catching these deaths. To handle that, the next level of the API provides the automatic tracking by scope. The scoping is done with the class Triceps::TrackedFile.Which is probably easier to describe right here.

A TrackedFile object keeps a reference of a file handle and also knows of its file descriptor being tracked by the TrieadOwner (so yes, it has a reference to the TrieadOwner too). Until the TrackedFile is destroyed, the file handle in it will never have its reference count go to 0, so it will never be automatically closed and destroyed. And when the TrackedFile is destroyed, first it tells the TrieadOwner to forget the file descriptor and only then unreferences the file handle, preserving the correct sequence.

And the scope of the TrackedFile is controlled by the scope of a variable that keeps a reference to it. If it's a local/my variable, TrackedFile will be destroyed on leaving the block, if it's a field in an object, it will be destroyed when the object is destroyed or the field is reset. Basically, the usual Perl scope business in Triceps.

If you want to close the file handle before leaving the scope of TrackedFile, don't call close() on the file handle. Instead, call close on the TrackedFile:

$trf->close();

This will again properly untrack the file descriptor, and then close the file handle, and remember that it has been closed, so no seconds attempt at that will be done when the TrackedFile gets destroyed.

There also are a couple of getter methods on the TrackedFile:

$fd = $trf->fd();

Get the tracked file descriptor. If the file handle has been already closed, will return -1.

$fh = $trf->get();

Get the tracked file handle. If the file handle had already been closed, will confess.

Now we get back to the TrieadOwner.

$trf = $to->makeTrackedFile(FILE);

Create a TrackedFile object from a file handle.

$trf = $to->makeTrackedFileFd(FILE, $fd);

The more low-level way to construct a TrackedFile, with specifying the file descriptor as a separate explicit argument. makeTrackedFile() is pretty much a wrapper that calls fileno() on the file handle and then calls makeTrackedFileFd().

And the following methods combine the loading of a file descriptor from the App and tracking it by the TrieadOwner. They are the most typically used interface for passing around the file handles through the App.

($trf, $file) = $to->trackDupFile($name, $mode);

Load the dupped file handle from the App, create an IO::Handle file object from it according to the $mode (in either r/w/a/r+/w+/a+ or </>/>>/+</+>/+>> format), and make a TrackedFile from it. Returns a pair of the TrackedFile object and the created file handle. The App still keeps the original file descriptor. The $mode must be consistent with the original mode of the file stored into the App. $name is the name used to store the file descriptor into the App.

($trf, $file) = $to->trackGetFile($name, $mode);


Similar to trackDupFile() only the file descriptor is moved from the App, and the App forgets about it.


($trf, $file) = $to->trackDupSocket($name, $mode);
($trf, $file) = $to->trackGetSocket($name, $mode);


Similar to the File versions, only create a file handle object of the class IO::Socket::INET.


($trf, $file) = $to->trackDupClass($name, $mode, $class);
($trf, $file) = $to->trackGetClass($name, $mode, $class);


Similar to the File versions, only creates the file handle of an arbitrary subclass of IO::Handle (with the name specified in $class). So for example if you ever want to load an IPv6 or Unix domain socket, these are the methods to use.

TrieadOwner reference, Perl, part 3

@exports = $to->exports();

The same as the method on the Triead, equivalent to $to->get()->exports(). The returned array contains the name-value pairs of the nexus names and objects.

@imports = $to->imports();

This method is different from the Triead method. It still returns an array of name-value pairs but the values are Facets (not Nexuses, as in Triead). It's a natural difference, since the facets are useful in the owner thread, and available only in it.

$result = $to->flushWriters();

Flush any data collected in the writer facets, sending them to the appropriate nexuses. The data in each facet becomes a tray that is sent to the nexus (if there was no data collected on a facet, nothing will be sent from it). Returns 1 if the flush was completed, 0 if the thread was requested to die and this the data was discarded. The data is never sent out of a facet by itself, it always must be flushed in one of the explicit ways (TrieadOwner::flushWriters(), Facet::flushWriter(), or enqueueing a rowop on the facet's labels _BEGIN_ and _END_). The flush may get stuck if this is an input-only thread and a drain is active, it will wait until the drain is released.

$to->requestMyselfDead();

Request this Triead itself to die. This is the way to disconnect from the nexuses while the thread is exiting on its own. For example, if this thread going to dump its data before exit to a large file that takes half an hour to write, normally the data queued for this thread might fill up the queues in the nexuses, and it's a bad practice to keep the other threads stuck due to the overflowing buffers. Requesting this thread to die disconnects it from the nexuses and prevents the data from collecting.The thread could also be disconnected by marking it dead, but it will keep the harvester stuck waiting to join it while the thread completes its long write, and that's not so  good either. So this call provides the solution, avoiding both pitfalls.

$result = $to->nextXtray();

Process one incoming tray from a single reader nexus (any nexus where data is available). A tray essentially embodies a transaction, and "X" stands for "cross-thread". There actually is the Xtray type that represents the Tray in a special thread-safe format but it's used only inside the nexuses and not visible from outside.

If there is currently no data to process, this method will wait.

The return value is 1 normally, or 0 if the thread was requested to die. So the typical usage is:

while($to->nextXtray()) { ... }

The method mainLoop() encapsulates the most typical usage, and nextXtray() needs to be used directly only in the more unusual circumstances.

The data is read from the reverse nexuses first, at a higher priority. If any reverse nexus has data available, it will always be read before the direct nexuses. A reverse nexus typically completes a topological loop, so this priority creates the preference to cycle the data through the loop until it comes out, before accepting more data into the loop. Since all the nexuses have non-zero-length queues, obviously, there will be multiple data items traveling through the loop, in different phases, but this priority solution limits the amount of data kept in the loop's queues and allows the queue flow control to prevent too much data from entering the loop.

The raised priority of the reverse nexuses can also be used to deliver the urgent messages. Remember, there is nothing preventing you from marking any nexus as reverse (as long as it doesn't create a loop consisting of only the reverse nexuses).

The downside of having the reverse nexuses connected to a thread is that it causes an extra overhead from the check with a mutex synchronization on each nextXtray(). The regular-priority direct nexuses use double-buffering, with locking a mutex only when the second buffer runs dry, and refilling it by swapping its contents with the whole collected first buffer. But the high-priority reverse nexuses have to be checked every time, even if they have no incoming data.

Within the same priority the data is processed in the round-robin order. More exactly, each refill of the double-buffering grabs the data from the first buffer of each facet and moves it to the second buffer. Then the second buffer is processed in the round-robin fashion until it runs out and another refill becomes needed.

The nextXtray() processes all the rowops from the incoming tray by calling them on the facet's FnReturn. Two special rowops are generated automatically even if they haven't been queued up explicitly, on the facet's labels _BEGIN_ and _END_ (to avoid extra overhead, they are actually generated only if there is any processing chained for them).

The nextXtray() automatically flushes the writers after processing a tray.

If a fatal error is encountered during processing (such as some code in a label died), nextXtray() will catch the exception, discard the rest of the tray and confess itself (without flushing the writers).

$result = $to->nextXtrayNoWait();

Similar to nextXtray(), but returns immediately if there is no data to process. Returns 0 if there is either no input data or the thread was requested to die (the way to differentiate between these cases is to call $to->isRqDead()).

$result = $to->nextXtrayTimeout($timeout);

Similar to nextXtrayNoWait(), only if there is no data, waits for up to the length of timeout. The timeout value is floating-point seconds. Returns 0 if the timeout has expired or the thread was requested to die.

$result = $to->nextXtrayTimeLimit($deadline);

Similar to nextXtrayNoWait(), only if there is no data, waits until the absolute deadline. The deadline value is time since epoch floating-point seconds, such as returned by Triceps::now(). Returns 0 if the wait reached the deadline or the thread was requested to die.



$to->mainLoop();


Process the incoming trays until the thread is requested to die. The exact implementation of the main loop (in C++) is:


void TrieadOwner::mainLoop()
{
    while (nextXtray())
        { }
}


It also used to call markDead() after the loop, and my earlier posts might say so, but now I've realized that it would not be advisable in the situations when the thread needs to do a lengthy saving of its state, blocking the harvester, so I've removed it.


The drain API of the TrieadOwner is very similar to the one in the App. The best way to do the drain is by the automatically-scoped AutoDrain class. If a drain doesn't need an automatic scoping, use the TrieadOwner API. And finally if you want to mess with drains from outside an app's Triead and thus don't have a TrieadOwner, only then use the App API.


$to->requestDrainShared();$to->requestDrainExclusive();
$to->waitDrain();
$to->drainShared();
$to->drainExclusive();
$to->undrain();

$result = $to->isDrained();



The methods are used in exactly the same way as the similar App methods, with only the difference of the names on the shared drains.

The exclusive drains always make the exclusion for this Triead. (Only one thread can be excluded from a drain). Normally the exclusive drains should be used only for the input-only threads. They could potentially be used to exclude the non-input-only thread too but I'm not sure, what's the point, and haven't worked out if it would work reliably (it might, or it might not).



$result = $to->isRqDrain();

Check whether a drain request is active. This can be used in the threads that generate data based on the real-time clock yet aren't input-only: if they find that a drain is active, they should refrain from generating the data and go back to the waiting. There is no way for them to find when the drain is released, so they should just continue to the next timeout as usual. Such code must use nextXtrayTimeout() or nextXtrayTimeLimit() for the timeouts, or the drain would never complete. The input-only threads don't have this limitation. And of course keep in mind that the better practice is to deal with the deal time either in the input-only threads or by driving it from outside the model altogether.


Tuesday, June 25, 2013

TrieadOwner reference, Perl, part 2

I've described it before but forgot to mention in the part 1 of the reference, the mark* methods bring the Triead to the target state through all the intermediate states. So if a Triead was not constructed, markReady() will mark it first constructed and then ready.

The same applies to markDead() but with an even more interesting twist. Suppose there is an application with an incorrect topology, and all the Trieads in it but one are ready. That last Triead then experiences an error, and proceeds directly to call markDead() and then exit. This markDead() will involve an intermediate step marking the Triead as ready. Since it's the last Triead to be ready, it will trigger the topology check, and since the topology is incorrect, it will fail. If it happened in markReady(), the method would confess. But in markDead() confessing doesn't make a whole lot of sense: after all, the thread is about to exit anyway. So markDead() will catch all these confessions and throw them away, it will never fail. However the failed check will still abort the App, and the rest of the threads will wake up and fail as usual.

The other thing about markDead() is that it clears and unreferences all the TrieadOwner's registered units, not waiting for the TrieadOwner object to be destroyed. This unravels any potential cyclic references where the code in a label might be referring back to the TrieadOwner.

And now continuing with more methods.

$facet = $to-> makeNexus(@options);

Create, export and (optionally) import a nexus. The result is an imported Facet of this Nexus, except when the options specify the no-import mode (then the result will be undef). Confesses on errors.

The options are:

name => $name
Name of the nexus, it will be used both as the export name and the local imported "as-name" of the facet.

labels => [
  name => $rowType,
  name => $fromLabel,
]
Defines the labels similarly to FnReturn in a referenced array. The array contains the pairs of (label_name, label_definition). The definition may be either a RowType, and then a label of this row type will be created, or a Label, and then a label of the same row type will be created and chained from that original label. The created label objects can be later found from Facets, and used like normal labels, by chaining them or sending rowops to them (but chaining from them is probably not the best idea, although it works anyway).

Optional, or may be an empty array; the implicit labels _BEGIN_ and _END_ will allways be added automatically if not explicitly defined.

The labels are used to construct an implicit FnReturn in the current Triead's main unit, and this is the FnReturn that will be visible in the Facet that gets imported back. If the import mode is "none", the FnReturn will still be  constructed and then abandoned (and freed by the reference count going to 0, as usual). The labels used as $fromLabel must always belong to the Triead's main unit.

rowTypes => [
  name => $rowType,
]
Defines the row types exported in this Nexus as a referenced array of name-value pairs. The types imported back into this Triead's facet will be references to the exact same type objects. Optional, or may be empty.

tableTypes => [
  name => $tableType,
]
Defines the table types exported in this Nexus as a referenced array of name-value pairs. The types imported back into this Triead's facet will be references to the exact same type objects. Optional, or may be empty.

reverse => 0/1
Flag: this Nexus goes in the reverse direction. The reverse nexuses are used to break up the topological loops, to prevent the deadlocks on the queueing. They have no limit on the queue size, and the data is read from them at a higher priority than from the direct nexuses. Default: 0.

chainFront => 0/1
Flag: when the labels are specified as $fromLabel, chain them at the front of the original labels. Default: 1. The default is this way because chaining at the front is what is typically needed. The reasons are described at length in the pipelined example, but the short gist is that you might want to send the rows from both the inputs, intermediate points, and the end of processing into an output nexus. It's most convenient to create the nexus in one go, after the whole thread's computation is defined. But the rowops from the earlier stages of computations have to come to the nexus before the rowops from the later stage. Chaining at the front ensures that each such label will send the rowop into the nexus first, and only then to the next stage of the computation.

queueLimit => $number
Defines the size limit after which the writes to the queue of this Nexus block. In reality because of the double-buffering the queue may contain up to twice that many trays before the future writes block. This option has no effect on the  reverse nexuses. Default: &Facet::DEFAULT_QUEUE_LIMIT, 500 or so.

import => $importType
A string value, essentially an enum, determining how this Nexus gets immediately imported back into this Triead. The supported values are:
  • reader (or anything starting from "read") - import for reading
  • writer (or anything starting from "write") - import for writing
  • none (or anything starting from "no") - do not import
The upper/lowercase doesn't matter. The use of the canonical strings is recommended.

$facet = $to->importNexus(@options);

Import a nexus into this Triead. Returns the imported Facet. The repeated attempts to import the same Nexus will return references to the same Facet object. Confesses on errors. An attempt to import the same nexus for both reading and writing is an error.

The options are:

from => "$t/$n"
Identifier of the nexus to import, consisting of two parts separated by a slash:
  •   thread name
  •   nexus name
The nexus name will also be used as the name of the local facet, unless overridden by the option "as". The reason for slash separator is that normally both the thread name and the nexus name parts may contain further components separated by dots, and a different separator allows to find the boundary between them. If a dot were used, in "a.b.c" it would be impossible to say, does it mean the thread "a" and nexus "b.c" in it, or thread "a.b" and nexus "c"? However "a/b.c" or "a.b/c" have no such ambiguity. Mutually exclusive with options "fromTriead" and "fromNexus".

fromTriead => $t
fromNexus => $n
The alternative way to specify the source thread and nexus as separate options. Both options must be present or absent at the same time. Mutually exclusive with "from".

as => $name
Specifies an override name for the local facet (and thus also to the FnReturn created in the facet). Logically similar to the SQL clause AS. Default is to reuse the nexus name.

import => $importType
A string value, essentially an enum, determining how this Nexus gets imported. The supported values are the same as for makeNexus (except "none", since there is no point in a no-op import):
  • reader (or anything starting from "read") - import for reading
  • writer (or anything starting from "write") - import for writing
The upper/lowercase doesn't matter.

immed => 0/1
Flag: do not wait for the thread that exported the nexus to be fully constructed. Waiting synchronizes with the exporter and prevents a race of an import attempt trying to find a nexus before it is made and failing. However if two threads are waiting for each other, it becomes a deadlock that gets caught and aborts the App. The immediate import allows to avoid such deadlocks for the circular topologies with helper threads.


The helper threads are the "blind alleys" in the topology: the "main thread" outsources some computation to a "helper thread", sending it the arguments, then later receiving the results and continuing with its logic.

For example, consider the following sequence:
  • thread A creates the nexus O;
  • thread A creates the helper thread B and tells it to import the nexus A/O for its input immediately and create the reverse nexus R for result;
  • thread A requests a (normal) import of the nexus B/R and falls asleep because B is not constructed yet;
  •     thread B starts running;
  •     thread B imports the nexus A/O immediately and succeeds;
  •     thread B defines its result nexus R;
  •     thread B marks itself as constructed and ready;
  • thread A wakes up after B is constructed, finds the nexus B/R and completes its import;
  • then thread A can complete its initialization, export other nexuses etc.
Default: 0, except if importing a nexus that has been exported from the same Triead. Importing from the same Triead is not used often, since the export also imports the nexus back right away, and there is rarely any use in importing separately. But it's possible, and importing back from the same Triead is always treated as immediate to avoid deadlocks.

TrieadOwner reference, Perl, part 1

TrieadOwner is the thread's private interface used to control its state and interact with the App (the App uses the thread's identity to detect the deadlocks). Whenever a Triead is constructed, its OS/Perl thread receives the TrieadOwner object for it.

Normally the TrieadOwner object is constructed inside Triead::start() or Triead::startHere() and passed to the thread's main function. The following constructor is used inside start(), and it's pretty much a private method. The only reason to use it would be if you want to do something very unusual, and even then you probably should write a wrapper method for your unusual thing and then call that wrapper method. The constructor constructs both Triead and TrieadOwner as a two sides of the same item, and registers the thread with the App.

$to = Triceps::TrieadOwner::new($tid, $handle, $appOrName, $tname, $fragname);

Here $tid is the Perl thread id where this TrieadOwner belongs (it can be obtained with $thr->tid()). $handle is the Perl thread's low-level handle (as in $thr->handle_()), it's the underlying POSIX thread handle, used to interrupt the thread on shutdown (the long story is that in the Perl threads the kill() call doesn't actually send a signal to another thread but just sends a flag, to interrupt a sleeping system call a real signal has to be delivered through the POSIX API). $handle is a dangerous argument, and passing a wrong value there may cause a crash.

Both $tid and $handle may be undef. If $tid is undef, the thread won't be joined by the harvester and you can either detach it or join it yourself. If either $tid or $handle is undef, the thread won't be interrupted on shutdown.

The signal used for interruption is SIGUSR2.Triceps sets its default handler that does nothing on this signal, but you can define your own handler instead.

$appOrName is the App object or its name that would be automatically looked up (or will confess if not found). $tname is the name of the previously created thread, that must be unique within the App (though it might be declared before). $fragname is the name of the fragment where the thread belongs, use "" for no fragment.

$app = $to->app();

Get the App where this Triead belongs.

$unit = $to->unit();

Whenever a Triead is constructed, a Unit is automatically  created to execute its logic. This call returns that unit. When the Triead is destroyed, the unit will be cleaned and unreferenced.

The unit is named the same as the thread.

$to->addUnit($moreUnit);

It's possible to split the Triead's logic into multiple units, all running in the same Perl thread. This call puts an extra unit under Triead's control, and has two effects: First, the unit will be referenced for the life of the Triead, and cleaned and unreferenced when the Triead is destroyed. Second, when the Triead's main loop runs, after each incoming rowop it will check all the controlled units for any rowops scheduled in them, and will run them until all such rowops are processed.

The names of the units are not checked in any way, it's your responsibility to name them sensibly and probably differently from each other.

The repeated calls with the same unit will have no effect.

$to->forgetUnit($moreUnit);

Pull a unit out of Triead's control. After that the cleaning of the unit becomes your responsibility. The thread's main unit can not be forgotten, the attempts to forget it will be simply ignored. The same goes for the units that aren't under the Triead's control in the first place, these calls are ignored.

@units = $to->listUnits();

Get the list of units under Triead's control. The main unit (the same as returned with $to->unit()) will always be the first in the list. The list contains only the unit references, not the name-value pairs (and you can always get the names from the unit objects themselves).

$triead = $to->get();

Get the public API of this Triead.

$name = $to->getName();

Get this Triead's name.

$frag = $to->fragment();

Get the name of this Triead's fragment ("" if not in a fragment).

$to->markConstructed();

Advance the Triead to the Constructed state. After that point no more nexuses may be exported in the Triead. Any look-ups by other Trieads for the Nexuses of this Triead will proceed at this point, either succeeding or failing (if no requested nexus is exported).

If the Triead is already in the Constructed or  later state, this call has no effect.

$to->markReady();

Advance the Triead to the Ready (fully initialized) state. After that point no more nexuses may be imported into this Triead.

If the App has been already shut down, this Triead will be immediately requested to die.

If this is the last Triead to become ready, this method will invoke the check for the topological correctness of the App. If the check finds an error (a loop of nexuses of the same direction), it will abort the App and confess with a message describing the nature of the error.

If the Triead is already in the Ready or  later state, this call has no effect.



$to->readyReady();

Mark this Triead as Ready and wait for all the App's Trieads to become Ready. There is no method that just waits for readiness because that would be likely causing a deadlock. When the thread waits for readiness, it must be ready itself, so this call does both. All the error checks of markReady() apply.

It is possible and reasonable to call this method repeatedly: more Trieads may be added to the App later, and it's a good idea to call readyReady() again before communicating with these new threads. Otherwise any rowops sent before these threads become ready will never arrive to these threads.

$to->markDead();

Mark this Triead as Dead. A dead thread will not receive any more input, and any its output will be thrown away. This notifies the harvester that it needs to join the Perl thread, so there should not be too much time left between making this call and exiting the Perl thread. The repeated calls have no effect.

Normally the Triead::start() and startHere() call markDead() automatically in their wrapper logic, and there is no need for a manual call. However if you decide to bypass them, you must call markDead() manually before exiting the thread, or the harvester will be stuck forever waiting for this thread to exit.

$to->abort($msg);

Abort the App with a message. This is a convenience wrapper that translates to App::abortBy().

$result = $to->isRqDead();

Check whether the thread was requested to die. For most threads, mainLoop() does this check automatically, and nextXtray() also returns the same value. However in the special cases, such as doing some long processing in response to a rowop, or doing some timeouts, it's best to do a manual check of isRqDead() periodically and abort the long operation if the thread has been requested to die, since any output will be thrown away anyway.

Note that even when the Triead has been requested to die, it still must call markDead() when it actually dies (normally the Triead::start() or startHere() takes care of it in its wrapper).

$result = $to->isConstructed();
$result = $to->isReady();

$result = $to->isDead();

$result = $to->isInputOnly();

Check the state of the Triead, the same as Triead methods.

Sunday, June 23, 2013

snapshot 1.0.93-20120632

I've decided to do one more snapshot before the release, largely to run it through CPAN's test infrastructure.

I've improved the detection of the NSPR library, and hopefully now it would build out of the box pretty much everywhere. Even if NSPR is not available, it would just use the implementation of the atomic integers through the mutexes.

Another build item, the Perl tests contain a dependency on the locale. They have the English text in some of the error strings received from the OS and Perl, so if you try to build in a non-English locale, these tests failed. To work around this issue, I've added "LANG=C" in the top-level Makefile. However if you run "make test" directly in the perl/Triceps, it has no such override (because the Makefile there is built by Perl). There just do you own "LANG=C make test".

The other items picked up in this snapshot are the last example of the fork-join topology, and other small fixes.

So far I've uploaded the snapshot to CPAN, and soon will upload it to the other locations as well.

Friday, June 21, 2013

Triead reference, C++

Naturally, Triead is a class that can be referenced from multiple threads and inherits from Mtarget. It's defined in app/Triead.h. (By the way, I forgot to mention it for App, but obviously App is also an Mtarget).

The meaning of the C++ methods is exactly the same as in Perl, only the format of values is slightly different. Obviously, the start* methods are Perl-only.

const string &getName() const;
const string &fragment() const;
bool isConstructed() const;
bool isReady() const;
bool isDead() const;
bool isInputOnly() const;

typedef map<string, Autoref<Nexus> > NexusMap;
void exports(NexusMap &ret) const;
void imports(NexusMap &ret) const;
void readerImports(NexusMap &ret) const;
void writerImports(NexusMap &ret) const;

The argument map gets cleared and then filled with the new returned contents.

Onceref<Nexus> findNexus(const string &srcName, const string &appName, const string &name) const;

This is a method unique to the C++ API. It looks up an exported nexus by name without involving the overhead of getting the whole map. Here name is the name of the nexus to look up (its short part, without the thread's name in it). If there is no such nexus, an Exception will be thrown.

The srcName and appName are used for the error message in the Exception: srcName is the name of the thread that requested the look-up, and appName is the name of the App where the threads belong. (It might seem surprising, but a Triead object has no reference to its App, and doesn't know the App's name. It has to do with the avoidance of the circular references).

Triead reference, Perl

The Triead class is the public interface of a Triceps thread, i.e. what of it is visible to the other threads. It's intended pretty much for introspection only, and all its method are only for reading the state. They are all synchronized but of course the thread may change its state at any moment. The Triead objects can be obtained by calling App::getTrieads().

The class also contains some static methods that are used to construct the Trieads.

$result = $t->same($t2);

Check that two Triead references point to the same Triead object.

$name = $t->getName();

Get the Triead's name.

$fragment = $t->fragment();

Get the name of the Triead's fragment. If the Triead doesn't belong to a fragment, returns "".

$result = $t->isConstructed();

Check whether the Triead has been constructed. (For the explanation of the Triead lifecycle states, see http://babkin-cep.blogspot.com/2013/04/the-triead-lifecycle.html.

$result = $t->isReady();

Check whether the Triead is ready.

$result = $t->isDead();

Check whether the Triead is dead.

$result = $t->isInputOnly();

Check whether the Triead is input-only, that is has no reader nexuses imported into it. When the Triead is created, this flag starts its life as false (0 for Perl), and then its correct value is computed when the Triead becomes ready. So, to check this flag correctly, you must first check that the Triead is ready.

@nexuses = $t-> exports();

Get the list of nexuses exported from this Triead,  as name-value pairs, suitable to be assigned into a hash. The values are the references to nexus objects.

@nexuses = $t->imports();
@nexuses = $t->readerImports();
@nexuses = $t->writerImports();


Get the list of nexuses imported into this Triead, as name-value pairs. The imports() returns the full list, without the ability to tell, which nexuses are imported for reading and which for writing, while readImports() and writeImports() return these subsets separately.

The names here are the "as-names" used for the import (the full names of the Nexuses can be obtained from the Nexus objects). The values are the references to nexus objects.

The next part of the API is the static construction methods. They really are wrappers to the TrieadOwner methods but Triead is a shorter name, and thus more convenient.

Triceps::Triead::start(@options);

Start a new Triead in a new Perl thread. The options are:

app => $appname
Name of the App that owns the new Triead. The App object will be looked up by name for the actual construction.

thread => $threadname
Name of the new Triead.

fragment => $fragname
Name of the new Triead's fragment. Default: "", which means no fragment.

immed => 0/1
Flag: when the new thread imports its nexuses, it should import them in the immediate mode. This flag is purely advisory, and the thread's main function is free to use or ignore it depending on its logic. It's provided as a convenience, since it's a typical concern for the helper threads. Default: 0.

main => \&function
The main function of the thread that will be called with all the options of start() plus some more:

    &$func(@opts, owner => $ownerObj)

The extra option of the main are:
owner: the TrieadOwner object constructed for this thread

Also, any other options may be added, and they will be forwarded to the main function without parsing. The main function is then free to parse them by itself, and if it finds any unknown options, it will fail.

For the convenience of writing the main functions, the set of "standard" options is provided in the global valiable

@Triceps::Triead::opts

The main function then uses this variable as a preable for any of its own options, for example:

sub listenerT          
{
    my $opts = {}; 
    &Triceps::Opt::parse("listenerT", $opts, {@Triceps::Triead::opts,
        socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
    }, @_);
    ...
}

Another convenience method is:

Triceps::Triead::startHere(@options);


It's very much the same as start() but starts the Triead in the current Perl thread. It's intended for the short-lived Apps that perform some computation and then have to return the result into the original thread. The unit tests are a good example of such apps.


And because of the typical usage, this method has some extra functionality compared to the plain start(): unless told otherwise, it will first create the App (with the name specified by the option "app"), then construct and run it, and after the main function of this thread exits, run the harvester and drop the App. So it just does the whole package, similar to App::build(), only typically the Triead started here runs as a normal real Triead, collects the results of computations and places them into some variables (global or referenced by the user-specific options of the main function).


This extra functionality can be disabled by the additional options (disable by setting them to 0):


harvest => 0/1
After the main function exits, automatically run the harvesrer. If you set it to 0, don't forget to call the harvester after this  function returns. Default: 1.

makeApp => 0/1
Before doing anything, create the App.  Obviously, this App must not exist yet. Default: 1.


These options are not passed through to the main function, unlike all the others.

App reference, C++

As usual, I won't be repeating the descriptions from the Perl reference, but just point out the methods and differences of the C++ version. And there are more detailed descriptions directly in the header files.

The static part of the API is:

static Onceref<App> make(const string &name);
static Onceref<App> find(const string &name);
static void drop(Onceref<App> app);

typedef map<string, Autoref<App> > Map;
static void listApps(Map &ret);

The App constructor is private, and the Apps get constructed with make(). make() throws an Exception if an App with this name already exists, find() throws an exception if an App with this name does not exist, and drop() just does nothing if its argument App had already been dropped.

All the operations on the global list of apps are internally synchronized and thread-safe. listApps() clears its argument map and fills it with the copy of the current list. Of course, after it returns and releases the mutex, other threads may create or delete apps, so the returned list (well, map) may immediately become obsolete. But since it all is done with the reference counters, the App objects will continue to exist and be operable as long as the references to them exist.

The App instance API is as follows. It's also all internally synchronized.

const string &getName() const;

Get the name of the App.

Onceref<TrieadOwner> makeTriead(const string &tname, const string &fragname = "");

Define a new Triead. This method is called either from the OS thread where the Triead will run or from its parent thread (unlike Perl, in C++ it's perfectly possible to pass the reference to the TrieadOwner from a parent OS thread to the thread that will run it). Either way, the OS thread that will run this Triead ends up with the TrieadOwner object reference, and no other thread must have it. The arguments are the thread name and fragment name, and the empty fragment name means that this thread won't belong to any fragment.

And just to reiterate, this does not create the OS thread. Creating the OS thread is your responsibility. This call only creates the Triceps Triead management structures to put it under control of the App.

If a thread with this name has already been defined, or if the thread name is empty, throws an Exception.

void declareTriead(const string &tname);

Declare a new thread. I forgot to tell it in the Perl API description, but declaring a thread more than once, or declaring a thread that has been already defined, is perfectly OK.

void defineJoin(const string &tname, Onceref<TrieadJoin> j);

This is a call without an analog in the Perl API. This defines a way for the harvester to join the thread. For the Perl API it happens to be hardcoded to join the Perl threads. But the C++ API can deal with any threads: POSIX ones, Perl ones, whatever. If a thread wants to be properly joined by the harvester, it must define its join interface, done as a TrieadJoin object. Each kind of threads will define its own subclass of TrieadJoin.

If there is no join defined for a Triead, then when it exits, the harvester will just update the state and manage the Triead object properly but won't do any joining. Which is useful in case if the OS thread is detached (not the best idea but doable) or if the Triead is created from the parent OS thread, and then the actual OS thread creation fails and there is nothing to join.

If the thread is not declared nor defined yet, defineJoin() throws an exception.

It's possible (though unusual) to call this method multiple times for the same thread. That would just replace the joiner object. The joiner is a reference-counted object, so the old object will just have itse reference count decreased. It's possible to pass the joiner as NULL, that would just drop the existing joiner, if any was defined.


typedef map<string, Autoref<Triead> > TrieadMap;
void getTrieads(TrieadMap &ret) const;

List the Trieads in this App. Same as with listing the Apps, the argument map gets cleared and then filled with the current contents.

void harvester(bool throwAbort = true);
bool harvestOnce();
void waitNeedHarvest();

The harvester API is very similar to Perl, with confession replaced by Exception, with the only difference of how the flag for throwing an Exception on detecting an App abort (the Exception will still be thrown only after joining all the App's threads).

The result of harvestOnce() is true if the App is dead. The Exceptions in harvestOnce() originate from the TrieadJoin::join() method that performs the actual joining. All the caveats apply in the same way as in Perl.

bool isDead();

Returns true if the App is dead.

void waitDead();

Wait for the App to become dead.

bool isAborted() const;

Returns true if the App is aborted.

string getAbortedBy() const;
string getAbortedMsg() const;

Get the thread name and message that caused the abort. If the App is not aborted, will return the empty strings.

void abortBy(const string &tname, const string &msg);

Abort the app, by the thread tname, with the error message msg.

bool isShutdown();


Returns true if the App has been requested to shut down.


 void shutdown();

Request the App to shut down. This involves interrupting all the threads in case if they are sleeping.  The interruption is another functionality of the TrieadJoin object. It's possible for the TrieadJoin interruptor to encounter an error and throw an Exception. If this happens, shutdown() will still go through all the Trieads and interrupt them, and then repackage the error messages from all the received Exceptions into one Exception and re-throw it.

Technically, this means that in the Perl API the shutdown might also confess, when its underlying C++ call returns an Exception. This should theoretically never happen, but practically you never know.

void shutdownFragment(const string &fragname);

Shutdown a fragment. All the logic described in the Perl API applies. Again, this involves interruption of all the threads in the fragment, and if any of them through Exceptions, these will be re-thrown as a single Exception.

enum {
    DEFAULT_TIMEOUT = 30,
};
void setTimeout(int sec, int fragsec = -1);

Set the readiness timeouts (main and fragment), in seconds. If the fragment timeout argument is <0, it gets set to the same value as the main timeout.

void setDeadline(const timespec &dl);

Set the deadline (unlike timeout, with fractional seconds) for the main readiness.

void refreshDeadline();

Explicitly refresh the deadline, using the fragment timeout.

void requestDrain();

Request a shared drain.

void requestDrainExclusive(TrieadOwner *to);

Request an exclusive drain, with the argument TrieadOwner. Unlike Perl API, the C++ API supports the methods for requesting the exclusive drain on both App and TrieadOwner classes (the TrieadOwner method is really a wrapper for the App method). In general, using the TrieadOwner method for this purpose probably looks nicer.

Since the TrieadOwner reference is really private to the OS thread that runs it, this method can be called only from that OS thread. Of course, being C++, you could pass it around to the other threads, but don't, TrieadOwner is not thread-safe internally and any operations on it must be done from one thread only.

void waitDrain();

Wait for the drain (either shared or exclusive) to complete.

void drain();

A combination of requestDrain() and waitDrain().

void drainExclusive(TrieadOwner *to);

A combination of requestDrainExclusive() and waitDrain().

bool isDrained();

Quickly check if the App is currently drained (should be used only if the App is known to be requested to drain).

void undrain();

End the drain sequence.

The file descriptor store/load API is really of not much use in C++ as such, in C++ it's easy to pass and share the file descriptors and file objects between the threads as-is. It has been built into the App class for the benefit of Perl and possibly other interpreted languages.

void storeFd(const string &name, int fd);

Store a file descriptor. Unlike the Perl API, the file descriptor is NOT dupped before storing. It's stored as is, and if you want dupping, you have to do it yourself. Throws an Exception if a file descriptor with this name is already stored.

int loadFd(const string &name) const;

Load back the file descriptor. Again, no dupping, returns the stored value as-is. If the name is unknown, returns -1.

bool forgetFd(const string &name);

Forget the file descriptor. Returns true if this name was known and became forgotten, or false if it wasn't known. Normally, you wold load the descriptor, take over its ownership, and then tell the App to forget it.

bool closeFd(const string &name);

Close the file descriptor (if it was known) and then forget it. Returns true if this name was known and became forgotten, or false if it wasn't known.

The rest of the Perl App methods have no analogs in C++. They are just purely Perl convenience wrappers.

App reference, Perl, part 2

Triceps::App::setTimeout($appOrName, $main_to_sec);
Triceps::App::setTimeout($appOrName, $main_to_sec, $frag_to_sec);

Set the readiness timeout. Even though Triceps is quite eager in watching for deadlocks in the threads topology, it's still possible to get some threads, and with them the whole program, stuck during initialization. For example, if you declare a thread and then never define it. These situations ere very unpleasant because you start the program and expect it to work but it doesn't, without any indication to why. So Triceps imposes an initialization timeout. The App (and thus all its threads) must become ready within the timeout after the definition or declaration of the last Triead. If not, the App will be aborted (and the error message will tell, which thread did not initialize in time). The same applies to the creation of Trieads (usually in the fragments) after the App is already running: all the threads must become ready within the timeout since the last thread has been defined or declared.

The default timeout is 30 seconds, or symbolically

&Triceps::App::DEFAULT_TIMEOUT

(subject to possible changes of the exact value in the future).

But the timeout can be changed. Technically, there are two timeouts:

  • one starts when the App is created
  • one restarts when any Triead is defined or declared

For the App to be aborted, both timeouts must expire. By default they are set to the same length, so only the second timeout really matters, since it will always be the last one to start and last one to end. But if you set the first timeout to be longer, you can allow for a longer initialization period at the App start ("main timeout") than later when more threads are added to a running App ("frag timeout", since the threads added later are typically the threads nin fragments).

The one-argument form of setTimeout() sets both timeouts to the same value, The two-argument form sets these timeouts separately.

The timeouts may be set only before the first thread has been created. This is largely due to the historical reasons of implementation, with the current implementation it should actually be safe to allow changing the timeouts at a later point as well, and this limitation may be removed in the future.

The timeout values are represented as integer whole seconds.

Note that it's still possible to get the initialization stuck without any indication if it gets stuck in the other libraries. The reason is that the harvester waits for all the threads to be joined before it propagates the error, so if a thread doesn't get joined, the harvester will be stuck. For example, if you open a file on NFS as a part of Triead initialization, and the NFS server doesn't respond, this thread will be stuck for a long time if not forever. The App will detect a timeout and try to interrupt the threads but the NFS operations are often not interruptable, so the harvester will wait for this thread to complete this operation and exit before it propagates the error, and thus the whole program will be silently stuck forever (and to avoid this, the NFS mounts should be done in the "soft" mode but it's a separate story). This will likely be improved in the future but it needs more thinking about how to do it right.

Triceps::App::setDeadline($appOrName, $deadline_sec);

Set the "main" timeout in the form of an absolute deadline. This is actually closer to the way it works internally: the limit is expressed as a deadline, and setTimeout() just adds the timeout value to the current time to compute the deadline, while setDeadline() sets it straight. Like setTimeout(), this may be called only before any Trieads were created.

The time is represented as floating-point seconds since epoch. Triceps::now() can be used to get the current time with the fractional seconds (or if you don't care about the fractional seconds, you can always use the stock time()).

Triceps::App::refreshDeadline($appOrName);

Restart the  "fragment" timeout. Same as when a Triead is defined or declared, only without involving a Triead. Again, the name has to do with how the time computations are done internally, computing the deadline from both timeouts. This method can be called at any time.

The next few methods have to do with the drains. Generally, using the AutoDrain class to automatically limit the scope of the drains is a better idea. But if the automatic scoping is not desired, the App methods can be used directly.

Triceps::App::requestDrain($appOrName);

Request a shared drain. Does not wait for the drain to complete. This method may be called repeatedly, potentially from multiple threads, which will keep the drain active and increase the recursion count. The drain will be released when undrain() is called the matching number of times.

If an exclusive drain by another thread is active when requestDrain() is called, the call will be stuck until the exclusive drain becomes undrained (and potentially more exclusive drains might be queued up before the shared drain, using the POSIX read-write-lock implementation). Otherwise the call will set the appropriate state and return immediately.

If this thread has requested an exclusive drain previously (and didn't undrain it yet), an attempt to get a shared drain in the same thread will likely deadlock. The same applies in the opposite order as well.

Once the shared drain is requested, the input-only threads will be blocked from sending more data (any their attempts to flush their write facets will get stuck until undrain), and the rest of the threads will continue churning through the data buffered in the nexuses until all the nexuses are empty and there is no more data to process (yes, these threads will continue writing to their write facets).

It is important to not request a shared drain from an input-only thread and then try to write more data into an output facet, that would deadlock. The whole concept is doable, but an exclusive drain must be used instead ("exclusive" means that a designated input-only thread is excluded from blocking).

Triceps::App::waitDrain($appOrName);

Wait for the requested shared drain to complete. This means that all the queues in all the nexuses have become empty.

The effect of waitDrain() without a preceding requestDrain() is undefined. If you definitely know that some other thread has requested a drain and didn't undrain yet, it will work as normal, so you can have any number of threads wait for drain in parallel. (And the semantics, shared or exclusive, will match that currently active request). If no thread requested a drain, it might either return immediately irrespective of the state of the nexuses or might wait for some other thread to request a drain and succeed.

This call may also be used with an exclusive (or shared) drain requested through a TrieadOwner or any drain requested through an AutoDrain (though a better style is to use the same object as used for requesting the drain).

Triceps::App::drain($appOrName);

A combination of requestDrain() and waitDrain() in one call.

Triceps::App::undrain($appOrName);

Release the drain and let the normal processing continue. Make sure to call it exactly the same number of times as the requestDrain().

This call may also be used with a drain requested through a TrieadOwner (though a better style is to use the same object as used for requesting the drain).

$result = Triceps::App::isDrained($appOrName);

Check whether the App is currently drained (i.e. all the nexuses are empty), without waiting for it. Returns 1 if drained, 0 if not. If no drain is active during this call, the result is undefined, not even in the "best effort" sense: it may return 1 even if there are millions of records queued up. The only reliable way is to request a drain first.

This call may also be used with a drain requested through a TrieadOwner or through an AutoDrain.


The next part of the API deals with the passing of the file descriptors between the Perl threads. The concepts have been described in detail before, this is just a short reference. This API works under the hood of TrieadOwner::TrackGetSocket() and friends, those desribed in http://babkin-cep.blogspot.com/2013/04/multithreaded-socket-server-part-3.html and neighboring posts, but can also be used directly.

Triceps::App::storeFd($appOrName, $name, $fd);

Store a file descriptor in the App object, allowing to load it into other threads, and thus pass it around between the threads. $name is the name for this descriptor that will later be used to get the file descriptor back (generally, you want to generate a unique name for each file descriptor stored to avoid confusion, and then pass this name to the target thread). $fd is the file descriptor, an integer number, normally received from fileno(). The file descriptor is dupped before it gets stored, so the original will continue to exist, and if you have no other use for it, should be closed.

If a file descriptor with this name already exists in the App, this call will confess.

$fd = Triceps::App::loadFd($appOrName, $name);

Load back the file descriptor that was previously stored. If no descriptor with such a name is stored in the App, it will confess. The descriptor will keep existing in the App, so to keep things consistent, there are two options:

One is to let your code take over the ownership of the file descriptor, and tell the App to forget about it with forgetFd().

The other one is to never close the received descriptor in your code (a good example would be to dup it right away for the future use and then leave the original alone), and let App keep its ownership.

$fd = Triceps::App::loadDupFd($appOrName, $name);

Very much the same as loadFd(), only does the dupping for you and returns the dupped descriptor. In this case your code is responsible for closing that descriptor. Which method is more suitable, loadFd() or dupFd(), depends on the nature of the code that will use the file descriptor and whether you want to leave the descriptor in the App for more threads to load.

Triceps::App::forgetFd($appOrName, $name);

Forget the file descriptor in the App. It doesn't get closed, so closing it becomes your responsibility, or it will leak. If no descriptor with this name exists, the call will confess.

Triceps::App::closeFd($appOrName, $name);

Close and forget the file descriptor in the App. If no descriptor with this name exists, the call will confess.

Triceps::App::storeFile($appOrName, $name, FILE);

A convenience wrapper for closeFd(), calls fileno() on the file and stores the resulting file descriptor.

Triceps::App::storeCloseFile($appOrName, $name, FILE);

Stores the file descriptor extracted from the file, and closes the original file (since the file descriptor gets dupped on store, that copy continues to exist in the App).

$file = Triceps::App::loadDupFile($appOrName, $name, $mode);

Load a file descriptor and build a file handle object (IO::Handle) from it. The mode string may be specified in either the open() format (</>/>>/+</+>/+>>) or the C stdio format (r/w/a/r+/w+/a+). Note that the mode must match or be a subset of the mode used to originally open the file descriptor. If you open a file read-only, store its descriptor, and load back as a write-only file, you will have a bad time.

There is no corresponding loadFile(), since loadFd() is a more dangerous method that is useful for the low-level operations but doesn't make much sense for the higher level.

$file = Triceps::App::loadDupSocket($appOrName, $name, $mode);

Load a file descriptor and build an IO::Socket::INET object from it. The mode string meaning is the same as for loadDupFile().


$file = Triceps::App::loadDupFileClass($appOrName, $name, $mode, $class);


Load a file descriptor and build an arbitrary class object from it. The class (specified by its name) should normally be a subclass of IO::Handle, or at the very least must implement the method new_from_fd() similar to IO::Handle. This is the common underlying implementation of but loadDupFile() and loadDupSocket().


The last part of the API is the convenience methods for building and starting an App.

Triceps::App::build($name,  \&builder );

Build an App instance. It creates the App instance, then the builder function is called to create the App's nexuses and threads, then the harvester is executed, that eventually destroys the App object after collecting all its threads. For a very basic example:

Triceps::App::build "a1", sub {
    Triceps::App::globalNexus(
        name => "types",
        rowTypes => [
            rt1 => $rt1,
        ],
    );
    Triceps::Triead::start(
        app => $Triceps::App::name,
        thread => "t1",
        main => sub {
            my $opts = {};
            &Triceps::Opt::parse("t1 main", $opts, {@Triceps::Triead::opts}, @_);
            my $to = $opts->{owner};
            $to->importNexus(
                from => "global/types",
                import => "writer", # if importing just for the types, use "writer"!
            );
            $to->readyReady();
        },
    );
};

The builder function runs in the current Perl thread, however from the logical standpoint it runs in the App's first Triead named "global" (this Triead name is hardcoded in build()). This allows you not to worry about the App being technically dead until the first Triead is created: by the time the builder function is called, the first Triead is already created. However it also means that you can't change the readiness timeouts. After the builder function returns, the global Triead exits, and the harvester starts in the same current Perl thread.

The builder has access to a few global variables:

  • $Triceps::App::name is the App name, from the build() first argument.
  • $Triceps::App::app is the reference to the App object.
  • $Triceps::App::global is the reference to the TrieadOwner object of the global Triead.
After the builder function exits, these variables become undefined.

Triceps::App::globalNexus(@nexusOptions);

Creates a nexus with the import mode of "none", on the global Triead. The arguments are the same options as for the normal nexus creation. This is a simple convenience wrapper for the nexus creation. Since the global thread is supposed to exit immediately, there is no point in importing this nexus into it.

The global thread is found by this function from $Triceps::App::global, so this method can only be used from inside the builder function of build().

Tuesday, June 18, 2013

$! !!!!!!

Turns out, the more recent versions of Perl (starting with 5.16.4 or maybe even earlier) have changed the way they treat the error variable $!. This special can not be assigned any arbitrary error text any more, now they want it to be a proper integer OS error code. So all the Triceps error reporting through $! breaks on the newer versions of Perl. The code still works, except that the text of the errors can not be extracted,

Well, looks like it's about time to bite the bullet and finally convert everything to the newer and better error reporting with confessions.

Sunday, June 16, 2013

App reference, Perl, part 1

The App API has two parts to it: The first part keeps track of all the App instances in the program, and allows to list and find them. The second part is the manipulations on a particular instance.

The global part of the API is:

@apps = Triceps::App::listApps();

Returns the array of name-value pairs, values containing the App references, listing all the Apps in the program (more exactly, in this Triceps library, you you compile multiple Triceps libraries together by renaming them, each of them will have its own Apps list). The returned array can be placed into a hash.

$app = Triceps::App::find($name);

Find an App by name. If an App with such a name does not exist, it will confess.

$app = Triceps::App:make($name);

Create a new App, give it the specified name, and put it on the list. The names must not be duplicate with the other existing Apps, or the method will confess.

drop($app);
drop($appName);

Drop the app, by reference or by name, from the list. The App is as usual reference-counted and will exist while there are references to it. The global list provides one of these references, so an App is guaranteed to exist while it's still on the list. When dropped, it will still be accessible through the existing references, but obviously will not be listed any more and could not be found by name.

Moreover, a new App with the same name can be added to the list. Because of this, dropping an App by name requires some care in case if there could be a new App created again with the same name: it creates a potential for a race, and you might end up dropping the new App instead of the old one. Of course, if it's the same thread that drops the old one and creates the new one, then there is no race. Dropping an application by name that doesn't exist at the moment is an error and will confess.

Dropping the App by reference theoretically allows to avoid the race: a specific object gets dropped, and if it already has been dropped, the call has no effect. Theoretically. However in practice Perl has a limitation of passing the object values between threads, and thus whenever each thread starts, first thing it does is finding its App by name. It's a very similar kind of race and is absolutely unavoidable except by making sure that all the App's threads have exited and joined (i.e. harvesting them). So make sure to complete the App's thread harvesting before dropping the App in the harvester thread, and by then it doesn't matter a whole lot if the App is dropped by name or by reference.

Now the second part of the API, working with an App instance.

Many (but not all) of the App methods allow to specify the App either by reference or by name, and they automatically sort it out, doing the internal look-up by name if necessary. So the same method could be used as either of:

$app->method(...);
Triceps::App::method($app, ...);
Triceps::App::method($appName, ...);

Obviously, you can not use the "->" syntax with a name, and obviously if the name is not in the app list, the method will confess. Below I'll show the calls that allow the dual formats as Triceps::App::method($appOrName, ...) but keep in mind that you can use the "->" form of them too with a reference.

$app = Triceps::App::resolve($appOrName);
Do just the automatically-sorting-out part: gets a reference or name and returns a reference either way. A newly created reference is returned in either case (not the argument reference). You can use this resolver before the methods that accept only a reference.

$result = $app->same($app2);

Check if two references are for the same App object. Here they both must be references and not names.

$name = $app->getName();

Get the name of the App, from a reference.

Triceps::App::declareTriead($appOrName, $trieadName);

Declare a Triead (Triceps thread) in advance. Any attempts to look up the nexuses in that thread will then wait for the thread to become ready. (Attempts to look up in an undeclared and undefined thread are errors). This is necessary to prevent a race at the thread creation time.  For the most part, the method Triead::start() just does the right thing by calling this method automatically and you don't need the use it manually, except in some very special circumstances.

@trieads = Triceps::App::getTrieads($appOrName);

Get the list of currently defined Trieads, as name-value pairs. Keep in mind that the other threads may be modifying the list of Trieads, so if you do this call multiple times, you may get different results. However the Trieads are returned as references, so they are guaranteed to stay alive and readable even if they get removed from the App, or even if the App gets dropped.

$app->harvester(@options);

Run the harvester in the current thread. The harvester gets notifications from the threads when they are about to exit, and joins them.  After all the threads have been joined, it automatically drops the App, and returns.
 
The harvesting is an absolutely necessary part of the App life cycle, however in most of the usage patterns (such as with Triead::startHere or App::build) the harvester is called implicitly from the wrapping library functions, so you don't need to care about it.

Note also that if you're running the harvester manually, you must call it only after the first thread has been defined or at least declared. Otherwise it will find no threads in the App, consider it dead and immediately drop it.

If the App was aborted, the harvester will normally confess after if had joined all the threads and disposed of the App, unless the option die_on_abort (see below) has been set to 0. This propagates the error to the caller. However there is a catch: if some of the threads don't react properly by exiting on an abort indication, the program will be stuck and you will see no error message until these threads get unstuck, possibly forever.

 Options:

 die_on_abort => 1/0

(default: 1) If the App was aborted, the harvester will normally confess after if had joined all the threads and disposed of the App.  Setting this option to 0 will make the aborts silently ignored. This option does not affect the errors in joining the threads: if any of those are detected, harvester will still confess after it had disposed of the app.

$dead = $app->harvestOnce();

Do one run of the harvesting.  Joins all the threads that have exited since its last call. If no threads have exited since then, returns immediately. Returns 1 if all the threads have exited (and thus the App is dead), 0 otherwise.  If a thread join fails, immediately confesses (if multiple threads were ready for joining, the ones queued after the failed one won't be joined, call harvestOnce() once more to join them).

$app->waitNeedHarvest();

Wait for at least one thread to become ready for harvesting. If the App is already dead (i.e. all its threads have exited), returns immediately.

These two methods allow to write the custom harvesters if you're not happy with the default one. The basic harvester logic can be written as:

do {
  $app->waitNeedHarvest()
} while(!$app->harvestOnce());
$app->drop();

However the real harvester also does some smarter things around the error handling. You can look it up in the source code in cpp/app/App.cpp.

$res = Triceps::App::isDead($appOrName);

Returns 1 if the App is dead (i.e. has no more live threads), otherwise 0.  Calling this method with a name for the argument is probably a bad idea, since normally the harvester will drop the App quickly after it becomes dead, and you may end up with this method confessing when it could not find the dropped App.

$res = Triceps::App::isShutdown($appOrName);

Returns 1  if the App has been requested to shut down, either normally or by being aborted.

$res = Triceps::App::isAborted($appOrName);

Returns 1 if the App has been aborted. The App may be aborted explicitly by calling the method abortBy(), or the thread wrapper logic automatically converts any unexpected deaths in the App's threads to the aborts. If any thread dies, this aborts the App, which in turn requests the other threads to die on their next thread-related call. Eventually the harvester collects them all and confesses, normally making the whole program die with an error.

($tname, $message) = Triceps::App::getAborted($appOrName);

Get the App abort information: name of the thread that caused the abort, and its error message.

Triceps::App::abortBy($appOrName, $tname, $msg);

Abort the application. The thread name and message will be remembered, and returned later by getAborted() or in the harvester. If abortBy() is called multiple times, only the first pair of thread name and message gets remembered. The reason is that on abort all the threads get interrupted in a fairly rough manner (all their ongoing and following calls to the threading API die), which typically causes them to call abortBy() as well, and there is no point in collecting these spurious messages.

The thread name here doesn't have to be the name of the actual thread that reports the issue. For example, if the thread creation as such fails (maybe because the OS limit on the thread count) that gets detected by the parent thread but reported in the name of the thread whose creation has failed. And in general you can pass just any string as the thread name, App itself doesn't care, just make it something that makes sense to you.

$res = Triceps::App::isDead($appOrName);


Returns 1 if the App is dead (i.e. it has no alive Trieads, all the defined and declared threads have exited). Right after the App is created, before the first Triead is created, the App is also considered dead, and becomes alive when the first Triead is declared or defined. If an App becomes dead later, when all the Trieads exit, it can still be brought back alive by creating more Trieads. But this is considered bad practice, and will cause a race with the harvester (if you want to do this, you have to make your own custom harvester).

$res = Triceps::App::isShutdown($appOrName);


Returns 1 if the App was requested to shut down. The Trieads might still run for some time, until they properly detect and process the shutdown, and exit. So this condition is not equivalent to Dead, althouh they are connected. If any new Trieads get started, they will be shut down right away and won't run.

To reiterate: if all the Trieads just exit by themselves, the App becomes dead but not shut down. You could still start more Trieads and bring the App back alive. If the App has been shut down, it won't become immediately dead, but it will send the shutdown indication to all the Trieads, and after all of them eventually exit, the App will become dead too. And after shutdown there is no way to bring the App back alive, since any new Trieads will be shut down right away (OK, there might be a short period until they detect the shutdown, so the App could spike as alive for a short time, but then will become dead again).

Triceps::App::waitDead($appOrName);


Will wait for the App to become dead and return after that. Make sure to not call waitDead() from any of App's Trieads: that would cause a deadlock.


Triceps::App::shutdown($appOrName);

Shut down the App. The shutdown state is sticky, so any repeated calls will have no effect. The call returns immediately and doesn't wait for the App to die. If you want to wait, call waitDead() afterwards. Make sure to not call waitDead() from a Triead: that would cause a deadlock.


Triceps::App::shutdownFragment($appOrName, $fragName);

Shut down a named fragment. This does not shut down the whole App, it just selectively shuts down the Trieads belonging to this fragment . See the explanation of the fragments in http://babkin-cep.blogspot.com/2013/03/triceps-multithreading-concepts.html. The fragment shutdown is not sticky: after a fragment has been shut down, it's possible to create another fragment with the same name. To avoid races, a fragment may be shut down only after all its Trieads are ready. So the caller Triead must call readyReady() before it calls shutdownFragment(). If any of the fragment's Trieads are not ready, the call will confess.