Showing posts with label app. Show all posts
Showing posts with label app. Show all posts

Thursday, May 1, 2014

auto-detecting the file handle class when passing it through the App

I've been editing the documentation for the passing of the file descriptors between threads, and I've realized that there is no need to specify the class of the Perl file handle when it gets loaded from the App. Instead the name of the class can be easily stored along with the file descriptor and then extracted back. So I went and changed the code to do that. The modifications are:

In the App class the method storeFd() takes an extra argument:

$app->storeFd($name, $fd, $className);

The $className specifies the class of the file object. The empty string can be used as a synonym for "IO::Handle", since ref() returns an empty string for the globs of the old-fashioned file handles.

The methods loadFd() and loadDupFd() now return two values:

($fd, $fclass) = $app->loadFd($name);
($fd, $fclass) = $app->loadDupFd($name);

The second returned value is the class name, as it was stored by storeFd().

And the methods App::loadDupSocket(), TrieadOwner::trackDupSocket() and TrieadOwner::trackGetSocket() have been removed. Instead App::loadDupFile(), TrieadOwner::trackDupFile() and TrieadOwner::trackGetFile() have been updated to get the stored file handle class and use it transparently, so they just work correctly for the sockets now.

The methods App::loadDupFileClass(), trieadOwner::trackDupClass() and TrieadOwner::trackGetClass() are still present, in case if you would want to override the class name, but now they should be pretty much never needed, since any class names should be handled automatically without the need for overrides.

And the C++ App class got changed a little bit as well, with the extended versions of storeFd() and loadFd():

void storeFd(const string &name, int fd);
void storeFd(const string &name, int fd, const string &className);
int loadFd(const string &name, string *className = NULL) const;

The new interface is backwards-compatible with the old one but also has the provision for storing and loading the file class name.

Sunday, July 14, 2013

BasicPthread reference (C++)

As you can see from the previous descriptions, building a new Triead is a serious business, containing many moving part. Doing it every time from scratch would be hugely annoying and error prone. The class BasicPthread, defined in app/BasicPthread.h,  takes care of wrapping all that complicated logic.

It originated as a subclass of pw::pwthread, and even though it ended up easier to copy and modify the code (okay, maybe this means that pwthread can be made more flexible), the usage is still very similar to it. You define a new subclass of BasicPthread, and define the virtual function execute() in it. Then you instantiate the object and call the method start() with the App argument.

For a very simple example:

class MainLoopPthread : public BasicPthread
{
public:
    MainLoopPthread(const string &name):
        BasicPthread(name)
    {
    }

    // overrides BasicPthread::start
    virtual void execute(TrieadOwner *to)
    {
        to->readyReady();
        to->mainLoop();
    }
};

...

Autoref<MainLoopPthread> pt3 = new MainLoopPthread("t3");
pt3->start(myapp);

It will properly create the Triead, TrieadOwner, register the thread joiner and start the execution. The TrieadOwner will pass through to the execute() method, and its field fi_ will contain the reference to the FileInterrupt object. After execute() returns, it will take care of marking the thread as dead.

It also wraps the call of execute() into a try/catch block, so any Exceptions thrown will be caught and cause the App to abort. In short, it's very similar to the Triead management in Perl.

You don't need to keep the reference to the thread object afterwards, you can even do the construction and start in one go:

(new MainLoopPthread("t3"))->start(myapp);

 The internals of BasicPthread will make sure that the object will be dereferenced (and thus, in the absence of other references, destroyed) after the thread gets joined by the harvester.

Of course, if you need to pass more arguments to the thread, you can define them as fields in your subclass, set them in the constructor (or by other means between constructing the object and calling start()), and then execute() can access them. Remember, execute() is a method, so it receives not only the TrieadObject as an argument but also the BasicPthread object as this.

BasicPthread is implemented as a subclass of TrieadJoin, and thus is an Mtarget. It provides the concrete implementation of the joiner's virtual methods, join() and interrupt(). Interrupt() calls the method of the base class, then sends the signal SIGUSR2 to the target thread.

And finally the actual reference:

BasicPthread(const string &name);

Constructor. The name of the thread is passed through to App::makeTriead(). The Triead will be constructed in start(), the BasicPthread constructor just collects together the arguments.

void start(Autoref<App> app);

Construct the Triead, create the POSIX thread, and start the execution there.

void start(Autoref<TrieadOwner> to);

Similar to the other version of start() but uses a pre-constructed TrieadOwner object. This version is useful mostly for the tests, and should not be used much in the real life.

virtual void execute(TrieadOwner *to);

Method that must be redefined by the subclass, containing the threads's logic.

virtual void join();
virtual void interrupt();

Methods inherited from TrieadJoin, providing the proper implementations for the POSIX threads.

And unless, I've missed something, this concludes the description of the Triceps threads API.

Friday, July 12, 2013

AutoDrain reference, Perl

The AutoDrain class creates the drains on an App with the automatic scoping. When the returned AutoDrain object gets destroyed, the drain becomes released. So placing the object into a lexically-scoped variable in a block with cause the undrain on the block exit. Placing it into another object will cause the undrain on deletion of that object. And just not storing the object anywhere works as a barrier: the drain gets completed and then immediately undrained, guaranteeing that all the previously sent data is processed and then continuing with the processing of the new data.

All the drain caveats described in the App class apply to the automatic drains too.

$ad = Triceps::AutoDrain::makeShared($app);
$ad = Triceps::AutoDrain::makeShared($to);

Create a shared drain and wait for it to complete. A drain may be created from either an App or a TrieadOwner object. Returns the AutoDrain object.

$ad = Triceps::AutoDrain::makeSharedNoWait($app);
$ad = Triceps::AutoDrain::makeSharedNoWait($to);

Same as makeShared() but doesn't wait for the drain to complete before returning. May still sleep if an exclusive drain is currently active.

$ad = Triceps::AutoDrain::makeExclusive($to);

Create an exclusive drain on a TrieadOwner and wait for it to complete. Returns the AutoDrain object. Normally the excluded thread should be input-only. Such an input-only thread is allowed to send more data in without blocking (to wait for the app become drained again after that, use the method wait()).

$ad = Triceps::AutoDrain::makeExclusiveNoWait($to);

Same as makeExclusive() but doesn't wait for the drain to complete before returning. May still sleep if a shared or another exclusive drain is currently active.

$ad->wait();

Wait for the drain to complete. Particularly useful after the NoWait creation, but can also be used to wait for the App to become drained again after injecting some rowops through the excluded Triead of the exclusive drain.

$ad->same($ad2);

Check that two AutoDrain references point to the same object.

Friday, June 21, 2013

Triead reference, Perl

The Triead class is the public interface of a Triceps thread, i.e. what of it is visible to the other threads. It's intended pretty much for introspection only, and all its method are only for reading the state. They are all synchronized but of course the thread may change its state at any moment. The Triead objects can be obtained by calling App::getTrieads().

The class also contains some static methods that are used to construct the Trieads.

$result = $t->same($t2);

Check that two Triead references point to the same Triead object.

$name = $t->getName();

Get the Triead's name.

$fragment = $t->fragment();

Get the name of the Triead's fragment. If the Triead doesn't belong to a fragment, returns "".

$result = $t->isConstructed();

Check whether the Triead has been constructed. (For the explanation of the Triead lifecycle states, see http://babkin-cep.blogspot.com/2013/04/the-triead-lifecycle.html.

$result = $t->isReady();

Check whether the Triead is ready.

$result = $t->isDead();

Check whether the Triead is dead.

$result = $t->isInputOnly();

Check whether the Triead is input-only, that is has no reader nexuses imported into it. When the Triead is created, this flag starts its life as false (0 for Perl), and then its correct value is computed when the Triead becomes ready. So, to check this flag correctly, you must first check that the Triead is ready.

@nexuses = $t-> exports();

Get the list of nexuses exported from this Triead,  as name-value pairs, suitable to be assigned into a hash. The values are the references to nexus objects.

@nexuses = $t->imports();
@nexuses = $t->readerImports();
@nexuses = $t->writerImports();


Get the list of nexuses imported into this Triead, as name-value pairs. The imports() returns the full list, without the ability to tell, which nexuses are imported for reading and which for writing, while readImports() and writeImports() return these subsets separately.

The names here are the "as-names" used for the import (the full names of the Nexuses can be obtained from the Nexus objects). The values are the references to nexus objects.

The next part of the API is the static construction methods. They really are wrappers to the TrieadOwner methods but Triead is a shorter name, and thus more convenient.

Triceps::Triead::start(@options);

Start a new Triead in a new Perl thread. The options are:

app => $appname
Name of the App that owns the new Triead. The App object will be looked up by name for the actual construction.

thread => $threadname
Name of the new Triead.

fragment => $fragname
Name of the new Triead's fragment. Default: "", which means no fragment.

immed => 0/1
Flag: when the new thread imports its nexuses, it should import them in the immediate mode. This flag is purely advisory, and the thread's main function is free to use or ignore it depending on its logic. It's provided as a convenience, since it's a typical concern for the helper threads. Default: 0.

main => \&function
The main function of the thread that will be called with all the options of start() plus some more:

    &$func(@opts, owner => $ownerObj)

The extra option of the main are:
owner: the TrieadOwner object constructed for this thread

Also, any other options may be added, and they will be forwarded to the main function without parsing. The main function is then free to parse them by itself, and if it finds any unknown options, it will fail.

For the convenience of writing the main functions, the set of "standard" options is provided in the global valiable

@Triceps::Triead::opts

The main function then uses this variable as a preable for any of its own options, for example:

sub listenerT          
{
    my $opts = {}; 
    &Triceps::Opt::parse("listenerT", $opts, {@Triceps::Triead::opts,
        socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
    }, @_);
    ...
}

Another convenience method is:

Triceps::Triead::startHere(@options);


It's very much the same as start() but starts the Triead in the current Perl thread. It's intended for the short-lived Apps that perform some computation and then have to return the result into the original thread. The unit tests are a good example of such apps.


And because of the typical usage, this method has some extra functionality compared to the plain start(): unless told otherwise, it will first create the App (with the name specified by the option "app"), then construct and run it, and after the main function of this thread exits, run the harvester and drop the App. So it just does the whole package, similar to App::build(), only typically the Triead started here runs as a normal real Triead, collects the results of computations and places them into some variables (global or referenced by the user-specific options of the main function).


This extra functionality can be disabled by the additional options (disable by setting them to 0):


harvest => 0/1
After the main function exits, automatically run the harvesrer. If you set it to 0, don't forget to call the harvester after this  function returns. Default: 1.

makeApp => 0/1
Before doing anything, create the App.  Obviously, this App must not exist yet. Default: 1.


These options are not passed through to the main function, unlike all the others.

App reference, C++

As usual, I won't be repeating the descriptions from the Perl reference, but just point out the methods and differences of the C++ version. And there are more detailed descriptions directly in the header files.

The static part of the API is:

static Onceref<App> make(const string &name);
static Onceref<App> find(const string &name);
static void drop(Onceref<App> app);

typedef map<string, Autoref<App> > Map;
static void listApps(Map &ret);

The App constructor is private, and the Apps get constructed with make(). make() throws an Exception if an App with this name already exists, find() throws an exception if an App with this name does not exist, and drop() just does nothing if its argument App had already been dropped.

All the operations on the global list of apps are internally synchronized and thread-safe. listApps() clears its argument map and fills it with the copy of the current list. Of course, after it returns and releases the mutex, other threads may create or delete apps, so the returned list (well, map) may immediately become obsolete. But since it all is done with the reference counters, the App objects will continue to exist and be operable as long as the references to them exist.

The App instance API is as follows. It's also all internally synchronized.

const string &getName() const;

Get the name of the App.

Onceref<TrieadOwner> makeTriead(const string &tname, const string &fragname = "");

Define a new Triead. This method is called either from the OS thread where the Triead will run or from its parent thread (unlike Perl, in C++ it's perfectly possible to pass the reference to the TrieadOwner from a parent OS thread to the thread that will run it). Either way, the OS thread that will run this Triead ends up with the TrieadOwner object reference, and no other thread must have it. The arguments are the thread name and fragment name, and the empty fragment name means that this thread won't belong to any fragment.

And just to reiterate, this does not create the OS thread. Creating the OS thread is your responsibility. This call only creates the Triceps Triead management structures to put it under control of the App.

If a thread with this name has already been defined, or if the thread name is empty, throws an Exception.

void declareTriead(const string &tname);

Declare a new thread. I forgot to tell it in the Perl API description, but declaring a thread more than once, or declaring a thread that has been already defined, is perfectly OK.

void defineJoin(const string &tname, Onceref<TrieadJoin> j);

This is a call without an analog in the Perl API. This defines a way for the harvester to join the thread. For the Perl API it happens to be hardcoded to join the Perl threads. But the C++ API can deal with any threads: POSIX ones, Perl ones, whatever. If a thread wants to be properly joined by the harvester, it must define its join interface, done as a TrieadJoin object. Each kind of threads will define its own subclass of TrieadJoin.

If there is no join defined for a Triead, then when it exits, the harvester will just update the state and manage the Triead object properly but won't do any joining. Which is useful in case if the OS thread is detached (not the best idea but doable) or if the Triead is created from the parent OS thread, and then the actual OS thread creation fails and there is nothing to join.

If the thread is not declared nor defined yet, defineJoin() throws an exception.

It's possible (though unusual) to call this method multiple times for the same thread. That would just replace the joiner object. The joiner is a reference-counted object, so the old object will just have itse reference count decreased. It's possible to pass the joiner as NULL, that would just drop the existing joiner, if any was defined.


typedef map<string, Autoref<Triead> > TrieadMap;
void getTrieads(TrieadMap &ret) const;

List the Trieads in this App. Same as with listing the Apps, the argument map gets cleared and then filled with the current contents.

void harvester(bool throwAbort = true);
bool harvestOnce();
void waitNeedHarvest();

The harvester API is very similar to Perl, with confession replaced by Exception, with the only difference of how the flag for throwing an Exception on detecting an App abort (the Exception will still be thrown only after joining all the App's threads).

The result of harvestOnce() is true if the App is dead. The Exceptions in harvestOnce() originate from the TrieadJoin::join() method that performs the actual joining. All the caveats apply in the same way as in Perl.

bool isDead();

Returns true if the App is dead.

void waitDead();

Wait for the App to become dead.

bool isAborted() const;

Returns true if the App is aborted.

string getAbortedBy() const;
string getAbortedMsg() const;

Get the thread name and message that caused the abort. If the App is not aborted, will return the empty strings.

void abortBy(const string &tname, const string &msg);

Abort the app, by the thread tname, with the error message msg.

bool isShutdown();


Returns true if the App has been requested to shut down.


 void shutdown();

Request the App to shut down. This involves interrupting all the threads in case if they are sleeping.  The interruption is another functionality of the TrieadJoin object. It's possible for the TrieadJoin interruptor to encounter an error and throw an Exception. If this happens, shutdown() will still go through all the Trieads and interrupt them, and then repackage the error messages from all the received Exceptions into one Exception and re-throw it.

Technically, this means that in the Perl API the shutdown might also confess, when its underlying C++ call returns an Exception. This should theoretically never happen, but practically you never know.

void shutdownFragment(const string &fragname);

Shutdown a fragment. All the logic described in the Perl API applies. Again, this involves interruption of all the threads in the fragment, and if any of them through Exceptions, these will be re-thrown as a single Exception.

enum {
    DEFAULT_TIMEOUT = 30,
};
void setTimeout(int sec, int fragsec = -1);

Set the readiness timeouts (main and fragment), in seconds. If the fragment timeout argument is <0, it gets set to the same value as the main timeout.

void setDeadline(const timespec &dl);

Set the deadline (unlike timeout, with fractional seconds) for the main readiness.

void refreshDeadline();

Explicitly refresh the deadline, using the fragment timeout.

void requestDrain();

Request a shared drain.

void requestDrainExclusive(TrieadOwner *to);

Request an exclusive drain, with the argument TrieadOwner. Unlike Perl API, the C++ API supports the methods for requesting the exclusive drain on both App and TrieadOwner classes (the TrieadOwner method is really a wrapper for the App method). In general, using the TrieadOwner method for this purpose probably looks nicer.

Since the TrieadOwner reference is really private to the OS thread that runs it, this method can be called only from that OS thread. Of course, being C++, you could pass it around to the other threads, but don't, TrieadOwner is not thread-safe internally and any operations on it must be done from one thread only.

void waitDrain();

Wait for the drain (either shared or exclusive) to complete.

void drain();

A combination of requestDrain() and waitDrain().

void drainExclusive(TrieadOwner *to);

A combination of requestDrainExclusive() and waitDrain().

bool isDrained();

Quickly check if the App is currently drained (should be used only if the App is known to be requested to drain).

void undrain();

End the drain sequence.

The file descriptor store/load API is really of not much use in C++ as such, in C++ it's easy to pass and share the file descriptors and file objects between the threads as-is. It has been built into the App class for the benefit of Perl and possibly other interpreted languages.

void storeFd(const string &name, int fd);

Store a file descriptor. Unlike the Perl API, the file descriptor is NOT dupped before storing. It's stored as is, and if you want dupping, you have to do it yourself. Throws an Exception if a file descriptor with this name is already stored.

int loadFd(const string &name) const;

Load back the file descriptor. Again, no dupping, returns the stored value as-is. If the name is unknown, returns -1.

bool forgetFd(const string &name);

Forget the file descriptor. Returns true if this name was known and became forgotten, or false if it wasn't known. Normally, you wold load the descriptor, take over its ownership, and then tell the App to forget it.

bool closeFd(const string &name);

Close the file descriptor (if it was known) and then forget it. Returns true if this name was known and became forgotten, or false if it wasn't known.

The rest of the Perl App methods have no analogs in C++. They are just purely Perl convenience wrappers.

App reference, Perl, part 2

Triceps::App::setTimeout($appOrName, $main_to_sec);
Triceps::App::setTimeout($appOrName, $main_to_sec, $frag_to_sec);

Set the readiness timeout. Even though Triceps is quite eager in watching for deadlocks in the threads topology, it's still possible to get some threads, and with them the whole program, stuck during initialization. For example, if you declare a thread and then never define it. These situations ere very unpleasant because you start the program and expect it to work but it doesn't, without any indication to why. So Triceps imposes an initialization timeout. The App (and thus all its threads) must become ready within the timeout after the definition or declaration of the last Triead. If not, the App will be aborted (and the error message will tell, which thread did not initialize in time). The same applies to the creation of Trieads (usually in the fragments) after the App is already running: all the threads must become ready within the timeout since the last thread has been defined or declared.

The default timeout is 30 seconds, or symbolically

&Triceps::App::DEFAULT_TIMEOUT

(subject to possible changes of the exact value in the future).

But the timeout can be changed. Technically, there are two timeouts:

  • one starts when the App is created
  • one restarts when any Triead is defined or declared

For the App to be aborted, both timeouts must expire. By default they are set to the same length, so only the second timeout really matters, since it will always be the last one to start and last one to end. But if you set the first timeout to be longer, you can allow for a longer initialization period at the App start ("main timeout") than later when more threads are added to a running App ("frag timeout", since the threads added later are typically the threads nin fragments).

The one-argument form of setTimeout() sets both timeouts to the same value, The two-argument form sets these timeouts separately.

The timeouts may be set only before the first thread has been created. This is largely due to the historical reasons of implementation, with the current implementation it should actually be safe to allow changing the timeouts at a later point as well, and this limitation may be removed in the future.

The timeout values are represented as integer whole seconds.

Note that it's still possible to get the initialization stuck without any indication if it gets stuck in the other libraries. The reason is that the harvester waits for all the threads to be joined before it propagates the error, so if a thread doesn't get joined, the harvester will be stuck. For example, if you open a file on NFS as a part of Triead initialization, and the NFS server doesn't respond, this thread will be stuck for a long time if not forever. The App will detect a timeout and try to interrupt the threads but the NFS operations are often not interruptable, so the harvester will wait for this thread to complete this operation and exit before it propagates the error, and thus the whole program will be silently stuck forever (and to avoid this, the NFS mounts should be done in the "soft" mode but it's a separate story). This will likely be improved in the future but it needs more thinking about how to do it right.

Triceps::App::setDeadline($appOrName, $deadline_sec);

Set the "main" timeout in the form of an absolute deadline. This is actually closer to the way it works internally: the limit is expressed as a deadline, and setTimeout() just adds the timeout value to the current time to compute the deadline, while setDeadline() sets it straight. Like setTimeout(), this may be called only before any Trieads were created.

The time is represented as floating-point seconds since epoch. Triceps::now() can be used to get the current time with the fractional seconds (or if you don't care about the fractional seconds, you can always use the stock time()).

Triceps::App::refreshDeadline($appOrName);

Restart the  "fragment" timeout. Same as when a Triead is defined or declared, only without involving a Triead. Again, the name has to do with how the time computations are done internally, computing the deadline from both timeouts. This method can be called at any time.

The next few methods have to do with the drains. Generally, using the AutoDrain class to automatically limit the scope of the drains is a better idea. But if the automatic scoping is not desired, the App methods can be used directly.

Triceps::App::requestDrain($appOrName);

Request a shared drain. Does not wait for the drain to complete. This method may be called repeatedly, potentially from multiple threads, which will keep the drain active and increase the recursion count. The drain will be released when undrain() is called the matching number of times.

If an exclusive drain by another thread is active when requestDrain() is called, the call will be stuck until the exclusive drain becomes undrained (and potentially more exclusive drains might be queued up before the shared drain, using the POSIX read-write-lock implementation). Otherwise the call will set the appropriate state and return immediately.

If this thread has requested an exclusive drain previously (and didn't undrain it yet), an attempt to get a shared drain in the same thread will likely deadlock. The same applies in the opposite order as well.

Once the shared drain is requested, the input-only threads will be blocked from sending more data (any their attempts to flush their write facets will get stuck until undrain), and the rest of the threads will continue churning through the data buffered in the nexuses until all the nexuses are empty and there is no more data to process (yes, these threads will continue writing to their write facets).

It is important to not request a shared drain from an input-only thread and then try to write more data into an output facet, that would deadlock. The whole concept is doable, but an exclusive drain must be used instead ("exclusive" means that a designated input-only thread is excluded from blocking).

Triceps::App::waitDrain($appOrName);

Wait for the requested shared drain to complete. This means that all the queues in all the nexuses have become empty.

The effect of waitDrain() without a preceding requestDrain() is undefined. If you definitely know that some other thread has requested a drain and didn't undrain yet, it will work as normal, so you can have any number of threads wait for drain in parallel. (And the semantics, shared or exclusive, will match that currently active request). If no thread requested a drain, it might either return immediately irrespective of the state of the nexuses or might wait for some other thread to request a drain and succeed.

This call may also be used with an exclusive (or shared) drain requested through a TrieadOwner or any drain requested through an AutoDrain (though a better style is to use the same object as used for requesting the drain).

Triceps::App::drain($appOrName);

A combination of requestDrain() and waitDrain() in one call.

Triceps::App::undrain($appOrName);

Release the drain and let the normal processing continue. Make sure to call it exactly the same number of times as the requestDrain().

This call may also be used with a drain requested through a TrieadOwner (though a better style is to use the same object as used for requesting the drain).

$result = Triceps::App::isDrained($appOrName);

Check whether the App is currently drained (i.e. all the nexuses are empty), without waiting for it. Returns 1 if drained, 0 if not. If no drain is active during this call, the result is undefined, not even in the "best effort" sense: it may return 1 even if there are millions of records queued up. The only reliable way is to request a drain first.

This call may also be used with a drain requested through a TrieadOwner or through an AutoDrain.


The next part of the API deals with the passing of the file descriptors between the Perl threads. The concepts have been described in detail before, this is just a short reference. This API works under the hood of TrieadOwner::TrackGetSocket() and friends, those desribed in http://babkin-cep.blogspot.com/2013/04/multithreaded-socket-server-part-3.html and neighboring posts, but can also be used directly.

Triceps::App::storeFd($appOrName, $name, $fd);

Store a file descriptor in the App object, allowing to load it into other threads, and thus pass it around between the threads. $name is the name for this descriptor that will later be used to get the file descriptor back (generally, you want to generate a unique name for each file descriptor stored to avoid confusion, and then pass this name to the target thread). $fd is the file descriptor, an integer number, normally received from fileno(). The file descriptor is dupped before it gets stored, so the original will continue to exist, and if you have no other use for it, should be closed.

If a file descriptor with this name already exists in the App, this call will confess.

$fd = Triceps::App::loadFd($appOrName, $name);

Load back the file descriptor that was previously stored. If no descriptor with such a name is stored in the App, it will confess. The descriptor will keep existing in the App, so to keep things consistent, there are two options:

One is to let your code take over the ownership of the file descriptor, and tell the App to forget about it with forgetFd().

The other one is to never close the received descriptor in your code (a good example would be to dup it right away for the future use and then leave the original alone), and let App keep its ownership.

$fd = Triceps::App::loadDupFd($appOrName, $name);

Very much the same as loadFd(), only does the dupping for you and returns the dupped descriptor. In this case your code is responsible for closing that descriptor. Which method is more suitable, loadFd() or dupFd(), depends on the nature of the code that will use the file descriptor and whether you want to leave the descriptor in the App for more threads to load.

Triceps::App::forgetFd($appOrName, $name);

Forget the file descriptor in the App. It doesn't get closed, so closing it becomes your responsibility, or it will leak. If no descriptor with this name exists, the call will confess.

Triceps::App::closeFd($appOrName, $name);

Close and forget the file descriptor in the App. If no descriptor with this name exists, the call will confess.

Triceps::App::storeFile($appOrName, $name, FILE);

A convenience wrapper for closeFd(), calls fileno() on the file and stores the resulting file descriptor.

Triceps::App::storeCloseFile($appOrName, $name, FILE);

Stores the file descriptor extracted from the file, and closes the original file (since the file descriptor gets dupped on store, that copy continues to exist in the App).

$file = Triceps::App::loadDupFile($appOrName, $name, $mode);

Load a file descriptor and build a file handle object (IO::Handle) from it. The mode string may be specified in either the open() format (</>/>>/+</+>/+>>) or the C stdio format (r/w/a/r+/w+/a+). Note that the mode must match or be a subset of the mode used to originally open the file descriptor. If you open a file read-only, store its descriptor, and load back as a write-only file, you will have a bad time.

There is no corresponding loadFile(), since loadFd() is a more dangerous method that is useful for the low-level operations but doesn't make much sense for the higher level.

$file = Triceps::App::loadDupSocket($appOrName, $name, $mode);

Load a file descriptor and build an IO::Socket::INET object from it. The mode string meaning is the same as for loadDupFile().


$file = Triceps::App::loadDupFileClass($appOrName, $name, $mode, $class);


Load a file descriptor and build an arbitrary class object from it. The class (specified by its name) should normally be a subclass of IO::Handle, or at the very least must implement the method new_from_fd() similar to IO::Handle. This is the common underlying implementation of but loadDupFile() and loadDupSocket().


The last part of the API is the convenience methods for building and starting an App.

Triceps::App::build($name,  \&builder );

Build an App instance. It creates the App instance, then the builder function is called to create the App's nexuses and threads, then the harvester is executed, that eventually destroys the App object after collecting all its threads. For a very basic example:

Triceps::App::build "a1", sub {
    Triceps::App::globalNexus(
        name => "types",
        rowTypes => [
            rt1 => $rt1,
        ],
    );
    Triceps::Triead::start(
        app => $Triceps::App::name,
        thread => "t1",
        main => sub {
            my $opts = {};
            &Triceps::Opt::parse("t1 main", $opts, {@Triceps::Triead::opts}, @_);
            my $to = $opts->{owner};
            $to->importNexus(
                from => "global/types",
                import => "writer", # if importing just for the types, use "writer"!
            );
            $to->readyReady();
        },
    );
};

The builder function runs in the current Perl thread, however from the logical standpoint it runs in the App's first Triead named "global" (this Triead name is hardcoded in build()). This allows you not to worry about the App being technically dead until the first Triead is created: by the time the builder function is called, the first Triead is already created. However it also means that you can't change the readiness timeouts. After the builder function returns, the global Triead exits, and the harvester starts in the same current Perl thread.

The builder has access to a few global variables:

  • $Triceps::App::name is the App name, from the build() first argument.
  • $Triceps::App::app is the reference to the App object.
  • $Triceps::App::global is the reference to the TrieadOwner object of the global Triead.
After the builder function exits, these variables become undefined.

Triceps::App::globalNexus(@nexusOptions);

Creates a nexus with the import mode of "none", on the global Triead. The arguments are the same options as for the normal nexus creation. This is a simple convenience wrapper for the nexus creation. Since the global thread is supposed to exit immediately, there is no point in importing this nexus into it.

The global thread is found by this function from $Triceps::App::global, so this method can only be used from inside the builder function of build().

Sunday, June 16, 2013

App reference, Perl, part 1

The App API has two parts to it: The first part keeps track of all the App instances in the program, and allows to list and find them. The second part is the manipulations on a particular instance.

The global part of the API is:

@apps = Triceps::App::listApps();

Returns the array of name-value pairs, values containing the App references, listing all the Apps in the program (more exactly, in this Triceps library, you you compile multiple Triceps libraries together by renaming them, each of them will have its own Apps list). The returned array can be placed into a hash.

$app = Triceps::App::find($name);

Find an App by name. If an App with such a name does not exist, it will confess.

$app = Triceps::App:make($name);

Create a new App, give it the specified name, and put it on the list. The names must not be duplicate with the other existing Apps, or the method will confess.

drop($app);
drop($appName);

Drop the app, by reference or by name, from the list. The App is as usual reference-counted and will exist while there are references to it. The global list provides one of these references, so an App is guaranteed to exist while it's still on the list. When dropped, it will still be accessible through the existing references, but obviously will not be listed any more and could not be found by name.

Moreover, a new App with the same name can be added to the list. Because of this, dropping an App by name requires some care in case if there could be a new App created again with the same name: it creates a potential for a race, and you might end up dropping the new App instead of the old one. Of course, if it's the same thread that drops the old one and creates the new one, then there is no race. Dropping an application by name that doesn't exist at the moment is an error and will confess.

Dropping the App by reference theoretically allows to avoid the race: a specific object gets dropped, and if it already has been dropped, the call has no effect. Theoretically. However in practice Perl has a limitation of passing the object values between threads, and thus whenever each thread starts, first thing it does is finding its App by name. It's a very similar kind of race and is absolutely unavoidable except by making sure that all the App's threads have exited and joined (i.e. harvesting them). So make sure to complete the App's thread harvesting before dropping the App in the harvester thread, and by then it doesn't matter a whole lot if the App is dropped by name or by reference.

Now the second part of the API, working with an App instance.

Many (but not all) of the App methods allow to specify the App either by reference or by name, and they automatically sort it out, doing the internal look-up by name if necessary. So the same method could be used as either of:

$app->method(...);
Triceps::App::method($app, ...);
Triceps::App::method($appName, ...);

Obviously, you can not use the "->" syntax with a name, and obviously if the name is not in the app list, the method will confess. Below I'll show the calls that allow the dual formats as Triceps::App::method($appOrName, ...) but keep in mind that you can use the "->" form of them too with a reference.

$app = Triceps::App::resolve($appOrName);
Do just the automatically-sorting-out part: gets a reference or name and returns a reference either way. A newly created reference is returned in either case (not the argument reference). You can use this resolver before the methods that accept only a reference.

$result = $app->same($app2);

Check if two references are for the same App object. Here they both must be references and not names.

$name = $app->getName();

Get the name of the App, from a reference.

Triceps::App::declareTriead($appOrName, $trieadName);

Declare a Triead (Triceps thread) in advance. Any attempts to look up the nexuses in that thread will then wait for the thread to become ready. (Attempts to look up in an undeclared and undefined thread are errors). This is necessary to prevent a race at the thread creation time.  For the most part, the method Triead::start() just does the right thing by calling this method automatically and you don't need the use it manually, except in some very special circumstances.

@trieads = Triceps::App::getTrieads($appOrName);

Get the list of currently defined Trieads, as name-value pairs. Keep in mind that the other threads may be modifying the list of Trieads, so if you do this call multiple times, you may get different results. However the Trieads are returned as references, so they are guaranteed to stay alive and readable even if they get removed from the App, or even if the App gets dropped.

$app->harvester(@options);

Run the harvester in the current thread. The harvester gets notifications from the threads when they are about to exit, and joins them.  After all the threads have been joined, it automatically drops the App, and returns.
 
The harvesting is an absolutely necessary part of the App life cycle, however in most of the usage patterns (such as with Triead::startHere or App::build) the harvester is called implicitly from the wrapping library functions, so you don't need to care about it.

Note also that if you're running the harvester manually, you must call it only after the first thread has been defined or at least declared. Otherwise it will find no threads in the App, consider it dead and immediately drop it.

If the App was aborted, the harvester will normally confess after if had joined all the threads and disposed of the App, unless the option die_on_abort (see below) has been set to 0. This propagates the error to the caller. However there is a catch: if some of the threads don't react properly by exiting on an abort indication, the program will be stuck and you will see no error message until these threads get unstuck, possibly forever.

 Options:

 die_on_abort => 1/0

(default: 1) If the App was aborted, the harvester will normally confess after if had joined all the threads and disposed of the App.  Setting this option to 0 will make the aborts silently ignored. This option does not affect the errors in joining the threads: if any of those are detected, harvester will still confess after it had disposed of the app.

$dead = $app->harvestOnce();

Do one run of the harvesting.  Joins all the threads that have exited since its last call. If no threads have exited since then, returns immediately. Returns 1 if all the threads have exited (and thus the App is dead), 0 otherwise.  If a thread join fails, immediately confesses (if multiple threads were ready for joining, the ones queued after the failed one won't be joined, call harvestOnce() once more to join them).

$app->waitNeedHarvest();

Wait for at least one thread to become ready for harvesting. If the App is already dead (i.e. all its threads have exited), returns immediately.

These two methods allow to write the custom harvesters if you're not happy with the default one. The basic harvester logic can be written as:

do {
  $app->waitNeedHarvest()
} while(!$app->harvestOnce());
$app->drop();

However the real harvester also does some smarter things around the error handling. You can look it up in the source code in cpp/app/App.cpp.

$res = Triceps::App::isDead($appOrName);

Returns 1 if the App is dead (i.e. has no more live threads), otherwise 0.  Calling this method with a name for the argument is probably a bad idea, since normally the harvester will drop the App quickly after it becomes dead, and you may end up with this method confessing when it could not find the dropped App.

$res = Triceps::App::isShutdown($appOrName);

Returns 1  if the App has been requested to shut down, either normally or by being aborted.

$res = Triceps::App::isAborted($appOrName);

Returns 1 if the App has been aborted. The App may be aborted explicitly by calling the method abortBy(), or the thread wrapper logic automatically converts any unexpected deaths in the App's threads to the aborts. If any thread dies, this aborts the App, which in turn requests the other threads to die on their next thread-related call. Eventually the harvester collects them all and confesses, normally making the whole program die with an error.

($tname, $message) = Triceps::App::getAborted($appOrName);

Get the App abort information: name of the thread that caused the abort, and its error message.

Triceps::App::abortBy($appOrName, $tname, $msg);

Abort the application. The thread name and message will be remembered, and returned later by getAborted() or in the harvester. If abortBy() is called multiple times, only the first pair of thread name and message gets remembered. The reason is that on abort all the threads get interrupted in a fairly rough manner (all their ongoing and following calls to the threading API die), which typically causes them to call abortBy() as well, and there is no point in collecting these spurious messages.

The thread name here doesn't have to be the name of the actual thread that reports the issue. For example, if the thread creation as such fails (maybe because the OS limit on the thread count) that gets detected by the parent thread but reported in the name of the thread whose creation has failed. And in general you can pass just any string as the thread name, App itself doesn't care, just make it something that makes sense to you.

$res = Triceps::App::isDead($appOrName);


Returns 1 if the App is dead (i.e. it has no alive Trieads, all the defined and declared threads have exited). Right after the App is created, before the first Triead is created, the App is also considered dead, and becomes alive when the first Triead is declared or defined. If an App becomes dead later, when all the Trieads exit, it can still be brought back alive by creating more Trieads. But this is considered bad practice, and will cause a race with the harvester (if you want to do this, you have to make your own custom harvester).

$res = Triceps::App::isShutdown($appOrName);


Returns 1 if the App was requested to shut down. The Trieads might still run for some time, until they properly detect and process the shutdown, and exit. So this condition is not equivalent to Dead, althouh they are connected. If any new Trieads get started, they will be shut down right away and won't run.

To reiterate: if all the Trieads just exit by themselves, the App becomes dead but not shut down. You could still start more Trieads and bring the App back alive. If the App has been shut down, it won't become immediately dead, but it will send the shutdown indication to all the Trieads, and after all of them eventually exit, the App will become dead too. And after shutdown there is no way to bring the App back alive, since any new Trieads will be shut down right away (OK, there might be a short period until they detect the shutdown, so the App could spike as alive for a short time, but then will become dead again).

Triceps::App::waitDead($appOrName);


Will wait for the App to become dead and return after that. Make sure to not call waitDead() from any of App's Trieads: that would cause a deadlock.


Triceps::App::shutdown($appOrName);

Shut down the App. The shutdown state is sticky, so any repeated calls will have no effect. The call returns immediately and doesn't wait for the App to die. If you want to wait, call waitDead() afterwards. Make sure to not call waitDead() from a Triead: that would cause a deadlock.


Triceps::App::shutdownFragment($appOrName, $fragName);

Shut down a named fragment. This does not shut down the whole App, it just selectively shuts down the Trieads belonging to this fragment . See the explanation of the fragments in http://babkin-cep.blogspot.com/2013/03/triceps-multithreading-concepts.html. The fragment shutdown is not sticky: after a fragment has been shut down, it's possible to create another fragment with the same name. To avoid races, a fragment may be shut down only after all its Trieads are ready. So the caller Triead must call readyReady() before it calls shutdownFragment(). If any of the fragment's Trieads are not ready, the call will confess.

Monday, April 29, 2013

ThreadedServer, part 1

And now I want to show the internals of the ThreadedServer methods. It shows how to store the socket file handles into the App, how the threads are harvested, and how the connections get accepted.

sub startServer # ($optName => $optValue, ...)
{
    my $myname = "Triceps::X::ThreadedServer::startServer";
    my $opts = {};
    my @myOpts = (
        app => [ undef, \&Triceps::Opt::ck_mandatory ],
        thread => [ "global", undef ],
        main => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "CODE") } ],
        port => [ undef, \&Triceps::Opt::ck_mandatory ],
        socketName => [ undef, undef ],
        fork => [ 1, undef ],
    );
    &Triceps::Opt::parse($myname, $opts, {
        @myOpts,
        '*' => [],
    }, @_);

    if (!defined $opts->{socketName}) {
        $opts->{socketName} = $opts->{thread} . ".listen";
    }

    my $srvsock = IO::Socket::INET->new(
        Proto => "tcp",
        LocalPort => $opts->{port},
        Listen => 10,
    ) or confess "$myname: socket creation failed: $!";
    my $port = $srvsock->sockport() or confess "$myname: sockport failed: $!";

So far it's pretty standard: get the options and open the socket for listening.

    if ($opts->{fork} > 0)  {
        my $pid = fork();
        confess "$myname: fork failed: $!" unless defined $pid;
        if ($pid) {
            # parent
            $srvsock->close();
            return ($port, $pid);
        }
        # for the child, fall through
    }

This handles the process forking option: if forking is requested, it executes and then the parent process returns the PID while the child process continues with the rest of the logic. By the way, your success with forking a process that has multiple running threads may vary. The resulting process usually has one running thread (continuing where the thread start called fork() was) but the synchronization primitives in the new process can be inherited in any state, so the attempts to continue the threaded processing are usually not such a good idea. It's usually best to fork first before there are more threads.

    # make the app explicitly, to put the socket into it first
    my $app = Triceps::App::make($opts->{app});
    $app->storeCloseFile($opts->{socketName}, $srvsock);
    Triceps::Triead::start(
        app => $opts->{app},
        thread => $opts->{thread},
        main => $opts->{main},
        socketName => $opts->{socketName},
        &Triceps::Opt::drop({ @myOpts }, \@_),
    );

Then the App gets created. Previously I was showing starting the App with startHere() that created the App and did a bunch of services implicitly. Here everything will be done manually. The listening socket has to be stored into the app before the listener thread gets started, so that the listener thread can find it.

Triceps keeps a global list of all its Apps in the process, and after an App is created, it's placed into that list and can be found by name from any thread. The App object will exist while there are references to it, including the reference from that global list. On the other hand, it's possible to remove the App from the list while it's still running but that's a bad practice because it will break any attempts from its threads to find it by name.
 
So the App is made, then the file handle gets stored. storeCloseFile() gets the file descriptor from the socket, dups it, stores into the App, and then closes the original file handle. All this monkeying with dupping and closing is needed because there is no way to extract the file descriptor from a Perl file handle without the handle trying to close it afterwards. But in result of this roundabout way, the file descriptor gets transferred into the App.

Then the listener thread is started, and as shown before, it's responsible for starting all the other threads. In the meantime, the startServer() continues.

    my $tharvest;
    if ($opts->{fork} < 0) {
        @_ = (); # prevent the Perl object leaks
        $tharvest = threads->create(sub {
            # In case of errors, the Perl's join() will transmit the error
            # message through.
            Triceps::App::find($_[0])->harvester();
        }, $opts->{app}); # app has to be passed by name
    } else {
        $app->harvester();
    }

Then the harvester logic is started. Each App must have its harvester. startHere() runs the harvester implicitly, unless told otherwise, but here the harvester has to be run manually. It can be run either in this thread or in another thread, as determined by the option "fork". If it says to make another thread, $tharvest will contain that thread's identity. A special thing about starting threads with threads->create() is that it's sensitive to anything in @_. If @_ contains anything, it will be leaked (though the more recent versions of Perl should have it fixed). So @_ gets cleared before starting the thread.

And one way or the other, the harvester is started. What does it do? It joins the App's threads as they exit. After all of them exit, it removes the App from the global list of Apps, which will allow to collect the App's memory when the last reference to it is gone, and then the harvester returns.

If any of the threads die, they cause the App to be aborted. The aborted App shuts down immediately and remembers the identity of the failed thread and its error message (only the first message is saved because the abort is likely to cause the other threads to die too, and there is no point in seeing these derivative messages). The harvester, in turn, collects this message from the App, and after all it cleaning-up work is done, dies propagating this message. Then if the harvester is running not in the first thread, that message will be propagated further by Perl's join().

A catch is that the errors are not reported until the harvester completes. Normally all the App's threads should exit immediately when shut down but if they don't, the program will be stuck without any indication of what happened.

It's also possible to disable this propagation of dying by using the option "die_on_abort":

$app->harvester(die_on_abort => 0);

Then the last part:

    if ($opts->{fork} > 0) {
        exit 0; # the forked child process
    }

    return ($port, $tharvest);
}



 If this was the child process forked before, it exits at this point. Otherwise the port and the harvester's thread object are returned.

Sunday, April 28, 2013

Multithreaded socket server, part 1, dynamic threads overview

As a quick announcement, I've renamed some of the methods described in the last post, and updated the post.

Now, to the new stuff. The threads can be used to run a TCP server that accepts the connections and then starts the new client communication thread(s) for each connection.  This thread can then communicate with the rest of the model, feeding and receiving data, as usual, through the nexuses.

The challenge here is that there must be a way to create the threads dynamically, and later when the client closes connection, to dispose of them. There are two possible general approaches:

  • dynamically create and delete the threads in the same App;
  • create a new App per connection and connect it to the main App.

Both have their own advantages  and difficulties, but the approach with the dynamic creation and deletion of threads ended up looking easier, and that's what Triceps has. The second approach is not particularly well supported yet. You can create multiple Apps in the program, and you can connect them by making two Triceps Trieads run in the same OS thread and ferry the data around. But it's extremely cumbersome. This will be improved in the future, but for now the first approach is the ticket.

The dynamically created threads are grouped into the fragments. This is done by specifying the fragment name option when creating a thread. The threads in a fragment have a few special properties.

One, it's possible to shut down the whole fragment in one fell swoop. There is no user-accessible way to shut down the individual threads, you can shut down either the whole App or a fragment. Shutting down individual threads is dangerous, since it can mess up the application in many non-obvious ways. But shutting down a fragment is OK, since the fragment serves a single logical function, such as service one TCP connection, and it's OK to shut down the whole logical function.

Two, when a thread in the fragment exits, it's really gone, and takes all its nexuses with it. Well, technically, the nexuses continue to exist as long as there are threads connected to them, but no new connections can be created after this point. Since usually the whole fragment will be gone together, and since the nexuses defined by the fragment's thread are normally used only by the other threads of the same fragment, a fragment shutdown cleans up its state like the fragment had never existed. By contrast, when a normal thread exists, the nexuses defined by it stay present and accessible until the App shuts down.

Another, somewhat surprising, challenge is the interaction of the threads and sockets (or file descriptors in general) in Perl.  I've already touched upon it, but there is a lot more.

To show how all this stuff works, I've created an example of a "chat server". It's not really a human-oriented chat, it's more of a machine-oriented publish-subscribe, and specially tilted to work through the running of a socket server with threads.

In this case the core logic is absolutely empty. All there is of it, is a nexus that passes messages through it. The clients read from this nexus to get the messages, and write to this nexus to send the messages.

When the App starts, it has only one thread, the listener thread that listens on a socket for the incoming connections. The listener doesn't even care about the common nexus and doesn't import it. When a connection comes in, the listener creates two threads to serve it: the reader reads the socket and sends to the nexus, and the writer receives from the nexus and writes to the socket. These two threads constitute a fragment for this client. They also create their own private nexus, allowing the reader to send control messages to the writer. That could also have been done through the central common nexus, but I wanted to show that there are different ways of doing things.

With a couple of clients connected, threads and sockets start looking like this:

client1 reader ----> client1 control nexus ------> client1 writer
         \                                           /
          ----------->\          /------------------>
                       chat nexus
          ----------->/          \------------------>
         /                                           \
client2 reader ----> client2 control nexus ------> client2 writer

And the listener thread still stays on the side.

Tuesday, April 23, 2013

Perl, threads and sockets

I've started writing an example that does a TCP server with threads and found that Perl doesn't allow to pass the file descriptors between the threads. Well, you sort of can pass them as arguments to another thread but then it ends up printing the error messages like these and corrupting the reference counts:

Unbalanced string table refcount: (1) for "GEN1" during global destruction.
Unbalanced string table refcount: (1) for "/usr/lib/perl5/5.10.0/Symbol.pm" during global destruction.
Scalars leaked: 1

If you try to pass a file descriptor through trheads::shared, it honestly won't allow you, while the thread arguments pretend that they can and then fail.

So, to get around this issue I've added the file descriptor passing service to Triceps::App. The easy way to pass a socket around is:

# in one thread
$app->storeFile('name', $socket);
close($socket);
# or both in one call
$app->storeCloseFile('name', $socket);

# in another thread
my $socket = $app->loadDupSocket('name', 'r+');
$app->closeFd('name');

The file descriptor gets extracted from the socket file handle, and its dup is stored in the App. Pick a unique name by which you can get it back. Then you can get it back and create an IO::Socket::INET object with it. When you get it back, it gets dupped again, and you can get it back multiple times, from the same or different threads. Then to avoid leaking sockets, you tell the App to close the file descriptor and forget it.

A limitation is that you can't really share the same descriptor between two thread, Perl is not happy about that. So you have to dup it and have two separate file descriptors for the same socket. You still need to be careful about writing to the same socket from two threads, the best idea is to write from one thread only (and the same applies to reading, though it might be the same or different thread as the writing one).

The other ways to create the file handle objects are:

my $fd = $app->loadDupFile('name', 'r+');

Gets the basic IO::Handle.

my $fd = $app-> loadDupFileClass('name', 'r+', $className);

Get a file handle of any named subclass of the IO::Handle, for example "IO::Socket::UNIX".

There is also the API for the raw file descriptors, that you can get with fileno():

$app->storeFd('name', $fd);

Store a dup of file descriptor. Same thing as the file handle, the original descriptor stays open and needs to be closed separately.

my $fd = $app->loadDupFd('name');

Get back a dup of the stored file descriptor. You can use it with IO::Handle or to open a named file descriptor in an old-fashioned way:

open(MYFILE, '+<&=' . $app->loadDupFd('name'));


Note the "=" in the opening mode, which tells open() to directly adopt the descriptor from the argument and not do another dup of it.


my $fd = $app->loadFd('name');

Get back the original file descriptor.  This is dangerous because now you'd have the same file descriptor in two places: in your thread and in the App. There are two ways out of this situation. One is to have it dupped on open:

open(MYFILE, '+<&' . $app->loadFd('name'));


Note, no "=" in this snippet's open mode, unlike the previous one.


The other way is to open without dupping and tell the App to simply forget this descriptor:


open(MYFILE, '+<&=' . $app->loadFd('name'));
$app->forgetFd('name');

In general, it's safer and easier stay with the dupping, and file the file handle interface instead of messing with the file descriptors.

P.S. The names of the load methods used to be different, I change dthem to be shorter, and added storeCloseFile().

Thursday, April 11, 2013

Multithreaded pipeline, part 2

The reader thread drives the pipeline:

sub ReaderMain # (@opts)
{
  my $opts = {};
  Triceps::Opt::parse("traffic main", $opts, {@Triceps::Triead::opts}, @_);
  my $owner = $opts->{owner};
  my $unit = $owner->unit();

  my $rtPacket = Triceps::RowType->new(
    time => "int64", # packet's timestamp, microseconds
    local_ip => "string", # string to make easier to read
    remote_ip => "string", # string to make easier to read
    local_port => "int32",
    remote_port => "int32",
    bytes => "int32", # size of the packet
  ) or confess "$!";

  my $rtPrint = Triceps::RowType->new(
    text => "string", # the text to print (including \n)
  ) or confess "$!";

  my $rtDumprq = Triceps::RowType->new(
    what => "string", # identifies, what to dump
  ) or confess "$!";

  my $faOut = $owner->makeNexus(
    name => "data",
    labels => [
      packet => $rtPacket,
      print => $rtPrint,
      dumprq => $rtDumprq,
    ],
    import => "writer",
  );

  my $lbPacket = $faOut->getLabel("packet");
  my $lbPrint = $faOut->getLabel("print");
  my $lbDumprq = $faOut->getLabel("dumprq");

  $owner->readyReady();

  while(<STDIN>) {
    chomp;
    # print the input line, as a debugging exercise
    $unit->makeArrayCall($lbPrint, "OP_INSERT", "! $_\n");

    my @data = split(/,/); # starts with a command, then string opcode
    my $type = shift @data;
    if ($type eq "new") {
      $unit->makeArrayCall($lbPacket, @data);
    } elsif ($type eq "dump") {
      $unit->makeArrayCall($lbDumprq, "OP_INSERT", $data[0]);
    } else {
      $unit->makeArrayCall($lbPrint, "OP_INSERT", "Unknown command '$type'\n");
    }
    $owner->flushWriters();
  }

  {
    # drain the pipeline before shutting down
    my $ad = Triceps::AutoDrain::makeShared($owner);
    $owner->app()->shutdown();
  }
}

It starts by creating the nexus with the initial set of the labels: for the data about the network packets, for the lines to be printed at the end of the pipeline and for the dump requests to the tables in the other threads. It gets exported for the other threads to import, and also imported right back into this thread, for writing. And then the setup is done, readyReady() is called, and the processing starts.

It reads the CSV lines, splits them, makes a decision if it's a data line or dump request, and one way or the other sends it into the nexus. The data sent to a facet doesn't get immediately forwarded to the nexus. It's collected internally in a tray, and then flushWriters() sends it on. The mainLoop() shown in the last post calls flushWriters() automatically after every tray it processes from the input. But when reading from a file you've got to do it yourself. Of course, it's more efficient to send through multiple rows at once, so a smarter implementation would check if multiple lines are available from the file and send them in larger bundles.

The last part is the shutdown. After the end of file is reached, it's time to shut down the application. You can't just shut down it right away because there still might be data in the pipeline, and if you shut it down, that data will be lost. The right way is to drain the pipeline first, and then do the shutdown when the app is drained. AutoDrain::makeShared() creates a scoped drain: the drain request for all the threads is started when this object is created, and the object construction completes when the drain succeeds. When the object is destroyed, that lifts the drain. So in this case the drain succeeds and then the app gets shut down.

The shutdown causes the mainLoop() calls in all the other threads to return, and the threads to exit. Then startHere() in the first thread has the special logic in it that joins all the started threads after its own main function returns and before it completes. After that the script continues on its way and is free to exit.

Thursday, April 4, 2013

the Triead lifecycle

Each Triead goes through a few stages in its life:
  • declared
  • defined
  • constructed
  • ready
  • waited ready
  • requested dead
  • dead

Note by the way that it's the stages of the Triead object, the OS-level thread as such doesn't know much about them, even though these stages do have some connections to its state.

These stages always go in order and can not be skipped. However for convenience you can move directly to a further stage, this will just automatically pass through all the intermediate stages. Although, well, there is one exception: the "waited ready" and "requested dead" stages can get skipped on the way to "dead". Other than that, there is always the sequence, so if you find out that a Triead is dead, you can be sure that it's also declared, defined, constructed and ready. The attempts to go to a previous stage are silently ignored.

Now, what do these stages mean?

Declared: The App knows the name of the thread and that this thread will eventually exist. When an App is asked to find the resources from this thread (such as Nexuses, and by the way, the nexuses are associated with the threads that created them) it will know to wait until this thread becomes constructed, and then look for the resources. It closes an important race condition: the code that defines the Triead normally runs in a new OS thread but there is no way to tell when exactly will it run and do its work. If you spawned a new thread and then attempted to get a nexus from it before it actually runs, the App would tell you that there is no such thread and fail. To get around it, you declare the thread first and then start it. Most of the time there is no need to declare explicitly, the library code that wraps the thread creation does it for you.

Defined: The Triead object has been created and connected to the App. Since this is normally done from the new OS thread, it also implies that the thread is running and is busy about constructing the nexuses and whatever its own internal resources.

Constructed: The Triead had constructed and exported all the nexuses that it planned to. This means that now these nexuses can be imported by the other threads (i.e. connected to the other threads). After this point the thread can not construct any more nexuses. However it can keep importing the nexuses from the other threads. It's actually a good idea to do all your exports, mark the thread constructed, and only then start importing. This order guarantees the absence of deadlocks (which would be detected and will cause the App to be aborted). There are some special cases when you need to import a nexus from a thread that is not fully constructed yet, and it's possible, but requires more attention and a special override. I'll talk about it in more detail later.

Ready: The thread had imported all the nexuses it wanted and fully initialized all its internals (for example, if it needs to load data from a file, it would do that before telling that it's ready). After this point no more nexuses can be imported. A fine point is that the other threads may still be created, and they may do their exporting and importing, but once a thread is marked as ready, it's cast in bronze. And in the simple cases you don't need to worry about separating the constructed and ready stages, just initialize everything and mark the thread as ready.

Waited ready: Before proceeding further, the thread has to wait for all the threads in App to be ready, or it would lose data when it tries to communicate with them. It's essentially a barrier. Normally both the stages "ready" and "waited ready" are advanced to with a single call readyReady(), the thread says "I'm ready, and let me continue when everyone is ready". After that the actual work can begin. It's still possible to create more threads after that (normally parts of the transient fragments), and until they all become ready, the App may temporarily become unready again, but that's a whole separate advanced topic.

Requested dead: This is a way to request a thread to exit. Normally some control thread will decide that the App needs to exit and will request all its threads to die. The threads will get these requests, perform their last rites and exit. The threads don't have to get this request to exit, that can also always decide to exit on their own. When a thread is requested to die, all the data communication with it stops. No more data will get to it through the nexuses and any data it sends will be discarded. It might churn a little bit through the data in its input buffers but any results produced will be discarded. The good practice is to make sure that all the data is drained before requesting a thread to die. Note that the nexuses created by this thread aren't affected at all, they keep working as usual. It's the data connections between this thread and any nexuses that get broken.

Dead: The thread had completed its execution and exited. Normally you don't need to mark this explicitly. When the thread's main function exits, the library will do it for you. Marking the thread dead also drives the harvesting of the OS threads: the harvesting logic will perform a join() (not to be confused with SQL join) of the thread and thus free the OS resources. The dead Trieads are still visible in the App (except for some special cases with the fragments), and their nexuses continue working as usual (even including the special cases with the fragments), the other threads can keep communicating through them for as long as they want.

Thursday, March 28, 2013

Triceps Multithreading Concepts

The multithreading support has solidified to the point where I can start documenting it. The full description will have to wait until I finalize the Perl API but the concepts are pretty much settled.

The idea of the multithreading support in Triceps is to make writing the multithreaded model easier. To make writing the good code easy and writing the bad code hard. But of course you don't have to use it, you can always make your own if you wish (just as you could before now).

Without the further ado, the diagram of a multithreaded Triceps application:

Fig. 1. Triceps application.

The Triceps application is embodied in the class App. It's possible to have multiple Apps in one program.

Each thread has multiple parts to it. First, of course, there is the OS-level (or, technically, library-level, or Perl-level) thread where the code executes. And then there is a class that represents this thread and its place in the App. To reduce the naming conflict, this class is creatively named Triead (pronounced still "thread"). In the discussion I use the word "thread" for both concepts, the OS-level thread and the Triead, and it's usually clear from the context which one I mean. But sometimes it's particularly important to make the distinction, and then I name one or the other explicitly.

The class Triead itself is largely opaque, allowing only a few methods for introspection. But there is a control interface to it, called TrieadOwner. The Triead is visible from the outside, the TrieadOwner object is visible only in the OS thread that owns the Triead. The TrieadOwner manages the thread state and acts as the intermediary in the thread's communications with the App.

The data is passed between the threads through the Nexuses. A Nexus is unidirectional, with data going only one way, however it may have multiple writers and multiple readers. All the readers see the exact same data, with rowops going in the exact same order (well, there will be other policies in the future as well, but for now there is only one policy).

A Nexus passes through the data for multiple labels, very much like an FnReturn does (and indeed there is a special connection between them). A Nexus also allows to export the row types and table types from one thread to another.

A Nexus gets connected to the Trieads to though the Facets. A Facet is a connection point between the Nexus and the Triead. Each Facet is for either reading or writing. And there may be only one Facet between a given Nexus and a given Triead, you can't make multiple connections between them. As a consequence, a thread can't both write and read to the same Nexus, it can do only one thing. This might actually be an overly restrictive limitation and might change in the future but that's how things work now.

Each Nexus also has a direction: either direct ("downwards") or reverse ("upwards").  And yes, the reverse Nexuses allow to build the models with loops. However the loops consisting of only the direct Nexuses are not allowed, nor of only reverse Nexuses. They would mess up the flow control. The proper loops must contain a mix of direct and reverse Nexuses.

The direct Nexuses have a limited queue size and stop the writers when the queue fills up, until the data gets consumed, thus providing the flow control. The reverse Nexuses have an unlimited queue size, which allows to avoid the circular deadlocks.

Normally an App is built once and keeps running in this configuration until it stops. But there is a strong need to have the threads dynamically added and deleted too. For example, if the App running as a server and clients connect to it, each client needs to have its thread(s) added on connection and then deleted when the client disconnects. This is handled through the concept of fragments. There is no Fragment class but when you create a Triead, you can specify a fragment name for it. Then it becomes possible to shut down and dispose the threads in a fragment after the fragment's work is done.