TrieadJoin is the abstract base class that tells the harvester how to join a thread after it had finished. Obviously, it's present only in the C++ API and not in Perl (and by the way, the reference of the Perl classes in 2.0 has been completed, the remaining classes are only in C++).
Currently TrieadJoin has two subclasses: PerlTrieadJoin for the Perl threads and BasicPthread for the POSIX threads in C++. I won't be describing PerlTriedJoin, since it's in the internals of the Perl implementation, never intended to be directly used by the application developers, and if you're interested, you can always look at its source code. I'll describe BasicPthread later.
Well, actually there is not a whole lot of direct use for TrieadJoin either: you need to worry about it only if you want to define a joiner for some other kind of threads, and this is not very likely. But since I've started, I'll complete the write-up of it.
So, if you want to define a joiner for some other kind of threads, you define a subclass of it, with an appropriately defined method join().
TrieadJoin is an Mtarget, naturally referenced from multiple threads (at the very least it's created in the thread to be joined or its parent, and then passed to the harvester thread by calling App::defineJoin()). The methods of TrieadJoin are:
TrieadJoin(const string &name);
The constrcutor. The name is the name of the Triead, used for the error messages. Due to the various syncronization reasons, this makes the life of the harvester much easier, than trying to look up the name from the Triead object.
virtual void join() = 0;
The Most Important joining method to be defined by the subclass. The subclass object must also hold the identity of the thread in it, to know which thread to join. The harvester will call this method.
virtual void interrupt();
The method that interrupts the target thread when it's requested to die. It's called in the context of the thread that triggers the App shutdown (or otherwise requests the target thread to die). By default the TrieadJoin carries a FileInterrupt object in it (it gets created on TrieadJoin construction, and then TrieadJoin keeps a reference to it), that will get called by this method to revoke the files. But everything else is a part of the threading system, and the base class doesn't know how to do it, the subclasses must define their own methods, wrapping the base class.
Both PerlTrieadJoin and BasicPthread add sending the signal SIGUSR2 to the target thread. For that they use the same target thread identity kept in the object as used by the join() call.
FileInterrupt *fileInterrupt() const;
Get a pointer to the FileInterrupt object defined in the TrieadJoin. The most typical use is to pass it to the TrieadOwner object, so that it can be easily found later:
to->fileInterrupt_ = fileInterrupt();
Though of course it could be kept in a separate local Autoref instead.
const string &getName() const;
Get back the name of the joiner's thread.
This started as my thoughts on the field of Complex Event Processing, mostly about my OpenSource project Triceps. But now it's about all kinds of software-related things.
Showing posts with label triead. Show all posts
Showing posts with label triead. Show all posts
Saturday, July 13, 2013
Friday, June 28, 2013
TrieadOwner reference, C++
The TrieadOwner is defined in app/TrieadOwner.h. Its constructor is protected, the normal way of constructing is by calling App::makeTriead(). This is also called "defining a Triead".
TrieadOwner is an Starget, and must be accessed only from the thread that owns it (though it's possible to create it in the parent thread and then pass to the actual owner thread, as long as the synchronization between the threads is done properly).
Triead *get() const;
Get the public side if the Triead. In C++, unlike Perl, the Triead methods are not duplicated in TrieadOwner. So they are accessed through get(), and for example to get the Triead name, call to->get()->getName(). The TrieadOwner holds a reference to Triead, so the Triead object will never get destroyed as long as the TrieadOwner is alive.
App *app() const;
Get the App where this Triead belongs. The TrieadOwner holds a reference to App, so the App object will never get destroyed as long as the TrieadOwner is alive.
bool isRqDead() const;
Check whether this triead has been requested to die.
void requestMyselfDead();
Request this thread itself to die (see the reasoning for the in the Perl reference).
Unit *unit() const;
Get the main unit of this Triead. It has the same name as the Triead itself.
void addUnit(Autoref<Unit> u);
Keep track of an additional unit. Adding a unit multiple times has no effect. See the other implications in the Perl reference.
bool forgetUnit(Unit *u);
Forget about an additional unit. If the unit is already unknown, has no effect. The main unit can not be forgotten.
typedef list<Autoref<Unit> > UnitList;
const UnitList &listUnits() const;
List the tracked units. The main unit is always included at the front of the list.
void markConstructed();
Advance the Triead state to Constructed. Repeated calls have no effect.
void markReady();
Advance the Triead state to Ready. Repeated calls have no effect. The advancement is cumulative: if the Triead was not constructed yet, it will be automatically advanced first to Constructed and then to Ready. If this is the last Triead to become ready, it will trigger the App topology check, and if the check fails, abort the App and throw an Exception.
void readyReady();
Advance the Triead to the Ready state, and wait for all the Trieads to become ready. The topology check applies in the same way as in markReady().
void markDead();
Advance the Triead to the Dead state. If this Triead was not Ready yet and it's the last one to become so, the topology check will run. If the topology check fails, the App will be aborted but markDead() will not throw an exception.
This method is automatically called from the TrieadOwner destructor, so most of the time there is no need to call it explicitly.
It also clears all the tracked units.
void abort(const string &msg) const;
Abort the App. The name of this thread will be forwarded to the App along with the error message. The error message may be multi-line.
Onceref<Triead> findTriead(const string &tname, bool immed = false);
Find a Triead in the App by name. The flag immed controls whether this method may wait. If immed is false, and the target thread is not constructed yet (but at least declared), the method will sleep until it becomes constructed, and then returns it. If the target thread is not declared, it will throw an Exception. If immed is true, the look-up is immediate: it will return the thread even if it's not constructed but is at least defined. If it's not defined (even if it's declared), an Exception will be thrown. The look-up of this Triead itself is always immediate, irrespective of the immed flag.
An Exception may also be thrown if a circular sequence of Trieads deadlocks waiting for each other.
Onceref<Facet> exportNexus(Autoref<Facet> facet, bool import = true);
Export a Nexus. The Nexus definition is constructed as a Facet object, which is then used by this method to construct and export the Nexus. The same argument Facet reference is then returned back as the result. The import flag tells, whether the Nexus is to be also imported back by connecting the same original Facet object to it. If import is false, the original Facet reference is still returned back but it can't be used for anything, and can only be thrown away. The direction of the import (reading or writing) is defined in the Facet, as well as the nexus name and all the other information.
Throws an Exception on any errors, in particular on the duplicate facet names within the Triead.
Onceref<Facet> exportNexusNoImport(Autoref<Facet> facet);
A convenience wrapper around exportNexus() with import=false.
Onceref<Facet> importNexus(const string &tname, const string &nexname, const string &asname, bool writer, bool immed = false);
Import a Nexus from another Triead. tname is the name of the other thread, nexname is the name of nexus exported from it, asname is the local name for the imported facet ("" means "same as nexname"), the writer flag determines if the import is for writing, and the immed flag has the same meaning as in findTriead(). The import of a nexus involves finding its exporting thread, and the immed flag controls, how this finding is done.
Throws an Exception if anything is not found, or the local import name conflicts with another imported facet.
Onceref<Facet> importNexusImmed(const string &tname, const string &nexname, const string &asname, bool writer);
Onceref<Facet> importReader(const string &tname, const string &nexname, const string &asname = "", bool immed=false);
Onceref<Facet> importWriter(const string &tname, const string &nexname, const string &asname = "", bool immed=false);
Onceref<Facet> importReaderImmed(const string &tname, const string &nexname, const string &asname = "");
Onceref<Facet> importWriterImmed(const string &tname, const string &nexname, const string &asname = "");
Convenience wrappers for importNexus(), providing the default arguments and the more mnemonic names.
typedef map<string, Autoref<Nexus> > NexusMap;
void exports(NexusMap &ret) const;
Get the nexuses exported here. The map argument will be cleared and refilled with the new values.
typedef map<string, Autoref<Facet> > FacetMap;
void imports(FacetMap &ret) const;
Get the facets imported here. The map argument will be cleared and refilled with the new values.
NexusMaker *makeNexusReader(const string &name);
NexusMaker *makeNexusWriter(const string &name);
NexusMaker *makeNexusNoImport(const string &name);
A convenient way to build the nexuses for export in a chained fashion. The name argument is the nexus name. The NexusMaker is an opaque class that has the same building methods as a Facet, plus the method complete() that finishes the export. This call sequence is more convenient than building a Facet and then exporting it. For example:
Autoref<Facet> myfacet = ow->makeNexusReader("my")
->addLabel("one", rt1)
->addFromLabel("two", lb2)
->setContext(new MyFnContext)
->setReverse()
->complete();
Only one nexus may be built like this at a time, since there is only one instance of NexusMaker per TrieadOwner that gets reused over and over. It keeps the Facet instance being built in it. If you don't complete the build, that Facet instance will be left sitting around until another makeNexus*() calls when it will get thrown away. But in general if you do the calling in the sequence as shown, you can't forget to call complete() at the end, since otherwise the return type would not match and the compiler will fail.
bool flushWriters();
Flush all the writer facets. Returns true on the successfull completion, false if the thread was requested to die, and thus all the output was thrown away.
bool nextXtray(bool wait = true, const struct timespec &abstime = *(const struct timespec *)NULL);
Read and process the next Xtray. Automatically calls the flushWriters() after processing. The rest works in the same way as described in Perl, however this is a combination of Perl's nextXtray(), nextXtrayNoWait() and nextXtrayTimeLimit(). If wait is false, this method will never wait. If wait is true and the abstime reference is not NULL, it might wait but not past that absolute time. Otherwise it will wait until the data becomes available of the thread is requested to die.
Returns true if an Xtray has been processed, false if it wasn't (for any reason, a timeout expiring or thread being requested to die).
bool nextXtrayNoWait();
A convenience wrapper over nextXtray(false).
bool nextXtrayTimeout(int64_t sec, int32_t nsec);
Another convenience wrapper over nextXtray(): read and process an Xtray, with a timeout limit. The timeout value consists of the seconds and nanoseconds parts.
void mainLoop();
Run the main loop, calling nextXtray() repeatedly until the thread is requested to die.
bool isRqDrain();
Check if a drain is currently requested by any thread (and applies to this thread). In case if an exclusive drain is requested with the exclusion of this thread, this method will return false.
void requestDrainShared();
void requestDrainExclusive();
void waitDrain();
bool isDrained();
void drainShared();
void drainExclusive();
void undrain();
The drain control, same as in Perl. These methods are really wrappers over the corresponding App methods. And generally a better idea is to do the scoped drains with AutoDrain rather than to call these methods directly.
The C++ API provides no methods for the file descriptor tracking as such. Instead these methods are implemented in the class FileInterrupt. TrieadOwner has a public field
Autoref<FileInterrupt> fileInterrupt_;
to keep an instance of the interruptor. TrieadOwner itself has no use for it, nor does any other part of Triceps, it's just a location to keep this reference for the convenience of the application developer. The Perl API makes use of this location.
But how does the thread then get interrupted when it's requested to die? The answer is that the TrieadJoin object also has a reference to the FileInterrupt object, and even creates that object in its own constructor. So when a joiner method is defined for a thread, that supplies the App with the access to its interruptor as well. And to put the file descriptors into the FileInterrupt object, you can either keep a direct reference to it somewhere in your code, or copy that reference into the TrieadOwner object.
Here is for example how the Perl thread joiner is created for the TrieadOwner inside the Perl implementation:
string tn(tname);
Autoref<TrieadOwner> to = appv->makeTriead(tn, fragname);
PerlTrieadJoin *tj = new PerlTrieadJoin(appv->getName(), tname,
SvIOK(tid)? SvIV(tid): -1,
SvIOK(handle)? SvIV(handle): 0,
testfail);
to->fileInterrupt_ = tj->fileInterrupt();
appv->defineJoin(tn, tj);
This is somewhat cumbersome, but the native C++ programs can create their threads using the class BasicPthread that takes care of this and more.
TrieadOwner is an Starget, and must be accessed only from the thread that owns it (though it's possible to create it in the parent thread and then pass to the actual owner thread, as long as the synchronization between the threads is done properly).
Triead *get() const;
Get the public side if the Triead. In C++, unlike Perl, the Triead methods are not duplicated in TrieadOwner. So they are accessed through get(), and for example to get the Triead name, call to->get()->getName(). The TrieadOwner holds a reference to Triead, so the Triead object will never get destroyed as long as the TrieadOwner is alive.
App *app() const;
Get the App where this Triead belongs. The TrieadOwner holds a reference to App, so the App object will never get destroyed as long as the TrieadOwner is alive.
bool isRqDead() const;
Check whether this triead has been requested to die.
void requestMyselfDead();
Request this thread itself to die (see the reasoning for the in the Perl reference).
Unit *unit() const;
Get the main unit of this Triead. It has the same name as the Triead itself.
void addUnit(Autoref<Unit> u);
Keep track of an additional unit. Adding a unit multiple times has no effect. See the other implications in the Perl reference.
bool forgetUnit(Unit *u);
Forget about an additional unit. If the unit is already unknown, has no effect. The main unit can not be forgotten.
typedef list<Autoref<Unit> > UnitList;
const UnitList &listUnits() const;
List the tracked units. The main unit is always included at the front of the list.
void markConstructed();
Advance the Triead state to Constructed. Repeated calls have no effect.
void markReady();
Advance the Triead state to Ready. Repeated calls have no effect. The advancement is cumulative: if the Triead was not constructed yet, it will be automatically advanced first to Constructed and then to Ready. If this is the last Triead to become ready, it will trigger the App topology check, and if the check fails, abort the App and throw an Exception.
void readyReady();
Advance the Triead to the Ready state, and wait for all the Trieads to become ready. The topology check applies in the same way as in markReady().
void markDead();
Advance the Triead to the Dead state. If this Triead was not Ready yet and it's the last one to become so, the topology check will run. If the topology check fails, the App will be aborted but markDead() will not throw an exception.
This method is automatically called from the TrieadOwner destructor, so most of the time there is no need to call it explicitly.
It also clears all the tracked units.
void abort(const string &msg) const;
Abort the App. The name of this thread will be forwarded to the App along with the error message. The error message may be multi-line.
Onceref<Triead> findTriead(const string &tname, bool immed = false);
Find a Triead in the App by name. The flag immed controls whether this method may wait. If immed is false, and the target thread is not constructed yet (but at least declared), the method will sleep until it becomes constructed, and then returns it. If the target thread is not declared, it will throw an Exception. If immed is true, the look-up is immediate: it will return the thread even if it's not constructed but is at least defined. If it's not defined (even if it's declared), an Exception will be thrown. The look-up of this Triead itself is always immediate, irrespective of the immed flag.
An Exception may also be thrown if a circular sequence of Trieads deadlocks waiting for each other.
Onceref<Facet> exportNexus(Autoref<Facet> facet, bool import = true);
Export a Nexus. The Nexus definition is constructed as a Facet object, which is then used by this method to construct and export the Nexus. The same argument Facet reference is then returned back as the result. The import flag tells, whether the Nexus is to be also imported back by connecting the same original Facet object to it. If import is false, the original Facet reference is still returned back but it can't be used for anything, and can only be thrown away. The direction of the import (reading or writing) is defined in the Facet, as well as the nexus name and all the other information.
Throws an Exception on any errors, in particular on the duplicate facet names within the Triead.
Onceref<Facet> exportNexusNoImport(Autoref<Facet> facet);
A convenience wrapper around exportNexus() with import=false.
Onceref<Facet> importNexus(const string &tname, const string &nexname, const string &asname, bool writer, bool immed = false);
Import a Nexus from another Triead. tname is the name of the other thread, nexname is the name of nexus exported from it, asname is the local name for the imported facet ("" means "same as nexname"), the writer flag determines if the import is for writing, and the immed flag has the same meaning as in findTriead(). The import of a nexus involves finding its exporting thread, and the immed flag controls, how this finding is done.
Throws an Exception if anything is not found, or the local import name conflicts with another imported facet.
Onceref<Facet> importNexusImmed(const string &tname, const string &nexname, const string &asname, bool writer);
Onceref<Facet> importReader(const string &tname, const string &nexname, const string &asname = "", bool immed=false);
Onceref<Facet> importWriter(const string &tname, const string &nexname, const string &asname = "", bool immed=false);
Onceref<Facet> importReaderImmed(const string &tname, const string &nexname, const string &asname = "");
Onceref<Facet> importWriterImmed(const string &tname, const string &nexname, const string &asname = "");
Convenience wrappers for importNexus(), providing the default arguments and the more mnemonic names.
typedef map<string, Autoref<Nexus> > NexusMap;
void exports(NexusMap &ret) const;
Get the nexuses exported here. The map argument will be cleared and refilled with the new values.
typedef map<string, Autoref<Facet> > FacetMap;
void imports(FacetMap &ret) const;
Get the facets imported here. The map argument will be cleared and refilled with the new values.
NexusMaker *makeNexusReader(const string &name);
NexusMaker *makeNexusWriter(const string &name);
NexusMaker *makeNexusNoImport(const string &name);
A convenient way to build the nexuses for export in a chained fashion. The name argument is the nexus name. The NexusMaker is an opaque class that has the same building methods as a Facet, plus the method complete() that finishes the export. This call sequence is more convenient than building a Facet and then exporting it. For example:
Autoref<Facet> myfacet = ow->makeNexusReader("my")
->addLabel("one", rt1)
->addFromLabel("two", lb2)
->setContext(new MyFnContext)
->setReverse()
->complete();
Only one nexus may be built like this at a time, since there is only one instance of NexusMaker per TrieadOwner that gets reused over and over. It keeps the Facet instance being built in it. If you don't complete the build, that Facet instance will be left sitting around until another makeNexus*() calls when it will get thrown away. But in general if you do the calling in the sequence as shown, you can't forget to call complete() at the end, since otherwise the return type would not match and the compiler will fail.
bool flushWriters();
Flush all the writer facets. Returns true on the successfull completion, false if the thread was requested to die, and thus all the output was thrown away.
bool nextXtray(bool wait = true, const struct timespec &abstime = *(const struct timespec *)NULL);
Read and process the next Xtray. Automatically calls the flushWriters() after processing. The rest works in the same way as described in Perl, however this is a combination of Perl's nextXtray(), nextXtrayNoWait() and nextXtrayTimeLimit(). If wait is false, this method will never wait. If wait is true and the abstime reference is not NULL, it might wait but not past that absolute time. Otherwise it will wait until the data becomes available of the thread is requested to die.
Returns true if an Xtray has been processed, false if it wasn't (for any reason, a timeout expiring or thread being requested to die).
bool nextXtrayNoWait();
A convenience wrapper over nextXtray(false).
bool nextXtrayTimeout(int64_t sec, int32_t nsec);
Another convenience wrapper over nextXtray(): read and process an Xtray, with a timeout limit. The timeout value consists of the seconds and nanoseconds parts.
void mainLoop();
Run the main loop, calling nextXtray() repeatedly until the thread is requested to die.
bool isRqDrain();
Check if a drain is currently requested by any thread (and applies to this thread). In case if an exclusive drain is requested with the exclusion of this thread, this method will return false.
void requestDrainShared();
void requestDrainExclusive();
void waitDrain();
bool isDrained();
void drainShared();
void drainExclusive();
void undrain();
The drain control, same as in Perl. These methods are really wrappers over the corresponding App methods. And generally a better idea is to do the scoped drains with AutoDrain rather than to call these methods directly.
The C++ API provides no methods for the file descriptor tracking as such. Instead these methods are implemented in the class FileInterrupt. TrieadOwner has a public field
Autoref<FileInterrupt> fileInterrupt_;
to keep an instance of the interruptor. TrieadOwner itself has no use for it, nor does any other part of Triceps, it's just a location to keep this reference for the convenience of the application developer. The Perl API makes use of this location.
But how does the thread then get interrupted when it's requested to die? The answer is that the TrieadJoin object also has a reference to the FileInterrupt object, and even creates that object in its own constructor. So when a joiner method is defined for a thread, that supplies the App with the access to its interruptor as well. And to put the file descriptors into the FileInterrupt object, you can either keep a direct reference to it somewhere in your code, or copy that reference into the TrieadOwner object.
Here is for example how the Perl thread joiner is created for the TrieadOwner inside the Perl implementation:
string tn(tname);
Autoref<TrieadOwner> to = appv->makeTriead(tn, fragname);
PerlTrieadJoin *tj = new PerlTrieadJoin(appv->getName(), tname,
SvIOK(tid)? SvIV(tid): -1,
SvIOK(handle)? SvIV(handle): 0,
testfail);
to->fileInterrupt_ = tj->fileInterrupt();
appv->defineJoin(tn, tj);
This is somewhat cumbersome, but the native C++ programs can create their threads using the class BasicPthread that takes care of this and more.
Tuesday, June 25, 2013
TrieadOwner reference, Perl, part 2
I've described it before but forgot to mention in the part 1 of the reference, the mark* methods bring the Triead to the target state through all the intermediate states. So if a Triead was not constructed, markReady() will mark it first constructed and then ready.
The same applies to markDead() but with an even more interesting twist. Suppose there is an application with an incorrect topology, and all the Trieads in it but one are ready. That last Triead then experiences an error, and proceeds directly to call markDead() and then exit. This markDead() will involve an intermediate step marking the Triead as ready. Since it's the last Triead to be ready, it will trigger the topology check, and since the topology is incorrect, it will fail. If it happened in markReady(), the method would confess. But in markDead() confessing doesn't make a whole lot of sense: after all, the thread is about to exit anyway. So markDead() will catch all these confessions and throw them away, it will never fail. However the failed check will still abort the App, and the rest of the threads will wake up and fail as usual.
The other thing about markDead() is that it clears and unreferences all the TrieadOwner's registered units, not waiting for the TrieadOwner object to be destroyed. This unravels any potential cyclic references where the code in a label might be referring back to the TrieadOwner.
And now continuing with more methods.
$facet = $to-> makeNexus(@options);
Create, export and (optionally) import a nexus. The result is an imported Facet of this Nexus, except when the options specify the no-import mode (then the result will be undef). Confesses on errors.
The options are:
name => $name
Name of the nexus, it will be used both as the export name and the local imported "as-name" of the facet.
labels => [
name => $rowType,
name => $fromLabel,
]
Defines the labels similarly to FnReturn in a referenced array. The array contains the pairs of (label_name, label_definition). The definition may be either a RowType, and then a label of this row type will be created, or a Label, and then a label of the same row type will be created and chained from that original label. The created label objects can be later found from Facets, and used like normal labels, by chaining them or sending rowops to them (but chaining from them is probably not the best idea, although it works anyway).
Optional, or may be an empty array; the implicit labels _BEGIN_ and _END_ will allways be added automatically if not explicitly defined.
The labels are used to construct an implicit FnReturn in the current Triead's main unit, and this is the FnReturn that will be visible in the Facet that gets imported back. If the import mode is "none", the FnReturn will still be constructed and then abandoned (and freed by the reference count going to 0, as usual). The labels used as $fromLabel must always belong to the Triead's main unit.
rowTypes => [
name => $rowType,
]
Defines the row types exported in this Nexus as a referenced array of name-value pairs. The types imported back into this Triead's facet will be references to the exact same type objects. Optional, or may be empty.
tableTypes => [
name => $tableType,
]
Defines the table types exported in this Nexus as a referenced array of name-value pairs. The types imported back into this Triead's facet will be references to the exact same type objects. Optional, or may be empty.
reverse => 0/1
Flag: this Nexus goes in the reverse direction. The reverse nexuses are used to break up the topological loops, to prevent the deadlocks on the queueing. They have no limit on the queue size, and the data is read from them at a higher priority than from the direct nexuses. Default: 0.
chainFront => 0/1
Flag: when the labels are specified as $fromLabel, chain them at the front of the original labels. Default: 1. The default is this way because chaining at the front is what is typically needed. The reasons are described at length in the pipelined example, but the short gist is that you might want to send the rows from both the inputs, intermediate points, and the end of processing into an output nexus. It's most convenient to create the nexus in one go, after the whole thread's computation is defined. But the rowops from the earlier stages of computations have to come to the nexus before the rowops from the later stage. Chaining at the front ensures that each such label will send the rowop into the nexus first, and only then to the next stage of the computation.
queueLimit => $number
Defines the size limit after which the writes to the queue of this Nexus block. In reality because of the double-buffering the queue may contain up to twice that many trays before the future writes block. This option has no effect on the reverse nexuses. Default: &Facet::DEFAULT_QUEUE_LIMIT, 500 or so.
import => $importType
A string value, essentially an enum, determining how this Nexus gets immediately imported back into this Triead. The supported values are:
$facet = $to->importNexus(@options);
Import a nexus into this Triead. Returns the imported Facet. The repeated attempts to import the same Nexus will return references to the same Facet object. Confesses on errors. An attempt to import the same nexus for both reading and writing is an error.
The options are:
from => "$t/$n"
Identifier of the nexus to import, consisting of two parts separated by a slash:
fromTriead => $t
fromNexus => $n
The alternative way to specify the source thread and nexus as separate options. Both options must be present or absent at the same time. Mutually exclusive with "from".
as => $name
Specifies an override name for the local facet (and thus also to the FnReturn created in the facet). Logically similar to the SQL clause AS. Default is to reuse the nexus name.
import => $importType
A string value, essentially an enum, determining how this Nexus gets imported. The supported values are the same as for makeNexus (except "none", since there is no point in a no-op import):
immed => 0/1
Flag: do not wait for the thread that exported the nexus to be fully constructed. Waiting synchronizes with the exporter and prevents a race of an import attempt trying to find a nexus before it is made and failing. However if two threads are waiting for each other, it becomes a deadlock that gets caught and aborts the App. The immediate import allows to avoid such deadlocks for the circular topologies with helper threads.
The helper threads are the "blind alleys" in the topology: the "main thread" outsources some computation to a "helper thread", sending it the arguments, then later receiving the results and continuing with its logic.
For example, consider the following sequence:
The same applies to markDead() but with an even more interesting twist. Suppose there is an application with an incorrect topology, and all the Trieads in it but one are ready. That last Triead then experiences an error, and proceeds directly to call markDead() and then exit. This markDead() will involve an intermediate step marking the Triead as ready. Since it's the last Triead to be ready, it will trigger the topology check, and since the topology is incorrect, it will fail. If it happened in markReady(), the method would confess. But in markDead() confessing doesn't make a whole lot of sense: after all, the thread is about to exit anyway. So markDead() will catch all these confessions and throw them away, it will never fail. However the failed check will still abort the App, and the rest of the threads will wake up and fail as usual.
The other thing about markDead() is that it clears and unreferences all the TrieadOwner's registered units, not waiting for the TrieadOwner object to be destroyed. This unravels any potential cyclic references where the code in a label might be referring back to the TrieadOwner.
And now continuing with more methods.
$facet = $to-> makeNexus(@options);
Create, export and (optionally) import a nexus. The result is an imported Facet of this Nexus, except when the options specify the no-import mode (then the result will be undef). Confesses on errors.
The options are:
name => $name
Name of the nexus, it will be used both as the export name and the local imported "as-name" of the facet.
labels => [
name => $rowType,
name => $fromLabel,
]
Defines the labels similarly to FnReturn in a referenced array. The array contains the pairs of (label_name, label_definition). The definition may be either a RowType, and then a label of this row type will be created, or a Label, and then a label of the same row type will be created and chained from that original label. The created label objects can be later found from Facets, and used like normal labels, by chaining them or sending rowops to them (but chaining from them is probably not the best idea, although it works anyway).
Optional, or may be an empty array; the implicit labels _BEGIN_ and _END_ will allways be added automatically if not explicitly defined.
The labels are used to construct an implicit FnReturn in the current Triead's main unit, and this is the FnReturn that will be visible in the Facet that gets imported back. If the import mode is "none", the FnReturn will still be constructed and then abandoned (and freed by the reference count going to 0, as usual). The labels used as $fromLabel must always belong to the Triead's main unit.
rowTypes => [
name => $rowType,
]
Defines the row types exported in this Nexus as a referenced array of name-value pairs. The types imported back into this Triead's facet will be references to the exact same type objects. Optional, or may be empty.
tableTypes => [
name => $tableType,
]
Defines the table types exported in this Nexus as a referenced array of name-value pairs. The types imported back into this Triead's facet will be references to the exact same type objects. Optional, or may be empty.
reverse => 0/1
Flag: this Nexus goes in the reverse direction. The reverse nexuses are used to break up the topological loops, to prevent the deadlocks on the queueing. They have no limit on the queue size, and the data is read from them at a higher priority than from the direct nexuses. Default: 0.
chainFront => 0/1
Flag: when the labels are specified as $fromLabel, chain them at the front of the original labels. Default: 1. The default is this way because chaining at the front is what is typically needed. The reasons are described at length in the pipelined example, but the short gist is that you might want to send the rows from both the inputs, intermediate points, and the end of processing into an output nexus. It's most convenient to create the nexus in one go, after the whole thread's computation is defined. But the rowops from the earlier stages of computations have to come to the nexus before the rowops from the later stage. Chaining at the front ensures that each such label will send the rowop into the nexus first, and only then to the next stage of the computation.
queueLimit => $number
Defines the size limit after which the writes to the queue of this Nexus block. In reality because of the double-buffering the queue may contain up to twice that many trays before the future writes block. This option has no effect on the reverse nexuses. Default: &Facet::DEFAULT_QUEUE_LIMIT, 500 or so.
import => $importType
A string value, essentially an enum, determining how this Nexus gets immediately imported back into this Triead. The supported values are:
- reader (or anything starting from "read") - import for reading
- writer (or anything starting from "write") - import for writing
- none (or anything starting from "no") - do not import
$facet = $to->importNexus(@options);
Import a nexus into this Triead. Returns the imported Facet. The repeated attempts to import the same Nexus will return references to the same Facet object. Confesses on errors. An attempt to import the same nexus for both reading and writing is an error.
The options are:
from => "$t/$n"
Identifier of the nexus to import, consisting of two parts separated by a slash:
- thread name
- nexus name
fromTriead => $t
fromNexus => $n
The alternative way to specify the source thread and nexus as separate options. Both options must be present or absent at the same time. Mutually exclusive with "from".
as => $name
Specifies an override name for the local facet (and thus also to the FnReturn created in the facet). Logically similar to the SQL clause AS. Default is to reuse the nexus name.
import => $importType
A string value, essentially an enum, determining how this Nexus gets imported. The supported values are the same as for makeNexus (except "none", since there is no point in a no-op import):
- reader (or anything starting from "read") - import for reading
- writer (or anything starting from "write") - import for writing
immed => 0/1
Flag: do not wait for the thread that exported the nexus to be fully constructed. Waiting synchronizes with the exporter and prevents a race of an import attempt trying to find a nexus before it is made and failing. However if two threads are waiting for each other, it becomes a deadlock that gets caught and aborts the App. The immediate import allows to avoid such deadlocks for the circular topologies with helper threads.
The helper threads are the "blind alleys" in the topology: the "main thread" outsources some computation to a "helper thread", sending it the arguments, then later receiving the results and continuing with its logic.
For example, consider the following sequence:
- thread A creates the nexus O;
- thread A creates the helper thread B and tells it to import the nexus A/O for its input immediately and create the reverse nexus R for result;
- thread A requests a (normal) import of the nexus B/R and falls asleep because B is not constructed yet;
- thread B starts running;
- thread B imports the nexus A/O immediately and succeeds;
- thread B defines its result nexus R;
- thread B marks itself as constructed and ready;
- thread A wakes up after B is constructed, finds the nexus B/R and completes its import;
- then thread A can complete its initialization, export other nexuses etc.
TrieadOwner reference, Perl, part 1
TrieadOwner is the thread's private interface used to control its state and interact with the App (the App uses the thread's identity to detect the deadlocks). Whenever a Triead is constructed, its OS/Perl thread receives the TrieadOwner object for it.
Normally the TrieadOwner object is constructed inside Triead::start() or Triead::startHere() and passed to the thread's main function. The following constructor is used inside start(), and it's pretty much a private method. The only reason to use it would be if you want to do something very unusual, and even then you probably should write a wrapper method for your unusual thing and then call that wrapper method. The constructor constructs both Triead and TrieadOwner as a two sides of the same item, and registers the thread with the App.
$to = Triceps::TrieadOwner::new($tid, $handle, $appOrName, $tname, $fragname);
Here $tid is the Perl thread id where this TrieadOwner belongs (it can be obtained with $thr->tid()). $handle is the Perl thread's low-level handle (as in $thr->handle_()), it's the underlying POSIX thread handle, used to interrupt the thread on shutdown (the long story is that in the Perl threads the kill() call doesn't actually send a signal to another thread but just sends a flag, to interrupt a sleeping system call a real signal has to be delivered through the POSIX API). $handle is a dangerous argument, and passing a wrong value there may cause a crash.
Both $tid and $handle may be undef. If $tid is undef, the thread won't be joined by the harvester and you can either detach it or join it yourself. If either $tid or $handle is undef, the thread won't be interrupted on shutdown.
The signal used for interruption is SIGUSR2.Triceps sets its default handler that does nothing on this signal, but you can define your own handler instead.
$appOrName is the App object or its name that would be automatically looked up (or will confess if not found). $tname is the name of the previously created thread, that must be unique within the App (though it might be declared before). $fragname is the name of the fragment where the thread belongs, use "" for no fragment.
$app = $to->app();
Get the App where this Triead belongs.
$unit = $to->unit();
Whenever a Triead is constructed, a Unit is automatically created to execute its logic. This call returns that unit. When the Triead is destroyed, the unit will be cleaned and unreferenced.
The unit is named the same as the thread.
$to->addUnit($moreUnit);
It's possible to split the Triead's logic into multiple units, all running in the same Perl thread. This call puts an extra unit under Triead's control, and has two effects: First, the unit will be referenced for the life of the Triead, and cleaned and unreferenced when the Triead is destroyed. Second, when the Triead's main loop runs, after each incoming rowop it will check all the controlled units for any rowops scheduled in them, and will run them until all such rowops are processed.
The names of the units are not checked in any way, it's your responsibility to name them sensibly and probably differently from each other.
The repeated calls with the same unit will have no effect.
$to->forgetUnit($moreUnit);
Pull a unit out of Triead's control. After that the cleaning of the unit becomes your responsibility. The thread's main unit can not be forgotten, the attempts to forget it will be simply ignored. The same goes for the units that aren't under the Triead's control in the first place, these calls are ignored.
@units = $to->listUnits();
Get the list of units under Triead's control. The main unit (the same as returned with $to->unit()) will always be the first in the list. The list contains only the unit references, not the name-value pairs (and you can always get the names from the unit objects themselves).
$triead = $to->get();
Get the public API of this Triead.
$name = $to->getName();
Get this Triead's name.
$frag = $to->fragment();
Get the name of this Triead's fragment ("" if not in a fragment).
$to->markConstructed();
Advance the Triead to the Constructed state. After that point no more nexuses may be exported in the Triead. Any look-ups by other Trieads for the Nexuses of this Triead will proceed at this point, either succeeding or failing (if no requested nexus is exported).
If the Triead is already in the Constructed or later state, this call has no effect.
$to->markReady();
Advance the Triead to the Ready (fully initialized) state. After that point no more nexuses may be imported into this Triead.
If the App has been already shut down, this Triead will be immediately requested to die.
If this is the last Triead to become ready, this method will invoke the check for the topological correctness of the App. If the check finds an error (a loop of nexuses of the same direction), it will abort the App and confess with a message describing the nature of the error.
If the Triead is already in the Ready or later state, this call has no effect.
$to->readyReady();
Mark this Triead as Ready and wait for all the App's Trieads to become Ready. There is no method that just waits for readiness because that would be likely causing a deadlock. When the thread waits for readiness, it must be ready itself, so this call does both. All the error checks of markReady() apply.
It is possible and reasonable to call this method repeatedly: more Trieads may be added to the App later, and it's a good idea to call readyReady() again before communicating with these new threads. Otherwise any rowops sent before these threads become ready will never arrive to these threads.
$to->markDead();
Mark this Triead as Dead. A dead thread will not receive any more input, and any its output will be thrown away. This notifies the harvester that it needs to join the Perl thread, so there should not be too much time left between making this call and exiting the Perl thread. The repeated calls have no effect.
Normally the Triead::start() and startHere() call markDead() automatically in their wrapper logic, and there is no need for a manual call. However if you decide to bypass them, you must call markDead() manually before exiting the thread, or the harvester will be stuck forever waiting for this thread to exit.
$to->abort($msg);
Abort the App with a message. This is a convenience wrapper that translates to App::abortBy().
$result = $to->isRqDead();
Check whether the thread was requested to die. For most threads, mainLoop() does this check automatically, and nextXtray() also returns the same value. However in the special cases, such as doing some long processing in response to a rowop, or doing some timeouts, it's best to do a manual check of isRqDead() periodically and abort the long operation if the thread has been requested to die, since any output will be thrown away anyway.
Note that even when the Triead has been requested to die, it still must call markDead() when it actually dies (normally the Triead::start() or startHere() takes care of it in its wrapper).
$result = $to->isConstructed();
$result = $to->isReady();
$result = $to->isDead();
$result = $to->isInputOnly();
Check the state of the Triead, the same as Triead methods.
Normally the TrieadOwner object is constructed inside Triead::start() or Triead::startHere() and passed to the thread's main function. The following constructor is used inside start(), and it's pretty much a private method. The only reason to use it would be if you want to do something very unusual, and even then you probably should write a wrapper method for your unusual thing and then call that wrapper method. The constructor constructs both Triead and TrieadOwner as a two sides of the same item, and registers the thread with the App.
$to = Triceps::TrieadOwner::new($tid, $handle, $appOrName, $tname, $fragname);
Here $tid is the Perl thread id where this TrieadOwner belongs (it can be obtained with $thr->tid()). $handle is the Perl thread's low-level handle (as in $thr->handle_()), it's the underlying POSIX thread handle, used to interrupt the thread on shutdown (the long story is that in the Perl threads the kill() call doesn't actually send a signal to another thread but just sends a flag, to interrupt a sleeping system call a real signal has to be delivered through the POSIX API). $handle is a dangerous argument, and passing a wrong value there may cause a crash.
Both $tid and $handle may be undef. If $tid is undef, the thread won't be joined by the harvester and you can either detach it or join it yourself. If either $tid or $handle is undef, the thread won't be interrupted on shutdown.
The signal used for interruption is SIGUSR2.Triceps sets its default handler that does nothing on this signal, but you can define your own handler instead.
$appOrName is the App object or its name that would be automatically looked up (or will confess if not found). $tname is the name of the previously created thread, that must be unique within the App (though it might be declared before). $fragname is the name of the fragment where the thread belongs, use "" for no fragment.
$app = $to->app();
Get the App where this Triead belongs.
$unit = $to->unit();
Whenever a Triead is constructed, a Unit is automatically created to execute its logic. This call returns that unit. When the Triead is destroyed, the unit will be cleaned and unreferenced.
The unit is named the same as the thread.
$to->addUnit($moreUnit);
It's possible to split the Triead's logic into multiple units, all running in the same Perl thread. This call puts an extra unit under Triead's control, and has two effects: First, the unit will be referenced for the life of the Triead, and cleaned and unreferenced when the Triead is destroyed. Second, when the Triead's main loop runs, after each incoming rowop it will check all the controlled units for any rowops scheduled in them, and will run them until all such rowops are processed.
The names of the units are not checked in any way, it's your responsibility to name them sensibly and probably differently from each other.
The repeated calls with the same unit will have no effect.
$to->forgetUnit($moreUnit);
Pull a unit out of Triead's control. After that the cleaning of the unit becomes your responsibility. The thread's main unit can not be forgotten, the attempts to forget it will be simply ignored. The same goes for the units that aren't under the Triead's control in the first place, these calls are ignored.
@units = $to->listUnits();
Get the list of units under Triead's control. The main unit (the same as returned with $to->unit()) will always be the first in the list. The list contains only the unit references, not the name-value pairs (and you can always get the names from the unit objects themselves).
$triead = $to->get();
Get the public API of this Triead.
$name = $to->getName();
Get this Triead's name.
$frag = $to->fragment();
Get the name of this Triead's fragment ("" if not in a fragment).
$to->markConstructed();
Advance the Triead to the Constructed state. After that point no more nexuses may be exported in the Triead. Any look-ups by other Trieads for the Nexuses of this Triead will proceed at this point, either succeeding or failing (if no requested nexus is exported).
If the Triead is already in the Constructed or later state, this call has no effect.
$to->markReady();
Advance the Triead to the Ready (fully initialized) state. After that point no more nexuses may be imported into this Triead.
If the App has been already shut down, this Triead will be immediately requested to die.
If this is the last Triead to become ready, this method will invoke the check for the topological correctness of the App. If the check finds an error (a loop of nexuses of the same direction), it will abort the App and confess with a message describing the nature of the error.
If the Triead is already in the Ready or later state, this call has no effect.
$to->readyReady();
Mark this Triead as Ready and wait for all the App's Trieads to become Ready. There is no method that just waits for readiness because that would be likely causing a deadlock. When the thread waits for readiness, it must be ready itself, so this call does both. All the error checks of markReady() apply.
It is possible and reasonable to call this method repeatedly: more Trieads may be added to the App later, and it's a good idea to call readyReady() again before communicating with these new threads. Otherwise any rowops sent before these threads become ready will never arrive to these threads.
$to->markDead();
Mark this Triead as Dead. A dead thread will not receive any more input, and any its output will be thrown away. This notifies the harvester that it needs to join the Perl thread, so there should not be too much time left between making this call and exiting the Perl thread. The repeated calls have no effect.
Normally the Triead::start() and startHere() call markDead() automatically in their wrapper logic, and there is no need for a manual call. However if you decide to bypass them, you must call markDead() manually before exiting the thread, or the harvester will be stuck forever waiting for this thread to exit.
$to->abort($msg);
Abort the App with a message. This is a convenience wrapper that translates to App::abortBy().
$result = $to->isRqDead();
Check whether the thread was requested to die. For most threads, mainLoop() does this check automatically, and nextXtray() also returns the same value. However in the special cases, such as doing some long processing in response to a rowop, or doing some timeouts, it's best to do a manual check of isRqDead() periodically and abort the long operation if the thread has been requested to die, since any output will be thrown away anyway.
Note that even when the Triead has been requested to die, it still must call markDead() when it actually dies (normally the Triead::start() or startHere() takes care of it in its wrapper).
$result = $to->isConstructed();
$result = $to->isReady();
$result = $to->isDead();
$result = $to->isInputOnly();
Check the state of the Triead, the same as Triead methods.
Friday, June 21, 2013
Triead reference, C++
Naturally, Triead is a class that can be referenced from multiple threads and inherits from Mtarget. It's defined in app/Triead.h. (By the way, I forgot to mention it for App, but obviously App is also an Mtarget).
The meaning of the C++ methods is exactly the same as in Perl, only the format of values is slightly different. Obviously, the start* methods are Perl-only.
const string &getName() const;
const string &fragment() const;
bool isConstructed() const;
bool isReady() const;
bool isDead() const;
bool isInputOnly() const;
typedef map<string, Autoref<Nexus> > NexusMap;
void exports(NexusMap &ret) const;
void imports(NexusMap &ret) const;
void readerImports(NexusMap &ret) const;
void writerImports(NexusMap &ret) const;
The argument map gets cleared and then filled with the new returned contents.
Onceref<Nexus> findNexus(const string &srcName, const string &appName, const string &name) const;
This is a method unique to the C++ API. It looks up an exported nexus by name without involving the overhead of getting the whole map. Here name is the name of the nexus to look up (its short part, without the thread's name in it). If there is no such nexus, an Exception will be thrown.
The srcName and appName are used for the error message in the Exception: srcName is the name of the thread that requested the look-up, and appName is the name of the App where the threads belong. (It might seem surprising, but a Triead object has no reference to its App, and doesn't know the App's name. It has to do with the avoidance of the circular references).
The meaning of the C++ methods is exactly the same as in Perl, only the format of values is slightly different. Obviously, the start* methods are Perl-only.
const string &getName() const;
const string &fragment() const;
bool isConstructed() const;
bool isReady() const;
bool isDead() const;
bool isInputOnly() const;
typedef map<string, Autoref<Nexus> > NexusMap;
void exports(NexusMap &ret) const;
void imports(NexusMap &ret) const;
void readerImports(NexusMap &ret) const;
void writerImports(NexusMap &ret) const;
The argument map gets cleared and then filled with the new returned contents.
Onceref<Nexus> findNexus(const string &srcName, const string &appName, const string &name) const;
This is a method unique to the C++ API. It looks up an exported nexus by name without involving the overhead of getting the whole map. Here name is the name of the nexus to look up (its short part, without the thread's name in it). If there is no such nexus, an Exception will be thrown.
The srcName and appName are used for the error message in the Exception: srcName is the name of the thread that requested the look-up, and appName is the name of the App where the threads belong. (It might seem surprising, but a Triead object has no reference to its App, and doesn't know the App's name. It has to do with the avoidance of the circular references).
Triead reference, Perl
The Triead class is the public interface of a Triceps thread, i.e. what of it is visible to the other threads. It's intended pretty much for introspection only, and all its method are only for reading the state. They are all synchronized but of course the thread may change its state at any moment. The Triead objects can be obtained by calling App::getTrieads().
The class also contains some static methods that are used to construct the Trieads.
$result = $t->same($t2);
Check that two Triead references point to the same Triead object.
$name = $t->getName();
Get the Triead's name.
$fragment = $t->fragment();
Get the name of the Triead's fragment. If the Triead doesn't belong to a fragment, returns "".
$result = $t->isConstructed();
Check whether the Triead has been constructed. (For the explanation of the Triead lifecycle states, see http://babkin-cep.blogspot.com/2013/04/the-triead-lifecycle.html.
$result = $t->isReady();
Check whether the Triead is ready.
$result = $t->isDead();
Check whether the Triead is dead.
$result = $t->isInputOnly();
Check whether the Triead is input-only, that is has no reader nexuses imported into it. When the Triead is created, this flag starts its life as false (0 for Perl), and then its correct value is computed when the Triead becomes ready. So, to check this flag correctly, you must first check that the Triead is ready.
@nexuses = $t-> exports();
Get the list of nexuses exported from this Triead, as name-value pairs, suitable to be assigned into a hash. The values are the references to nexus objects.
@nexuses = $t->imports();
@nexuses = $t->readerImports();
@nexuses = $t->writerImports();
Get the list of nexuses imported into this Triead, as name-value pairs. The imports() returns the full list, without the ability to tell, which nexuses are imported for reading and which for writing, while readImports() and writeImports() return these subsets separately.
The names here are the "as-names" used for the import (the full names of the Nexuses can be obtained from the Nexus objects). The values are the references to nexus objects.
The next part of the API is the static construction methods. They really are wrappers to the TrieadOwner methods but Triead is a shorter name, and thus more convenient.
Triceps::Triead::start(@options);
Start a new Triead in a new Perl thread. The options are:
app => $appname
Name of the App that owns the new Triead. The App object will be looked up by name for the actual construction.
thread => $threadname
Name of the new Triead.
fragment => $fragname
Name of the new Triead's fragment. Default: "", which means no fragment.
immed => 0/1
Flag: when the new thread imports its nexuses, it should import them in the immediate mode. This flag is purely advisory, and the thread's main function is free to use or ignore it depending on its logic. It's provided as a convenience, since it's a typical concern for the helper threads. Default: 0.
main => \&function
The main function of the thread that will be called with all the options of start() plus some more:
&$func(@opts, owner => $ownerObj)
The extra option of the main are:
owner: the TrieadOwner object constructed for this thread
Also, any other options may be added, and they will be forwarded to the main function without parsing. The main function is then free to parse them by itself, and if it finds any unknown options, it will fail.
For the convenience of writing the main functions, the set of "standard" options is provided in the global valiable
@Triceps::Triead::opts
The main function then uses this variable as a preable for any of its own options, for example:
sub listenerT
{
my $opts = {};
&Triceps::Opt::parse("listenerT", $opts, {@Triceps::Triead::opts,
socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
}, @_);
...
}
Another convenience method is:
Triceps::Triead::startHere(@options);
It's very much the same as start() but starts the Triead in the current Perl thread. It's intended for the short-lived Apps that perform some computation and then have to return the result into the original thread. The unit tests are a good example of such apps.
And because of the typical usage, this method has some extra functionality compared to the plain start(): unless told otherwise, it will first create the App (with the name specified by the option "app"), then construct and run it, and after the main function of this thread exits, run the harvester and drop the App. So it just does the whole package, similar to App::build(), only typically the Triead started here runs as a normal real Triead, collects the results of computations and places them into some variables (global or referenced by the user-specific options of the main function).
This extra functionality can be disabled by the additional options (disable by setting them to 0):
harvest => 0/1
After the main function exits, automatically run the harvesrer. If you set it to 0, don't forget to call the harvester after this function returns. Default: 1.
makeApp => 0/1
Before doing anything, create the App. Obviously, this App must not exist yet. Default: 1.
These options are not passed through to the main function, unlike all the others.
The class also contains some static methods that are used to construct the Trieads.
$result = $t->same($t2);
Check that two Triead references point to the same Triead object.
$name = $t->getName();
Get the Triead's name.
$fragment = $t->fragment();
Get the name of the Triead's fragment. If the Triead doesn't belong to a fragment, returns "".
$result = $t->isConstructed();
Check whether the Triead has been constructed. (For the explanation of the Triead lifecycle states, see http://babkin-cep.blogspot.com/2013/04/the-triead-lifecycle.html.
$result = $t->isReady();
Check whether the Triead is ready.
$result = $t->isDead();
Check whether the Triead is dead.
$result = $t->isInputOnly();
Check whether the Triead is input-only, that is has no reader nexuses imported into it. When the Triead is created, this flag starts its life as false (0 for Perl), and then its correct value is computed when the Triead becomes ready. So, to check this flag correctly, you must first check that the Triead is ready.
@nexuses = $t-> exports();
Get the list of nexuses exported from this Triead, as name-value pairs, suitable to be assigned into a hash. The values are the references to nexus objects.
@nexuses = $t->imports();
@nexuses = $t->readerImports();
@nexuses = $t->writerImports();
Get the list of nexuses imported into this Triead, as name-value pairs. The imports() returns the full list, without the ability to tell, which nexuses are imported for reading and which for writing, while readImports() and writeImports() return these subsets separately.
The names here are the "as-names" used for the import (the full names of the Nexuses can be obtained from the Nexus objects). The values are the references to nexus objects.
The next part of the API is the static construction methods. They really are wrappers to the TrieadOwner methods but Triead is a shorter name, and thus more convenient.
Triceps::Triead::start(@options);
Start a new Triead in a new Perl thread. The options are:
app => $appname
Name of the App that owns the new Triead. The App object will be looked up by name for the actual construction.
thread => $threadname
Name of the new Triead.
fragment => $fragname
Name of the new Triead's fragment. Default: "", which means no fragment.
immed => 0/1
Flag: when the new thread imports its nexuses, it should import them in the immediate mode. This flag is purely advisory, and the thread's main function is free to use or ignore it depending on its logic. It's provided as a convenience, since it's a typical concern for the helper threads. Default: 0.
main => \&function
The main function of the thread that will be called with all the options of start() plus some more:
&$func(@opts, owner => $ownerObj)
The extra option of the main are:
owner: the TrieadOwner object constructed for this thread
Also, any other options may be added, and they will be forwarded to the main function without parsing. The main function is then free to parse them by itself, and if it finds any unknown options, it will fail.
For the convenience of writing the main functions, the set of "standard" options is provided in the global valiable
@Triceps::Triead::opts
The main function then uses this variable as a preable for any of its own options, for example:
sub listenerT
{
my $opts = {};
&Triceps::Opt::parse("listenerT", $opts, {@Triceps::Triead::opts,
socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
}, @_);
...
}
Another convenience method is:
Triceps::Triead::startHere(@options);
It's very much the same as start() but starts the Triead in the current Perl thread. It's intended for the short-lived Apps that perform some computation and then have to return the result into the original thread. The unit tests are a good example of such apps.
And because of the typical usage, this method has some extra functionality compared to the plain start(): unless told otherwise, it will first create the App (with the name specified by the option "app"), then construct and run it, and after the main function of this thread exits, run the harvester and drop the App. So it just does the whole package, similar to App::build(), only typically the Triead started here runs as a normal real Triead, collects the results of computations and places them into some variables (global or referenced by the user-specific options of the main function).
This extra functionality can be disabled by the additional options (disable by setting them to 0):
harvest => 0/1
After the main function exits, automatically run the harvesrer. If you set it to 0, don't forget to call the harvester after this function returns. Default: 1.
makeApp => 0/1
Before doing anything, create the App. Obviously, this App must not exist yet. Default: 1.
These options are not passed through to the main function, unlike all the others.
App reference, Perl, part 2
Triceps::App::setTimeout($appOrName, $main_to_sec);
Triceps::App::setTimeout($appOrName, $main_to_sec, $frag_to_sec);
Set the readiness timeout. Even though Triceps is quite eager in watching for deadlocks in the threads topology, it's still possible to get some threads, and with them the whole program, stuck during initialization. For example, if you declare a thread and then never define it. These situations ere very unpleasant because you start the program and expect it to work but it doesn't, without any indication to why. So Triceps imposes an initialization timeout. The App (and thus all its threads) must become ready within the timeout after the definition or declaration of the last Triead. If not, the App will be aborted (and the error message will tell, which thread did not initialize in time). The same applies to the creation of Trieads (usually in the fragments) after the App is already running: all the threads must become ready within the timeout since the last thread has been defined or declared.
The default timeout is 30 seconds, or symbolically
&Triceps::App::DEFAULT_TIMEOUT
(subject to possible changes of the exact value in the future).
But the timeout can be changed. Technically, there are two timeouts:
For the App to be aborted, both timeouts must expire. By default they are set to the same length, so only the second timeout really matters, since it will always be the last one to start and last one to end. But if you set the first timeout to be longer, you can allow for a longer initialization period at the App start ("main timeout") than later when more threads are added to a running App ("frag timeout", since the threads added later are typically the threads nin fragments).
The one-argument form of setTimeout() sets both timeouts to the same value, The two-argument form sets these timeouts separately.
The timeouts may be set only before the first thread has been created. This is largely due to the historical reasons of implementation, with the current implementation it should actually be safe to allow changing the timeouts at a later point as well, and this limitation may be removed in the future.
The timeout values are represented as integer whole seconds.
Note that it's still possible to get the initialization stuck without any indication if it gets stuck in the other libraries. The reason is that the harvester waits for all the threads to be joined before it propagates the error, so if a thread doesn't get joined, the harvester will be stuck. For example, if you open a file on NFS as a part of Triead initialization, and the NFS server doesn't respond, this thread will be stuck for a long time if not forever. The App will detect a timeout and try to interrupt the threads but the NFS operations are often not interruptable, so the harvester will wait for this thread to complete this operation and exit before it propagates the error, and thus the whole program will be silently stuck forever (and to avoid this, the NFS mounts should be done in the "soft" mode but it's a separate story). This will likely be improved in the future but it needs more thinking about how to do it right.
Triceps::App::setDeadline($appOrName, $deadline_sec);
Set the "main" timeout in the form of an absolute deadline. This is actually closer to the way it works internally: the limit is expressed as a deadline, and setTimeout() just adds the timeout value to the current time to compute the deadline, while setDeadline() sets it straight. Like setTimeout(), this may be called only before any Trieads were created.
The time is represented as floating-point seconds since epoch. Triceps::now() can be used to get the current time with the fractional seconds (or if you don't care about the fractional seconds, you can always use the stock time()).
Triceps::App::refreshDeadline($appOrName);
Restart the "fragment" timeout. Same as when a Triead is defined or declared, only without involving a Triead. Again, the name has to do with how the time computations are done internally, computing the deadline from both timeouts. This method can be called at any time.
The next few methods have to do with the drains. Generally, using the AutoDrain class to automatically limit the scope of the drains is a better idea. But if the automatic scoping is not desired, the App methods can be used directly.
Triceps::App::requestDrain($appOrName);
Request a shared drain. Does not wait for the drain to complete. This method may be called repeatedly, potentially from multiple threads, which will keep the drain active and increase the recursion count. The drain will be released when undrain() is called the matching number of times.
If an exclusive drain by another thread is active when requestDrain() is called, the call will be stuck until the exclusive drain becomes undrained (and potentially more exclusive drains might be queued up before the shared drain, using the POSIX read-write-lock implementation). Otherwise the call will set the appropriate state and return immediately.
If this thread has requested an exclusive drain previously (and didn't undrain it yet), an attempt to get a shared drain in the same thread will likely deadlock. The same applies in the opposite order as well.
Once the shared drain is requested, the input-only threads will be blocked from sending more data (any their attempts to flush their write facets will get stuck until undrain), and the rest of the threads will continue churning through the data buffered in the nexuses until all the nexuses are empty and there is no more data to process (yes, these threads will continue writing to their write facets).
It is important to not request a shared drain from an input-only thread and then try to write more data into an output facet, that would deadlock. The whole concept is doable, but an exclusive drain must be used instead ("exclusive" means that a designated input-only thread is excluded from blocking).
Triceps::App::waitDrain($appOrName);
Wait for the requested shared drain to complete. This means that all the queues in all the nexuses have become empty.
The effect of waitDrain() without a preceding requestDrain() is undefined. If you definitely know that some other thread has requested a drain and didn't undrain yet, it will work as normal, so you can have any number of threads wait for drain in parallel. (And the semantics, shared or exclusive, will match that currently active request). If no thread requested a drain, it might either return immediately irrespective of the state of the nexuses or might wait for some other thread to request a drain and succeed.
This call may also be used with an exclusive (or shared) drain requested through a TrieadOwner or any drain requested through an AutoDrain (though a better style is to use the same object as used for requesting the drain).
Triceps::App::drain($appOrName);
A combination of requestDrain() and waitDrain() in one call.
Triceps::App::undrain($appOrName);
Release the drain and let the normal processing continue. Make sure to call it exactly the same number of times as the requestDrain().
This call may also be used with a drain requested through a TrieadOwner (though a better style is to use the same object as used for requesting the drain).
$result = Triceps::App::isDrained($appOrName);
Check whether the App is currently drained (i.e. all the nexuses are empty), without waiting for it. Returns 1 if drained, 0 if not. If no drain is active during this call, the result is undefined, not even in the "best effort" sense: it may return 1 even if there are millions of records queued up. The only reliable way is to request a drain first.
This call may also be used with a drain requested through a TrieadOwner or through an AutoDrain.
The next part of the API deals with the passing of the file descriptors between the Perl threads. The concepts have been described in detail before, this is just a short reference. This API works under the hood of TrieadOwner::TrackGetSocket() and friends, those desribed in http://babkin-cep.blogspot.com/2013/04/multithreaded-socket-server-part-3.html and neighboring posts, but can also be used directly.
Triceps::App::storeFd($appOrName, $name, $fd);
Store a file descriptor in the App object, allowing to load it into other threads, and thus pass it around between the threads. $name is the name for this descriptor that will later be used to get the file descriptor back (generally, you want to generate a unique name for each file descriptor stored to avoid confusion, and then pass this name to the target thread). $fd is the file descriptor, an integer number, normally received from fileno(). The file descriptor is dupped before it gets stored, so the original will continue to exist, and if you have no other use for it, should be closed.
If a file descriptor with this name already exists in the App, this call will confess.
$fd = Triceps::App::loadFd($appOrName, $name);
Load back the file descriptor that was previously stored. If no descriptor with such a name is stored in the App, it will confess. The descriptor will keep existing in the App, so to keep things consistent, there are two options:
One is to let your code take over the ownership of the file descriptor, and tell the App to forget about it with forgetFd().
The other one is to never close the received descriptor in your code (a good example would be to dup it right away for the future use and then leave the original alone), and let App keep its ownership.
$fd = Triceps::App::loadDupFd($appOrName, $name);
Very much the same as loadFd(), only does the dupping for you and returns the dupped descriptor. In this case your code is responsible for closing that descriptor. Which method is more suitable, loadFd() or dupFd(), depends on the nature of the code that will use the file descriptor and whether you want to leave the descriptor in the App for more threads to load.
Triceps::App::forgetFd($appOrName, $name);
Forget the file descriptor in the App. It doesn't get closed, so closing it becomes your responsibility, or it will leak. If no descriptor with this name exists, the call will confess.
Triceps::App::closeFd($appOrName, $name);
Close and forget the file descriptor in the App. If no descriptor with this name exists, the call will confess.
Triceps::App::storeFile($appOrName, $name, FILE);
A convenience wrapper for closeFd(), calls fileno() on the file and stores the resulting file descriptor.
Triceps::App::storeCloseFile($appOrName, $name, FILE);
Stores the file descriptor extracted from the file, and closes the original file (since the file descriptor gets dupped on store, that copy continues to exist in the App).
$file = Triceps::App::loadDupFile($appOrName, $name, $mode);
Load a file descriptor and build a file handle object (IO::Handle) from it. The mode string may be specified in either the open() format (</>/>>/+</+>/+>>) or the C stdio format (r/w/a/r+/w+/a+). Note that the mode must match or be a subset of the mode used to originally open the file descriptor. If you open a file read-only, store its descriptor, and load back as a write-only file, you will have a bad time.
There is no corresponding loadFile(), since loadFd() is a more dangerous method that is useful for the low-level operations but doesn't make much sense for the higher level.
$file = Triceps::App::loadDupSocket($appOrName, $name, $mode);
Load a file descriptor and build an IO::Socket::INET object from it. The mode string meaning is the same as for loadDupFile().
$file = Triceps::App::loadDupFileClass($appOrName, $name, $mode, $class);
Load a file descriptor and build an arbitrary class object from it. The class (specified by its name) should normally be a subclass of IO::Handle, or at the very least must implement the method new_from_fd() similar to IO::Handle. This is the common underlying implementation of but loadDupFile() and loadDupSocket().
The last part of the API is the convenience methods for building and starting an App.
Triceps::App::build($name, \&builder );
Build an App instance. It creates the App instance, then the builder function is called to create the App's nexuses and threads, then the harvester is executed, that eventually destroys the App object after collecting all its threads. For a very basic example:
Triceps::App::build "a1", sub {
Triceps::App::globalNexus(
name => "types",
rowTypes => [
rt1 => $rt1,
],
);
Triceps::Triead::start(
app => $Triceps::App::name,
thread => "t1",
main => sub {
my $opts = {};
&Triceps::Opt::parse("t1 main", $opts, {@Triceps::Triead::opts}, @_);
my $to = $opts->{owner};
$to->importNexus(
from => "global/types",
import => "writer", # if importing just for the types, use "writer"!
);
$to->readyReady();
},
);
};
The builder function runs in the current Perl thread, however from the logical standpoint it runs in the App's first Triead named "global" (this Triead name is hardcoded in build()). This allows you not to worry about the App being technically dead until the first Triead is created: by the time the builder function is called, the first Triead is already created. However it also means that you can't change the readiness timeouts. After the builder function returns, the global Triead exits, and the harvester starts in the same current Perl thread.
The builder has access to a few global variables:
Triceps::App::globalNexus(@nexusOptions);
Creates a nexus with the import mode of "none", on the global Triead. The arguments are the same options as for the normal nexus creation. This is a simple convenience wrapper for the nexus creation. Since the global thread is supposed to exit immediately, there is no point in importing this nexus into it.
The global thread is found by this function from $Triceps::App::global, so this method can only be used from inside the builder function of build().
Triceps::App::setTimeout($appOrName, $main_to_sec, $frag_to_sec);
Set the readiness timeout. Even though Triceps is quite eager in watching for deadlocks in the threads topology, it's still possible to get some threads, and with them the whole program, stuck during initialization. For example, if you declare a thread and then never define it. These situations ere very unpleasant because you start the program and expect it to work but it doesn't, without any indication to why. So Triceps imposes an initialization timeout. The App (and thus all its threads) must become ready within the timeout after the definition or declaration of the last Triead. If not, the App will be aborted (and the error message will tell, which thread did not initialize in time). The same applies to the creation of Trieads (usually in the fragments) after the App is already running: all the threads must become ready within the timeout since the last thread has been defined or declared.
The default timeout is 30 seconds, or symbolically
&Triceps::App::DEFAULT_TIMEOUT
(subject to possible changes of the exact value in the future).
But the timeout can be changed. Technically, there are two timeouts:
- one starts when the App is created
- one restarts when any Triead is defined or declared
For the App to be aborted, both timeouts must expire. By default they are set to the same length, so only the second timeout really matters, since it will always be the last one to start and last one to end. But if you set the first timeout to be longer, you can allow for a longer initialization period at the App start ("main timeout") than later when more threads are added to a running App ("frag timeout", since the threads added later are typically the threads nin fragments).
The one-argument form of setTimeout() sets both timeouts to the same value, The two-argument form sets these timeouts separately.
The timeouts may be set only before the first thread has been created. This is largely due to the historical reasons of implementation, with the current implementation it should actually be safe to allow changing the timeouts at a later point as well, and this limitation may be removed in the future.
The timeout values are represented as integer whole seconds.
Note that it's still possible to get the initialization stuck without any indication if it gets stuck in the other libraries. The reason is that the harvester waits for all the threads to be joined before it propagates the error, so if a thread doesn't get joined, the harvester will be stuck. For example, if you open a file on NFS as a part of Triead initialization, and the NFS server doesn't respond, this thread will be stuck for a long time if not forever. The App will detect a timeout and try to interrupt the threads but the NFS operations are often not interruptable, so the harvester will wait for this thread to complete this operation and exit before it propagates the error, and thus the whole program will be silently stuck forever (and to avoid this, the NFS mounts should be done in the "soft" mode but it's a separate story). This will likely be improved in the future but it needs more thinking about how to do it right.
Triceps::App::setDeadline($appOrName, $deadline_sec);
Set the "main" timeout in the form of an absolute deadline. This is actually closer to the way it works internally: the limit is expressed as a deadline, and setTimeout() just adds the timeout value to the current time to compute the deadline, while setDeadline() sets it straight. Like setTimeout(), this may be called only before any Trieads were created.
The time is represented as floating-point seconds since epoch. Triceps::now() can be used to get the current time with the fractional seconds (or if you don't care about the fractional seconds, you can always use the stock time()).
Triceps::App::refreshDeadline($appOrName);
Restart the "fragment" timeout. Same as when a Triead is defined or declared, only without involving a Triead. Again, the name has to do with how the time computations are done internally, computing the deadline from both timeouts. This method can be called at any time.
The next few methods have to do with the drains. Generally, using the AutoDrain class to automatically limit the scope of the drains is a better idea. But if the automatic scoping is not desired, the App methods can be used directly.
Triceps::App::requestDrain($appOrName);
Request a shared drain. Does not wait for the drain to complete. This method may be called repeatedly, potentially from multiple threads, which will keep the drain active and increase the recursion count. The drain will be released when undrain() is called the matching number of times.
If an exclusive drain by another thread is active when requestDrain() is called, the call will be stuck until the exclusive drain becomes undrained (and potentially more exclusive drains might be queued up before the shared drain, using the POSIX read-write-lock implementation). Otherwise the call will set the appropriate state and return immediately.
If this thread has requested an exclusive drain previously (and didn't undrain it yet), an attempt to get a shared drain in the same thread will likely deadlock. The same applies in the opposite order as well.
Once the shared drain is requested, the input-only threads will be blocked from sending more data (any their attempts to flush their write facets will get stuck until undrain), and the rest of the threads will continue churning through the data buffered in the nexuses until all the nexuses are empty and there is no more data to process (yes, these threads will continue writing to their write facets).
It is important to not request a shared drain from an input-only thread and then try to write more data into an output facet, that would deadlock. The whole concept is doable, but an exclusive drain must be used instead ("exclusive" means that a designated input-only thread is excluded from blocking).
Triceps::App::waitDrain($appOrName);
Wait for the requested shared drain to complete. This means that all the queues in all the nexuses have become empty.
The effect of waitDrain() without a preceding requestDrain() is undefined. If you definitely know that some other thread has requested a drain and didn't undrain yet, it will work as normal, so you can have any number of threads wait for drain in parallel. (And the semantics, shared or exclusive, will match that currently active request). If no thread requested a drain, it might either return immediately irrespective of the state of the nexuses or might wait for some other thread to request a drain and succeed.
This call may also be used with an exclusive (or shared) drain requested through a TrieadOwner or any drain requested through an AutoDrain (though a better style is to use the same object as used for requesting the drain).
Triceps::App::drain($appOrName);
A combination of requestDrain() and waitDrain() in one call.
Triceps::App::undrain($appOrName);
Release the drain and let the normal processing continue. Make sure to call it exactly the same number of times as the requestDrain().
This call may also be used with a drain requested through a TrieadOwner (though a better style is to use the same object as used for requesting the drain).
$result = Triceps::App::isDrained($appOrName);
Check whether the App is currently drained (i.e. all the nexuses are empty), without waiting for it. Returns 1 if drained, 0 if not. If no drain is active during this call, the result is undefined, not even in the "best effort" sense: it may return 1 even if there are millions of records queued up. The only reliable way is to request a drain first.
This call may also be used with a drain requested through a TrieadOwner or through an AutoDrain.
The next part of the API deals with the passing of the file descriptors between the Perl threads. The concepts have been described in detail before, this is just a short reference. This API works under the hood of TrieadOwner::TrackGetSocket() and friends, those desribed in http://babkin-cep.blogspot.com/2013/04/multithreaded-socket-server-part-3.html and neighboring posts, but can also be used directly.
Triceps::App::storeFd($appOrName, $name, $fd);
Store a file descriptor in the App object, allowing to load it into other threads, and thus pass it around between the threads. $name is the name for this descriptor that will later be used to get the file descriptor back (generally, you want to generate a unique name for each file descriptor stored to avoid confusion, and then pass this name to the target thread). $fd is the file descriptor, an integer number, normally received from fileno(). The file descriptor is dupped before it gets stored, so the original will continue to exist, and if you have no other use for it, should be closed.
If a file descriptor with this name already exists in the App, this call will confess.
$fd = Triceps::App::loadFd($appOrName, $name);
Load back the file descriptor that was previously stored. If no descriptor with such a name is stored in the App, it will confess. The descriptor will keep existing in the App, so to keep things consistent, there are two options:
One is to let your code take over the ownership of the file descriptor, and tell the App to forget about it with forgetFd().
The other one is to never close the received descriptor in your code (a good example would be to dup it right away for the future use and then leave the original alone), and let App keep its ownership.
$fd = Triceps::App::loadDupFd($appOrName, $name);
Very much the same as loadFd(), only does the dupping for you and returns the dupped descriptor. In this case your code is responsible for closing that descriptor. Which method is more suitable, loadFd() or dupFd(), depends on the nature of the code that will use the file descriptor and whether you want to leave the descriptor in the App for more threads to load.
Triceps::App::forgetFd($appOrName, $name);
Forget the file descriptor in the App. It doesn't get closed, so closing it becomes your responsibility, or it will leak. If no descriptor with this name exists, the call will confess.
Triceps::App::closeFd($appOrName, $name);
Close and forget the file descriptor in the App. If no descriptor with this name exists, the call will confess.
Triceps::App::storeFile($appOrName, $name, FILE);
A convenience wrapper for closeFd(), calls fileno() on the file and stores the resulting file descriptor.
Triceps::App::storeCloseFile($appOrName, $name, FILE);
Stores the file descriptor extracted from the file, and closes the original file (since the file descriptor gets dupped on store, that copy continues to exist in the App).
$file = Triceps::App::loadDupFile($appOrName, $name, $mode);
Load a file descriptor and build a file handle object (IO::Handle) from it. The mode string may be specified in either the open() format (</>/>>/+</+>/+>>) or the C stdio format (r/w/a/r+/w+/a+). Note that the mode must match or be a subset of the mode used to originally open the file descriptor. If you open a file read-only, store its descriptor, and load back as a write-only file, you will have a bad time.
There is no corresponding loadFile(), since loadFd() is a more dangerous method that is useful for the low-level operations but doesn't make much sense for the higher level.
$file = Triceps::App::loadDupSocket($appOrName, $name, $mode);
Load a file descriptor and build an IO::Socket::INET object from it. The mode string meaning is the same as for loadDupFile().
$file = Triceps::App::loadDupFileClass($appOrName, $name, $mode, $class);
Load a file descriptor and build an arbitrary class object from it. The class (specified by its name) should normally be a subclass of IO::Handle, or at the very least must implement the method new_from_fd() similar to IO::Handle. This is the common underlying implementation of but loadDupFile() and loadDupSocket().
The last part of the API is the convenience methods for building and starting an App.
Triceps::App::build($name, \&builder );
Build an App instance. It creates the App instance, then the builder function is called to create the App's nexuses and threads, then the harvester is executed, that eventually destroys the App object after collecting all its threads. For a very basic example:
Triceps::App::build "a1", sub {
Triceps::App::globalNexus(
name => "types",
rowTypes => [
rt1 => $rt1,
],
);
Triceps::Triead::start(
app => $Triceps::App::name,
thread => "t1",
main => sub {
my $opts = {};
&Triceps::Opt::parse("t1 main", $opts, {@Triceps::Triead::opts}, @_);
my $to = $opts->{owner};
$to->importNexus(
from => "global/types",
import => "writer", # if importing just for the types, use "writer"!
);
$to->readyReady();
},
);
};
The builder function runs in the current Perl thread, however from the logical standpoint it runs in the App's first Triead named "global" (this Triead name is hardcoded in build()). This allows you not to worry about the App being technically dead until the first Triead is created: by the time the builder function is called, the first Triead is already created. However it also means that you can't change the readiness timeouts. After the builder function returns, the global Triead exits, and the harvester starts in the same current Perl thread.
The builder has access to a few global variables:
- $Triceps::App::name is the App name, from the build() first argument.
- $Triceps::App::app is the reference to the App object.
- $Triceps::App::global is the reference to the TrieadOwner object of the global Triead.
Triceps::App::globalNexus(@nexusOptions);
Creates a nexus with the import mode of "none", on the global Triead. The arguments are the same options as for the normal nexus creation. This is a simple convenience wrapper for the nexus creation. Since the global thread is supposed to exit immediately, there is no point in importing this nexus into it.
The global thread is found by this function from $Triceps::App::global, so this method can only be used from inside the builder function of build().
Sunday, June 16, 2013
App reference, Perl, part 1
The App API has two parts to it: The first part keeps track of all the App instances in the program, and allows to list and find them. The second part is the manipulations on a particular instance.
The global part of the API is:
@apps = Triceps::App::listApps();
Returns the array of name-value pairs, values containing the App references, listing all the Apps in the program (more exactly, in this Triceps library, you you compile multiple Triceps libraries together by renaming them, each of them will have its own Apps list). The returned array can be placed into a hash.
$app = Triceps::App::find($name);
Find an App by name. If an App with such a name does not exist, it will confess.
$app = Triceps::App:make($name);
Create a new App, give it the specified name, and put it on the list. The names must not be duplicate with the other existing Apps, or the method will confess.
drop($app);
drop($appName);
Drop the app, by reference or by name, from the list. The App is as usual reference-counted and will exist while there are references to it. The global list provides one of these references, so an App is guaranteed to exist while it's still on the list. When dropped, it will still be accessible through the existing references, but obviously will not be listed any more and could not be found by name.
Moreover, a new App with the same name can be added to the list. Because of this, dropping an App by name requires some care in case if there could be a new App created again with the same name: it creates a potential for a race, and you might end up dropping the new App instead of the old one. Of course, if it's the same thread that drops the old one and creates the new one, then there is no race. Dropping an application by name that doesn't exist at the moment is an error and will confess.
Dropping the App by reference theoretically allows to avoid the race: a specific object gets dropped, and if it already has been dropped, the call has no effect. Theoretically. However in practice Perl has a limitation of passing the object values between threads, and thus whenever each thread starts, first thing it does is finding its App by name. It's a very similar kind of race and is absolutely unavoidable except by making sure that all the App's threads have exited and joined (i.e. harvesting them). So make sure to complete the App's thread harvesting before dropping the App in the harvester thread, and by then it doesn't matter a whole lot if the App is dropped by name or by reference.
Now the second part of the API, working with an App instance.
Many (but not all) of the App methods allow to specify the App either by reference or by name, and they automatically sort it out, doing the internal look-up by name if necessary. So the same method could be used as either of:
$app->method(...);
Triceps::App::method($app, ...);
Triceps::App::method($appName, ...);
Obviously, you can not use the "->" syntax with a name, and obviously if the name is not in the app list, the method will confess. Below I'll show the calls that allow the dual formats as Triceps::App::method($appOrName, ...) but keep in mind that you can use the "->" form of them too with a reference.
$app = Triceps::App::resolve($appOrName);
Do just the automatically-sorting-out part: gets a reference or name and returns a reference either way. A newly created reference is returned in either case (not the argument reference). You can use this resolver before the methods that accept only a reference.
$result = $app->same($app2);
Check if two references are for the same App object. Here they both must be references and not names.
$name = $app->getName();
Get the name of the App, from a reference.
Triceps::App::declareTriead($appOrName, $trieadName);
Declare a Triead (Triceps thread) in advance. Any attempts to look up the nexuses in that thread will then wait for the thread to become ready. (Attempts to look up in an undeclared and undefined thread are errors). This is necessary to prevent a race at the thread creation time. For the most part, the method Triead::start() just does the right thing by calling this method automatically and you don't need the use it manually, except in some very special circumstances.
@trieads = Triceps::App::getTrieads($appOrName);
Get the list of currently defined Trieads, as name-value pairs. Keep in mind that the other threads may be modifying the list of Trieads, so if you do this call multiple times, you may get different results. However the Trieads are returned as references, so they are guaranteed to stay alive and readable even if they get removed from the App, or even if the App gets dropped.
$app->harvester(@options);
Run the harvester in the current thread. The harvester gets notifications from the threads when they are about to exit, and joins them. After all the threads have been joined, it automatically drops the App, and returns.
The harvesting is an absolutely necessary part of the App life cycle, however in most of the usage patterns (such as with Triead::startHere or App::build) the harvester is called implicitly from the wrapping library functions, so you don't need to care about it.
Note also that if you're running the harvester manually, you must call it only after the first thread has been defined or at least declared. Otherwise it will find no threads in the App, consider it dead and immediately drop it.
If the App was aborted, the harvester will normally confess after if had joined all the threads and disposed of the App, unless the option die_on_abort (see below) has been set to 0. This propagates the error to the caller. However there is a catch: if some of the threads don't react properly by exiting on an abort indication, the program will be stuck and you will see no error message until these threads get unstuck, possibly forever.
Options:
die_on_abort => 1/0
(default: 1) If the App was aborted, the harvester will normally confess after if had joined all the threads and disposed of the App. Setting this option to 0 will make the aborts silently ignored. This option does not affect the errors in joining the threads: if any of those are detected, harvester will still confess after it had disposed of the app.
$dead = $app->harvestOnce();
Do one run of the harvesting. Joins all the threads that have exited since its last call. If no threads have exited since then, returns immediately. Returns 1 if all the threads have exited (and thus the App is dead), 0 otherwise. If a thread join fails, immediately confesses (if multiple threads were ready for joining, the ones queued after the failed one won't be joined, call harvestOnce() once more to join them).
$app->waitNeedHarvest();
Wait for at least one thread to become ready for harvesting. If the App is already dead (i.e. all its threads have exited), returns immediately.
These two methods allow to write the custom harvesters if you're not happy with the default one. The basic harvester logic can be written as:
do {
$app->waitNeedHarvest()
} while(!$app->harvestOnce());
$app->drop();
However the real harvester also does some smarter things around the error handling. You can look it up in the source code in cpp/app/App.cpp.
$res = Triceps::App::isDead($appOrName);
Returns 1 if the App is dead (i.e. has no more live threads), otherwise 0. Calling this method with a name for the argument is probably a bad idea, since normally the harvester will drop the App quickly after it becomes dead, and you may end up with this method confessing when it could not find the dropped App.
$res = Triceps::App::isShutdown($appOrName);
Returns 1 if the App has been requested to shut down, either normally or by being aborted.
$res = Triceps::App::isAborted($appOrName);
Returns 1 if the App has been aborted. The App may be aborted explicitly by calling the method abortBy(), or the thread wrapper logic automatically converts any unexpected deaths in the App's threads to the aborts. If any thread dies, this aborts the App, which in turn requests the other threads to die on their next thread-related call. Eventually the harvester collects them all and confesses, normally making the whole program die with an error.
($tname, $message) = Triceps::App::getAborted($appOrName);
Get the App abort information: name of the thread that caused the abort, and its error message.
Triceps::App::abortBy($appOrName, $tname, $msg);
Abort the application. The thread name and message will be remembered, and returned later by getAborted() or in the harvester. If abortBy() is called multiple times, only the first pair of thread name and message gets remembered. The reason is that on abort all the threads get interrupted in a fairly rough manner (all their ongoing and following calls to the threading API die), which typically causes them to call abortBy() as well, and there is no point in collecting these spurious messages.
The thread name here doesn't have to be the name of the actual thread that reports the issue. For example, if the thread creation as such fails (maybe because the OS limit on the thread count) that gets detected by the parent thread but reported in the name of the thread whose creation has failed. And in general you can pass just any string as the thread name, App itself doesn't care, just make it something that makes sense to you.
$res = Triceps::App::isDead($appOrName);
Returns 1 if the App is dead (i.e. it has no alive Trieads, all the defined and declared threads have exited). Right after the App is created, before the first Triead is created, the App is also considered dead, and becomes alive when the first Triead is declared or defined. If an App becomes dead later, when all the Trieads exit, it can still be brought back alive by creating more Trieads. But this is considered bad practice, and will cause a race with the harvester (if you want to do this, you have to make your own custom harvester).
$res = Triceps::App::isShutdown($appOrName);
Returns 1 if the App was requested to shut down. The Trieads might still run for some time, until they properly detect and process the shutdown, and exit. So this condition is not equivalent to Dead, althouh they are connected. If any new Trieads get started, they will be shut down right away and won't run.
To reiterate: if all the Trieads just exit by themselves, the App becomes dead but not shut down. You could still start more Trieads and bring the App back alive. If the App has been shut down, it won't become immediately dead, but it will send the shutdown indication to all the Trieads, and after all of them eventually exit, the App will become dead too. And after shutdown there is no way to bring the App back alive, since any new Trieads will be shut down right away (OK, there might be a short period until they detect the shutdown, so the App could spike as alive for a short time, but then will become dead again).
Triceps::App::waitDead($appOrName);
Will wait for the App to become dead and return after that. Make sure to not call waitDead() from any of App's Trieads: that would cause a deadlock.
Triceps::App::shutdown($appOrName);
Shut down the App. The shutdown state is sticky, so any repeated calls will have no effect. The call returns immediately and doesn't wait for the App to die. If you want to wait, call waitDead() afterwards. Make sure to not call waitDead() from a Triead: that would cause a deadlock.
Triceps::App::shutdownFragment($appOrName, $fragName);
Shut down a named fragment. This does not shut down the whole App, it just selectively shuts down the Trieads belonging to this fragment . See the explanation of the fragments in http://babkin-cep.blogspot.com/2013/03/triceps-multithreading-concepts.html. The fragment shutdown is not sticky: after a fragment has been shut down, it's possible to create another fragment with the same name. To avoid races, a fragment may be shut down only after all its Trieads are ready. So the caller Triead must call readyReady() before it calls shutdownFragment(). If any of the fragment's Trieads are not ready, the call will confess.
The global part of the API is:
@apps = Triceps::App::listApps();
Returns the array of name-value pairs, values containing the App references, listing all the Apps in the program (more exactly, in this Triceps library, you you compile multiple Triceps libraries together by renaming them, each of them will have its own Apps list). The returned array can be placed into a hash.
$app = Triceps::App::find($name);
Find an App by name. If an App with such a name does not exist, it will confess.
$app = Triceps::App:make($name);
Create a new App, give it the specified name, and put it on the list. The names must not be duplicate with the other existing Apps, or the method will confess.
drop($app);
drop($appName);
Drop the app, by reference or by name, from the list. The App is as usual reference-counted and will exist while there are references to it. The global list provides one of these references, so an App is guaranteed to exist while it's still on the list. When dropped, it will still be accessible through the existing references, but obviously will not be listed any more and could not be found by name.
Moreover, a new App with the same name can be added to the list. Because of this, dropping an App by name requires some care in case if there could be a new App created again with the same name: it creates a potential for a race, and you might end up dropping the new App instead of the old one. Of course, if it's the same thread that drops the old one and creates the new one, then there is no race. Dropping an application by name that doesn't exist at the moment is an error and will confess.
Dropping the App by reference theoretically allows to avoid the race: a specific object gets dropped, and if it already has been dropped, the call has no effect. Theoretically. However in practice Perl has a limitation of passing the object values between threads, and thus whenever each thread starts, first thing it does is finding its App by name. It's a very similar kind of race and is absolutely unavoidable except by making sure that all the App's threads have exited and joined (i.e. harvesting them). So make sure to complete the App's thread harvesting before dropping the App in the harvester thread, and by then it doesn't matter a whole lot if the App is dropped by name or by reference.
Now the second part of the API, working with an App instance.
Many (but not all) of the App methods allow to specify the App either by reference or by name, and they automatically sort it out, doing the internal look-up by name if necessary. So the same method could be used as either of:
$app->method(...);
Triceps::App::method($app, ...);
Triceps::App::method($appName, ...);
Obviously, you can not use the "->" syntax with a name, and obviously if the name is not in the app list, the method will confess. Below I'll show the calls that allow the dual formats as Triceps::App::method($appOrName, ...) but keep in mind that you can use the "->" form of them too with a reference.
$app = Triceps::App::resolve($appOrName);
Do just the automatically-sorting-out part: gets a reference or name and returns a reference either way. A newly created reference is returned in either case (not the argument reference). You can use this resolver before the methods that accept only a reference.
$result = $app->same($app2);
Check if two references are for the same App object. Here they both must be references and not names.
$name = $app->getName();
Get the name of the App, from a reference.
Triceps::App::declareTriead($appOrName, $trieadName);
Declare a Triead (Triceps thread) in advance. Any attempts to look up the nexuses in that thread will then wait for the thread to become ready. (Attempts to look up in an undeclared and undefined thread are errors). This is necessary to prevent a race at the thread creation time. For the most part, the method Triead::start() just does the right thing by calling this method automatically and you don't need the use it manually, except in some very special circumstances.
@trieads = Triceps::App::getTrieads($appOrName);
Get the list of currently defined Trieads, as name-value pairs. Keep in mind that the other threads may be modifying the list of Trieads, so if you do this call multiple times, you may get different results. However the Trieads are returned as references, so they are guaranteed to stay alive and readable even if they get removed from the App, or even if the App gets dropped.
$app->harvester(@options);
Run the harvester in the current thread. The harvester gets notifications from the threads when they are about to exit, and joins them. After all the threads have been joined, it automatically drops the App, and returns.
The harvesting is an absolutely necessary part of the App life cycle, however in most of the usage patterns (such as with Triead::startHere or App::build) the harvester is called implicitly from the wrapping library functions, so you don't need to care about it.
Note also that if you're running the harvester manually, you must call it only after the first thread has been defined or at least declared. Otherwise it will find no threads in the App, consider it dead and immediately drop it.
If the App was aborted, the harvester will normally confess after if had joined all the threads and disposed of the App, unless the option die_on_abort (see below) has been set to 0. This propagates the error to the caller. However there is a catch: if some of the threads don't react properly by exiting on an abort indication, the program will be stuck and you will see no error message until these threads get unstuck, possibly forever.
Options:
die_on_abort => 1/0
(default: 1) If the App was aborted, the harvester will normally confess after if had joined all the threads and disposed of the App. Setting this option to 0 will make the aborts silently ignored. This option does not affect the errors in joining the threads: if any of those are detected, harvester will still confess after it had disposed of the app.
$dead = $app->harvestOnce();
Do one run of the harvesting. Joins all the threads that have exited since its last call. If no threads have exited since then, returns immediately. Returns 1 if all the threads have exited (and thus the App is dead), 0 otherwise. If a thread join fails, immediately confesses (if multiple threads were ready for joining, the ones queued after the failed one won't be joined, call harvestOnce() once more to join them).
$app->waitNeedHarvest();
Wait for at least one thread to become ready for harvesting. If the App is already dead (i.e. all its threads have exited), returns immediately.
These two methods allow to write the custom harvesters if you're not happy with the default one. The basic harvester logic can be written as:
do {
$app->waitNeedHarvest()
} while(!$app->harvestOnce());
$app->drop();
However the real harvester also does some smarter things around the error handling. You can look it up in the source code in cpp/app/App.cpp.
$res = Triceps::App::isDead($appOrName);
Returns 1 if the App is dead (i.e. has no more live threads), otherwise 0. Calling this method with a name for the argument is probably a bad idea, since normally the harvester will drop the App quickly after it becomes dead, and you may end up with this method confessing when it could not find the dropped App.
$res = Triceps::App::isShutdown($appOrName);
Returns 1 if the App has been requested to shut down, either normally or by being aborted.
$res = Triceps::App::isAborted($appOrName);
Returns 1 if the App has been aborted. The App may be aborted explicitly by calling the method abortBy(), or the thread wrapper logic automatically converts any unexpected deaths in the App's threads to the aborts. If any thread dies, this aborts the App, which in turn requests the other threads to die on their next thread-related call. Eventually the harvester collects them all and confesses, normally making the whole program die with an error.
($tname, $message) = Triceps::App::getAborted($appOrName);
Get the App abort information: name of the thread that caused the abort, and its error message.
Triceps::App::abortBy($appOrName, $tname, $msg);
Abort the application. The thread name and message will be remembered, and returned later by getAborted() or in the harvester. If abortBy() is called multiple times, only the first pair of thread name and message gets remembered. The reason is that on abort all the threads get interrupted in a fairly rough manner (all their ongoing and following calls to the threading API die), which typically causes them to call abortBy() as well, and there is no point in collecting these spurious messages.
The thread name here doesn't have to be the name of the actual thread that reports the issue. For example, if the thread creation as such fails (maybe because the OS limit on the thread count) that gets detected by the parent thread but reported in the name of the thread whose creation has failed. And in general you can pass just any string as the thread name, App itself doesn't care, just make it something that makes sense to you.
$res = Triceps::App::isDead($appOrName);
Returns 1 if the App is dead (i.e. it has no alive Trieads, all the defined and declared threads have exited). Right after the App is created, before the first Triead is created, the App is also considered dead, and becomes alive when the first Triead is declared or defined. If an App becomes dead later, when all the Trieads exit, it can still be brought back alive by creating more Trieads. But this is considered bad practice, and will cause a race with the harvester (if you want to do this, you have to make your own custom harvester).
$res = Triceps::App::isShutdown($appOrName);
Returns 1 if the App was requested to shut down. The Trieads might still run for some time, until they properly detect and process the shutdown, and exit. So this condition is not equivalent to Dead, althouh they are connected. If any new Trieads get started, they will be shut down right away and won't run.
To reiterate: if all the Trieads just exit by themselves, the App becomes dead but not shut down. You could still start more Trieads and bring the App back alive. If the App has been shut down, it won't become immediately dead, but it will send the shutdown indication to all the Trieads, and after all of them eventually exit, the App will become dead too. And after shutdown there is no way to bring the App back alive, since any new Trieads will be shut down right away (OK, there might be a short period until they detect the shutdown, so the App could spike as alive for a short time, but then will become dead again).
Triceps::App::waitDead($appOrName);
Will wait for the App to become dead and return after that. Make sure to not call waitDead() from any of App's Trieads: that would cause a deadlock.
Triceps::App::shutdown($appOrName);
Shut down the App. The shutdown state is sticky, so any repeated calls will have no effect. The call returns immediately and doesn't wait for the App to die. If you want to wait, call waitDead() afterwards. Make sure to not call waitDead() from a Triead: that would cause a deadlock.
Triceps::App::shutdownFragment($appOrName, $fragName);
Shut down a named fragment. This does not shut down the whole App, it just selectively shuts down the Trieads belonging to this fragment . See the explanation of the fragments in http://babkin-cep.blogspot.com/2013/03/triceps-multithreading-concepts.html. The fragment shutdown is not sticky: after a fragment has been shut down, it's possible to create another fragment with the same name. To avoid races, a fragment may be shut down only after all its Trieads are ready. So the caller Triead must call readyReady() before it calls shutdownFragment(). If any of the fragment's Trieads are not ready, the call will confess.
Monday, May 27, 2013
TQL server with multithreading
The next big example I've been talking about is finally ready. It's the adaptation of the TQL to work with the multithreaded server framework. The big reason for this example is the export of the table types through a nexus and creation of tables from them. And we'll get to that, but first let's look at the new abilities of the TQL.
TQL is still not of a production quality, in either single- or multi-threaded variety, and contains a large number of simplifying assumptions in its code. As the single-threaded version works symbiotically with the SimpleServer, the multithreaded version works with the ThreadedServer.
One thread created by the programmer contains the "core logic" of the model. It doesn't technically have to be all in a single thread: the data can be forwarded to the other threads and then the results forwarded back from them. But a single core logic thread is a convenient simplification. This thread has some input labels, to receive data from the outside, and some tables with the computed results that can be read by TQL. Of course, it's entirely realistic to have also just the output labels without tables, sending a stream or computed rowops, but again for simplicity I've left this out for now.
This core logic thread creates a TQL instance, which listens on a socket, accepts the connections, forwards the input data to the core logic, performs queries on the tables from the core logic and sends the results back to the client. To this end, the TQL instance creates a few nexuses in the core logic thread and uses them to communicate between all the fragments. The input labels and tables in the core thread also get properly connected to these nexuses. The following figure shows the thread architecture, I'll use it for the reference throughout the discussion:
The core logic thread then goes into its main loop and performs as its name says the core logic computations.
Here is a very simple example of a TQL application:
sub appCoreT # (@opts)
{
my $opts = {};
&Triceps::Opt::parse("appCoreT", $opts, {@Triceps::Triead::opts,
socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
}, @_);
undef @_; # avoids a leak in threads module
my $owner = $opts->{owner};
my $app = $owner->app();
my $unit = $owner->unit();
# build the core logic
my $rtTrade = Triceps::RowType->new(
id => "int32", # trade unique id
symbol => "string", # symbol traded
price => "float64",
size => "float64", # number of shares traded
) or confess "$!";
my $ttWindow = Triceps::TableType->new($rtTrade)
->addSubIndex("byId",
Triceps::SimpleOrderedIndex->new(id => "ASC")
)
or confess "$!";
$ttWindow->initialize() or confess "$!";
# Represents the static information about a company.
my $rtSymbol = Triceps::RowType->new(
symbol => "string", # symbol name
name => "string", # the official company name
eps => "float64", # last quarter earnings per share
) or confess "$!";
my $ttSymbol = Triceps::TableType->new($rtSymbol)
->addSubIndex("bySymbol",
Triceps::SimpleOrderedIndex->new(symbol => "ASC")
)
or confess "$!";
$ttSymbol->initialize() or confess "$!";
my $tWindow = $unit->makeTable($ttWindow, "EM_CALL", "tWindow")
or confess "$!";
my $tSymbol = $unit->makeTable($ttSymbol, "EM_CALL", "tSymbol")
or confess "$!";
# export the endpoints for TQL (it starts the listener)
my $tql = Triceps::X::Tql->new(
name => "tql",
trieadOwner => $owner,
socketName => $opts->{socketName},
tables => [
$tWindow,
$tSymbol,
],
tableNames => [
"window",
"symbol",
],
inputs => [
$tWindow->getInputLabel(),
$tSymbol->getInputLabel(),
],
inputNames => [
"window",
"symbol",
],
);
$owner->readyReady();
$owner->mainLoop();
}
{
my ($port, $thread) = Triceps::X::ThreadedServer::startServer(
app => "appTql",
main => \&appCoreT,
port => 0,
fork => -1, # create a thread, not a process
);
}
This core logic is very simple: all it does is create two tables and then send the input data into them. The server gets started in a background thread (fork => -1) because this code is taken from a test that then goes and runs the expect with the SimpleClient.
The specification of inputs and tables for TQL is somewhat ugly but I kept it as it was historic (it was done this way to keep the parsing of the options simpler). The new options compared to the single-threaded TQL are the "threadOwner", "inputs" and "inputNames". The "threadOwner" is how TQL knows that it must run in the multithreaded mode, and it's used to create the nexuses for communication between the core logic and the rest of TQL. The inputs are needed because the multithreaded TQL parses and forwards the input data, unlike the single-threaded version that relies on the SimpleServer to do that according to the user-defined dispatch table.
The names options don't have to be used: if you name your labels and tables nicely and suitable for the external vieweing, the renaming-for-export can be skipped.
Similar to the single-threaded version, if any of the options "tables" or "inputs" is used, the TQL object gets initialized automatically, otherwise the tables and inputs can be added piecemeal with addTable(), addNamedTable(), addInput(), addNamedInput(), and then the whole thing initialized manually.
Then the clients can establish the connections with the TQL server, send in the data and the queries. To jump in, here is a trace of a simple session that sends some data, then does some table dumps and subscribes, not touching the queries yet. I'll go through it fragment by fragment and explain the meaning. The dumps and subscribes were the warm-up exercises before writing the full queries, but they're useful in their own right, and here they serve as the warm-up exercises for the making of the queries!
> connect c1
c1|ready
The "connect" is not an actual command send but just the indication in the trace that the connection was set up by the client "c1" (it's a trace from the SimpleClient, so it follows the usual conventions). The "ready" response is set when the connection is opened, similar to the chat server shown before.
> c1|subscribe,s1,symbol
c1|subscribe,s1,symbol
This is a subscription request. It means "I'm not interested in the current state of a table but send me all the updates". The response is the mirror of the request, so that the client knows that the request has been processed. "s1" is the unique identifier of the request, so that the client can match together the responses it received to the requests it sent (and keeping the uniqueness is up to the client, the server may refuse the requests with duplicate identifiers). And "symbol" is the name of the table. Once a subscription is in place, there is no way to unsubscribe other than by disconnecting the client (it's doable but adds complications, and I wanted to skip over the nonessential parts). Subscribing multiple times to the same table will send a confirmation every time but the repeated confirmations will have no effect: only one copy of the data will be sent anyway.
> c1|d,symbol,OP_INSERT,ABC,ABC Corp,1.0
c1|d,symbol,OP_INSERT,ABC,ABC Corp,1
This sends the data into the model. And since it propagates through the subscription, the data gets sent back too. The "symbol" here means two different things: on the input side it's the name of the label where the data is sent, on the output side it's the name of the table that has been subscribed to.
The data lines start with the command "d" (since the data is sent much more frequently than the commands, I've picked a short one-letter "command name" for it), then the label/table name, opcode and the row fields in CSV format.
> c1|confirm,cf1
c1|confirm,cf1,,,
The "confirm" command provides a way for the client to check that the data it send had propagated through the model. And it doesn't have to subscribe back to the data and read them. Send some data lines, then send the "confirm" command and wait for it to come back (again, the unique id allows to keep multiple confirmations in flight if you please). This command doesn't guarantee that all the clients have seen the results from that data. It only guarantees that the core logic had seen the data, and more weakly guarantees that the data has been processed by the core logic, and this particular client had already seen all the results from it.
Why weakly? It has to do with the way it works inside, and it depends on the core logic. If the core logic consists of one thread, the guarantee is quite strong. But if the core logic farms out the work from the main thread to the other threads and then collects the results back, the guarantee breaks.
On the Fig. 1 you can see that unlike the chat server shown before, TQL doesn't have any private nexuses for communication between the reader and writer threads of a client. Instead it relies on the same input and output nexuses, adding a control label to them, to forward the commands from the reader to the writer. The TQL object in the core logic thread creates a short-circuit connection between the control labels in the input and output nexuses, forwarding the commands. And if the core logic all runs in one thread, this creates a natural pipeline: the data comes in, gets processed, comes out, the "confirm" command comes in, comes out after the data. But if the core logic farms out the work to more threads, the confirmation can "jump the line" because its path is a direct short circuit.
> c1|drain,dr1
c1|drain,dr1,,,
The "drain" is an analog of "confirm" but more reliable and slower: the reader thread drains the whole model before sending the command on. This guarantees that all the processing is done, and all the output from it has been sent to all the clients.
> c1|dump,d2,symbol
c1|startdump,d2,symbol
c1|d,symbol,OP_INSERT,ABC,ABC Corp,1
c1|dump,d2,symbol
The "dump" command dumps the current contents of a table. Its result starts with "startdump", and the same id and table name as in the request, then goes the data (all with OP_INSERT), finishing with the completion confirmation echoing the original command. The dump is atomic, the contents of the table doesn't change in the middle of the dump. However if a subscription on this table is active, the data rows from that subscription may come before and after the dump.
I'm not going to describe the error reporting, but it's worth mentioning that if a command contains errors, its "confirmation" will be an error line with the same identifier.
> c1|dumpsub,ds3,symbol
c1|startdump,ds3,symbol
c1|d,symbol,OP_INSERT,ABC,ABC Corp,1
c1|dumpsub,ds3,symbol
The "dumpsub" command is a combination of a dump and subscribe: get the initial state and then get all the updates. The confirmation of "dumpsub" marks the boundary between the original dump and the following updates.
> c1|d,symbol,OP_INSERT,DEF,Defense Corp,2.0
c1|d,symbol,OP_INSERT,DEF,Defense Corp,2
Send some more data, and it comes back only once, even though the subscription was done twice: once in "subscribe" and once in "dumpsub". The repeated subscription requests simply get consumed into one subscription.
> c1|d,window,OP_INSERT,1,ABC,101,10
This sends a row to the other table but nothing comes back because there is no subscription to that table.
> c1|dumpsub,ds4,window
c1|startdump,ds4,window
c1|d,window,OP_INSERT,1,ABC,101,10
c1|dumpsub,ds4,window
> c1|d,window,OP_INSERT,2,ABC,102,12
c1|d,window,OP_INSERT,2,ABC,102,12
This demonstrates the pure dump-and-subscribe without any interventions.
> c1|shutdown
c1|shutdown,,,,
c1|__EOF__
And the shutdown command works the same as in the chat server, draning and then shutting down the whole server.
Now on to the queries.
> connect c1
c1|ready
> c1|d,symbol,OP_INSERT,ABC,ABC Corp,1.0
Starts a client connection and sends some data.
> c1|querysub,q1,query1,{read table symbol}{print tokenized 0}
c1|d,query1,OP_INSERT,ABC,ABC Corp,1
c1|querysub,q1,query1
The "querysub" command does the "query-and-subscribe": reads the initial state of the table, processed through the query, and then subscribes to any future updates. The single-threaded variety of TQL doesn't do this, it does just the one-time queries. The multithreaded TQL could also do the one-time queries, and also just the subscribes without the initial state, but I've been cutting corners for this example and the only thing that's actually available is the combination of two, the "querysub".
"q1" is similar to the other command, the command identifier. The next field "query1" is the name for the query, it's the name that will be shown for the data lines coming out of the query. And then goes the query in the brace-quoted format, same as the single-threaded TQL (and there is no further splitting by commas, so the commas can be used freely in the query).
The identified and the name for the query sound kind of redundant. But the client may generate them in different ways and need both. The name has the more symbolic character. The identifier can be generated as a sequence of numbers, so that the client can keep track of its progress more easily. And the error reports include the identifier but not the query name in them.
For the query, there is no special line coming out before the initial dump. Supposedly, there would not be more than one query in flight with the same name, so this could be easily told apart based on the name in the data lines. There is also an underlying consideration that when the query involves a join, in the future the initial dump might be happening in multiple chunks, requiring to either surround every chunk with the start-end lines or just let them go without the extra notifications, as they are now.
And the initial dump ends as usual with getting the echo of the command (without the query part) back.
This particular query is very simple and equivalent to a "dumpsub".
> c1|d,symbol,OP_INSERT,DEF,Defense Corp,2.0
c1|d,query1,OP_INSERT,DEF,Defense Corp,2
Send more data and it will come out of the query.
> c1|querysub,q2,query2,{read table symbol}{where istrue {$%symbol =~ /^A/}}{project fields {symbol eps}}
c1|t,query2,query2 OP_INSERT symbol="ABC" eps="1"
c1|querysub,q2,query2
This query is more complicated, doing a selection (the "where" query command) and projection. It also prints the results in the tokenized format (the "print" command gets added automatically if it wasn't used explicitly, and the default options for it enable the tokenized format).
The tokenized lines come out with the command "t", query name and then the contents of the row. The query name happens to be sent twice, and I'm not sure yet if it's a feature or a bug.
> c1|d,symbol,OP_INSERT,AAA,Absolute Auto Analytics Inc,3.0
c1|d,query1,OP_INSERT,AAA,Absolute Auto Analytics Inc,3
c1|t,query2,query2 OP_INSERT symbol="AAA" eps="3"
> c1|d,symbol,OP_DELETE,DEF,Defense Corp,2.0
c1|d,query1,OP_DELETE,DEF,Defense Corp,2
More examples of the data sent, getting processed by both queries. In the second case the "where" filters out the row from query2, so only query1 produces the result.
> c1|shutdown
c1|shutdown,,,,
c1|__EOF__
And the shutdown as usual.
Now the "piece de resistance": queries with joins.
> connect c1
c1|ready
> c1|d,symbol,OP_INSERT,ABC,ABC Corp,2.0
> c1|d,symbol,OP_INSERT,DEF,Defense Corp,2.0
> c1|d,symbol,OP_INSERT,AAA,Absolute Auto Analytics Inc,3.0
> c1|d,window,OP_INSERT,1,AAA,12,100
Connect and send some starting data.
> c1|querysub,q1,query1,{read table window}{join table symbol byLeft {symbol} type left}
c1|t,query1,query1 OP_INSERT id="1" symbol="AAA" price="12" size="100" name="Absolute Auto Analytics Inc" eps="3"
c1|querysub,q1,query1
A left join of the tables "window" and "symbol", by the field "symbol" as join condition.
Note that unlike the previous single-threaded TQL examples, the index type path for the table "symbol" is not explicitly specified. It's the result of the new method TableType::findIndexPathForKeys() described before, now the index gets found automatically. And the single-threaded TQL now has this feature too. If you really want, you can still specify the index path but usually there is no need to.
The TQL joins, even in the multithreaded mode, are still implemented internally as LookupJoin, driven only by the main flow of the query. So the changes to the joined dimension tables will not update the query results, and will be visible only when a change on the main flow picks them up, potentially creating inconsistencies in the output. This is wrong, but fixing it presents complexities that I've left alone until some later time.
> c1|d,window,OP_INSERT,2,ABC,13,100
c1|t,query1,query1 OP_INSERT id="2" symbol="ABC" price="13" size="100" name="ABC Corp" eps="2"
> c1|d,window,OP_INSERT,3,AAA,11,200
c1|t,query1,query1 OP_INSERT id="3" symbol="AAA" price="11" size="200" name="Absolute Auto Analytics Inc" eps="3"
Sending data updates the results of the query.
> c1|d,symbol,OP_DELETE,AAA,Absolute Auto Analytics Inc,3.0
> c1|d,symbol,OP_INSERT,AAA,Alcoholic Abstract Aliens,3.0
As described above, the modifications of the dimension table are mot visible in the query directly.
> c1|d,window,OP_DELETE,1
c1|t,query1,query1 OP_DELETE id="1" symbol="AAA" price="12" size="100" name="Alcoholic Abstract Aliens" eps="3"
But an update on the main flow brings them up (an in this case inconsistently, the row getting deleted is not exactly the same as the row inserted before).
> c1|querysub,q2,query2,{read table window}{join table symbol byLeft {symbol} type left}{join table symbol byLeft {eps} type left rightFields {symbol/symbol2}}
c1|t,query2,query2 OP_INSERT id="2" symbol="ABC" price="13" size="100" name="ABC Corp" eps="2" symbol2="ABC"
c1|t,query2,query2 OP_INSERT id="2" symbol="ABC" price="13" size="100" name="ABC Corp" eps="2" symbol2="DEF"
c1|t,query2,query2 OP_INSERT id="3" symbol="AAA" price="11" size="200" name="Alcoholic Abstract Aliens" eps="3" symbol2="AAA"
c1|querysub,q2,query2
This is a more complicated query, involving two joins, with the same dimension table "symbol". The second join by "eps" makes no real-world sense whatsoever but it's interesting from the technical perspective: if you check the table type of this table at the start of the post, you'll find that it has no index on the field "eps". The join adds this index on demand!
The way it works, all the dimension tables are copied into the client's writer thread, created from the table types exported by the core logic throuhg the output nexus. (And if a table is used in the same query twice, it's currently also copied twice). This provides a nice opportunity to amend the table type by adding any necessary secondary index before creating the table, and TQL makes a good use of it.
The details are forthcoming in the next post.
TQL is still not of a production quality, in either single- or multi-threaded variety, and contains a large number of simplifying assumptions in its code. As the single-threaded version works symbiotically with the SimpleServer, the multithreaded version works with the ThreadedServer.
One thread created by the programmer contains the "core logic" of the model. It doesn't technically have to be all in a single thread: the data can be forwarded to the other threads and then the results forwarded back from them. But a single core logic thread is a convenient simplification. This thread has some input labels, to receive data from the outside, and some tables with the computed results that can be read by TQL. Of course, it's entirely realistic to have also just the output labels without tables, sending a stream or computed rowops, but again for simplicity I've left this out for now.
This core logic thread creates a TQL instance, which listens on a socket, accepts the connections, forwards the input data to the core logic, performs queries on the tables from the core logic and sends the results back to the client. To this end, the TQL instance creates a few nexuses in the core logic thread and uses them to communicate between all the fragments. The input labels and tables in the core thread also get properly connected to these nexuses. The following figure shows the thread architecture, I'll use it for the reference throughout the discussion:
![]() |
Fig. 1. TQL application. |
The core logic thread then goes into its main loop and performs as its name says the core logic computations.
Here is a very simple example of a TQL application:
sub appCoreT # (@opts)
{
my $opts = {};
&Triceps::Opt::parse("appCoreT", $opts, {@Triceps::Triead::opts,
socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
}, @_);
undef @_; # avoids a leak in threads module
my $owner = $opts->{owner};
my $app = $owner->app();
my $unit = $owner->unit();
# build the core logic
my $rtTrade = Triceps::RowType->new(
id => "int32", # trade unique id
symbol => "string", # symbol traded
price => "float64",
size => "float64", # number of shares traded
) or confess "$!";
my $ttWindow = Triceps::TableType->new($rtTrade)
->addSubIndex("byId",
Triceps::SimpleOrderedIndex->new(id => "ASC")
)
or confess "$!";
$ttWindow->initialize() or confess "$!";
# Represents the static information about a company.
my $rtSymbol = Triceps::RowType->new(
symbol => "string", # symbol name
name => "string", # the official company name
eps => "float64", # last quarter earnings per share
) or confess "$!";
my $ttSymbol = Triceps::TableType->new($rtSymbol)
->addSubIndex("bySymbol",
Triceps::SimpleOrderedIndex->new(symbol => "ASC")
)
or confess "$!";
$ttSymbol->initialize() or confess "$!";
my $tWindow = $unit->makeTable($ttWindow, "EM_CALL", "tWindow")
or confess "$!";
my $tSymbol = $unit->makeTable($ttSymbol, "EM_CALL", "tSymbol")
or confess "$!";
# export the endpoints for TQL (it starts the listener)
my $tql = Triceps::X::Tql->new(
name => "tql",
trieadOwner => $owner,
socketName => $opts->{socketName},
tables => [
$tWindow,
$tSymbol,
],
tableNames => [
"window",
"symbol",
],
inputs => [
$tWindow->getInputLabel(),
$tSymbol->getInputLabel(),
],
inputNames => [
"window",
"symbol",
],
);
$owner->readyReady();
$owner->mainLoop();
}
{
my ($port, $thread) = Triceps::X::ThreadedServer::startServer(
app => "appTql",
main => \&appCoreT,
port => 0,
fork => -1, # create a thread, not a process
);
}
This core logic is very simple: all it does is create two tables and then send the input data into them. The server gets started in a background thread (fork => -1) because this code is taken from a test that then goes and runs the expect with the SimpleClient.
The specification of inputs and tables for TQL is somewhat ugly but I kept it as it was historic (it was done this way to keep the parsing of the options simpler). The new options compared to the single-threaded TQL are the "threadOwner", "inputs" and "inputNames". The "threadOwner" is how TQL knows that it must run in the multithreaded mode, and it's used to create the nexuses for communication between the core logic and the rest of TQL. The inputs are needed because the multithreaded TQL parses and forwards the input data, unlike the single-threaded version that relies on the SimpleServer to do that according to the user-defined dispatch table.
The names options don't have to be used: if you name your labels and tables nicely and suitable for the external vieweing, the renaming-for-export can be skipped.
Similar to the single-threaded version, if any of the options "tables" or "inputs" is used, the TQL object gets initialized automatically, otherwise the tables and inputs can be added piecemeal with addTable(), addNamedTable(), addInput(), addNamedInput(), and then the whole thing initialized manually.
Then the clients can establish the connections with the TQL server, send in the data and the queries. To jump in, here is a trace of a simple session that sends some data, then does some table dumps and subscribes, not touching the queries yet. I'll go through it fragment by fragment and explain the meaning. The dumps and subscribes were the warm-up exercises before writing the full queries, but they're useful in their own right, and here they serve as the warm-up exercises for the making of the queries!
> connect c1
c1|ready
The "connect" is not an actual command send but just the indication in the trace that the connection was set up by the client "c1" (it's a trace from the SimpleClient, so it follows the usual conventions). The "ready" response is set when the connection is opened, similar to the chat server shown before.
> c1|subscribe,s1,symbol
c1|subscribe,s1,symbol
This is a subscription request. It means "I'm not interested in the current state of a table but send me all the updates". The response is the mirror of the request, so that the client knows that the request has been processed. "s1" is the unique identifier of the request, so that the client can match together the responses it received to the requests it sent (and keeping the uniqueness is up to the client, the server may refuse the requests with duplicate identifiers). And "symbol" is the name of the table. Once a subscription is in place, there is no way to unsubscribe other than by disconnecting the client (it's doable but adds complications, and I wanted to skip over the nonessential parts). Subscribing multiple times to the same table will send a confirmation every time but the repeated confirmations will have no effect: only one copy of the data will be sent anyway.
> c1|d,symbol,OP_INSERT,ABC,ABC Corp,1.0
c1|d,symbol,OP_INSERT,ABC,ABC Corp,1
This sends the data into the model. And since it propagates through the subscription, the data gets sent back too. The "symbol" here means two different things: on the input side it's the name of the label where the data is sent, on the output side it's the name of the table that has been subscribed to.
The data lines start with the command "d" (since the data is sent much more frequently than the commands, I've picked a short one-letter "command name" for it), then the label/table name, opcode and the row fields in CSV format.
> c1|confirm,cf1
c1|confirm,cf1,,,
The "confirm" command provides a way for the client to check that the data it send had propagated through the model. And it doesn't have to subscribe back to the data and read them. Send some data lines, then send the "confirm" command and wait for it to come back (again, the unique id allows to keep multiple confirmations in flight if you please). This command doesn't guarantee that all the clients have seen the results from that data. It only guarantees that the core logic had seen the data, and more weakly guarantees that the data has been processed by the core logic, and this particular client had already seen all the results from it.
Why weakly? It has to do with the way it works inside, and it depends on the core logic. If the core logic consists of one thread, the guarantee is quite strong. But if the core logic farms out the work from the main thread to the other threads and then collects the results back, the guarantee breaks.
On the Fig. 1 you can see that unlike the chat server shown before, TQL doesn't have any private nexuses for communication between the reader and writer threads of a client. Instead it relies on the same input and output nexuses, adding a control label to them, to forward the commands from the reader to the writer. The TQL object in the core logic thread creates a short-circuit connection between the control labels in the input and output nexuses, forwarding the commands. And if the core logic all runs in one thread, this creates a natural pipeline: the data comes in, gets processed, comes out, the "confirm" command comes in, comes out after the data. But if the core logic farms out the work to more threads, the confirmation can "jump the line" because its path is a direct short circuit.
> c1|drain,dr1
c1|drain,dr1,,,
The "drain" is an analog of "confirm" but more reliable and slower: the reader thread drains the whole model before sending the command on. This guarantees that all the processing is done, and all the output from it has been sent to all the clients.
> c1|dump,d2,symbol
c1|startdump,d2,symbol
c1|d,symbol,OP_INSERT,ABC,ABC Corp,1
c1|dump,d2,symbol
The "dump" command dumps the current contents of a table. Its result starts with "startdump", and the same id and table name as in the request, then goes the data (all with OP_INSERT), finishing with the completion confirmation echoing the original command. The dump is atomic, the contents of the table doesn't change in the middle of the dump. However if a subscription on this table is active, the data rows from that subscription may come before and after the dump.
I'm not going to describe the error reporting, but it's worth mentioning that if a command contains errors, its "confirmation" will be an error line with the same identifier.
> c1|dumpsub,ds3,symbol
c1|startdump,ds3,symbol
c1|d,symbol,OP_INSERT,ABC,ABC Corp,1
c1|dumpsub,ds3,symbol
The "dumpsub" command is a combination of a dump and subscribe: get the initial state and then get all the updates. The confirmation of "dumpsub" marks the boundary between the original dump and the following updates.
> c1|d,symbol,OP_INSERT,DEF,Defense Corp,2.0
c1|d,symbol,OP_INSERT,DEF,Defense Corp,2
Send some more data, and it comes back only once, even though the subscription was done twice: once in "subscribe" and once in "dumpsub". The repeated subscription requests simply get consumed into one subscription.
> c1|d,window,OP_INSERT,1,ABC,101,10
This sends a row to the other table but nothing comes back because there is no subscription to that table.
> c1|dumpsub,ds4,window
c1|startdump,ds4,window
c1|d,window,OP_INSERT,1,ABC,101,10
c1|dumpsub,ds4,window
> c1|d,window,OP_INSERT,2,ABC,102,12
c1|d,window,OP_INSERT,2,ABC,102,12
This demonstrates the pure dump-and-subscribe without any interventions.
> c1|shutdown
c1|shutdown,,,,
c1|__EOF__
And the shutdown command works the same as in the chat server, draning and then shutting down the whole server.
Now on to the queries.
> connect c1
c1|ready
> c1|d,symbol,OP_INSERT,ABC,ABC Corp,1.0
Starts a client connection and sends some data.
> c1|querysub,q1,query1,{read table symbol}{print tokenized 0}
c1|d,query1,OP_INSERT,ABC,ABC Corp,1
c1|querysub,q1,query1
The "querysub" command does the "query-and-subscribe": reads the initial state of the table, processed through the query, and then subscribes to any future updates. The single-threaded variety of TQL doesn't do this, it does just the one-time queries. The multithreaded TQL could also do the one-time queries, and also just the subscribes without the initial state, but I've been cutting corners for this example and the only thing that's actually available is the combination of two, the "querysub".
"q1" is similar to the other command, the command identifier. The next field "query1" is the name for the query, it's the name that will be shown for the data lines coming out of the query. And then goes the query in the brace-quoted format, same as the single-threaded TQL (and there is no further splitting by commas, so the commas can be used freely in the query).
The identified and the name for the query sound kind of redundant. But the client may generate them in different ways and need both. The name has the more symbolic character. The identifier can be generated as a sequence of numbers, so that the client can keep track of its progress more easily. And the error reports include the identifier but not the query name in them.
For the query, there is no special line coming out before the initial dump. Supposedly, there would not be more than one query in flight with the same name, so this could be easily told apart based on the name in the data lines. There is also an underlying consideration that when the query involves a join, in the future the initial dump might be happening in multiple chunks, requiring to either surround every chunk with the start-end lines or just let them go without the extra notifications, as they are now.
And the initial dump ends as usual with getting the echo of the command (without the query part) back.
This particular query is very simple and equivalent to a "dumpsub".
> c1|d,symbol,OP_INSERT,DEF,Defense Corp,2.0
c1|d,query1,OP_INSERT,DEF,Defense Corp,2
Send more data and it will come out of the query.
> c1|querysub,q2,query2,{read table symbol}{where istrue {$%symbol =~ /^A/}}{project fields {symbol eps}}
c1|t,query2,query2 OP_INSERT symbol="ABC" eps="1"
c1|querysub,q2,query2
This query is more complicated, doing a selection (the "where" query command) and projection. It also prints the results in the tokenized format (the "print" command gets added automatically if it wasn't used explicitly, and the default options for it enable the tokenized format).
The tokenized lines come out with the command "t", query name and then the contents of the row. The query name happens to be sent twice, and I'm not sure yet if it's a feature or a bug.
> c1|d,symbol,OP_INSERT,AAA,Absolute Auto Analytics Inc,3.0
c1|d,query1,OP_INSERT,AAA,Absolute Auto Analytics Inc,3
c1|t,query2,query2 OP_INSERT symbol="AAA" eps="3"
> c1|d,symbol,OP_DELETE,DEF,Defense Corp,2.0
c1|d,query1,OP_DELETE,DEF,Defense Corp,2
More examples of the data sent, getting processed by both queries. In the second case the "where" filters out the row from query2, so only query1 produces the result.
> c1|shutdown
c1|shutdown,,,,
c1|__EOF__
And the shutdown as usual.
Now the "piece de resistance": queries with joins.
> connect c1
c1|ready
> c1|d,symbol,OP_INSERT,ABC,ABC Corp,2.0
> c1|d,symbol,OP_INSERT,DEF,Defense Corp,2.0
> c1|d,symbol,OP_INSERT,AAA,Absolute Auto Analytics Inc,3.0
> c1|d,window,OP_INSERT,1,AAA,12,100
Connect and send some starting data.
> c1|querysub,q1,query1,{read table window}{join table symbol byLeft {symbol} type left}
c1|t,query1,query1 OP_INSERT id="1" symbol="AAA" price="12" size="100" name="Absolute Auto Analytics Inc" eps="3"
c1|querysub,q1,query1
A left join of the tables "window" and "symbol", by the field "symbol" as join condition.
Note that unlike the previous single-threaded TQL examples, the index type path for the table "symbol" is not explicitly specified. It's the result of the new method TableType::findIndexPathForKeys() described before, now the index gets found automatically. And the single-threaded TQL now has this feature too. If you really want, you can still specify the index path but usually there is no need to.
The TQL joins, even in the multithreaded mode, are still implemented internally as LookupJoin, driven only by the main flow of the query. So the changes to the joined dimension tables will not update the query results, and will be visible only when a change on the main flow picks them up, potentially creating inconsistencies in the output. This is wrong, but fixing it presents complexities that I've left alone until some later time.
> c1|d,window,OP_INSERT,2,ABC,13,100
c1|t,query1,query1 OP_INSERT id="2" symbol="ABC" price="13" size="100" name="ABC Corp" eps="2"
> c1|d,window,OP_INSERT,3,AAA,11,200
c1|t,query1,query1 OP_INSERT id="3" symbol="AAA" price="11" size="200" name="Absolute Auto Analytics Inc" eps="3"
Sending data updates the results of the query.
> c1|d,symbol,OP_DELETE,AAA,Absolute Auto Analytics Inc,3.0
> c1|d,symbol,OP_INSERT,AAA,Alcoholic Abstract Aliens,3.0
As described above, the modifications of the dimension table are mot visible in the query directly.
> c1|d,window,OP_DELETE,1
c1|t,query1,query1 OP_DELETE id="1" symbol="AAA" price="12" size="100" name="Alcoholic Abstract Aliens" eps="3"
But an update on the main flow brings them up (an in this case inconsistently, the row getting deleted is not exactly the same as the row inserted before).
> c1|querysub,q2,query2,{read table window}{join table symbol byLeft {symbol} type left}{join table symbol byLeft {eps} type left rightFields {symbol/symbol2}}
c1|t,query2,query2 OP_INSERT id="2" symbol="ABC" price="13" size="100" name="ABC Corp" eps="2" symbol2="ABC"
c1|t,query2,query2 OP_INSERT id="2" symbol="ABC" price="13" size="100" name="ABC Corp" eps="2" symbol2="DEF"
c1|t,query2,query2 OP_INSERT id="3" symbol="AAA" price="11" size="200" name="Alcoholic Abstract Aliens" eps="3" symbol2="AAA"
c1|querysub,q2,query2
This is a more complicated query, involving two joins, with the same dimension table "symbol". The second join by "eps" makes no real-world sense whatsoever but it's interesting from the technical perspective: if you check the table type of this table at the start of the post, you'll find that it has no index on the field "eps". The join adds this index on demand!
The way it works, all the dimension tables are copied into the client's writer thread, created from the table types exported by the core logic throuhg the output nexus. (And if a table is used in the same query twice, it's currently also copied twice). This provides a nice opportunity to amend the table type by adding any necessary secondary index before creating the table, and TQL makes a good use of it.
The details are forthcoming in the next post.
Thursday, April 11, 2013
Multithreaded pipeline, part 4
Let's look at the aggregation by the hour. First, the short version that skips over the actual logic and concentrates on how the nexuses are connected.
sub RawToHourlyMain # (@opts)
{
my $opts = {};
Triceps::Opt::parse("traffic main", $opts, {
@Triceps::Triead::opts,
from => [ undef, \&Triceps::Opt::ck_mandatory ],
}, @_);
my $owner = $opts->{owner};
my $unit = $owner->unit();
my $faIn = $owner->importNexus(
from => $opts->{from},
as => "input",
import => "reader",
);
# ... create the table and aggregation ...
my $faOut = $owner->makeNexus(
name => "data",
labels => [
$faIn->getFnReturn()->getLabelHash(),
hourly => $lbHourlyFiltered,
],
import => "writer",
);
# ... connect the input nexus to the table ...
# ... create the table dump logic ...
$owner->readyReady();
$owner->mainLoop(); # all driven by the reader
}
This function inherits the options from Triead::start() as usual and adds the option from of its own. This option's value is then used as the name of nexus to import for reading. The row types of the labels from that imported facet are then used to create the table and aggregation. But they aren't connected to the input labels yet.
First, the output nexus is created. The creation passes through all the incoming data, short-circuiting the input and output, and adds the extra label for the aggregated output. After that the rest of the logic can be connected to the inputs (and to the outputs too).
The reason why this connection order is important is that the labels get caller in the order they are chained from the input label. And when this thread reacts to some event, we want the original event to pass through to the output first and then send the reaction to it.
And after that it's all usual readyReady() and mainLoop().
The full text of the function follows. The logic is based on the previous example from the chapter 13, and the only big change is the use of SimpleAggergator instead of a manually-built one. The HourlyToDailyMain() is very similar, so I won't even show it, you can find the full text in SVN.
# compute an hour-rounded timestamp (in microseconds)
sub hourStamp # (time)
{
return $_[0] - ($_[0] % (1000*1000*3600));
}
sub RawToHourlyMain # (@opts)
{
my $opts = {};
Triceps::Opt::parse("traffic main", $opts, {
@Triceps::Triead::opts,
from => [ undef, \&Triceps::Opt::ck_mandatory ],
}, @_);
my $owner = $opts->{owner};
my $unit = $owner->unit();
# The current hour stamp that keeps being updated;
# any aggregated data will be propagated when it is in the
# current hour (to avoid the propagation of the aggregator clearing).
my $currentHour;
my $faIn = $owner->importNexus(
from => $opts->{from},
as => "input",
import => "reader",
);
# the full stats for the recent time
my $ttPackets = Triceps::TableType->new($faIn->getLabel("packet")->getRowType())
->addSubIndex("byHour",
Triceps::IndexType->newPerlSorted("byHour", undef, sub {
return &hourStamp($_[0]->get("time")) <=> &hourStamp($_[1]->get("time"));
})
->addSubIndex("byIP",
Triceps::IndexType->newHashed(key => [ "local_ip", "remote_ip" ])
->addSubIndex("group",
Triceps::IndexType->newFifo()
)
)
)
or confess "$!";
# type for a periodic summary, used for hourly, daily etc. updates
my $rtSummary;
Triceps::SimpleAggregator::make(
tabType => $ttPackets,
name => "hourly",
idxPath => [ "byHour", "byIP", "group" ],
result => [
# time period's (here hour's) start timestamp, microseconds
time => "int64", "last", sub {&hourStamp($_[0]->get("time"));},
local_ip => "string", "last", sub {$_[0]->get("local_ip");},
remote_ip => "string", "last", sub {$_[0]->get("remote_ip");},
# bytes sent in a time period, here an hour
bytes => "int64", "sum", sub {$_[0]->get("bytes");},
],
saveRowTypeTo => \$rtSummary,
);
$ttPackets->initialize() or confess "$!";
my $tPackets = $unit->makeTable($ttPackets,
&Triceps::EM_CALL, "tPackets") or confess "$!";
# Filter the aggregator output to match the current hour.
my $lbHourlyFiltered = $unit->makeDummyLabel($rtSummary, "hourlyFiltered");
$tPackets->getAggregatorLabel("hourly")->makeChained("hourlyFilter", undef, sub {
if ($_[1]->getRow()->get("time") == $currentHour) {
$unit->call($lbHourlyFiltered->adopt($_[1]));
}
});
# It's important to connect the pass-through data first,
# before chaining anything to the labels of the faIn, to
# make sure that any requests and raw inputs get through before
# our reactions to them.
my $faOut = $owner->makeNexus(
name => "data",
labels => [
$faIn->getFnReturn()->getLabelHash(),
hourly => $lbHourlyFiltered,
],
import => "writer",
);
my $lbPrint = $faOut->getLabel("print");
# update the notion of the current hour before the table
$faIn->getLabel("packet")->makeChained("processPackets", undef, sub {
my $row = $_[1]->getRow();
$currentHour = &hourStamp($row->get("time"));
# skip the timestamp updates without data
if (defined $row->get("bytes")) {
$unit->call($tPackets->getInputLabel()->adopt($_[1]));
}
});
# the dump request processing
$tPackets->getDumpLabel()->makeChained("printDump", undef, sub {
$unit->makeArrayCall($lbPrint, "OP_INSERT", $_[1]->getRow()->printP() . "\n");
});
$faIn->getLabel("dumprq")->makeChained("dump", undef, sub {
if ($_[1]->getRow()->get("what") eq "packets") {
$tPackets->dumpAll();
}
});
$owner->readyReady();
$owner->mainLoop(); # all driven by the reader
}
sub RawToHourlyMain # (@opts)
{
my $opts = {};
Triceps::Opt::parse("traffic main", $opts, {
@Triceps::Triead::opts,
from => [ undef, \&Triceps::Opt::ck_mandatory ],
}, @_);
my $owner = $opts->{owner};
my $unit = $owner->unit();
my $faIn = $owner->importNexus(
from => $opts->{from},
as => "input",
import => "reader",
);
# ... create the table and aggregation ...
my $faOut = $owner->makeNexus(
name => "data",
labels => [
$faIn->getFnReturn()->getLabelHash(),
hourly => $lbHourlyFiltered,
],
import => "writer",
);
# ... connect the input nexus to the table ...
# ... create the table dump logic ...
$owner->readyReady();
$owner->mainLoop(); # all driven by the reader
}
This function inherits the options from Triead::start() as usual and adds the option from of its own. This option's value is then used as the name of nexus to import for reading. The row types of the labels from that imported facet are then used to create the table and aggregation. But they aren't connected to the input labels yet.
First, the output nexus is created. The creation passes through all the incoming data, short-circuiting the input and output, and adds the extra label for the aggregated output. After that the rest of the logic can be connected to the inputs (and to the outputs too).
The reason why this connection order is important is that the labels get caller in the order they are chained from the input label. And when this thread reacts to some event, we want the original event to pass through to the output first and then send the reaction to it.
And after that it's all usual readyReady() and mainLoop().
The full text of the function follows. The logic is based on the previous example from the chapter 13, and the only big change is the use of SimpleAggergator instead of a manually-built one. The HourlyToDailyMain() is very similar, so I won't even show it, you can find the full text in SVN.
# compute an hour-rounded timestamp (in microseconds)
sub hourStamp # (time)
{
return $_[0] - ($_[0] % (1000*1000*3600));
}
sub RawToHourlyMain # (@opts)
{
my $opts = {};
Triceps::Opt::parse("traffic main", $opts, {
@Triceps::Triead::opts,
from => [ undef, \&Triceps::Opt::ck_mandatory ],
}, @_);
my $owner = $opts->{owner};
my $unit = $owner->unit();
# The current hour stamp that keeps being updated;
# any aggregated data will be propagated when it is in the
# current hour (to avoid the propagation of the aggregator clearing).
my $currentHour;
my $faIn = $owner->importNexus(
from => $opts->{from},
as => "input",
import => "reader",
);
# the full stats for the recent time
my $ttPackets = Triceps::TableType->new($faIn->getLabel("packet")->getRowType())
->addSubIndex("byHour",
Triceps::IndexType->newPerlSorted("byHour", undef, sub {
return &hourStamp($_[0]->get("time")) <=> &hourStamp($_[1]->get("time"));
})
->addSubIndex("byIP",
Triceps::IndexType->newHashed(key => [ "local_ip", "remote_ip" ])
->addSubIndex("group",
Triceps::IndexType->newFifo()
)
)
)
or confess "$!";
# type for a periodic summary, used for hourly, daily etc. updates
my $rtSummary;
Triceps::SimpleAggregator::make(
tabType => $ttPackets,
name => "hourly",
idxPath => [ "byHour", "byIP", "group" ],
result => [
# time period's (here hour's) start timestamp, microseconds
time => "int64", "last", sub {&hourStamp($_[0]->get("time"));},
local_ip => "string", "last", sub {$_[0]->get("local_ip");},
remote_ip => "string", "last", sub {$_[0]->get("remote_ip");},
# bytes sent in a time period, here an hour
bytes => "int64", "sum", sub {$_[0]->get("bytes");},
],
saveRowTypeTo => \$rtSummary,
);
$ttPackets->initialize() or confess "$!";
my $tPackets = $unit->makeTable($ttPackets,
&Triceps::EM_CALL, "tPackets") or confess "$!";
# Filter the aggregator output to match the current hour.
my $lbHourlyFiltered = $unit->makeDummyLabel($rtSummary, "hourlyFiltered");
$tPackets->getAggregatorLabel("hourly")->makeChained("hourlyFilter", undef, sub {
if ($_[1]->getRow()->get("time") == $currentHour) {
$unit->call($lbHourlyFiltered->adopt($_[1]));
}
});
# It's important to connect the pass-through data first,
# before chaining anything to the labels of the faIn, to
# make sure that any requests and raw inputs get through before
# our reactions to them.
my $faOut = $owner->makeNexus(
name => "data",
labels => [
$faIn->getFnReturn()->getLabelHash(),
hourly => $lbHourlyFiltered,
],
import => "writer",
);
my $lbPrint = $faOut->getLabel("print");
# update the notion of the current hour before the table
$faIn->getLabel("packet")->makeChained("processPackets", undef, sub {
my $row = $_[1]->getRow();
$currentHour = &hourStamp($row->get("time"));
# skip the timestamp updates without data
if (defined $row->get("bytes")) {
$unit->call($tPackets->getInputLabel()->adopt($_[1]));
}
});
# the dump request processing
$tPackets->getDumpLabel()->makeChained("printDump", undef, sub {
$unit->makeArrayCall($lbPrint, "OP_INSERT", $_[1]->getRow()->printP() . "\n");
});
$faIn->getLabel("dumprq")->makeChained("dump", undef, sub {
if ($_[1]->getRow()->get("what") eq "packets") {
$tPackets->dumpAll();
}
});
$owner->readyReady();
$owner->mainLoop(); # all driven by the reader
}
Subscribe to:
Posts (Atom)