When a thread is requested to die, its registered file descriptors become revoked, and the signal SIGUSR2 is sent to it to interrupt any ongoing system calls. For this to work correctly, there must be a signal handler defined on SIGUSR2, because otherwise the default reaction to it is to kill the process. It doesn't matter what signal handler, just some handler must be there. The Triceps library defines an empty signal handler but you can also define your own instead.
In Perl, the empty handler for SIGUSR2 is set when the module Triceps.pm is loaded. You can change it afterwards.
In C++ Triceps provides a class Sigusr2, defined in app/Sigusr2.h, to help with this. If you use the class BasicPthread, you don't need to deal with Sigusr2 directly: BasicPthread takes care of it. All the methods of Sigusr2 are static.
static void setup();
Set up an empty handler for SIGUSR2 if it hasn't been done yet. This class has a static flag (synchronized by a mutex) showing that the handler had been set up. On the first call it sets the handler and sets the flag. On the subsequent calls it checks the flag and does nothing.
static void markDone();
Just set the flag that the setup has been done. This allows to set your own handler instead and still cooperate with the logic of Sigusr2 and BasicPthread.
If you set your custom handler before any threads have been started, then set up your handler and then call markDone(), telling Sigusr2 that there is no need to set the handler any more.
If you set your custom handler when the Triceps threads are already running (not the best idea but still a possibility), there is a possibility of a race with another thread calling setup(). To work around that race, set up your handler, call markDone(), then set up your handler again.
static void reSetup();
This allows to replace the custom handler with the empty one. It always forcibly sets the empty handler (and also the flag).
This started as my thoughts on the field of Complex Event Processing, mostly about my OpenSource project Triceps. But now it's about all kinds of software-related things.
Showing posts with label shutdown. Show all posts
Showing posts with label shutdown. Show all posts
Saturday, July 13, 2013
Sunday, June 16, 2013
App reference, Perl, part 1
The App API has two parts to it: The first part keeps track of all the App instances in the program, and allows to list and find them. The second part is the manipulations on a particular instance.
The global part of the API is:
@apps = Triceps::App::listApps();
Returns the array of name-value pairs, values containing the App references, listing all the Apps in the program (more exactly, in this Triceps library, you you compile multiple Triceps libraries together by renaming them, each of them will have its own Apps list). The returned array can be placed into a hash.
$app = Triceps::App::find($name);
Find an App by name. If an App with such a name does not exist, it will confess.
$app = Triceps::App:make($name);
Create a new App, give it the specified name, and put it on the list. The names must not be duplicate with the other existing Apps, or the method will confess.
drop($app);
drop($appName);
Drop the app, by reference or by name, from the list. The App is as usual reference-counted and will exist while there are references to it. The global list provides one of these references, so an App is guaranteed to exist while it's still on the list. When dropped, it will still be accessible through the existing references, but obviously will not be listed any more and could not be found by name.
Moreover, a new App with the same name can be added to the list. Because of this, dropping an App by name requires some care in case if there could be a new App created again with the same name: it creates a potential for a race, and you might end up dropping the new App instead of the old one. Of course, if it's the same thread that drops the old one and creates the new one, then there is no race. Dropping an application by name that doesn't exist at the moment is an error and will confess.
Dropping the App by reference theoretically allows to avoid the race: a specific object gets dropped, and if it already has been dropped, the call has no effect. Theoretically. However in practice Perl has a limitation of passing the object values between threads, and thus whenever each thread starts, first thing it does is finding its App by name. It's a very similar kind of race and is absolutely unavoidable except by making sure that all the App's threads have exited and joined (i.e. harvesting them). So make sure to complete the App's thread harvesting before dropping the App in the harvester thread, and by then it doesn't matter a whole lot if the App is dropped by name or by reference.
Now the second part of the API, working with an App instance.
Many (but not all) of the App methods allow to specify the App either by reference or by name, and they automatically sort it out, doing the internal look-up by name if necessary. So the same method could be used as either of:
$app->method(...);
Triceps::App::method($app, ...);
Triceps::App::method($appName, ...);
Obviously, you can not use the "->" syntax with a name, and obviously if the name is not in the app list, the method will confess. Below I'll show the calls that allow the dual formats as Triceps::App::method($appOrName, ...) but keep in mind that you can use the "->" form of them too with a reference.
$app = Triceps::App::resolve($appOrName);
Do just the automatically-sorting-out part: gets a reference or name and returns a reference either way. A newly created reference is returned in either case (not the argument reference). You can use this resolver before the methods that accept only a reference.
$result = $app->same($app2);
Check if two references are for the same App object. Here they both must be references and not names.
$name = $app->getName();
Get the name of the App, from a reference.
Triceps::App::declareTriead($appOrName, $trieadName);
Declare a Triead (Triceps thread) in advance. Any attempts to look up the nexuses in that thread will then wait for the thread to become ready. (Attempts to look up in an undeclared and undefined thread are errors). This is necessary to prevent a race at the thread creation time. For the most part, the method Triead::start() just does the right thing by calling this method automatically and you don't need the use it manually, except in some very special circumstances.
@trieads = Triceps::App::getTrieads($appOrName);
Get the list of currently defined Trieads, as name-value pairs. Keep in mind that the other threads may be modifying the list of Trieads, so if you do this call multiple times, you may get different results. However the Trieads are returned as references, so they are guaranteed to stay alive and readable even if they get removed from the App, or even if the App gets dropped.
$app->harvester(@options);
Run the harvester in the current thread. The harvester gets notifications from the threads when they are about to exit, and joins them. After all the threads have been joined, it automatically drops the App, and returns.
The harvesting is an absolutely necessary part of the App life cycle, however in most of the usage patterns (such as with Triead::startHere or App::build) the harvester is called implicitly from the wrapping library functions, so you don't need to care about it.
Note also that if you're running the harvester manually, you must call it only after the first thread has been defined or at least declared. Otherwise it will find no threads in the App, consider it dead and immediately drop it.
If the App was aborted, the harvester will normally confess after if had joined all the threads and disposed of the App, unless the option die_on_abort (see below) has been set to 0. This propagates the error to the caller. However there is a catch: if some of the threads don't react properly by exiting on an abort indication, the program will be stuck and you will see no error message until these threads get unstuck, possibly forever.
Options:
die_on_abort => 1/0
(default: 1) If the App was aborted, the harvester will normally confess after if had joined all the threads and disposed of the App. Setting this option to 0 will make the aborts silently ignored. This option does not affect the errors in joining the threads: if any of those are detected, harvester will still confess after it had disposed of the app.
$dead = $app->harvestOnce();
Do one run of the harvesting. Joins all the threads that have exited since its last call. If no threads have exited since then, returns immediately. Returns 1 if all the threads have exited (and thus the App is dead), 0 otherwise. If a thread join fails, immediately confesses (if multiple threads were ready for joining, the ones queued after the failed one won't be joined, call harvestOnce() once more to join them).
$app->waitNeedHarvest();
Wait for at least one thread to become ready for harvesting. If the App is already dead (i.e. all its threads have exited), returns immediately.
These two methods allow to write the custom harvesters if you're not happy with the default one. The basic harvester logic can be written as:
do {
$app->waitNeedHarvest()
} while(!$app->harvestOnce());
$app->drop();
However the real harvester also does some smarter things around the error handling. You can look it up in the source code in cpp/app/App.cpp.
$res = Triceps::App::isDead($appOrName);
Returns 1 if the App is dead (i.e. has no more live threads), otherwise 0. Calling this method with a name for the argument is probably a bad idea, since normally the harvester will drop the App quickly after it becomes dead, and you may end up with this method confessing when it could not find the dropped App.
$res = Triceps::App::isShutdown($appOrName);
Returns 1 if the App has been requested to shut down, either normally or by being aborted.
$res = Triceps::App::isAborted($appOrName);
Returns 1 if the App has been aborted. The App may be aborted explicitly by calling the method abortBy(), or the thread wrapper logic automatically converts any unexpected deaths in the App's threads to the aborts. If any thread dies, this aborts the App, which in turn requests the other threads to die on their next thread-related call. Eventually the harvester collects them all and confesses, normally making the whole program die with an error.
($tname, $message) = Triceps::App::getAborted($appOrName);
Get the App abort information: name of the thread that caused the abort, and its error message.
Triceps::App::abortBy($appOrName, $tname, $msg);
Abort the application. The thread name and message will be remembered, and returned later by getAborted() or in the harvester. If abortBy() is called multiple times, only the first pair of thread name and message gets remembered. The reason is that on abort all the threads get interrupted in a fairly rough manner (all their ongoing and following calls to the threading API die), which typically causes them to call abortBy() as well, and there is no point in collecting these spurious messages.
The thread name here doesn't have to be the name of the actual thread that reports the issue. For example, if the thread creation as such fails (maybe because the OS limit on the thread count) that gets detected by the parent thread but reported in the name of the thread whose creation has failed. And in general you can pass just any string as the thread name, App itself doesn't care, just make it something that makes sense to you.
$res = Triceps::App::isDead($appOrName);
Returns 1 if the App is dead (i.e. it has no alive Trieads, all the defined and declared threads have exited). Right after the App is created, before the first Triead is created, the App is also considered dead, and becomes alive when the first Triead is declared or defined. If an App becomes dead later, when all the Trieads exit, it can still be brought back alive by creating more Trieads. But this is considered bad practice, and will cause a race with the harvester (if you want to do this, you have to make your own custom harvester).
$res = Triceps::App::isShutdown($appOrName);
Returns 1 if the App was requested to shut down. The Trieads might still run for some time, until they properly detect and process the shutdown, and exit. So this condition is not equivalent to Dead, althouh they are connected. If any new Trieads get started, they will be shut down right away and won't run.
To reiterate: if all the Trieads just exit by themselves, the App becomes dead but not shut down. You could still start more Trieads and bring the App back alive. If the App has been shut down, it won't become immediately dead, but it will send the shutdown indication to all the Trieads, and after all of them eventually exit, the App will become dead too. And after shutdown there is no way to bring the App back alive, since any new Trieads will be shut down right away (OK, there might be a short period until they detect the shutdown, so the App could spike as alive for a short time, but then will become dead again).
Triceps::App::waitDead($appOrName);
Will wait for the App to become dead and return after that. Make sure to not call waitDead() from any of App's Trieads: that would cause a deadlock.
Triceps::App::shutdown($appOrName);
Shut down the App. The shutdown state is sticky, so any repeated calls will have no effect. The call returns immediately and doesn't wait for the App to die. If you want to wait, call waitDead() afterwards. Make sure to not call waitDead() from a Triead: that would cause a deadlock.
Triceps::App::shutdownFragment($appOrName, $fragName);
Shut down a named fragment. This does not shut down the whole App, it just selectively shuts down the Trieads belonging to this fragment . See the explanation of the fragments in http://babkin-cep.blogspot.com/2013/03/triceps-multithreading-concepts.html. The fragment shutdown is not sticky: after a fragment has been shut down, it's possible to create another fragment with the same name. To avoid races, a fragment may be shut down only after all its Trieads are ready. So the caller Triead must call readyReady() before it calls shutdownFragment(). If any of the fragment's Trieads are not ready, the call will confess.
The global part of the API is:
@apps = Triceps::App::listApps();
Returns the array of name-value pairs, values containing the App references, listing all the Apps in the program (more exactly, in this Triceps library, you you compile multiple Triceps libraries together by renaming them, each of them will have its own Apps list). The returned array can be placed into a hash.
$app = Triceps::App::find($name);
Find an App by name. If an App with such a name does not exist, it will confess.
$app = Triceps::App:make($name);
Create a new App, give it the specified name, and put it on the list. The names must not be duplicate with the other existing Apps, or the method will confess.
drop($app);
drop($appName);
Drop the app, by reference or by name, from the list. The App is as usual reference-counted and will exist while there are references to it. The global list provides one of these references, so an App is guaranteed to exist while it's still on the list. When dropped, it will still be accessible through the existing references, but obviously will not be listed any more and could not be found by name.
Moreover, a new App with the same name can be added to the list. Because of this, dropping an App by name requires some care in case if there could be a new App created again with the same name: it creates a potential for a race, and you might end up dropping the new App instead of the old one. Of course, if it's the same thread that drops the old one and creates the new one, then there is no race. Dropping an application by name that doesn't exist at the moment is an error and will confess.
Dropping the App by reference theoretically allows to avoid the race: a specific object gets dropped, and if it already has been dropped, the call has no effect. Theoretically. However in practice Perl has a limitation of passing the object values between threads, and thus whenever each thread starts, first thing it does is finding its App by name. It's a very similar kind of race and is absolutely unavoidable except by making sure that all the App's threads have exited and joined (i.e. harvesting them). So make sure to complete the App's thread harvesting before dropping the App in the harvester thread, and by then it doesn't matter a whole lot if the App is dropped by name or by reference.
Now the second part of the API, working with an App instance.
Many (but not all) of the App methods allow to specify the App either by reference or by name, and they automatically sort it out, doing the internal look-up by name if necessary. So the same method could be used as either of:
$app->method(...);
Triceps::App::method($app, ...);
Triceps::App::method($appName, ...);
Obviously, you can not use the "->" syntax with a name, and obviously if the name is not in the app list, the method will confess. Below I'll show the calls that allow the dual formats as Triceps::App::method($appOrName, ...) but keep in mind that you can use the "->" form of them too with a reference.
$app = Triceps::App::resolve($appOrName);
Do just the automatically-sorting-out part: gets a reference or name and returns a reference either way. A newly created reference is returned in either case (not the argument reference). You can use this resolver before the methods that accept only a reference.
$result = $app->same($app2);
Check if two references are for the same App object. Here they both must be references and not names.
$name = $app->getName();
Get the name of the App, from a reference.
Triceps::App::declareTriead($appOrName, $trieadName);
Declare a Triead (Triceps thread) in advance. Any attempts to look up the nexuses in that thread will then wait for the thread to become ready. (Attempts to look up in an undeclared and undefined thread are errors). This is necessary to prevent a race at the thread creation time. For the most part, the method Triead::start() just does the right thing by calling this method automatically and you don't need the use it manually, except in some very special circumstances.
@trieads = Triceps::App::getTrieads($appOrName);
Get the list of currently defined Trieads, as name-value pairs. Keep in mind that the other threads may be modifying the list of Trieads, so if you do this call multiple times, you may get different results. However the Trieads are returned as references, so they are guaranteed to stay alive and readable even if they get removed from the App, or even if the App gets dropped.
$app->harvester(@options);
Run the harvester in the current thread. The harvester gets notifications from the threads when they are about to exit, and joins them. After all the threads have been joined, it automatically drops the App, and returns.
The harvesting is an absolutely necessary part of the App life cycle, however in most of the usage patterns (such as with Triead::startHere or App::build) the harvester is called implicitly from the wrapping library functions, so you don't need to care about it.
Note also that if you're running the harvester manually, you must call it only after the first thread has been defined or at least declared. Otherwise it will find no threads in the App, consider it dead and immediately drop it.
If the App was aborted, the harvester will normally confess after if had joined all the threads and disposed of the App, unless the option die_on_abort (see below) has been set to 0. This propagates the error to the caller. However there is a catch: if some of the threads don't react properly by exiting on an abort indication, the program will be stuck and you will see no error message until these threads get unstuck, possibly forever.
Options:
die_on_abort => 1/0
(default: 1) If the App was aborted, the harvester will normally confess after if had joined all the threads and disposed of the App. Setting this option to 0 will make the aborts silently ignored. This option does not affect the errors in joining the threads: if any of those are detected, harvester will still confess after it had disposed of the app.
$dead = $app->harvestOnce();
Do one run of the harvesting. Joins all the threads that have exited since its last call. If no threads have exited since then, returns immediately. Returns 1 if all the threads have exited (and thus the App is dead), 0 otherwise. If a thread join fails, immediately confesses (if multiple threads were ready for joining, the ones queued after the failed one won't be joined, call harvestOnce() once more to join them).
$app->waitNeedHarvest();
Wait for at least one thread to become ready for harvesting. If the App is already dead (i.e. all its threads have exited), returns immediately.
These two methods allow to write the custom harvesters if you're not happy with the default one. The basic harvester logic can be written as:
do {
$app->waitNeedHarvest()
} while(!$app->harvestOnce());
$app->drop();
However the real harvester also does some smarter things around the error handling. You can look it up in the source code in cpp/app/App.cpp.
$res = Triceps::App::isDead($appOrName);
Returns 1 if the App is dead (i.e. has no more live threads), otherwise 0. Calling this method with a name for the argument is probably a bad idea, since normally the harvester will drop the App quickly after it becomes dead, and you may end up with this method confessing when it could not find the dropped App.
$res = Triceps::App::isShutdown($appOrName);
Returns 1 if the App has been requested to shut down, either normally or by being aborted.
$res = Triceps::App::isAborted($appOrName);
Returns 1 if the App has been aborted. The App may be aborted explicitly by calling the method abortBy(), or the thread wrapper logic automatically converts any unexpected deaths in the App's threads to the aborts. If any thread dies, this aborts the App, which in turn requests the other threads to die on their next thread-related call. Eventually the harvester collects them all and confesses, normally making the whole program die with an error.
($tname, $message) = Triceps::App::getAborted($appOrName);
Get the App abort information: name of the thread that caused the abort, and its error message.
Triceps::App::abortBy($appOrName, $tname, $msg);
Abort the application. The thread name and message will be remembered, and returned later by getAborted() or in the harvester. If abortBy() is called multiple times, only the first pair of thread name and message gets remembered. The reason is that on abort all the threads get interrupted in a fairly rough manner (all their ongoing and following calls to the threading API die), which typically causes them to call abortBy() as well, and there is no point in collecting these spurious messages.
The thread name here doesn't have to be the name of the actual thread that reports the issue. For example, if the thread creation as such fails (maybe because the OS limit on the thread count) that gets detected by the parent thread but reported in the name of the thread whose creation has failed. And in general you can pass just any string as the thread name, App itself doesn't care, just make it something that makes sense to you.
$res = Triceps::App::isDead($appOrName);
Returns 1 if the App is dead (i.e. it has no alive Trieads, all the defined and declared threads have exited). Right after the App is created, before the first Triead is created, the App is also considered dead, and becomes alive when the first Triead is declared or defined. If an App becomes dead later, when all the Trieads exit, it can still be brought back alive by creating more Trieads. But this is considered bad practice, and will cause a race with the harvester (if you want to do this, you have to make your own custom harvester).
$res = Triceps::App::isShutdown($appOrName);
Returns 1 if the App was requested to shut down. The Trieads might still run for some time, until they properly detect and process the shutdown, and exit. So this condition is not equivalent to Dead, althouh they are connected. If any new Trieads get started, they will be shut down right away and won't run.
To reiterate: if all the Trieads just exit by themselves, the App becomes dead but not shut down. You could still start more Trieads and bring the App back alive. If the App has been shut down, it won't become immediately dead, but it will send the shutdown indication to all the Trieads, and after all of them eventually exit, the App will become dead too. And after shutdown there is no way to bring the App back alive, since any new Trieads will be shut down right away (OK, there might be a short period until they detect the shutdown, so the App could spike as alive for a short time, but then will become dead again).
Triceps::App::waitDead($appOrName);
Will wait for the App to become dead and return after that. Make sure to not call waitDead() from any of App's Trieads: that would cause a deadlock.
Triceps::App::shutdown($appOrName);
Shut down the App. The shutdown state is sticky, so any repeated calls will have no effect. The call returns immediately and doesn't wait for the App to die. If you want to wait, call waitDead() afterwards. Make sure to not call waitDead() from a Triead: that would cause a deadlock.
Triceps::App::shutdownFragment($appOrName, $fragName);
Shut down a named fragment. This does not shut down the whole App, it just selectively shuts down the Trieads belonging to this fragment . See the explanation of the fragments in http://babkin-cep.blogspot.com/2013/03/triceps-multithreading-concepts.html. The fragment shutdown is not sticky: after a fragment has been shut down, it's possible to create another fragment with the same name. To avoid races, a fragment may be shut down only after all its Trieads are ready. So the caller Triead must call readyReady() before it calls shutdownFragment(). If any of the fragment's Trieads are not ready, the call will confess.
Tuesday, April 30, 2013
ThreadedServer, part 2
The next part of the ThreadedServer is listen(), the function that gets called from the listener thread and takes care of accepting the connections and spawning the per-client threads.
sub listen # ($optName => $optValue, ...)
{
my $myname = "Triceps::X::ThreadedServer::listen";
my $opts = {};
my @myOpts = (
owner => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "Triceps::TrieadOwner") } ],
socket => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "IO::Socket") } ],
prefix => [ undef, \&Triceps::Opt::ck_mandatory ],
handler => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "CODE") } ],
pass => [ undef, sub { &Triceps::Opt::ck_ref(@_, "ARRAY") } ],
);
&Triceps::Opt::parse($myname, $opts, {
@myOpts,
'*' => [],
}, @_);
my $owner = $opts->{owner};
my $app = $owner->app();
my $prefix = $opts->{prefix};
my $sock = $opts->{socket};
The first part is traditional, the only thing to note is that it also saves the list of options in a variable for the future use (startServer() did that too but I forgot to mention it). And the option "*" is the pass-through: it gets all the unknown options collected in $opts->{"*"}.
my $clid = 0; # client id
while(!$owner->isRqDead()) {
my $client = $sock->accept();
if (!defined $client) {
my $err = "$!"; # or th etext message will be reset by isRqDead()
if ($owner->isRqDead()) {
last;
} elsif($!{EAGAIN} || $!{EINTR}) { # numeric codes don't get reset
next;
} else {
confess "$myname: accept failed: $err";
}
}
The accept loop starts. It runs until the listening socket gets revoked by shutdown, the revocation is done by dup2() of a file descriptor from /dev/null over it. Note that this function itself doesn't request to track the file descriptor for revocation. That's the caller's responsibility.
Oh, and by the way, if you've been tracing the possible ways of thread execution closely, you might be wondering: what if the shutdown happens after the socket is opened but before it is tracked? This can't really happen to the listening socket but can happen with the accepted client sockets. The answer is that the tracking enrollment will check whether the shutdown already happened, and if so, it will revoke the socket right away, before returning. So the reading loop will find the socket revoked right on its first iteration.
The revocation also sends the signal SIGUSR2 to the thread. This is done because the calls like accept() do not return on a simple revocation by dup2() but a signal does interrupt them. Triceps registers an empty handler for SIGUSR2 that just immediately returns, but the accept() system call gets interrupted, and the thread gets a chance to check for shutdown, and even if it calls accept() again, this will return an error and force it check for shutdown again.
And by the way, the Perl's threads::kill doesn't send a real signal, it just sets a flag for the interpreter. If you try it on your own, it won't interrupt the system calls, and now you know why. Instead Triceps gets the POSIX thread identity from the Perl thread and calls the honest ptherad_kill() from the C++ code.
So, the main loop goes by the shutdown condition, isRdDead() tells if this thread has been requested to die. After accept(), it checks for errors. The first thing to check for is again the isRdDead(), because the revocation will manifest as a socket operation error, and there is no point in reporting this spurious error. However, like other Triceps calls, isRdDead() will clear the error text, and the text has to be saved first. If the shutdown is found, the loop exits. Then the check for the spurious interruptions is done, and for them the loop continues. Funny enough, the $!{} uses the numeric part of $! that is independent from its text part and doesn't get cleared by the Triceps calls. And on any other errors the thread confesses. This will unroll the stack, eventually get caught by the Triceps threading code, abort the App, and propagate the error message to the harvester.
$clid++;
my $cliname = "$prefix$clid";
$app->storeCloseFile($cliname, $client);
Triceps::Triead::start(
app => $app->getName(),
thread => $cliname,
fragment => $cliname,
main => $opts->{handler},
socketName => $cliname,
&Triceps::Opt::drop({ @myOpts }, \@_),
);
# Doesn't wait for the new thread(s) to become ready.
}
}
If a proper connection has been received, the socket gets stored into the App with a unique name, for later load by the per-client thread. And then the per-client thread gets started.
The drop passes through all the original options except for the ones handled by this thread. In retrospect, this is not the best solution for this case. It would be better to just use @{$opts->{"*"}} instead. The drop call is convenient when not all the explicitly recognized options but only a part of them has to be dropped.
After starting the thread, the loop doesn't call readyReady() but goes for the next iteration. This is basically because it doesn't care about the started thread and doesn't ever send anything to it. And waiting for the threads to start will make the loop slower, possibly overflowing the socket's listening queue and dropping the incoming connections if they arrive very fast.
And the last part of the ThreadedServer is the printOrShut:
sub printOrShut # ($app, $fragment, $sock, @text)
{
my $app = shift;
my $fragment = shift;
my $sock = shift;
undef $!;
print $sock @_;
$sock->flush();
if ($!) { # can't write, so shutdown
Triceps::App::shutdownFragment($app, $fragment);
}
}
Nothing too complicated. Prints the text into the socket, flushes it and checks for errors. On errors shuts down the fragment. In this case there is no need for draining. After all, the socket leading to the client is dead and there is no way to send anything more through it, so there is no point in worrying about any unsent data. Just shut down as fast as it can, before the threads have generated more data that can't be sent any more. Any data queued in the nexuses for the shut down threads will be discarded.
sub listen # ($optName => $optValue, ...)
{
my $myname = "Triceps::X::ThreadedServer::listen";
my $opts = {};
my @myOpts = (
owner => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "Triceps::TrieadOwner") } ],
socket => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "IO::Socket") } ],
prefix => [ undef, \&Triceps::Opt::ck_mandatory ],
handler => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "CODE") } ],
pass => [ undef, sub { &Triceps::Opt::ck_ref(@_, "ARRAY") } ],
);
&Triceps::Opt::parse($myname, $opts, {
@myOpts,
'*' => [],
}, @_);
my $owner = $opts->{owner};
my $app = $owner->app();
my $prefix = $opts->{prefix};
my $sock = $opts->{socket};
The first part is traditional, the only thing to note is that it also saves the list of options in a variable for the future use (startServer() did that too but I forgot to mention it). And the option "*" is the pass-through: it gets all the unknown options collected in $opts->{"*"}.
my $clid = 0; # client id
while(!$owner->isRqDead()) {
my $client = $sock->accept();
if (!defined $client) {
my $err = "$!"; # or th etext message will be reset by isRqDead()
if ($owner->isRqDead()) {
last;
} elsif($!{EAGAIN} || $!{EINTR}) { # numeric codes don't get reset
next;
} else {
confess "$myname: accept failed: $err";
}
}
The accept loop starts. It runs until the listening socket gets revoked by shutdown, the revocation is done by dup2() of a file descriptor from /dev/null over it. Note that this function itself doesn't request to track the file descriptor for revocation. That's the caller's responsibility.
Oh, and by the way, if you've been tracing the possible ways of thread execution closely, you might be wondering: what if the shutdown happens after the socket is opened but before it is tracked? This can't really happen to the listening socket but can happen with the accepted client sockets. The answer is that the tracking enrollment will check whether the shutdown already happened, and if so, it will revoke the socket right away, before returning. So the reading loop will find the socket revoked right on its first iteration.
The revocation also sends the signal SIGUSR2 to the thread. This is done because the calls like accept() do not return on a simple revocation by dup2() but a signal does interrupt them. Triceps registers an empty handler for SIGUSR2 that just immediately returns, but the accept() system call gets interrupted, and the thread gets a chance to check for shutdown, and even if it calls accept() again, this will return an error and force it check for shutdown again.
And by the way, the Perl's threads::kill doesn't send a real signal, it just sets a flag for the interpreter. If you try it on your own, it won't interrupt the system calls, and now you know why. Instead Triceps gets the POSIX thread identity from the Perl thread and calls the honest ptherad_kill() from the C++ code.
So, the main loop goes by the shutdown condition, isRdDead() tells if this thread has been requested to die. After accept(), it checks for errors. The first thing to check for is again the isRdDead(), because the revocation will manifest as a socket operation error, and there is no point in reporting this spurious error. However, like other Triceps calls, isRdDead() will clear the error text, and the text has to be saved first. If the shutdown is found, the loop exits. Then the check for the spurious interruptions is done, and for them the loop continues. Funny enough, the $!{} uses the numeric part of $! that is independent from its text part and doesn't get cleared by the Triceps calls. And on any other errors the thread confesses. This will unroll the stack, eventually get caught by the Triceps threading code, abort the App, and propagate the error message to the harvester.
$clid++;
my $cliname = "$prefix$clid";
$app->storeCloseFile($cliname, $client);
Triceps::Triead::start(
app => $app->getName(),
thread => $cliname,
fragment => $cliname,
main => $opts->{handler},
socketName => $cliname,
&Triceps::Opt::drop({ @myOpts }, \@_),
);
# Doesn't wait for the new thread(s) to become ready.
}
}
If a proper connection has been received, the socket gets stored into the App with a unique name, for later load by the per-client thread. And then the per-client thread gets started.
The drop passes through all the original options except for the ones handled by this thread. In retrospect, this is not the best solution for this case. It would be better to just use @{$opts->{"*"}} instead. The drop call is convenient when not all the explicitly recognized options but only a part of them has to be dropped.
After starting the thread, the loop doesn't call readyReady() but goes for the next iteration. This is basically because it doesn't care about the started thread and doesn't ever send anything to it. And waiting for the threads to start will make the loop slower, possibly overflowing the socket's listening queue and dropping the incoming connections if they arrive very fast.
And the last part of the ThreadedServer is the printOrShut:
sub printOrShut # ($app, $fragment, $sock, @text)
{
my $app = shift;
my $fragment = shift;
my $sock = shift;
undef $!;
print $sock @_;
$sock->flush();
if ($!) { # can't write, so shutdown
Triceps::App::shutdownFragment($app, $fragment);
}
}
Nothing too complicated. Prints the text into the socket, flushes it and checks for errors. On errors shuts down the fragment. In this case there is no need for draining. After all, the socket leading to the client is dead and there is no way to send anything more through it, so there is no point in worrying about any unsent data. Just shut down as fast as it can, before the threads have generated more data that can't be sent any more. Any data queued in the nexuses for the shut down threads will be discarded.
Monday, April 29, 2013
Multithreaded socket server, part4, socket reader thread
As I've said before, each socket gets served by two threads: one sits reading from the socket and forwards the data into the model and another one sits getting data from the model and forwards it into the socket. Since the same thread can't wait for both a socket descriptor and a thread synchronization primitive, so they have to be separate.
The first thread started is the socket reader. Let's go through it bit by bit.
sub chatSockReadT
{
my $opts = {};
&Triceps::Opt::parse("chatSockReadT", $opts, {@Triceps::Triead::opts,
socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
}, @_);
undef @_; # avoids a leak in threads module
my $owner = $opts->{owner};
my $app = $owner->app();
my $unit = $owner->unit();
my $tname = $opts->{thread};
# only dup the socket, the writer thread will consume it
my ($tsock, $sock) = $owner->trackDupSocket($opts->{socketName}, "<");
The beginning is quite usual. Then it loads the socked from the App and gets it tracked with the TrieadOwner. The difference between trackDupSocket() here and the trackGetSocket() used before is that trackDupSocket() leaves the socked copy in the App, to be found by the writer-side thread.
The socket is reopened in this thread as read-only. The writing to the socket from all the threads has to be synchronized to avoid mixing the half-messages. And the easiest way to synchronize is to always write from one thread, and if the other thread wants to write something, it has to pass the data to the writer thread through the control nexus.
# user messages will be sent here
my $faChat = $owner->importNexus(
from => "global/chat",
import => "writer",
);
# control messages to the reader side will be sent here
my $faCtl = $owner->makeNexus(
name => "ctl",
labels => [
ctl => $faChat->impRowType("ctl"),
],
reverse => 1, # gives this nexus a high priority
import => "writer",
);
Imports the chat nexus and creates the private control nexus for communication with the writer side. The name of the chat nexus is hardcoded hare, since it's pretty much a solid part of the application. If this were a module, the name of the chat nexus could be passed through the options.
The control nexus is marked as reverse even though it really isn't. But the reverse option has a side effect of making this nexus high-priority. Even if the writer thread has a long queue of messages from the chat nexus, the messages from the control nexus will be read first. Which again isn't strictly necessary here, but I wanted to show how it's done.
The type of the control label is imported from the chat nexus, so it doesn't have to be defined from scratch.
$owner->markConstructed();
Triceps::Triead::start(
app => $opts->{app},
thread => "$tname.rd",
fragment => $opts->{fragment},
main => \&chatSockWriteT,
socketName => $opts->{socketName},
ctlFrom => "$tname/ctl",
);
$owner->readyReady();
Then the construction is done and the writer thread gets started. And then the thread is ready and waits for the writer thread to be ready too. The readyReady() works in the fragments just as it does at the start of the app. Whenever a new thread is started, the App becomes not ready, and stays this way until all the threads report that they are ready. The rest of the App keeps working like nothing happened, at least sort of. Whenever a nexus is imported, the messages from this nexus start collecting for this thread, and if there are many of them, the nexus will become backed up and the threads writing to them will block. The new threads have to call readyReady() as usual to synchronize between themselves, and then everything gets on its way.
Of course, if two connections are received in a quick succession, that would start two sets of threads, and readyReady() will continue only after all of them are ready.
my $lbChat = $faChat->getLabel("msg");
my $lbCtl = $faCtl->getLabel("ctl");
$unit->makeHashCall($lbCtl, "OP_INSERT", cmd => "print", arg => "!ready," . $opts->{fragment});
$owner->flushWriters();
A couple of labels get remembered for the future use, and the connection ready message gets sent to the writer thread through the control nexus. By convention of this application, the messages go in the CVS format, with the control messages starting with "!". If this is the first client, this would send
!ready,cliconn1
to the client. It's important to call flushWriters() every time to get the message(s) delivered.
while(<$sock>) {
s/[\r\n]+$//;
my @data = split(/,/);
if ($data[0] eq "exit") {
last; # a special case, handle in this thread
} elsif ($data[0] eq "kill") {
eval {$app->shutdownFragment($data[1]);};
if ($@) {
$unit->makeHashCall($lbCtl, "OP_INSERT", cmd => "print", arg => "!error,$@");
$owner->flushWriters();
}
} elsif ($data[0] eq "shutdown") {
$unit->makeHashCall($lbChat, "OP_INSERT", topic => "*", msg => "server shutting down");
$owner->flushWriters();
Triceps::AutoDrain::makeShared($owner);
eval {$app->shutdown();};
} elsif ($data[0] eq "publish") {
$unit->makeHashCall($lbChat, "OP_INSERT", topic => $data[1], msg => $data[2]);
$owner->flushWriters();
} else {
# this is not something you want to do in a real chat application
# but it's cute for a demonstration
$unit->makeHashCall($lbCtl, "OP_INSERT", cmd => $data[0], arg => $data[1]);
$owner->flushWriters();
}
}
The main loop keeps reading lines from the socket and interpreting them. The lines are in CSV format, and the first field is the command and the rest are the arguments (if any). The commands are:
publish - send a message with a topic to the chat nexus
exit - close the connection
kill - close another connection, by name
shutdown - shut down the server
subscribe - subscribe the client to a topic
unsibscribe - unsubscribe the client from a topic
The exit just exits the loop, since it works the same if the socket just gets closed from the other side.
The kill shuts down by name the fragment where the threads of the other connection belongs. This is a simple application, so it doesn't check any permissions, whether this fragment should allowed to be shut down. If there is no such fragment, the shutdown call will silently do nothing, so the error check and reporting is really redundant (if something goes grossly wrong in the thread interruption code, an error might still occur, but theoretically this should never happen).
The shutdown sends the notification to the common topic "*" (to which all the clients are subscribed by default), then drains the model and shuts it down. The drain makes sure that all the messages in the model get processed (and even written to the socket) without allowing any new messages to be injected. "Shared" means that there is no special exceptions for some threads.
makeShared() actually creates a drain object that keeps the drain active during its lifetime. Here this object is not assigned anywhere, so it gets immediately destroyed and lifts the drain. So potentially more messages can get squeezed in between this point and shutdown. Which doesn't matter a whole lot here.
If it were really important that nothing get sent after the shutdown notification, it could be done like this (this is an untested fragment, so it might contain typos):
} elsif ($data[0] eq "shutdown") {
my $drain = Triceps::AutoDrain::makeExclusive($owner);
$unit->makeHashCall($lbChat, "OP_INSERT", topic => "*", msg => "server shutting down");
$owner->flushWriters();
$drain->wait();
eval {$app->shutdown();};
}
This starts the drain, but this time the exclusive mode means that this thread is allowed to send more data. When the drain is created, it waits for success, so when the new message is inserted, it will be after all the other messages. $drain->wait() does another wait and makes sure that this last message propagates all the way. And then the app gets shut down, while the drain is still in effect, so no more messages can be sent for sure.
The publish sends the data to the chat nexus (note the flushWriters(), as usual!).
And the rest of commands (that would be subscribe and unsubscribe but you can do any other commands like "print") get simply forwarded to the reader thread for execution. Sending through the commands like this without testing is not a good practice for a real application but it's cute for a demo.
{
# let the data drain through
my $drain = Triceps::AutoDrain::makeExclusive($owner);
# send the notification - can do it because the drain is excluding itself
$unit->makeHashCall($lbCtl, "OP_INSERT", cmd => "print", arg => "!exiting");
$owner->flushWriters();
$drain->wait(); # wait for the notification to drain
$app->shutdownFragment($opts->{fragment});
}
$tsock->close(); # not strictly necessary
}
The last part is when the connection get closed, either by the "exit" command or when the socket gets closed. Remember, the socket can get closed asymmetrically, in one direction, so even when the reading is closed, the writing may still work and needs to return the responses to any commands received from the socket. And of course the same is true for the "exit" command.
So here the full exclusive drain sequence is used, ending with the shutdown of this thread's own fragment, which will close the socket. Even though only one fragment needs to be shut down, the drain drains the whole model. Because of the potentially complex interdependencies, there is no way to reliably drain only a part, and all the drains are App-wide.
The last part, with $tsock->close(), is not technically necessary since the shutdown of the fragment will get the socket descriptor revoked anyway. But other than that, it's a good practice that unregisters the socket from the TrieadOwner and then closes it.
The first thread started is the socket reader. Let's go through it bit by bit.
sub chatSockReadT
{
my $opts = {};
&Triceps::Opt::parse("chatSockReadT", $opts, {@Triceps::Triead::opts,
socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
}, @_);
undef @_; # avoids a leak in threads module
my $owner = $opts->{owner};
my $app = $owner->app();
my $unit = $owner->unit();
my $tname = $opts->{thread};
# only dup the socket, the writer thread will consume it
my ($tsock, $sock) = $owner->trackDupSocket($opts->{socketName}, "<");
The beginning is quite usual. Then it loads the socked from the App and gets it tracked with the TrieadOwner. The difference between trackDupSocket() here and the trackGetSocket() used before is that trackDupSocket() leaves the socked copy in the App, to be found by the writer-side thread.
The socket is reopened in this thread as read-only. The writing to the socket from all the threads has to be synchronized to avoid mixing the half-messages. And the easiest way to synchronize is to always write from one thread, and if the other thread wants to write something, it has to pass the data to the writer thread through the control nexus.
# user messages will be sent here
my $faChat = $owner->importNexus(
from => "global/chat",
import => "writer",
);
# control messages to the reader side will be sent here
my $faCtl = $owner->makeNexus(
name => "ctl",
labels => [
ctl => $faChat->impRowType("ctl"),
],
reverse => 1, # gives this nexus a high priority
import => "writer",
);
Imports the chat nexus and creates the private control nexus for communication with the writer side. The name of the chat nexus is hardcoded hare, since it's pretty much a solid part of the application. If this were a module, the name of the chat nexus could be passed through the options.
The control nexus is marked as reverse even though it really isn't. But the reverse option has a side effect of making this nexus high-priority. Even if the writer thread has a long queue of messages from the chat nexus, the messages from the control nexus will be read first. Which again isn't strictly necessary here, but I wanted to show how it's done.
The type of the control label is imported from the chat nexus, so it doesn't have to be defined from scratch.
$owner->markConstructed();
Triceps::Triead::start(
app => $opts->{app},
thread => "$tname.rd",
fragment => $opts->{fragment},
main => \&chatSockWriteT,
socketName => $opts->{socketName},
ctlFrom => "$tname/ctl",
);
$owner->readyReady();
Then the construction is done and the writer thread gets started. And then the thread is ready and waits for the writer thread to be ready too. The readyReady() works in the fragments just as it does at the start of the app. Whenever a new thread is started, the App becomes not ready, and stays this way until all the threads report that they are ready. The rest of the App keeps working like nothing happened, at least sort of. Whenever a nexus is imported, the messages from this nexus start collecting for this thread, and if there are many of them, the nexus will become backed up and the threads writing to them will block. The new threads have to call readyReady() as usual to synchronize between themselves, and then everything gets on its way.
Of course, if two connections are received in a quick succession, that would start two sets of threads, and readyReady() will continue only after all of them are ready.
my $lbChat = $faChat->getLabel("msg");
my $lbCtl = $faCtl->getLabel("ctl");
$unit->makeHashCall($lbCtl, "OP_INSERT", cmd => "print", arg => "!ready," . $opts->{fragment});
$owner->flushWriters();
A couple of labels get remembered for the future use, and the connection ready message gets sent to the writer thread through the control nexus. By convention of this application, the messages go in the CVS format, with the control messages starting with "!". If this is the first client, this would send
!ready,cliconn1
to the client. It's important to call flushWriters() every time to get the message(s) delivered.
while(<$sock>) {
s/[\r\n]+$//;
my @data = split(/,/);
if ($data[0] eq "exit") {
last; # a special case, handle in this thread
} elsif ($data[0] eq "kill") {
eval {$app->shutdownFragment($data[1]);};
if ($@) {
$unit->makeHashCall($lbCtl, "OP_INSERT", cmd => "print", arg => "!error,$@");
$owner->flushWriters();
}
} elsif ($data[0] eq "shutdown") {
$unit->makeHashCall($lbChat, "OP_INSERT", topic => "*", msg => "server shutting down");
$owner->flushWriters();
Triceps::AutoDrain::makeShared($owner);
eval {$app->shutdown();};
} elsif ($data[0] eq "publish") {
$unit->makeHashCall($lbChat, "OP_INSERT", topic => $data[1], msg => $data[2]);
$owner->flushWriters();
} else {
# this is not something you want to do in a real chat application
# but it's cute for a demonstration
$unit->makeHashCall($lbCtl, "OP_INSERT", cmd => $data[0], arg => $data[1]);
$owner->flushWriters();
}
}
The main loop keeps reading lines from the socket and interpreting them. The lines are in CSV format, and the first field is the command and the rest are the arguments (if any). The commands are:
publish - send a message with a topic to the chat nexus
exit - close the connection
kill - close another connection, by name
shutdown - shut down the server
subscribe - subscribe the client to a topic
unsibscribe - unsubscribe the client from a topic
The exit just exits the loop, since it works the same if the socket just gets closed from the other side.
The kill shuts down by name the fragment where the threads of the other connection belongs. This is a simple application, so it doesn't check any permissions, whether this fragment should allowed to be shut down. If there is no such fragment, the shutdown call will silently do nothing, so the error check and reporting is really redundant (if something goes grossly wrong in the thread interruption code, an error might still occur, but theoretically this should never happen).
The shutdown sends the notification to the common topic "*" (to which all the clients are subscribed by default), then drains the model and shuts it down. The drain makes sure that all the messages in the model get processed (and even written to the socket) without allowing any new messages to be injected. "Shared" means that there is no special exceptions for some threads.
makeShared() actually creates a drain object that keeps the drain active during its lifetime. Here this object is not assigned anywhere, so it gets immediately destroyed and lifts the drain. So potentially more messages can get squeezed in between this point and shutdown. Which doesn't matter a whole lot here.
If it were really important that nothing get sent after the shutdown notification, it could be done like this (this is an untested fragment, so it might contain typos):
} elsif ($data[0] eq "shutdown") {
my $drain = Triceps::AutoDrain::makeExclusive($owner);
$unit->makeHashCall($lbChat, "OP_INSERT", topic => "*", msg => "server shutting down");
$owner->flushWriters();
$drain->wait();
eval {$app->shutdown();};
}
This starts the drain, but this time the exclusive mode means that this thread is allowed to send more data. When the drain is created, it waits for success, so when the new message is inserted, it will be after all the other messages. $drain->wait() does another wait and makes sure that this last message propagates all the way. And then the app gets shut down, while the drain is still in effect, so no more messages can be sent for sure.
The publish sends the data to the chat nexus (note the flushWriters(), as usual!).
And the rest of commands (that would be subscribe and unsubscribe but you can do any other commands like "print") get simply forwarded to the reader thread for execution. Sending through the commands like this without testing is not a good practice for a real application but it's cute for a demo.
{
# let the data drain through
my $drain = Triceps::AutoDrain::makeExclusive($owner);
# send the notification - can do it because the drain is excluding itself
$unit->makeHashCall($lbCtl, "OP_INSERT", cmd => "print", arg => "!exiting");
$owner->flushWriters();
$drain->wait(); # wait for the notification to drain
$app->shutdownFragment($opts->{fragment});
}
$tsock->close(); # not strictly necessary
}
The last part is when the connection get closed, either by the "exit" command or when the socket gets closed. Remember, the socket can get closed asymmetrically, in one direction, so even when the reading is closed, the writing may still work and needs to return the responses to any commands received from the socket. And of course the same is true for the "exit" command.
So here the full exclusive drain sequence is used, ending with the shutdown of this thread's own fragment, which will close the socket. Even though only one fragment needs to be shut down, the drain drains the whole model. Because of the potentially complex interdependencies, there is no way to reliably drain only a part, and all the drains are App-wide.
The last part, with $tsock->close(), is not technically necessary since the shutdown of the fragment will get the socket descriptor revoked anyway. But other than that, it's a good practice that unregisters the socket from the TrieadOwner and then closes it.
Thursday, April 11, 2013
Multithreaded pipeline, part 2
The reader thread drives the pipeline:
sub ReaderMain # (@opts)
{
my $opts = {};
Triceps::Opt::parse("traffic main", $opts, {@Triceps::Triead::opts}, @_);
my $owner = $opts->{owner};
my $unit = $owner->unit();
my $rtPacket = Triceps::RowType->new(
time => "int64", # packet's timestamp, microseconds
local_ip => "string", # string to make easier to read
remote_ip => "string", # string to make easier to read
local_port => "int32",
remote_port => "int32",
bytes => "int32", # size of the packet
) or confess "$!";
my $rtPrint = Triceps::RowType->new(
text => "string", # the text to print (including \n)
) or confess "$!";
my $rtDumprq = Triceps::RowType->new(
what => "string", # identifies, what to dump
) or confess "$!";
my $faOut = $owner->makeNexus(
name => "data",
labels => [
packet => $rtPacket,
print => $rtPrint,
dumprq => $rtDumprq,
],
import => "writer",
);
my $lbPacket = $faOut->getLabel("packet");
my $lbPrint = $faOut->getLabel("print");
my $lbDumprq = $faOut->getLabel("dumprq");
$owner->readyReady();
while(<STDIN>) {
chomp;
# print the input line, as a debugging exercise
$unit->makeArrayCall($lbPrint, "OP_INSERT", "! $_\n");
my @data = split(/,/); # starts with a command, then string opcode
my $type = shift @data;
if ($type eq "new") {
$unit->makeArrayCall($lbPacket, @data);
} elsif ($type eq "dump") {
$unit->makeArrayCall($lbDumprq, "OP_INSERT", $data[0]);
} else {
$unit->makeArrayCall($lbPrint, "OP_INSERT", "Unknown command '$type'\n");
}
$owner->flushWriters();
}
{
# drain the pipeline before shutting down
my $ad = Triceps::AutoDrain::makeShared($owner);
$owner->app()->shutdown();
}
}
It starts by creating the nexus with the initial set of the labels: for the data about the network packets, for the lines to be printed at the end of the pipeline and for the dump requests to the tables in the other threads. It gets exported for the other threads to import, and also imported right back into this thread, for writing. And then the setup is done, readyReady() is called, and the processing starts.
It reads the CSV lines, splits them, makes a decision if it's a data line or dump request, and one way or the other sends it into the nexus. The data sent to a facet doesn't get immediately forwarded to the nexus. It's collected internally in a tray, and then flushWriters() sends it on. The mainLoop() shown in the last post calls flushWriters() automatically after every tray it processes from the input. But when reading from a file you've got to do it yourself. Of course, it's more efficient to send through multiple rows at once, so a smarter implementation would check if multiple lines are available from the file and send them in larger bundles.
The last part is the shutdown. After the end of file is reached, it's time to shut down the application. You can't just shut down it right away because there still might be data in the pipeline, and if you shut it down, that data will be lost. The right way is to drain the pipeline first, and then do the shutdown when the app is drained. AutoDrain::makeShared() creates a scoped drain: the drain request for all the threads is started when this object is created, and the object construction completes when the drain succeeds. When the object is destroyed, that lifts the drain. So in this case the drain succeeds and then the app gets shut down.
The shutdown causes the mainLoop() calls in all the other threads to return, and the threads to exit. Then startHere() in the first thread has the special logic in it that joins all the started threads after its own main function returns and before it completes. After that the script continues on its way and is free to exit.
sub ReaderMain # (@opts)
{
my $opts = {};
Triceps::Opt::parse("traffic main", $opts, {@Triceps::Triead::opts}, @_);
my $owner = $opts->{owner};
my $unit = $owner->unit();
my $rtPacket = Triceps::RowType->new(
time => "int64", # packet's timestamp, microseconds
local_ip => "string", # string to make easier to read
remote_ip => "string", # string to make easier to read
local_port => "int32",
remote_port => "int32",
bytes => "int32", # size of the packet
) or confess "$!";
my $rtPrint = Triceps::RowType->new(
text => "string", # the text to print (including \n)
) or confess "$!";
my $rtDumprq = Triceps::RowType->new(
what => "string", # identifies, what to dump
) or confess "$!";
my $faOut = $owner->makeNexus(
name => "data",
labels => [
packet => $rtPacket,
print => $rtPrint,
dumprq => $rtDumprq,
],
import => "writer",
);
my $lbPacket = $faOut->getLabel("packet");
my $lbPrint = $faOut->getLabel("print");
my $lbDumprq = $faOut->getLabel("dumprq");
$owner->readyReady();
while(<STDIN>) {
chomp;
# print the input line, as a debugging exercise
$unit->makeArrayCall($lbPrint, "OP_INSERT", "! $_\n");
my @data = split(/,/); # starts with a command, then string opcode
my $type = shift @data;
if ($type eq "new") {
$unit->makeArrayCall($lbPacket, @data);
} elsif ($type eq "dump") {
$unit->makeArrayCall($lbDumprq, "OP_INSERT", $data[0]);
} else {
$unit->makeArrayCall($lbPrint, "OP_INSERT", "Unknown command '$type'\n");
}
$owner->flushWriters();
}
{
# drain the pipeline before shutting down
my $ad = Triceps::AutoDrain::makeShared($owner);
$owner->app()->shutdown();
}
}
It starts by creating the nexus with the initial set of the labels: for the data about the network packets, for the lines to be printed at the end of the pipeline and for the dump requests to the tables in the other threads. It gets exported for the other threads to import, and also imported right back into this thread, for writing. And then the setup is done, readyReady() is called, and the processing starts.
It reads the CSV lines, splits them, makes a decision if it's a data line or dump request, and one way or the other sends it into the nexus. The data sent to a facet doesn't get immediately forwarded to the nexus. It's collected internally in a tray, and then flushWriters() sends it on. The mainLoop() shown in the last post calls flushWriters() automatically after every tray it processes from the input. But when reading from a file you've got to do it yourself. Of course, it's more efficient to send through multiple rows at once, so a smarter implementation would check if multiple lines are available from the file and send them in larger bundles.
The last part is the shutdown. After the end of file is reached, it's time to shut down the application. You can't just shut down it right away because there still might be data in the pipeline, and if you shut it down, that data will be lost. The right way is to drain the pipeline first, and then do the shutdown when the app is drained. AutoDrain::makeShared() creates a scoped drain: the drain request for all the threads is started when this object is created, and the object construction completes when the drain succeeds. When the object is destroyed, that lifts the drain. So in this case the drain succeeds and then the app gets shut down.
The shutdown causes the mainLoop() calls in all the other threads to return, and the threads to exit. Then startHere() in the first thread has the special logic in it that joins all the started threads after its own main function returns and before it completes. After that the script continues on its way and is free to exit.
Subscribe to:
Posts (Atom)