The App API has two parts to it: The first part keeps track of all the App instances in the program, and allows to list and find them. The second part is the manipulations on a particular instance.
The global part of the API is:
@apps = Triceps::App::listApps();
Returns the array of name-value pairs, values containing the App references, listing all the Apps in the program (more exactly, in this Triceps library, you you compile multiple Triceps libraries together by renaming them, each of them will have its own Apps list). The returned array can be placed into a hash.
$app = Triceps::App::find($name);
Find an App by name. If an App with such a name does not exist, it will confess.
$app = Triceps::App:make($name);
Create a new App, give it the specified name, and put it on the list. The names must not be duplicate with the other existing Apps, or the method will confess.
drop($app);
drop($appName);
Drop the app, by reference or by name, from the list. The App is as usual reference-counted and will exist while there are references to it. The global list provides one of these references, so an App is guaranteed to exist while it's still on the list. When dropped, it will still be accessible through the existing references, but obviously will not be listed any more and could not be found by name.
Moreover, a new App with the same name can be added to the list. Because of this, dropping an App by name requires some care in case if there could be a new App created again with the same name: it creates a potential for a race, and you might end up dropping the new App instead of the old one. Of course, if it's the same thread that drops the old one and creates the new one, then there is no race. Dropping an application by name that doesn't exist at the moment is an error and will confess.
Dropping the App by reference theoretically allows to avoid the race: a specific object gets dropped, and if it already has been dropped, the call has no effect. Theoretically. However in practice Perl has a limitation of passing the object values between threads, and thus whenever each thread starts, first thing it does is finding its App by name. It's a very similar kind of race and is absolutely unavoidable except by making sure that all the App's threads have exited and joined (i.e. harvesting them). So make sure to complete the App's thread harvesting before dropping the App in the harvester thread, and by then it doesn't matter a whole lot if the App is dropped by name or by reference.
Now the second part of the API, working with an App instance.
Many (but not all) of the App methods allow to specify the App either by reference or by name, and they automatically sort it out, doing the internal look-up by name if necessary. So the same method could be used as either of:
$app->method(...);
Triceps::App::method($app, ...);
Triceps::App::method($appName, ...);
Obviously, you can not use the "->" syntax with a name, and obviously if the name is not in the app list, the method will confess. Below I'll show the calls that allow the dual formats as Triceps::App::method($appOrName, ...) but keep in mind that you can use the "->" form of them too with a reference.
$app = Triceps::App::resolve($appOrName);
Do just the automatically-sorting-out part: gets a reference or name and returns a reference either way. A newly created reference is returned in either case (not the argument reference). You can use this resolver before the methods that accept only a reference.
$result = $app->same($app2);
Check if two references are for the same App object. Here they both must be references and not names.
$name = $app->getName();
Get the name of the App, from a reference.
Triceps::App::declareTriead($appOrName, $trieadName);
Declare a Triead (Triceps thread) in advance. Any attempts to look up the nexuses in that thread will then wait for the thread to become ready. (Attempts to look up in an undeclared and undefined thread are errors). This is necessary to prevent a race at the thread creation time. For the most part, the method Triead::start() just does the right thing by calling this method automatically and you don't need the use it manually, except in some very special circumstances.
@trieads = Triceps::App::getTrieads($appOrName);
Get the list of currently defined Trieads, as name-value pairs. Keep in mind that the other threads may be modifying the list of Trieads, so if you do this call multiple times, you may get different results. However the Trieads are returned as references, so they are guaranteed to stay alive and readable even if they get removed from the App, or even if the App gets dropped.
$app->harvester(@options);
Run the harvester in the current thread. The harvester gets notifications from the threads when they are about to exit, and joins them. After all the threads have been joined, it automatically drops the App, and returns.
The harvesting is an absolutely necessary part of the App life cycle, however in most of the usage patterns (such as with Triead::startHere or App::build) the harvester is called implicitly from the wrapping library functions, so you don't need to care about it.
Note also that if you're running the harvester manually, you must call it only after the first thread has been defined or at least declared. Otherwise it will find no threads in the App, consider it dead and immediately drop it.
If the App was aborted, the harvester will normally confess after if had joined all the threads and disposed of the App, unless the option die_on_abort (see below) has been set to 0. This propagates the error to the caller. However there is a catch: if some of the threads don't react properly by exiting on an abort indication, the program will be stuck and you will see no error message until these threads get unstuck, possibly forever.
Options:
die_on_abort => 1/0
(default: 1) If the App was aborted, the harvester will normally confess after if had joined all the threads and disposed of the App. Setting this option to 0 will make the aborts silently ignored. This option does not affect the errors in joining the threads: if any of those are detected, harvester will still confess after it had disposed of the app.
$dead = $app->harvestOnce();
Do one run of the harvesting. Joins all the threads that have exited since its last call. If no threads have exited since then, returns immediately. Returns 1 if all the threads have exited (and thus the App is dead), 0 otherwise. If a thread join fails, immediately confesses (if multiple threads were ready for joining, the ones queued after the failed one won't be joined, call harvestOnce() once more to join them).
$app->waitNeedHarvest();
Wait for at least one thread to become ready for harvesting. If the App is already dead (i.e. all its threads have exited), returns immediately.
These two methods allow to write the custom harvesters if you're not happy with the default one. The basic harvester logic can be written as:
do {
$app->waitNeedHarvest()
} while(!$app->harvestOnce());
$app->drop();
However the real harvester also does some smarter things around the error handling. You can look it up in the source code in cpp/app/App.cpp.
$res = Triceps::App::isDead($appOrName);
Returns 1 if the App is dead (i.e. has no more live threads), otherwise 0. Calling this method with a name for the argument is probably a bad idea, since normally the harvester will drop the App quickly after it becomes dead, and you may end up with this method confessing when it could not find the dropped App.
$res = Triceps::App::isShutdown($appOrName);
Returns 1 if the App has been requested to shut down, either normally or by being aborted.
$res = Triceps::App::isAborted($appOrName);
Returns 1 if the App has been aborted. The App may be aborted explicitly by calling the method abortBy(), or the thread wrapper logic automatically converts any unexpected deaths in the App's threads to the aborts. If any thread dies, this aborts the App, which in turn requests the other threads to die on their next thread-related call. Eventually the harvester collects them all and confesses, normally making the whole program die with an error.
($tname, $message) = Triceps::App::getAborted($appOrName);
Get the App abort information: name of the thread that caused the abort, and its error message.
Triceps::App::abortBy($appOrName, $tname, $msg);
Abort the application. The thread name and message will be remembered, and returned later by getAborted() or in the harvester. If abortBy() is called multiple times, only the first pair of thread name and message gets remembered. The reason is that on abort all the threads get interrupted in a fairly rough manner (all their ongoing and following calls to the threading API die), which typically causes them to call abortBy() as well, and there is no point in collecting these spurious messages.
The thread name here doesn't have to be the name of the actual thread that reports the issue. For example, if the thread creation as such fails (maybe because the OS limit on the thread count) that gets detected by the parent thread but reported in the name of the thread whose creation has failed. And in general you can pass just any string as the thread name, App itself doesn't care, just make it something that makes sense to you.
$res = Triceps::App::isDead($appOrName);
Returns 1 if the App is dead (i.e. it has no alive Trieads, all the defined and declared threads have exited). Right after the App is created, before the first Triead is created, the App is also considered dead, and becomes alive when the first Triead is declared or defined. If an App becomes dead later, when all the Trieads exit, it can still be brought back alive by creating more Trieads. But this is considered bad practice, and will cause a race with the harvester (if you want to do this, you have to make your own custom harvester).
$res = Triceps::App::isShutdown($appOrName);
Returns 1 if the App was requested to shut down. The Trieads might still run for some time, until they properly detect and process the shutdown, and exit. So this condition is not equivalent to Dead, althouh they are connected. If any new Trieads get started, they will be shut down right away and won't run.
To reiterate: if all the Trieads just exit by themselves, the App becomes dead but not shut down. You could still start more Trieads and bring the App back alive. If the App has been shut down, it won't become immediately dead, but it will send the shutdown indication to all the Trieads, and after all of them eventually exit, the App will become dead too. And after shutdown there is no way to bring the App back alive, since any new Trieads will be shut down right away (OK, there might be a short period until they detect the shutdown, so the App could spike as alive for a short time, but then will become dead again).
Triceps::App::waitDead($appOrName);
Will wait for the App to become dead and return after that. Make sure to not call waitDead() from any of App's Trieads: that would cause a deadlock.
Triceps::App::shutdown($appOrName);
Shut down the App. The shutdown state is sticky, so any repeated calls will have no effect. The call returns immediately and doesn't wait for the App to die. If you want to wait, call waitDead() afterwards. Make sure to not call waitDead() from a Triead: that would cause a deadlock.
Triceps::App::shutdownFragment($appOrName, $fragName);
Shut down a named fragment. This does not shut down the whole App, it just selectively shuts down the Trieads belonging to this fragment . See the explanation of the fragments in http://babkin-cep.blogspot.com/2013/03/triceps-multithreading-concepts.html. The fragment shutdown is not sticky: after a fragment has been shut down, it's possible to create another fragment with the same name. To avoid races, a fragment may be shut down only after all its Trieads are ready. So the caller Triead must call readyReady() before it calls shutdownFragment(). If any of the fragment's Trieads are not ready, the call will confess.
This started as my thoughts on the field of Complex Event Processing, mostly about my OpenSource project Triceps. But now it's about all kinds of software-related things.
Showing posts with label fragment. Show all posts
Showing posts with label fragment. Show all posts
Sunday, June 16, 2013
Monday, April 29, 2013
Multithreaded socket server, part4, socket reader thread
As I've said before, each socket gets served by two threads: one sits reading from the socket and forwards the data into the model and another one sits getting data from the model and forwards it into the socket. Since the same thread can't wait for both a socket descriptor and a thread synchronization primitive, so they have to be separate.
The first thread started is the socket reader. Let's go through it bit by bit.
sub chatSockReadT
{
my $opts = {};
&Triceps::Opt::parse("chatSockReadT", $opts, {@Triceps::Triead::opts,
socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
}, @_);
undef @_; # avoids a leak in threads module
my $owner = $opts->{owner};
my $app = $owner->app();
my $unit = $owner->unit();
my $tname = $opts->{thread};
# only dup the socket, the writer thread will consume it
my ($tsock, $sock) = $owner->trackDupSocket($opts->{socketName}, "<");
The beginning is quite usual. Then it loads the socked from the App and gets it tracked with the TrieadOwner. The difference between trackDupSocket() here and the trackGetSocket() used before is that trackDupSocket() leaves the socked copy in the App, to be found by the writer-side thread.
The socket is reopened in this thread as read-only. The writing to the socket from all the threads has to be synchronized to avoid mixing the half-messages. And the easiest way to synchronize is to always write from one thread, and if the other thread wants to write something, it has to pass the data to the writer thread through the control nexus.
# user messages will be sent here
my $faChat = $owner->importNexus(
from => "global/chat",
import => "writer",
);
# control messages to the reader side will be sent here
my $faCtl = $owner->makeNexus(
name => "ctl",
labels => [
ctl => $faChat->impRowType("ctl"),
],
reverse => 1, # gives this nexus a high priority
import => "writer",
);
Imports the chat nexus and creates the private control nexus for communication with the writer side. The name of the chat nexus is hardcoded hare, since it's pretty much a solid part of the application. If this were a module, the name of the chat nexus could be passed through the options.
The control nexus is marked as reverse even though it really isn't. But the reverse option has a side effect of making this nexus high-priority. Even if the writer thread has a long queue of messages from the chat nexus, the messages from the control nexus will be read first. Which again isn't strictly necessary here, but I wanted to show how it's done.
The type of the control label is imported from the chat nexus, so it doesn't have to be defined from scratch.
$owner->markConstructed();
Triceps::Triead::start(
app => $opts->{app},
thread => "$tname.rd",
fragment => $opts->{fragment},
main => \&chatSockWriteT,
socketName => $opts->{socketName},
ctlFrom => "$tname/ctl",
);
$owner->readyReady();
Then the construction is done and the writer thread gets started. And then the thread is ready and waits for the writer thread to be ready too. The readyReady() works in the fragments just as it does at the start of the app. Whenever a new thread is started, the App becomes not ready, and stays this way until all the threads report that they are ready. The rest of the App keeps working like nothing happened, at least sort of. Whenever a nexus is imported, the messages from this nexus start collecting for this thread, and if there are many of them, the nexus will become backed up and the threads writing to them will block. The new threads have to call readyReady() as usual to synchronize between themselves, and then everything gets on its way.
Of course, if two connections are received in a quick succession, that would start two sets of threads, and readyReady() will continue only after all of them are ready.
my $lbChat = $faChat->getLabel("msg");
my $lbCtl = $faCtl->getLabel("ctl");
$unit->makeHashCall($lbCtl, "OP_INSERT", cmd => "print", arg => "!ready," . $opts->{fragment});
$owner->flushWriters();
A couple of labels get remembered for the future use, and the connection ready message gets sent to the writer thread through the control nexus. By convention of this application, the messages go in the CVS format, with the control messages starting with "!". If this is the first client, this would send
!ready,cliconn1
to the client. It's important to call flushWriters() every time to get the message(s) delivered.
while(<$sock>) {
s/[\r\n]+$//;
my @data = split(/,/);
if ($data[0] eq "exit") {
last; # a special case, handle in this thread
} elsif ($data[0] eq "kill") {
eval {$app->shutdownFragment($data[1]);};
if ($@) {
$unit->makeHashCall($lbCtl, "OP_INSERT", cmd => "print", arg => "!error,$@");
$owner->flushWriters();
}
} elsif ($data[0] eq "shutdown") {
$unit->makeHashCall($lbChat, "OP_INSERT", topic => "*", msg => "server shutting down");
$owner->flushWriters();
Triceps::AutoDrain::makeShared($owner);
eval {$app->shutdown();};
} elsif ($data[0] eq "publish") {
$unit->makeHashCall($lbChat, "OP_INSERT", topic => $data[1], msg => $data[2]);
$owner->flushWriters();
} else {
# this is not something you want to do in a real chat application
# but it's cute for a demonstration
$unit->makeHashCall($lbCtl, "OP_INSERT", cmd => $data[0], arg => $data[1]);
$owner->flushWriters();
}
}
The main loop keeps reading lines from the socket and interpreting them. The lines are in CSV format, and the first field is the command and the rest are the arguments (if any). The commands are:
publish - send a message with a topic to the chat nexus
exit - close the connection
kill - close another connection, by name
shutdown - shut down the server
subscribe - subscribe the client to a topic
unsibscribe - unsubscribe the client from a topic
The exit just exits the loop, since it works the same if the socket just gets closed from the other side.
The kill shuts down by name the fragment where the threads of the other connection belongs. This is a simple application, so it doesn't check any permissions, whether this fragment should allowed to be shut down. If there is no such fragment, the shutdown call will silently do nothing, so the error check and reporting is really redundant (if something goes grossly wrong in the thread interruption code, an error might still occur, but theoretically this should never happen).
The shutdown sends the notification to the common topic "*" (to which all the clients are subscribed by default), then drains the model and shuts it down. The drain makes sure that all the messages in the model get processed (and even written to the socket) without allowing any new messages to be injected. "Shared" means that there is no special exceptions for some threads.
makeShared() actually creates a drain object that keeps the drain active during its lifetime. Here this object is not assigned anywhere, so it gets immediately destroyed and lifts the drain. So potentially more messages can get squeezed in between this point and shutdown. Which doesn't matter a whole lot here.
If it were really important that nothing get sent after the shutdown notification, it could be done like this (this is an untested fragment, so it might contain typos):
} elsif ($data[0] eq "shutdown") {
my $drain = Triceps::AutoDrain::makeExclusive($owner);
$unit->makeHashCall($lbChat, "OP_INSERT", topic => "*", msg => "server shutting down");
$owner->flushWriters();
$drain->wait();
eval {$app->shutdown();};
}
This starts the drain, but this time the exclusive mode means that this thread is allowed to send more data. When the drain is created, it waits for success, so when the new message is inserted, it will be after all the other messages. $drain->wait() does another wait and makes sure that this last message propagates all the way. And then the app gets shut down, while the drain is still in effect, so no more messages can be sent for sure.
The publish sends the data to the chat nexus (note the flushWriters(), as usual!).
And the rest of commands (that would be subscribe and unsubscribe but you can do any other commands like "print") get simply forwarded to the reader thread for execution. Sending through the commands like this without testing is not a good practice for a real application but it's cute for a demo.
{
# let the data drain through
my $drain = Triceps::AutoDrain::makeExclusive($owner);
# send the notification - can do it because the drain is excluding itself
$unit->makeHashCall($lbCtl, "OP_INSERT", cmd => "print", arg => "!exiting");
$owner->flushWriters();
$drain->wait(); # wait for the notification to drain
$app->shutdownFragment($opts->{fragment});
}
$tsock->close(); # not strictly necessary
}
The last part is when the connection get closed, either by the "exit" command or when the socket gets closed. Remember, the socket can get closed asymmetrically, in one direction, so even when the reading is closed, the writing may still work and needs to return the responses to any commands received from the socket. And of course the same is true for the "exit" command.
So here the full exclusive drain sequence is used, ending with the shutdown of this thread's own fragment, which will close the socket. Even though only one fragment needs to be shut down, the drain drains the whole model. Because of the potentially complex interdependencies, there is no way to reliably drain only a part, and all the drains are App-wide.
The last part, with $tsock->close(), is not technically necessary since the shutdown of the fragment will get the socket descriptor revoked anyway. But other than that, it's a good practice that unregisters the socket from the TrieadOwner and then closes it.
The first thread started is the socket reader. Let's go through it bit by bit.
sub chatSockReadT
{
my $opts = {};
&Triceps::Opt::parse("chatSockReadT", $opts, {@Triceps::Triead::opts,
socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
}, @_);
undef @_; # avoids a leak in threads module
my $owner = $opts->{owner};
my $app = $owner->app();
my $unit = $owner->unit();
my $tname = $opts->{thread};
# only dup the socket, the writer thread will consume it
my ($tsock, $sock) = $owner->trackDupSocket($opts->{socketName}, "<");
The beginning is quite usual. Then it loads the socked from the App and gets it tracked with the TrieadOwner. The difference between trackDupSocket() here and the trackGetSocket() used before is that trackDupSocket() leaves the socked copy in the App, to be found by the writer-side thread.
The socket is reopened in this thread as read-only. The writing to the socket from all the threads has to be synchronized to avoid mixing the half-messages. And the easiest way to synchronize is to always write from one thread, and if the other thread wants to write something, it has to pass the data to the writer thread through the control nexus.
# user messages will be sent here
my $faChat = $owner->importNexus(
from => "global/chat",
import => "writer",
);
# control messages to the reader side will be sent here
my $faCtl = $owner->makeNexus(
name => "ctl",
labels => [
ctl => $faChat->impRowType("ctl"),
],
reverse => 1, # gives this nexus a high priority
import => "writer",
);
Imports the chat nexus and creates the private control nexus for communication with the writer side. The name of the chat nexus is hardcoded hare, since it's pretty much a solid part of the application. If this were a module, the name of the chat nexus could be passed through the options.
The control nexus is marked as reverse even though it really isn't. But the reverse option has a side effect of making this nexus high-priority. Even if the writer thread has a long queue of messages from the chat nexus, the messages from the control nexus will be read first. Which again isn't strictly necessary here, but I wanted to show how it's done.
The type of the control label is imported from the chat nexus, so it doesn't have to be defined from scratch.
$owner->markConstructed();
Triceps::Triead::start(
app => $opts->{app},
thread => "$tname.rd",
fragment => $opts->{fragment},
main => \&chatSockWriteT,
socketName => $opts->{socketName},
ctlFrom => "$tname/ctl",
);
$owner->readyReady();
Then the construction is done and the writer thread gets started. And then the thread is ready and waits for the writer thread to be ready too. The readyReady() works in the fragments just as it does at the start of the app. Whenever a new thread is started, the App becomes not ready, and stays this way until all the threads report that they are ready. The rest of the App keeps working like nothing happened, at least sort of. Whenever a nexus is imported, the messages from this nexus start collecting for this thread, and if there are many of them, the nexus will become backed up and the threads writing to them will block. The new threads have to call readyReady() as usual to synchronize between themselves, and then everything gets on its way.
Of course, if two connections are received in a quick succession, that would start two sets of threads, and readyReady() will continue only after all of them are ready.
my $lbChat = $faChat->getLabel("msg");
my $lbCtl = $faCtl->getLabel("ctl");
$unit->makeHashCall($lbCtl, "OP_INSERT", cmd => "print", arg => "!ready," . $opts->{fragment});
$owner->flushWriters();
A couple of labels get remembered for the future use, and the connection ready message gets sent to the writer thread through the control nexus. By convention of this application, the messages go in the CVS format, with the control messages starting with "!". If this is the first client, this would send
!ready,cliconn1
to the client. It's important to call flushWriters() every time to get the message(s) delivered.
while(<$sock>) {
s/[\r\n]+$//;
my @data = split(/,/);
if ($data[0] eq "exit") {
last; # a special case, handle in this thread
} elsif ($data[0] eq "kill") {
eval {$app->shutdownFragment($data[1]);};
if ($@) {
$unit->makeHashCall($lbCtl, "OP_INSERT", cmd => "print", arg => "!error,$@");
$owner->flushWriters();
}
} elsif ($data[0] eq "shutdown") {
$unit->makeHashCall($lbChat, "OP_INSERT", topic => "*", msg => "server shutting down");
$owner->flushWriters();
Triceps::AutoDrain::makeShared($owner);
eval {$app->shutdown();};
} elsif ($data[0] eq "publish") {
$unit->makeHashCall($lbChat, "OP_INSERT", topic => $data[1], msg => $data[2]);
$owner->flushWriters();
} else {
# this is not something you want to do in a real chat application
# but it's cute for a demonstration
$unit->makeHashCall($lbCtl, "OP_INSERT", cmd => $data[0], arg => $data[1]);
$owner->flushWriters();
}
}
The main loop keeps reading lines from the socket and interpreting them. The lines are in CSV format, and the first field is the command and the rest are the arguments (if any). The commands are:
publish - send a message with a topic to the chat nexus
exit - close the connection
kill - close another connection, by name
shutdown - shut down the server
subscribe - subscribe the client to a topic
unsibscribe - unsubscribe the client from a topic
The exit just exits the loop, since it works the same if the socket just gets closed from the other side.
The kill shuts down by name the fragment where the threads of the other connection belongs. This is a simple application, so it doesn't check any permissions, whether this fragment should allowed to be shut down. If there is no such fragment, the shutdown call will silently do nothing, so the error check and reporting is really redundant (if something goes grossly wrong in the thread interruption code, an error might still occur, but theoretically this should never happen).
The shutdown sends the notification to the common topic "*" (to which all the clients are subscribed by default), then drains the model and shuts it down. The drain makes sure that all the messages in the model get processed (and even written to the socket) without allowing any new messages to be injected. "Shared" means that there is no special exceptions for some threads.
makeShared() actually creates a drain object that keeps the drain active during its lifetime. Here this object is not assigned anywhere, so it gets immediately destroyed and lifts the drain. So potentially more messages can get squeezed in between this point and shutdown. Which doesn't matter a whole lot here.
If it were really important that nothing get sent after the shutdown notification, it could be done like this (this is an untested fragment, so it might contain typos):
} elsif ($data[0] eq "shutdown") {
my $drain = Triceps::AutoDrain::makeExclusive($owner);
$unit->makeHashCall($lbChat, "OP_INSERT", topic => "*", msg => "server shutting down");
$owner->flushWriters();
$drain->wait();
eval {$app->shutdown();};
}
This starts the drain, but this time the exclusive mode means that this thread is allowed to send more data. When the drain is created, it waits for success, so when the new message is inserted, it will be after all the other messages. $drain->wait() does another wait and makes sure that this last message propagates all the way. And then the app gets shut down, while the drain is still in effect, so no more messages can be sent for sure.
The publish sends the data to the chat nexus (note the flushWriters(), as usual!).
And the rest of commands (that would be subscribe and unsubscribe but you can do any other commands like "print") get simply forwarded to the reader thread for execution. Sending through the commands like this without testing is not a good practice for a real application but it's cute for a demo.
{
# let the data drain through
my $drain = Triceps::AutoDrain::makeExclusive($owner);
# send the notification - can do it because the drain is excluding itself
$unit->makeHashCall($lbCtl, "OP_INSERT", cmd => "print", arg => "!exiting");
$owner->flushWriters();
$drain->wait(); # wait for the notification to drain
$app->shutdownFragment($opts->{fragment});
}
$tsock->close(); # not strictly necessary
}
The last part is when the connection get closed, either by the "exit" command or when the socket gets closed. Remember, the socket can get closed asymmetrically, in one direction, so even when the reading is closed, the writing may still work and needs to return the responses to any commands received from the socket. And of course the same is true for the "exit" command.
So here the full exclusive drain sequence is used, ending with the shutdown of this thread's own fragment, which will close the socket. Even though only one fragment needs to be shut down, the drain drains the whole model. Because of the potentially complex interdependencies, there is no way to reliably drain only a part, and all the drains are App-wide.
The last part, with $tsock->close(), is not technically necessary since the shutdown of the fragment will get the socket descriptor revoked anyway. But other than that, it's a good practice that unregisters the socket from the TrieadOwner and then closes it.
Sunday, April 28, 2013
Multithreaded socket server, part 1, dynamic threads overview
As a quick announcement, I've renamed some of the methods described in the last post, and updated the post.
Now, to the new stuff. The threads can be used to run a TCP server that accepts the connections and then starts the new client communication thread(s) for each connection. This thread can then communicate with the rest of the model, feeding and receiving data, as usual, through the nexuses.
The challenge here is that there must be a way to create the threads dynamically, and later when the client closes connection, to dispose of them. There are two possible general approaches:
Both have their own advantages and difficulties, but the approach with the dynamic creation and deletion of threads ended up looking easier, and that's what Triceps has. The second approach is not particularly well supported yet. You can create multiple Apps in the program, and you can connect them by making two Triceps Trieads run in the same OS thread and ferry the data around. But it's extremely cumbersome. This will be improved in the future, but for now the first approach is the ticket.
The dynamically created threads are grouped into the fragments. This is done by specifying the fragment name option when creating a thread. The threads in a fragment have a few special properties.
One, it's possible to shut down the whole fragment in one fell swoop. There is no user-accessible way to shut down the individual threads, you can shut down either the whole App or a fragment. Shutting down individual threads is dangerous, since it can mess up the application in many non-obvious ways. But shutting down a fragment is OK, since the fragment serves a single logical function, such as service one TCP connection, and it's OK to shut down the whole logical function.
Two, when a thread in the fragment exits, it's really gone, and takes all its nexuses with it. Well, technically, the nexuses continue to exist as long as there are threads connected to them, but no new connections can be created after this point. Since usually the whole fragment will be gone together, and since the nexuses defined by the fragment's thread are normally used only by the other threads of the same fragment, a fragment shutdown cleans up its state like the fragment had never existed. By contrast, when a normal thread exists, the nexuses defined by it stay present and accessible until the App shuts down.
Another, somewhat surprising, challenge is the interaction of the threads and sockets (or file descriptors in general) in Perl. I've already touched upon it, but there is a lot more.
To show how all this stuff works, I've created an example of a "chat server". It's not really a human-oriented chat, it's more of a machine-oriented publish-subscribe, and specially tilted to work through the running of a socket server with threads.
In this case the core logic is absolutely empty. All there is of it, is a nexus that passes messages through it. The clients read from this nexus to get the messages, and write to this nexus to send the messages.
When the App starts, it has only one thread, the listener thread that listens on a socket for the incoming connections. The listener doesn't even care about the common nexus and doesn't import it. When a connection comes in, the listener creates two threads to serve it: the reader reads the socket and sends to the nexus, and the writer receives from the nexus and writes to the socket. These two threads constitute a fragment for this client. They also create their own private nexus, allowing the reader to send control messages to the writer. That could also have been done through the central common nexus, but I wanted to show that there are different ways of doing things.
With a couple of clients connected, threads and sockets start looking like this:
----------->/ \------------------>
And the listener thread still stays on the side.
Now, to the new stuff. The threads can be used to run a TCP server that accepts the connections and then starts the new client communication thread(s) for each connection. This thread can then communicate with the rest of the model, feeding and receiving data, as usual, through the nexuses.
The challenge here is that there must be a way to create the threads dynamically, and later when the client closes connection, to dispose of them. There are two possible general approaches:
- dynamically create and delete the threads in the same App;
- create a new App per connection and connect it to the main App.
Both have their own advantages and difficulties, but the approach with the dynamic creation and deletion of threads ended up looking easier, and that's what Triceps has. The second approach is not particularly well supported yet. You can create multiple Apps in the program, and you can connect them by making two Triceps Trieads run in the same OS thread and ferry the data around. But it's extremely cumbersome. This will be improved in the future, but for now the first approach is the ticket.
The dynamically created threads are grouped into the fragments. This is done by specifying the fragment name option when creating a thread. The threads in a fragment have a few special properties.
One, it's possible to shut down the whole fragment in one fell swoop. There is no user-accessible way to shut down the individual threads, you can shut down either the whole App or a fragment. Shutting down individual threads is dangerous, since it can mess up the application in many non-obvious ways. But shutting down a fragment is OK, since the fragment serves a single logical function, such as service one TCP connection, and it's OK to shut down the whole logical function.
Two, when a thread in the fragment exits, it's really gone, and takes all its nexuses with it. Well, technically, the nexuses continue to exist as long as there are threads connected to them, but no new connections can be created after this point. Since usually the whole fragment will be gone together, and since the nexuses defined by the fragment's thread are normally used only by the other threads of the same fragment, a fragment shutdown cleans up its state like the fragment had never existed. By contrast, when a normal thread exists, the nexuses defined by it stay present and accessible until the App shuts down.
Another, somewhat surprising, challenge is the interaction of the threads and sockets (or file descriptors in general) in Perl. I've already touched upon it, but there is a lot more.
To show how all this stuff works, I've created an example of a "chat server". It's not really a human-oriented chat, it's more of a machine-oriented publish-subscribe, and specially tilted to work through the running of a socket server with threads.
In this case the core logic is absolutely empty. All there is of it, is a nexus that passes messages through it. The clients read from this nexus to get the messages, and write to this nexus to send the messages.
When the App starts, it has only one thread, the listener thread that listens on a socket for the incoming connections. The listener doesn't even care about the common nexus and doesn't import it. When a connection comes in, the listener creates two threads to serve it: the reader reads the socket and sends to the nexus, and the writer receives from the nexus and writes to the socket. These two threads constitute a fragment for this client. They also create their own private nexus, allowing the reader to send control messages to the writer. That could also have been done through the central common nexus, but I wanted to show that there are different ways of doing things.
With a couple of clients connected, threads and sockets start looking like this:
client1 reader ----> client1 control nexus ------> client1 writer
\ /
----------->\ /------------------>
chat nexus
/ \
client2 reader ----> client2 control nexus ------> client2 writerThursday, March 28, 2013
Triceps Multithreading Concepts
The multithreading support has solidified to the point where I can start documenting it. The full description will have to wait until I finalize the Perl API but the concepts are pretty much settled.
The idea of the multithreading support in Triceps is to make writing the multithreaded model easier. To make writing the good code easy and writing the bad code hard. But of course you don't have to use it, you can always make your own if you wish (just as you could before now).
Without the further ado, the diagram of a multithreaded Triceps application:
The Triceps application is embodied in the class App. It's possible to have multiple Apps in one program.
Each thread has multiple parts to it. First, of course, there is the OS-level (or, technically, library-level, or Perl-level) thread where the code executes. And then there is a class that represents this thread and its place in the App. To reduce the naming conflict, this class is creatively named Triead (pronounced still "thread"). In the discussion I use the word "thread" for both concepts, the OS-level thread and the Triead, and it's usually clear from the context which one I mean. But sometimes it's particularly important to make the distinction, and then I name one or the other explicitly.
The class Triead itself is largely opaque, allowing only a few methods for introspection. But there is a control interface to it, called TrieadOwner. The Triead is visible from the outside, the TrieadOwner object is visible only in the OS thread that owns the Triead. The TrieadOwner manages the thread state and acts as the intermediary in the thread's communications with the App.
The data is passed between the threads through the Nexuses. A Nexus is unidirectional, with data going only one way, however it may have multiple writers and multiple readers. All the readers see the exact same data, with rowops going in the exact same order (well, there will be other policies in the future as well, but for now there is only one policy).
A Nexus passes through the data for multiple labels, very much like an FnReturn does (and indeed there is a special connection between them). A Nexus also allows to export the row types and table types from one thread to another.
A Nexus gets connected to the Trieads to though the Facets. A Facet is a connection point between the Nexus and the Triead. Each Facet is for either reading or writing. And there may be only one Facet between a given Nexus and a given Triead, you can't make multiple connections between them. As a consequence, a thread can't both write and read to the same Nexus, it can do only one thing. This might actually be an overly restrictive limitation and might change in the future but that's how things work now.
Each Nexus also has a direction: either direct ("downwards") or reverse ("upwards"). And yes, the reverse Nexuses allow to build the models with loops. However the loops consisting of only the direct Nexuses are not allowed, nor of only reverse Nexuses. They would mess up the flow control. The proper loops must contain a mix of direct and reverse Nexuses.
The direct Nexuses have a limited queue size and stop the writers when the queue fills up, until the data gets consumed, thus providing the flow control. The reverse Nexuses have an unlimited queue size, which allows to avoid the circular deadlocks.
Normally an App is built once and keeps running in this configuration until it stops. But there is a strong need to have the threads dynamically added and deleted too. For example, if the App running as a server and clients connect to it, each client needs to have its thread(s) added on connection and then deleted when the client disconnects. This is handled through the concept of fragments. There is no Fragment class but when you create a Triead, you can specify a fragment name for it. Then it becomes possible to shut down and dispose the threads in a fragment after the fragment's work is done.
The idea of the multithreading support in Triceps is to make writing the multithreaded model easier. To make writing the good code easy and writing the bad code hard. But of course you don't have to use it, you can always make your own if you wish (just as you could before now).
Without the further ado, the diagram of a multithreaded Triceps application:
![]() |
Fig. 1. Triceps application. |
The Triceps application is embodied in the class App. It's possible to have multiple Apps in one program.
Each thread has multiple parts to it. First, of course, there is the OS-level (or, technically, library-level, or Perl-level) thread where the code executes. And then there is a class that represents this thread and its place in the App. To reduce the naming conflict, this class is creatively named Triead (pronounced still "thread"). In the discussion I use the word "thread" for both concepts, the OS-level thread and the Triead, and it's usually clear from the context which one I mean. But sometimes it's particularly important to make the distinction, and then I name one or the other explicitly.
The class Triead itself is largely opaque, allowing only a few methods for introspection. But there is a control interface to it, called TrieadOwner. The Triead is visible from the outside, the TrieadOwner object is visible only in the OS thread that owns the Triead. The TrieadOwner manages the thread state and acts as the intermediary in the thread's communications with the App.
The data is passed between the threads through the Nexuses. A Nexus is unidirectional, with data going only one way, however it may have multiple writers and multiple readers. All the readers see the exact same data, with rowops going in the exact same order (well, there will be other policies in the future as well, but for now there is only one policy).
A Nexus passes through the data for multiple labels, very much like an FnReturn does (and indeed there is a special connection between them). A Nexus also allows to export the row types and table types from one thread to another.
A Nexus gets connected to the Trieads to though the Facets. A Facet is a connection point between the Nexus and the Triead. Each Facet is for either reading or writing. And there may be only one Facet between a given Nexus and a given Triead, you can't make multiple connections between them. As a consequence, a thread can't both write and read to the same Nexus, it can do only one thing. This might actually be an overly restrictive limitation and might change in the future but that's how things work now.
Each Nexus also has a direction: either direct ("downwards") or reverse ("upwards"). And yes, the reverse Nexuses allow to build the models with loops. However the loops consisting of only the direct Nexuses are not allowed, nor of only reverse Nexuses. They would mess up the flow control. The proper loops must contain a mix of direct and reverse Nexuses.
The direct Nexuses have a limited queue size and stop the writers when the queue fills up, until the data gets consumed, thus providing the flow control. The reverse Nexuses have an unlimited queue size, which allows to avoid the circular deadlocks.
Normally an App is built once and keeps running in this configuration until it stops. But there is a strong need to have the threads dynamically added and deleted too. For example, if the App running as a server and clients connect to it, each client needs to have its thread(s) added on connection and then deleted when the client disconnects. This is handled through the concept of fragments. There is no Fragment class but when you create a Triead, you can specify a fragment name for it. Then it becomes possible to shut down and dispose the threads in a fragment after the fragment's work is done.
Subscribe to:
Posts (Atom)