Showing posts with label socket. Show all posts
Showing posts with label socket. Show all posts

Tuesday, July 12, 2022

small servers

 Pretty regularly we (the programmers in general) do make the small servers that sit on some random port and execute commands from the clients. With no security. We rely on the obscurity of the randomly (but not really randomly, since we usually choose it explicitly) chosen port, and usually also on the relative privacy of the development machines for security. And ideally, firewalls that forbid connections to random ports from the outside. But what if the machine is not very private (such as the tool gets run on a production machine), and the firewall doesn't forbid the connection? Someone else might connect. Then of course there is the obscurity of the tool's protocol, but if the tool becomes popular, its protocol becomes widely known. And then if the tool can access files, it easily becomes a target for attacks. So a little obscure tool can outgrow its breeches and become a security hole.

But it seems to be not that hard to fix with a helper library that would check the password on connection. It can automatically generate the password and put it into a file, and a client would read it from the file and supply on connection (like .Xauthority does). Maybe there even is already an existing library that does this?

Wednesday, April 22, 2015

ThreadedClient improvement of timeout handling

The work on updating the tests for the new ordered index brought a nice side effect: the handling of the timeouts in the Perl class X:ThreadedClient has improved. Now it returns not only the error message but also the data received up to that point. This helps a lot with diagnosing the errors in the automated tests of TQL: the error messages get returned in a clear way.

Sunday, July 20, 2014

a minor TQL clean-up

I've been working on the documentation chapter about TQL, and noticed that its single-threaded version has not been updated to use the query name as the label in the response. So I've fixed it. The previous output like

query,{read table tSymbol}
lb1read OP_INSERT symbol="AAA" name="Absolute Auto Analytics Inc" eps="0.5"
+EOD,OP_NOP,lb1read

now becomes

query,{read table tSymbol}
query OP_INSERT symbol="AAA" name="Absolute Auto Analytics Inc" eps="0.5"
+EOD,OP_NOP,query

I've also added the optional argument for name override in SimpleServer::makeServerOutLabel().

Thursday, May 1, 2014

auto-detecting the file handle class when passing it through the App

I've been editing the documentation for the passing of the file descriptors between threads, and I've realized that there is no need to specify the class of the Perl file handle when it gets loaded from the App. Instead the name of the class can be easily stored along with the file descriptor and then extracted back. So I went and changed the code to do that. The modifications are:

In the App class the method storeFd() takes an extra argument:

$app->storeFd($name, $fd, $className);

The $className specifies the class of the file object. The empty string can be used as a synonym for "IO::Handle", since ref() returns an empty string for the globs of the old-fashioned file handles.

The methods loadFd() and loadDupFd() now return two values:

($fd, $fclass) = $app->loadFd($name);
($fd, $fclass) = $app->loadDupFd($name);

The second returned value is the class name, as it was stored by storeFd().

And the methods App::loadDupSocket(), TrieadOwner::trackDupSocket() and TrieadOwner::trackGetSocket() have been removed. Instead App::loadDupFile(), TrieadOwner::trackDupFile() and TrieadOwner::trackGetFile() have been updated to get the stored file handle class and use it transparently, so they just work correctly for the sockets now.

The methods App::loadDupFileClass(), trieadOwner::trackDupClass() and TrieadOwner::trackGetClass() are still present, in case if you would want to override the class name, but now they should be pretty much never needed, since any class names should be handled automatically without the need for overrides.

And the C++ App class got changed a little bit as well, with the extended versions of storeFd() and loadFd():

void storeFd(const string &name, int fd);
void storeFd(const string &name, int fd, const string &className);
int loadFd(const string &name, string *className = NULL) const;

The new interface is backwards-compatible with the old one but also has the provision for storing and loading the file class name.

Friday, June 28, 2013

TrieadOwner and TrackedFile reference, Perl, part 4

The file interruption part of the API deals with how the thread handles a request to die. It should stop its work and exit, and for the normal threads that read from nexuses, this is fairly straightforward. The difficulty is with the threads that read from the outside sources (sockets and such). They may be in the middle of a read call, with no way to tell when the next chunk of data will arrive. These long calls on the file descriptors need to be interrupted when the thread is requested to die.

The interruption is done by revoking the file descriptors (dupping a /dev/null into it) and sending the signal SIGUSR2 to the thread. Even if the dupping doesn't interrupt the file operation, SIGUSR2 does, and on restart it will find that the file descriptor now returns to /dev/null and return immediately. Triceps defines a SIGUSR2 handler that does nothing, but you can override it with a custom one.

For this to work, Triceps needs to know, which file descriptors are to be revoked, which is achieved by registering for tracking them in the TrieadOwner. To avoid revoking the unrelated descriptors, they must be unregistered before closing. The normal sequence is:
  • open a file descriptor
  • register it for tracking
  • do the file operations
  • unregister the file descriptor
  • close it

The lowest-level calls deal with the raw tracking:

$to->trackFd($fd);

Register the file descriptor (obtained with a fileno()  or such) for tracking. The repeated calls for the same descriptor have no effect.

$to->forgetFd($fd);

Unregister the file descriptor. If the descriptor is not registered, the call is ignored.

The next level of the API deals with the file handles, extracting the file descriptors from them as needed.

$to->track(FILE);

Get a file descriptor from the file handle and track it.

$to->forget(FILE);

Get a file descriptor from the file handle and unregister it.

$to->close(FILE);

Unegister the file handle's descriptor then close the file handle. It's a convenience wrapper, to make the unregistering easier to remember.

The correct sequence might be hard to follow if the code involves some dying and evals catching these deaths. To handle that, the next level of the API provides the automatic tracking by scope. The scoping is done with the class Triceps::TrackedFile.Which is probably easier to describe right here.

A TrackedFile object keeps a reference of a file handle and also knows of its file descriptor being tracked by the TrieadOwner (so yes, it has a reference to the TrieadOwner too). Until the TrackedFile is destroyed, the file handle in it will never have its reference count go to 0, so it will never be automatically closed and destroyed. And when the TrackedFile is destroyed, first it tells the TrieadOwner to forget the file descriptor and only then unreferences the file handle, preserving the correct sequence.

And the scope of the TrackedFile is controlled by the scope of a variable that keeps a reference to it. If it's a local/my variable, TrackedFile will be destroyed on leaving the block, if it's a field in an object, it will be destroyed when the object is destroyed or the field is reset. Basically, the usual Perl scope business in Triceps.

If you want to close the file handle before leaving the scope of TrackedFile, don't call close() on the file handle. Instead, call close on the TrackedFile:

$trf->close();

This will again properly untrack the file descriptor, and then close the file handle, and remember that it has been closed, so no seconds attempt at that will be done when the TrackedFile gets destroyed.

There also are a couple of getter methods on the TrackedFile:

$fd = $trf->fd();

Get the tracked file descriptor. If the file handle has been already closed, will return -1.

$fh = $trf->get();

Get the tracked file handle. If the file handle had already been closed, will confess.

Now we get back to the TrieadOwner.

$trf = $to->makeTrackedFile(FILE);

Create a TrackedFile object from a file handle.

$trf = $to->makeTrackedFileFd(FILE, $fd);

The more low-level way to construct a TrackedFile, with specifying the file descriptor as a separate explicit argument. makeTrackedFile() is pretty much a wrapper that calls fileno() on the file handle and then calls makeTrackedFileFd().

And the following methods combine the loading of a file descriptor from the App and tracking it by the TrieadOwner. They are the most typically used interface for passing around the file handles through the App.

($trf, $file) = $to->trackDupFile($name, $mode);

Load the dupped file handle from the App, create an IO::Handle file object from it according to the $mode (in either r/w/a/r+/w+/a+ or </>/>>/+</+>/+>> format), and make a TrackedFile from it. Returns a pair of the TrackedFile object and the created file handle. The App still keeps the original file descriptor. The $mode must be consistent with the original mode of the file stored into the App. $name is the name used to store the file descriptor into the App.

($trf, $file) = $to->trackGetFile($name, $mode);


Similar to trackDupFile() only the file descriptor is moved from the App, and the App forgets about it.


($trf, $file) = $to->trackDupSocket($name, $mode);
($trf, $file) = $to->trackGetSocket($name, $mode);


Similar to the File versions, only create a file handle object of the class IO::Socket::INET.


($trf, $file) = $to->trackDupClass($name, $mode, $class);
($trf, $file) = $to->trackGetClass($name, $mode, $class);


Similar to the File versions, only creates the file handle of an arbitrary subclass of IO::Handle (with the name specified in $class). So for example if you ever want to load an IPv6 or Unix domain socket, these are the methods to use.

Monday, May 27, 2013

TQL server with multithreading

The next big example I've been talking about is finally ready. It's the adaptation of the TQL to work with the multithreaded server framework. The big reason for this example is the export of the table types through a nexus and creation of tables from them. And we'll get to that, but first let's look at the new abilities of the TQL.

TQL is still not of a production quality, in either single- or multi-threaded variety, and contains a large number of simplifying assumptions in its code. As the single-threaded version works symbiotically with the SimpleServer, the multithreaded version works with the ThreadedServer.

One thread created by the programmer contains the "core logic" of the model. It doesn't technically have to be all in a single thread: the data can be forwarded to the other threads and then the results forwarded back from them. But a single core logic thread is a convenient simplification. This thread has some input labels, to receive data from the outside, and some tables with the computed results that can be read by TQL. Of course, it's entirely realistic to have also just the output labels without tables, sending a stream or computed rowops, but again for simplicity I've left this out for now.

This core logic thread creates a TQL instance, which listens on a socket, accepts the connections, forwards the input data to the core logic, performs queries on the tables from the core logic and sends the results back to the client. To this end, the TQL instance creates a few nexuses in the core logic thread and uses them to communicate between all the fragments. The input labels and tables in the core thread also get properly connected to these nexuses. The following figure shows the thread architecture, I'll use it for the reference throughout the discussion:

Fig. 1. TQL application.

The core logic thread then goes into its main loop and performs as its name says the core logic computations.

Here is a very simple example of a TQL application:

sub appCoreT # (@opts)
{
  my $opts = {};
  &Triceps::Opt::parse("appCoreT", $opts, {@Triceps::Triead::opts,
    socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
  }, @_);
  undef @_; # avoids a leak in threads module
  my $owner = $opts->{owner};
  my $app = $owner->app();
  my $unit = $owner->unit();

  # build the core logic

  my $rtTrade = Triceps::RowType->new(
    id => "int32", # trade unique id
    symbol => "string", # symbol traded
    price => "float64",
    size => "float64", # number of shares traded
  ) or confess "$!";

  my $ttWindow = Triceps::TableType->new($rtTrade)
    ->addSubIndex("byId",
      Triceps::SimpleOrderedIndex->new(id => "ASC")
    )
    or confess "$!";
  $ttWindow->initialize() or confess "$!";

  # Represents the static information about a company.
  my $rtSymbol = Triceps::RowType->new(
    symbol => "string", # symbol name
    name => "string", # the official company name
    eps => "float64", # last quarter earnings per share
  ) or confess "$!";

  my $ttSymbol = Triceps::TableType->new($rtSymbol)
    ->addSubIndex("bySymbol",
      Triceps::SimpleOrderedIndex->new(symbol => "ASC")
    )
    or confess "$!";
  $ttSymbol->initialize() or confess "$!";

  my $tWindow = $unit->makeTable($ttWindow, "EM_CALL", "tWindow")
    or confess "$!";
  my $tSymbol = $unit->makeTable($ttSymbol, "EM_CALL", "tSymbol")
    or confess "$!";

  # export the endpoints for TQL (it starts the listener)
  my $tql = Triceps::X::Tql->new(
    name => "tql",
    trieadOwner => $owner,
    socketName => $opts->{socketName},
    tables => [
      $tWindow,
      $tSymbol,
    ],
    tableNames => [
      "window",
      "symbol",
    ],
    inputs => [
      $tWindow->getInputLabel(),
      $tSymbol->getInputLabel(),
    ],
    inputNames => [
      "window",
      "symbol",
    ],
  );

  $owner->readyReady();

  $owner->mainLoop();
}

{
  my ($port, $thread) = Triceps::X::ThreadedServer::startServer(
      app => "appTql",
      main => \&appCoreT,
      port => 0,
      fork => -1, # create a thread, not a process
  );
}

This core logic is very simple: all it does is create two tables and then send the input data into them. The server gets started in a background thread (fork => -1) because this code is taken from a test that then goes and runs the expect with the SimpleClient.

The specification of inputs and tables for TQL is somewhat ugly but I kept it as it was historic (it was done this way to keep the parsing of the options simpler).  The new options compared to the single-threaded TQL are the "threadOwner", "inputs" and "inputNames". The "threadOwner" is how TQL knows that it must run in the multithreaded mode, and it's used to create the nexuses for communication between the core logic and the rest of TQL. The inputs are needed because the multithreaded TQL parses and forwards the input data, unlike the single-threaded version that relies on the SimpleServer to do that according to the user-defined dispatch table.

The names options don't have to be used: if you name your labels and tables nicely and suitable for the external vieweing, the renaming-for-export can be skipped.

Similar to the single-threaded version, if any of the options "tables" or "inputs" is used, the TQL object gets initialized automatically, otherwise the tables and inputs can be added piecemeal with addTable(), addNamedTable(), addInput(), addNamedInput(), and then the whole thing initialized manually.

Then the clients can establish the connections with the TQL server, send in the data and the queries. To jump in, here is a trace of a simple session that sends some data, then does some table dumps and subscribes, not touching the queries yet. I'll go through it fragment by fragment and explain the meaning. The dumps and subscribes were the warm-up exercises before writing the full queries, but they're useful in their own right, and here they serve as the warm-up exercises for the making of the queries!


> connect c1
c1|ready

The "connect" is not an actual command send but just the indication in the trace that the connection was set up by the client "c1" (it's a trace from the SimpleClient, so it follows the usual conventions). The "ready" response is set when the connection is opened, similar to the chat server shown before.

> c1|subscribe,s1,symbol
c1|subscribe,s1,symbol

This is a subscription request. It means "I'm not interested in the current state of a table but send me all the updates". The response is the mirror of the request, so that the client knows that the request has been processed. "s1" is the unique identifier of the request, so that the client can match together the responses it received to the requests it sent (and keeping the uniqueness is up to the client, the server may refuse the requests with duplicate identifiers).  And "symbol" is the name of the table. Once a subscription is in place, there is no way to unsubscribe other than by disconnecting the client (it's doable but adds complications, and I wanted to skip over the nonessential parts). Subscribing multiple times to the same table will send a confirmation every time but the repeated confirmations will have no effect: only one copy of the data will be sent anyway.

> c1|d,symbol,OP_INSERT,ABC,ABC Corp,1.0
c1|d,symbol,OP_INSERT,ABC,ABC Corp,1

This sends the data into the model. And since it propagates through the subscription, the data gets sent back too. The "symbol" here means two different things: on the input side it's the name of the label where the data is sent, on the output side it's the name of the table that has been subscribed to.

The data lines start with the command "d" (since the data is sent much more frequently than the commands, I've picked a short one-letter "command name" for it), then the label/table name, opcode and the row fields in CSV format.

> c1|confirm,cf1
c1|confirm,cf1,,,

The "confirm" command provides a way for the client to check that the data it send had propagated through the model. And it doesn't have to subscribe back to the data and read them. Send some data lines, then send the "confirm" command and wait for it to come back (again, the unique id allows to keep multiple confirmations in flight if you please). This command doesn't guarantee that all the clients have seen the results from that data. It only guarantees that the core logic had seen the data, and more weakly guarantees that the data has been processed by the core logic, and this particular client had already seen all the results from it.

Why weakly? It has to do with the way it works inside, and it depends on the core logic. If the core logic consists of one thread, the guarantee is quite strong. But if the core logic farms out the work from the main thread to the other threads and then collects the results back, the guarantee breaks.

On the Fig. 1 you can see that unlike the chat server shown before, TQL doesn't have any private nexuses for communication between the reader and writer threads of a client. Instead it relies on the same input and output nexuses, adding a control label to them, to forward the commands from the reader to the writer. The TQL object in the core logic thread creates a short-circuit connection between the control labels in the input and output nexuses, forwarding the commands. And if the core logic all runs in one thread, this creates a natural pipeline: the data comes in, gets processed, comes out, the "confirm" command comes in, comes out after the data. But if the core logic farms out the work to more threads, the confirmation can "jump the line" because its path is a direct short circuit.

> c1|drain,dr1
c1|drain,dr1,,,

The "drain" is an analog of "confirm" but more reliable and slower:  the reader thread drains the whole model before sending the command on. This guarantees that all the processing is done, and all the output from it has been sent to all the clients.

> c1|dump,d2,symbol
c1|startdump,d2,symbol
c1|d,symbol,OP_INSERT,ABC,ABC Corp,1
c1|dump,d2,symbol

The "dump" command dumps the current contents of a table. Its result starts with "startdump", and the same id and table name as in the request, then goes the data (all with OP_INSERT), finishing with the completion confirmation echoing the original command. The dump is atomic, the contents of the table doesn't change in the middle of the dump. However if a subscription on this table is active, the data rows from that subscription may come before and after the dump.

I'm not going to describe the error reporting, but it's worth mentioning that if a command contains errors, its "confirmation" will be an error line with the same identifier.

> c1|dumpsub,ds3,symbol
c1|startdump,ds3,symbol
c1|d,symbol,OP_INSERT,ABC,ABC Corp,1
c1|dumpsub,ds3,symbol

The "dumpsub" command is a combination of a dump and subscribe: get the initial state and then get all the updates.  The confirmation of "dumpsub" marks the boundary between the original dump and the following updates.

> c1|d,symbol,OP_INSERT,DEF,Defense Corp,2.0
c1|d,symbol,OP_INSERT,DEF,Defense Corp,2

Send some more data, and it comes back only once, even though the subscription was done twice: once in "subscribe" and once in "dumpsub". The repeated subscription requests simply get consumed into one subscription.

> c1|d,window,OP_INSERT,1,ABC,101,10

This sends a row to the other table but nothing comes back because there is no subscription to that table.

> c1|dumpsub,ds4,window
c1|startdump,ds4,window
c1|d,window,OP_INSERT,1,ABC,101,10
c1|dumpsub,ds4,window
> c1|d,window,OP_INSERT,2,ABC,102,12
c1|d,window,OP_INSERT,2,ABC,102,12

This demonstrates the pure dump-and-subscribe without any interventions.

> c1|shutdown
c1|shutdown,,,,
c1|__EOF__

And the shutdown command works the same as in the chat server, draning and then shutting down the whole server.

Now on to the queries.

 > connect c1
c1|ready
> c1|d,symbol,OP_INSERT,ABC,ABC Corp,1.0

Starts a client connection and sends some data.


> c1|querysub,q1,query1,{read table symbol}{print tokenized 0}
c1|d,query1,OP_INSERT,ABC,ABC Corp,1
c1|querysub,q1,query1

The "querysub" command does the "query-and-subscribe": reads the initial state of the table, processed through the query, and then subscribes to any future updates. The single-threaded variety of TQL doesn't do this, it does just the one-time queries. The multithreaded TQL could also do the one-time queries, and also just the subscribes without the initial state, but I've been cutting corners for this example and the only thing that's actually available is the combination of two, the "querysub".

"q1" is similar to the other command, the command identifier. The next field "query1" is the name for the query, it's the name that will be shown for the data lines coming out of the query. And then goes the query in the brace-quoted format, same as the single-threaded TQL (and there is no further splitting by commas, so the commas can be used freely in the query).

The identified and the name for the query sound kind of redundant. But the client may generate them in different ways and need both. The name has the more symbolic character. The identifier can be generated as a sequence of numbers,  so that the client can keep track of its progress more easily. And the error reports include the identifier but not the query name in them.

For the query, there is no special line coming out before the initial dump. Supposedly, there would not be more than one query in flight with the same name, so this could be easily told apart based on the name in the data lines. There is also an underlying consideration that when the query involves a join, in the future the initial dump might be happening in multiple chunks, requiring to either surround every chunk with the start-end lines or just let them go without the extra notifications, as they are now.
 
And the initial dump ends as usual with getting the echo of the command (without the query part) back.

This particular query is very simple and equivalent to a "dumpsub".


> c1|d,symbol,OP_INSERT,DEF,Defense Corp,2.0
c1|d,query1,OP_INSERT,DEF,Defense Corp,2

Send more data and it will come out of the query.


> c1|querysub,q2,query2,{read table symbol}{where istrue {$%symbol =~ /^A/}}{project fields {symbol eps}}
c1|t,query2,query2 OP_INSERT symbol="ABC" eps="1"
c1|querysub,q2,query2

This query is more complicated, doing a selection (the "where" query command) and projection. It also prints the results in the tokenized format (the "print" command gets added automatically if it wasn't used explicitly, and the default options for it enable the tokenized format).


The tokenized lines come out with the command "t", query name and then the contents of the row. The query name happens to be sent twice, and I'm not sure yet if it's a feature or a bug.

> c1|d,symbol,OP_INSERT,AAA,Absolute Auto Analytics Inc,3.0
c1|d,query1,OP_INSERT,AAA,Absolute Auto Analytics Inc,3
c1|t,query2,query2 OP_INSERT symbol="AAA" eps="3"
> c1|d,symbol,OP_DELETE,DEF,Defense Corp,2.0
c1|d,query1,OP_DELETE,DEF,Defense Corp,2

More examples of the data sent, getting processed by both queries.  In the second case the "where" filters out the row from query2, so only query1 produces the result.

> c1|shutdown
c1|shutdown,,,,
c1|__EOF__

And the shutdown as usual.

Now the "piece de resistance": queries with joins.

> connect c1
c1|ready
> c1|d,symbol,OP_INSERT,ABC,ABC Corp,2.0
> c1|d,symbol,OP_INSERT,DEF,Defense Corp,2.0
> c1|d,symbol,OP_INSERT,AAA,Absolute Auto Analytics Inc,3.0
> c1|d,window,OP_INSERT,1,AAA,12,100

Connect and send some starting data.

> c1|querysub,q1,query1,{read table window}{join table symbol byLeft {symbol} type left}
c1|t,query1,query1 OP_INSERT id="1" symbol="AAA" price="12" size="100" name="Absolute Auto Analytics Inc" eps="3"
c1|querysub,q1,query1

A left join of the tables "window" and "symbol", by the field "symbol" as join condition.

Note that unlike the previous single-threaded TQL examples, the index type path for the table "symbol" is not explicitly specified. It's the result of the new method TableType::findIndexPathForKeys() described before, now the index gets found automatically. And the single-threaded TQL now has this feature too. If you really want, you can still specify the index path but usually there is no need to.

The TQL joins, even in the multithreaded mode, are still implemented internally as LookupJoin, driven only by the main flow of the query. So the changes to the joined dimension tables will not update the query results, and will be visible only when a change on the main flow picks them up, potentially creating inconsistencies in the output. This is wrong, but fixing it presents complexities that I've left alone until some later time.

> c1|d,window,OP_INSERT,2,ABC,13,100
c1|t,query1,query1 OP_INSERT id="2" symbol="ABC" price="13" size="100" name="ABC Corp" eps="2"
> c1|d,window,OP_INSERT,3,AAA,11,200
c1|t,query1,query1 OP_INSERT id="3" symbol="AAA" price="11" size="200" name="Absolute Auto Analytics Inc" eps="3"

Sending data updates the results of the query.

> c1|d,symbol,OP_DELETE,AAA,Absolute Auto Analytics Inc,3.0
> c1|d,symbol,OP_INSERT,AAA,Alcoholic Abstract Aliens,3.0

As described above, the modifications of the dimension table are mot visible in the query directly.

> c1|d,window,OP_DELETE,1
c1|t,query1,query1 OP_DELETE id="1" symbol="AAA" price="12" size="100" name="Alcoholic Abstract Aliens" eps="3"

But an update on the main flow brings them up (an in this case inconsistently, the row getting deleted is not exactly the same as the row inserted before).

> c1|querysub,q2,query2,{read table window}{join table symbol byLeft {symbol} type left}{join table symbol byLeft {eps} type left rightFields {symbol/symbol2}}
c1|t,query2,query2 OP_INSERT id="2" symbol="ABC" price="13" size="100" name="ABC Corp" eps="2" symbol2="ABC"
c1|t,query2,query2 OP_INSERT id="2" symbol="ABC" price="13" size="100" name="ABC Corp" eps="2" symbol2="DEF"
c1|t,query2,query2 OP_INSERT id="3" symbol="AAA" price="11" size="200" name="Alcoholic Abstract Aliens" eps="3" symbol2="AAA"
c1|querysub,q2,query2

This is a more complicated query, involving two joins, with the same dimension table "symbol". The second join by "eps" makes no real-world sense whatsoever but it's interesting from the technical perspective: if you check the table type of this table at the start of the post, you'll find that it has no index on the field "eps". The join adds this index on demand!

The way it works, all the dimension tables are copied into the client's writer thread, created from the table types exported by the core logic throuhg the output nexus. (And if a table is used in the same query twice, it's currently also copied twice). This provides a nice opportunity to amend the table type by adding any necessary secondary index before creating the table, and TQL makes a good use of it.

The details are forthcoming in the next post.

Sunday, May 19, 2013

time and threads and ThreadedClient

I've been making the ThreadedClient more resilient, to handle the unexpected better. First, if it encounters a socket disconnect (__EOF__) while expecting data, it will immediately return and set the error message in $@. Second, there is now a way to specify a timeout for waiting. If the message doesn't get received in time, expect() with a timeout will also return immediately with a message in $@.

As an unrelated change, I've renamed the method protocol () to getTrace(). The word "protocol" creates to much confusion when used around the sockets, "trace" is a more distinct one.

And a mix thereof, the error messages get not only set in $@ but also included into the trace, so it can be detected all together. There is also a separate trace of the error messages only, that can be obtained with getErrorTrace().

There are three ways to specify the timeout. Two of them are in the new():

        my $client = Triceps::X::ThreadedClient->new(
            owner => $owner,
            totalTimeout => $timeout,
        );

        my $client = Triceps::X::ThreadedClient->new(
            owner => $owner,
            timeout => $timeout,
        );


The option "totalTimeout" gives a timeout for the whole run to complete. Once that timeout is reached, all future expect()s just fail immediately. The option "timeout" gives the default timeout for each expect(). It's possible to use both, and each call of expect() will be limited by the shorter time limit of the two (remember, "totalTimeout" starts counting since the call of new (not from startClient!), "timeout" starts counting since the call of expect).

All the timeouts are specified in seconds with fractions, so for 0.1 seconds you just use 0.1.

The third way is to use an extra argument in expect():

$client->expect("c1", "expected text", $timeout);

This timeout completely overrides whatever was set in new(). The value of 0 disables the timeout for this call, and 0 overrides the timeout from new() too, so it can be used for the one-off calls without the timeout.

And now, how it works inside. First thing, the call Triceps::now() returns the current time in seconds since epoch as a floating-point value, including the fractions of the seconds. The Triceps Perl API deals with time in this format.

Then, how to do the timeouts. Remember, if you look for repeatability of computation, you should really use an external time source synchronized with your data. The interface described here is for quick-and-dirty things, like time-limiting the tests, so that the unexpected input would not get the test stuck.

The core part of expect(), after it computes the time limit from the three sources, is this:

  $self->{error} = undef;
  $self->{expectDone} = 0;

  $owner->unit()->makeHashCall($self->{faCtl}->getLabel("msg"), "OP_INSERT",
    cmd => "expect",
    client => $client,
    arg => $pattern,
  );
  $owner->flushWriters();

  if ($limit > 0.) {
    while(!$self->{expectDone} && $owner->nextXtrayTimeLimit($limit)) { }
    # on timeout reset the expect and have that confirmed
    if (!$self->{expectDone}) {
      $owner->unit()->makeHashCall($self->{faCtl}->getLabel("msg"), "OP_INSERT",
        cmd => "cancel",
        client => $client,
      );
      $owner->flushWriters();
      # wait for confirmation
      while(!$self->{expectDone} && $owner->nextXtray()) { }
    }
  } else {
    while(!$self->{expectDone} && $owner->nextXtray()) { }
  }

  $@ = $self->{error};

The expects and inputs get combined in the collector thread. The expect request gets forwarded to it and then the current thread waits for the response. The response gets processed with nextXtray(), which represents one step of main loop. The main loop is literally implemented in C++ like this:

void TrieadOwner::mainLoop()
{
    while (nextXtray())
        { }
    markDead();
}

The nextXtray() takes the next tray from the Triead's read nexuses and processes it. "Xtray" is a special form of the trays used to pass the data across the nexus. It returns true until the thread is requested dead, and then it returns false.

The normal nextXtray() waits until there is more data to process or the thread is requested to die. But there are special forms of it:

nextXtrayNoWait()

Returns immediately if there is no data available at the moment.

nextXtrayTimeLimit($deadline)

Returns if no data becomes available before the absolute deadline.

nextXtrayTimeout($timeout)

Returns if no data becomes available before the timeout, starting at the time of the call. Just to reiterate, the difference is that the nextXtrayTimeLimit() receives the absolute time since the epoch while nextXtrayTimeou() receives the length of the timeout starting from the time of the call, both as seconds in floating-point.

All the versions that may return early on no data return false if they have to do so.

Expect() uses the version with the absolute deadline. If the collector thread finds a match, it will send a rowop back to the expect thread, it will get processed in nextXtrayTimeLimit(), calling a label that sets the flag $self->{expectDone}, and then nextXtrayTimeLimit() will return true, and the loop will find the flag and exit.

If the collector thread doesn't find a match, nextXtrayTimeLimit() will return false, and the loop will again exit. But then it will fill find the "done" flag not set, so it knows that the timeout has expired and it has to tell the controller thread that the call is being cancelled. So it sends another rowop for the cancel, and then waits for the confirmation with another nextXtray(), this time with no limit on it since the confirmation must arrive back quickly in any case.


It's the confirmation rowop processing that sets $self->{error}. But there is always a possibility that the match will arrive just after the timeout has expired but just before the cancellation. It's one of these things that you have to deal with when multiple threads exchange messages. What then? Then the normal confirmation will arrive back to the expecting thread. And when the cancel message will arrive to the collector thread, it will find that this client doesn't have an outstanding expect requests any more, and will just ignore the cancel. Thus, the second nextXtray() will receive either a confirmation of the cancel and set the error message, of it will receive the last-moment success message. Either way it will fall through and return (setting $@ if the cancel confirmation came back).

By the way, if the collector thread finds the socket closed, it will immediately return an error rowop, very similar to the confirmation of the cancel. And it will set both $self->{expectDone} and $self->{error} in the expect thread.

Wednesday, May 1, 2013

a little more of ThreadedClient

Forgot to mention one more method of ThreadedClient, it's used like this:

$client->sendClose("c4", "WR");

This allows to close the client socket (in socket terms, shut it down). The first argument is the client name. The second argument determines, which side of the socket gets closed: "WR" for the writing side, "RD" for the reading side, and "RDWR" for both. It's the same names as for the system call shutdown().

Tuesday, April 30, 2013

ThreadedClient: a Triceps expect

In case if you're not familiar with it, "expect" is a program that allows to connect to the interactive programs and pretend being an interactive user. Obviously, the terminal programs, not the GUI ones. It has originally been done as an extension for Tcl, and later ported as a library for Perl and other languages.

The class Triceps::X::ThreadedClient implements a variety of expect in the Triceps framework. I'm using it for the unit tests of the Triceps servers but it can have other uses as well. Why not just use expect? One reason, I don't like bringing in extra dependencies, especially just for tests, second, it was an interesting exercise, and third, I didn't realize that I'm writing a simplified variety of expect until I had it mostly completed. The biggest simplification is that ThreadedClient works with the complete lines.

It gets used in the unit tests like this: first the server gets started in a background process or thread, and then this port number is used to create the clients. The ThreadedClient gets embedded into a Triceps App, so you can start other things in the same App. Well, the names of the ThreadedClient threads are hardcoded at the moment, so you can start only one copy of it per App, and there could be conflicts if you start your other threads.

But first you need to start an App. I'll do it in yet another way this time:

  Triceps::App::build "client", sub {
    my $appname = $Triceps::App::name;
    my $owner = $Triceps::App::global;

    my $client = Triceps::X::ThreadedClient->new(
      owner => $owner,
      port => $port,
      debug => 0,
    );

    $owner->readyReady();

    $client->startClient("c1");
    $client->expect("c1", '!ready');

    # this repetition
    $client->send("c1", "publish,*,zzzzzz\n");
    $client->send("c1", "publish,*,zzzzzz\n");
    $client->expect("c1", '\*,zzzzzz');
    $client->expect("c1", '\*,zzzzzz');

    $client->startClient("c2");
    $client->expect("c2", '!ready,cliconn2');

    $client->send("c1", "kill,cliconn2\n");
    $client->expect("c2", '__EOF__');

    $client->send("c1", "shutdown\n");
    $client->expect("c1", '__EOF__');
  };


Triceps::App::build() is kind of like Triceps::Triead::startHere() but saves the trouble of parsing the options. The app name is its first argument and the code for the main routine of the first thread is the second argument. That first Triead will run in the current Perl thread. After that the name of the app is placed into the global variable $Triceps::App::name, the App object into Triceps::App::app, and the TrieadOwner into $Triceps::App::global. The name of the first Triead is hardcoded as "global".  After the main function exits, Triceps::App::build runs the harvester, very similar to startHere().

Triceps::X::ThreadedClient->new() is used to start an instance of a client. The option "owner" gives it the current TrieadOwner as a starting point but it will create its own threads starting from this point. The option "port" specifies the default port number for the connections.

The option "debug" is optional, with the default value of 0. In this mode it collects the protocol of the run but otherwise runs silently. Setting the debug to 1 makes it also print the protocol as it gets collected, so if something goes not the way you expected, you can see what it is. Setting the debug to 1 also adds the printouts from the socket-facing threads, so you can also see what goes in and out the client socket.

 After that the clients get started with startClient(). Its first argument is a symbolic name of the client that will be used in the further calls and also in the protocol. The second optional argument is the port number, if you want to use a different port than specified in the new().

After that the data gets sent into the client socket with send(), and the returned lines get parsed with expect(). The usual procedure is that you send something, then expect some response to it. The send lines are included into the protocol with the prefix "> ".

The second argument of expect() is actually a regexp placed inside a string. So to expect a literal special character like '*', prefix it with a backslash and put the string into single quotes, like:

    $client->expect("c1", '\*,zzzzzz');


The expect() keeps reading lines until it finds one matching the regexp. Then all the lines including that one are added to the protocol and the call returns.


There are a couple of special lines generated by the connection status itself rather than coming from the socket. When the socket gets connected, it generates the line "> connected name" in the protocol (but not as an expectable input). When the connection gets dropped, this generates an expectable line "__EOF__". And each line in the protocol gets prefixed with the client name and "|". The example of the chat server run in the previous post was created by the ThreadedClient.


The protocol can be extracted as $client->protocol(). So for unit tests you can check the protocol match as


ok($client->protocol(), $expected);


Internally the ThreadedClient is very much like the chat server, so there is not a whole lot of point in going over it in detail. But feel free to read it on your own.

ThreadedServer, part 2

The next part of the ThreadedServer is listen(), the function that gets called from the listener thread and takes care of accepting the connections and spawning the per-client threads.

sub listen # ($optName => $optValue, ...)
{
    my $myname = "Triceps::X::ThreadedServer::listen";
    my $opts = {};
    my @myOpts = (
        owner => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "Triceps::TrieadOwner") } ],
        socket => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "IO::Socket") } ],
        prefix => [ undef, \&Triceps::Opt::ck_mandatory ],
        handler => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "CODE") } ],
        pass => [ undef, sub { &Triceps::Opt::ck_ref(@_, "ARRAY") } ],
    );
    &Triceps::Opt::parse($myname, $opts, {
        @myOpts,
        '*' => [],
    }, @_);
    my $owner = $opts->{owner};
    my $app = $owner->app();
    my $prefix = $opts->{prefix};
    my $sock = $opts->{socket};

The first part is traditional, the only thing to note is that it also saves the list of options in a variable for the future use (startServer() did that too but I forgot to mention it). And the option "*" is the pass-through: it gets all the unknown options collected in $opts->{"*"}.

    my $clid = 0; # client id

    while(!$owner->isRqDead()) {
        my $client = $sock->accept();
        if (!defined $client) {
            my $err = "$!"; # or th etext message will be reset by isRqDead()
            if ($owner->isRqDead()) {
                last;
            } elsif($!{EAGAIN} || $!{EINTR}) { # numeric codes don't get reset
                next;
            } else {
                confess "$myname: accept failed: $err";
            }
        }

The accept loop starts. It runs until the listening socket gets revoked by shutdown, the revocation is done by dup2() of a file descriptor from /dev/null over it. Note that this function itself doesn't request to track the file descriptor for revocation. That's the caller's responsibility.

Oh, and by the way, if you've been tracing the possible ways of thread execution closely, you might be wondering: what if the shutdown happens after the socket is opened but before it is tracked? This can't really happen to the listening socket but can happen with the accepted client sockets. The answer is that the tracking enrollment will check whether the shutdown already happened, and if so, it will revoke the socket right away, before returning. So the reading loop will find the socket revoked right on its first iteration.

The revocation also sends the signal SIGUSR2 to the thread. This is done because the calls like accept() do not return on a simple revocation by dup2() but a signal does interrupt them. Triceps registers an empty handler for SIGUSR2 that just immediately returns, but the accept() system call gets interrupted, and the thread gets a chance to check for shutdown, and even if it calls accept() again, this will return an error and force it check for shutdown again.

And by the way, the Perl's threads::kill doesn't send a real signal, it just sets a flag for the interpreter. If you try it on your own, it won't interrupt the system calls, and now you know why. Instead Triceps gets the POSIX thread identity from the Perl thread and calls the honest ptherad_kill() from the C++ code.

So, the main loop goes by the shutdown condition, isRdDead() tells if this thread has been requested to die. After accept(), it checks for errors. The first thing to check for is again the isRdDead(), because the revocation will manifest as a socket operation error, and there is no point in reporting this spurious error. However, like other Triceps calls, isRdDead() will clear the error text, and the text has to be saved first. If the shutdown is found, the loop exits. Then the check for the spurious interruptions is done, and for them the loop continues. Funny enough, the $!{} uses the numeric part of $! that is independent from its text part and doesn't get cleared by the Triceps calls. And on any other errors the thread confesses. This will unroll the stack, eventually get caught by the Triceps threading code, abort the App, and propagate the error message to the harvester.

        $clid++;
        my $cliname = "$prefix$clid";
        $app->storeCloseFile($cliname, $client);

        Triceps::Triead::start(
            app => $app->getName(),
            thread => $cliname,
            fragment => $cliname,
            main => $opts->{handler},
            socketName => $cliname,
            &Triceps::Opt::drop({ @myOpts }, \@_),
        );

        # Doesn't wait for the new thread(s) to become ready.
    }
}

If a proper connection has been received, the socket gets stored into the App with a unique name, for later load by the per-client thread. And then the per-client thread gets started.

The drop passes through all the original options except for the ones handled by this thread. In retrospect, this is not the best solution for this case. It would be better to just use @{$opts->{"*"}} instead. The drop call is convenient when not all the explicitly recognized options but only a part of them has to be dropped.

After starting the thread, the loop doesn't call readyReady() but goes for the next iteration. This is basically because it doesn't care about the started thread and doesn't ever send anything to it. And waiting for the threads to start will make the loop slower, possibly overflowing the socket's listening queue and dropping the incoming connections if they arrive very fast.

And the last part of the ThreadedServer is the printOrShut:

sub printOrShut # ($app, $fragment, $sock, @text)
{
    my $app = shift;
    my $fragment = shift;
    my $sock = shift;

    undef $!;
    print $sock @_;
    $sock->flush();

    if ($!) { # can't write, so shutdown
        Triceps::App::shutdownFragment($app, $fragment);
    }
}

Nothing too complicated. Prints the text into the socket, flushes it and checks for errors. On errors shuts down the fragment. In this case there is no need for draining. After all, the socket leading to the client is dead and there is no way to send anything more through it, so there is no point in worrying about any unsent data. Just shut down as fast as it can, before the threads have generated more data that can't be sent any more. Any data queued in the nexuses for the shut down threads will be discarded.


Monday, April 29, 2013

ThreadedServer, part 1

And now I want to show the internals of the ThreadedServer methods. It shows how to store the socket file handles into the App, how the threads are harvested, and how the connections get accepted.

sub startServer # ($optName => $optValue, ...)
{
    my $myname = "Triceps::X::ThreadedServer::startServer";
    my $opts = {};
    my @myOpts = (
        app => [ undef, \&Triceps::Opt::ck_mandatory ],
        thread => [ "global", undef ],
        main => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "CODE") } ],
        port => [ undef, \&Triceps::Opt::ck_mandatory ],
        socketName => [ undef, undef ],
        fork => [ 1, undef ],
    );
    &Triceps::Opt::parse($myname, $opts, {
        @myOpts,
        '*' => [],
    }, @_);

    if (!defined $opts->{socketName}) {
        $opts->{socketName} = $opts->{thread} . ".listen";
    }

    my $srvsock = IO::Socket::INET->new(
        Proto => "tcp",
        LocalPort => $opts->{port},
        Listen => 10,
    ) or confess "$myname: socket creation failed: $!";
    my $port = $srvsock->sockport() or confess "$myname: sockport failed: $!";

So far it's pretty standard: get the options and open the socket for listening.

    if ($opts->{fork} > 0)  {
        my $pid = fork();
        confess "$myname: fork failed: $!" unless defined $pid;
        if ($pid) {
            # parent
            $srvsock->close();
            return ($port, $pid);
        }
        # for the child, fall through
    }

This handles the process forking option: if forking is requested, it executes and then the parent process returns the PID while the child process continues with the rest of the logic. By the way, your success with forking a process that has multiple running threads may vary. The resulting process usually has one running thread (continuing where the thread start called fork() was) but the synchronization primitives in the new process can be inherited in any state, so the attempts to continue the threaded processing are usually not such a good idea. It's usually best to fork first before there are more threads.

    # make the app explicitly, to put the socket into it first
    my $app = Triceps::App::make($opts->{app});
    $app->storeCloseFile($opts->{socketName}, $srvsock);
    Triceps::Triead::start(
        app => $opts->{app},
        thread => $opts->{thread},
        main => $opts->{main},
        socketName => $opts->{socketName},
        &Triceps::Opt::drop({ @myOpts }, \@_),
    );

Then the App gets created. Previously I was showing starting the App with startHere() that created the App and did a bunch of services implicitly. Here everything will be done manually. The listening socket has to be stored into the app before the listener thread gets started, so that the listener thread can find it.

Triceps keeps a global list of all its Apps in the process, and after an App is created, it's placed into that list and can be found by name from any thread. The App object will exist while there are references to it, including the reference from that global list. On the other hand, it's possible to remove the App from the list while it's still running but that's a bad practice because it will break any attempts from its threads to find it by name.
 
So the App is made, then the file handle gets stored. storeCloseFile() gets the file descriptor from the socket, dups it, stores into the App, and then closes the original file handle. All this monkeying with dupping and closing is needed because there is no way to extract the file descriptor from a Perl file handle without the handle trying to close it afterwards. But in result of this roundabout way, the file descriptor gets transferred into the App.

Then the listener thread is started, and as shown before, it's responsible for starting all the other threads. In the meantime, the startServer() continues.

    my $tharvest;
    if ($opts->{fork} < 0) {
        @_ = (); # prevent the Perl object leaks
        $tharvest = threads->create(sub {
            # In case of errors, the Perl's join() will transmit the error
            # message through.
            Triceps::App::find($_[0])->harvester();
        }, $opts->{app}); # app has to be passed by name
    } else {
        $app->harvester();
    }

Then the harvester logic is started. Each App must have its harvester. startHere() runs the harvester implicitly, unless told otherwise, but here the harvester has to be run manually. It can be run either in this thread or in another thread, as determined by the option "fork". If it says to make another thread, $tharvest will contain that thread's identity. A special thing about starting threads with threads->create() is that it's sensitive to anything in @_. If @_ contains anything, it will be leaked (though the more recent versions of Perl should have it fixed). So @_ gets cleared before starting the thread.

And one way or the other, the harvester is started. What does it do? It joins the App's threads as they exit. After all of them exit, it removes the App from the global list of Apps, which will allow to collect the App's memory when the last reference to it is gone, and then the harvester returns.

If any of the threads die, they cause the App to be aborted. The aborted App shuts down immediately and remembers the identity of the failed thread and its error message (only the first message is saved because the abort is likely to cause the other threads to die too, and there is no point in seeing these derivative messages). The harvester, in turn, collects this message from the App, and after all it cleaning-up work is done, dies propagating this message. Then if the harvester is running not in the first thread, that message will be propagated further by Perl's join().

A catch is that the errors are not reported until the harvester completes. Normally all the App's threads should exit immediately when shut down but if they don't, the program will be stuck without any indication of what happened.

It's also possible to disable this propagation of dying by using the option "die_on_abort":

$app->harvester(die_on_abort => 0);

Then the last part:

    if ($opts->{fork} > 0) {
        exit 0; # the forked child process
    }

    return ($port, $tharvest);
}



 If this was the child process forked before, it exits at this point. Otherwise the port and the harvester's thread object are returned.

Multithreaded socket server, part 6, a run example

Hare is a protocol from a run. It has been produced with the automated testing infrastructure that I'll describe later.

As usual, the lines sent from the clients to the socket server are preceded with a "> ". But since there are many clients, to tell them apart, both the sent and received lines are prefixed by the client's name and a "|". I've just picked arbitrary client names to tell them apart.

I've also marked the incoming connection as "> connect client_name", and the disconnections as "__EOF__" after the client name.

So, here we go.

 > connect c1
c1|!ready,cliconn1
> connect c2
c2|!ready,cliconn2

Two clients connect.

 > c1|publish,*,zzzzzz
c1|*,zzzzzz
c2|*,zzzzzz

A message published to the topic "*" gets forwarded to all the connected clients. In reality the messages may of course be received on separate sockets in any order, but I've ordered them here for the ease of reading.

> c2|garbage,trash
c2|!invalid command,garbage,trash

An invalid command gets detected in the writer thread and responded as such.

> c2|subscribe,A
c2|!subscribed,A
> c1|publish,A,xxx
c2|A,xxx

A subscription request gets acknowledged, and after that all the messages sent to this topic get received by the client.

> c1|subscribe,A
c1|!subscribed,A
> c1|publish,A,www
c1|A,www
c2|A,www

If more than one client is subscribed to a topic, all of them get the messages.

> c2|unsubscribe,A
c2|!unsubscribed,A
> c1|publish,A,vvv
c1|A,vvv

The unsubscription makes the client stop receiving messages from this topic.

> connect c3
c3|!ready,cliconn3
> c3|exit
c3|!exiting
c3|__EOF__

The third client connects, immediately requests an exit, gets the confirmation and gets disconnected.

> connect c4
c4|!ready,cliconn4
> close WR c4
c4|!exiting
c4|__EOF__

The fourth client connects and then closes its write side of the socket (that is the read side for the server). It produces the same affect as the exit command.

> c1|shutdown
c1|*,server shutting down
c1|__EOF__
c2|*,server shutting down
c2|__EOF__

And the shutdown command sends the notifications to all the remaining clients and closes the connections.

Multithreaded socket server, part 5, socket writer

The socket writer thread is the last part of the puzzle.

sub chatSockWriteT
{
    my $opts = {};
    &Triceps::Opt::parse("chatSockWriteT", $opts, {@Triceps::Triead::opts,
        socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
        ctlFrom => [ undef, \&Triceps::Opt::ck_mandatory ],
    }, @_);
    undef @_;
    my $owner = $opts->{owner};
    my $app = $owner->app();
    my $tname = $opts->{thread};

    my ($tsock, $sock) = $owner->trackGetSocket($opts->{socketName}, ">");

    my $faChat = $owner->importNexus(
        from => "global/chat",
        import => "reader",
    );

    my $faCtl = $owner->importNexus(
        from => $opts->{ctlFrom},
        import => "reader",
    );

The usual preamble. The trackGetSocket() consumes the socket from the App, and this time reopens it for writing. The previously created nexuses are imported.

    my %topics; # subscribed topics for this thread

    $faChat->getLabel("msg")->makeChained("lbMsg", undef, sub {
        my $row = $_[1]->getRow();
        my $topic = $row->get("topic");
        if ($topic eq "*" || exists $topics{$topic}) {
            printOrShut($app, $opts->{fragment}, $sock, $topic, ",", $row->get("msg"), "\n");
        }
    });

The logic is defined as the connected labels. The topic hash keeps the keys that this thread is subscribed to. When a message is received from the chat nexus and the topic is in the hash or is "*", the message gets sent into the socket in the CSV format:

topic,text

The function printOrShut() is imported from Triceps::X::ThreadedServer. Its first 3 arguments are fixed, and the rest are passed through to print(). It prints the message to the socket file handle, flushes the socket, and in case of any errors it shuts down the fragment specified in its second argument. This way if the socket gets closed from the other side, the threads handling it automatically shut down.

    $faCtl->getLabel("ctl")->makeChained("lbCtl", undef, sub {
        my $row = $_[1]->getRow();
        my ($cmd, $arg) = $row->toArray();
        if ($cmd eq "print") {
            printOrShut($app, $opts->{fragment}, $sock, $arg, "\n");
        } elsif ($cmd eq "subscribe") {
            $topics{$arg} = 1;
            printOrShut($app, $opts->{fragment}, $sock, "!subscribed,$arg\n");
        } elsif ($cmd eq "unsubscribe") {
            delete $topics{$arg};
            printOrShut($app, $opts->{fragment}, $sock, "!unsubscribed,$arg\n");
        } else {
            printOrShut($app, $opts->{fragment}, $sock, "!invalid command,$cmd,$arg\n");
        }
    });

The handling of the control commands is pretty straightforward.

    $owner->readyReady();

    $owner->mainLoop();

    $tsock->close(); # not strictly necessary
}

And the rest is taken care of by the mainLoop(). The thread's main loop runs until the thread gets shut down, by handling the incoming messages. So if say printOrShut() decides to shut down the fragment, the next iteration of the loop will detect it and exit.

Multithreaded socket server, part4, socket reader thread

As I've said before, each socket gets served by two threads: one sits reading from the socket and forwards the data into the model and another one sits getting data from the model and forwards it into the socket. Since the same thread can't wait for both a socket descriptor and a thread synchronization primitive, so they have to be separate.

The first thread started is the socket reader. Let's go through it bit by bit.

sub chatSockReadT
{
    my $opts = {};
    &Triceps::Opt::parse("chatSockReadT", $opts, {@Triceps::Triead::opts,
        socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
    }, @_);
    undef @_; # avoids a leak in threads module
    my $owner = $opts->{owner};
    my $app = $owner->app();
    my $unit = $owner->unit();
    my $tname = $opts->{thread};

    # only dup the socket, the writer thread will consume it
    my ($tsock, $sock) = $owner->trackDupSocket($opts->{socketName}, "<");

The beginning is quite usual. Then it loads the socked from the App and gets it tracked with the TrieadOwner. The difference between trackDupSocket() here and the trackGetSocket() used before is that trackDupSocket() leaves the socked copy in the App, to be found by the writer-side thread.

The socket is reopened in this thread as read-only. The writing to the socket from all the threads has to be synchronized to avoid mixing the half-messages. And the easiest way to synchronize is to always write from one thread, and if the other thread wants to write something, it has to pass the data to the writer thread through the control nexus.

    # user messages will be sent here
    my $faChat = $owner->importNexus(
        from => "global/chat",
        import => "writer",
    );

    # control messages to the reader side will be sent here
    my $faCtl = $owner->makeNexus(
        name => "ctl",
        labels => [
            ctl => $faChat->impRowType("ctl"),
        ],
        reverse => 1, # gives this nexus a high priority
        import => "writer",
    );

Imports the chat nexus and creates the private control nexus for communication with the writer side. The name of the chat nexus is hardcoded hare, since it's pretty much a solid part of the application. If this were a module, the name of the chat nexus could be passed through the options.

The control nexus is marked as reverse even though it really isn't. But the reverse option has a side effect of making this nexus high-priority. Even if the writer thread has a long queue of messages from the chat nexus, the messages from the control nexus will be read first. Which again isn't strictly necessary here, but I wanted to show how it's done.

The type of the control label is imported from the chat nexus, so it doesn't have to be defined from scratch.

    $owner->markConstructed();

    Triceps::Triead::start(
        app => $opts->{app},
        thread => "$tname.rd",
        fragment => $opts->{fragment},
        main => \&chatSockWriteT,
        socketName => $opts->{socketName},
        ctlFrom => "$tname/ctl",
    );

    $owner->readyReady();

Then the construction is done and the writer thread gets started.  And then the thread is ready and waits for the writer thread to be ready too. The readyReady() works in the fragments just as it does at the start of the app. Whenever a new thread is started, the App becomes not ready, and stays this way until all the threads report that they are ready. The rest of the App keeps working like nothing happened, at least sort of. Whenever a nexus is imported, the messages from this nexus start collecting for this thread, and if there are many of them, the nexus will become backed up and the threads writing to them will block. The new threads have to call readyReady() as usual to synchronize between themselves, and then everything gets on its way.

Of course, if two connections are received in a quick succession, that would start two sets of threads, and readyReady() will continue only after all of them are ready.


    my $lbChat = $faChat->getLabel("msg");
    my $lbCtl = $faCtl->getLabel("ctl");

    $unit->makeHashCall($lbCtl, "OP_INSERT", cmd => "print", arg => "!ready," . $opts->{fragment});
    $owner->flushWriters();

A couple of labels get remembered for the future use, and the connection ready message gets sent to the writer thread through the control nexus. By convention of this application, the messages go in the CVS format, with the control messages starting with "!". If this is the first client, this would send

!ready,cliconn1

to the client. It's important to call flushWriters() every time to get the message(s) delivered.

    while(<$sock>) {
        s/[\r\n]+$//;
        my @data = split(/,/);
        if ($data[0] eq "exit") {
            last; # a special case, handle in this thread
        } elsif ($data[0] eq "kill") {
            eval {$app->shutdownFragment($data[1]);};
            if ($@) {
                $unit->makeHashCall($lbCtl, "OP_INSERT", cmd => "print", arg => "!error,$@");
                $owner->flushWriters();
            }
        } elsif ($data[0] eq "shutdown") {
            $unit->makeHashCall($lbChat, "OP_INSERT", topic => "*", msg => "server shutting down");
            $owner->flushWriters();
            Triceps::AutoDrain::makeShared($owner);
            eval {$app->shutdown();};
        } elsif ($data[0] eq "publish") {
            $unit->makeHashCall($lbChat, "OP_INSERT", topic => $data[1], msg => $data[2]);
            $owner->flushWriters();
        } else {
            # this is not something you want to do in a real chat application
            # but it's cute for a demonstration
            $unit->makeHashCall($lbCtl, "OP_INSERT", cmd => $data[0], arg => $data[1]);
            $owner->flushWriters();
        }
    }

The main loop keeps reading lines from the socket and interpreting them. The lines are in CSV format, and the first field is the command and the rest are the arguments (if any).  The commands are:

publish - send a message with a topic to the chat nexus
exit - close the connection
kill - close another connection, by name
shutdown - shut down the server
subscribe - subscribe the client to a topic
unsibscribe - unsubscribe the client from a topic

The exit just exits the loop, since it works the same if the socket just gets closed from the other side.

The kill shuts down by name the fragment where the threads of the other connection belongs. This is a simple application, so it doesn't check any permissions, whether this fragment should allowed to be shut down. If there is no such fragment, the shutdown call will silently do nothing, so the error check and reporting is really redundant (if something goes grossly wrong in the thread interruption code, an error might still occur, but theoretically this should never happen).

The shutdown sends the notification to the common topic "*" (to which all the clients are subscribed by default), then drains the model and shuts it down. The drain makes sure that all the messages in the model get processed (and even written to the socket) without allowing any new messages to be injected. "Shared" means that there is no special exceptions for some threads.

makeShared() actually creates a drain object that keeps the drain active during its lifetime. Here this object is not assigned anywhere, so it gets immediately destroyed and lifts the drain. So potentially more messages can get squeezed in between this point and shutdown. Which doesn't matter a whole lot here.

If it were really important that nothing get sent after the shutdown notification, it could be done like this (this is an untested fragment, so it might contain typos):

        } elsif ($data[0] eq "shutdown") {
            my $drain = Triceps::AutoDrain::makeExclusive($owner);
            $unit->makeHashCall($lbChat, "OP_INSERT", topic => "*", msg => "server shutting down");
            $owner->flushWriters();
            $drain->wait();
            eval {$app->shutdown();};
        }

This starts the drain, but this time the exclusive mode means that this thread is allowed to send more data. When the drain is created, it waits for success, so when the new message is inserted, it will be after all the other messages. $drain->wait() does another wait and makes sure that this last message propagates all the way. And then the app gets shut down, while the drain is still in effect, so no more messages can be sent for sure.

The publish sends the data to the chat nexus (note the flushWriters(), as usual!).

And the rest of commands (that would be subscribe and unsubscribe but you can do any other commands like "print") get simply forwarded to the reader thread for execution. Sending through the commands like this without testing is not a good practice for a real application but it's cute for a demo.

    {
        # let the data drain through
        my $drain = Triceps::AutoDrain::makeExclusive($owner);

        # send the notification - can do it because the drain is excluding itself
        $unit->makeHashCall($lbCtl, "OP_INSERT", cmd => "print", arg => "!exiting");
        $owner->flushWriters();

        $drain->wait(); # wait for the notification to drain

        $app->shutdownFragment($opts->{fragment});
    }

    $tsock->close(); # not strictly necessary
}

The last part is when the connection get closed, either by the "exit" command or when the socket gets closed. Remember, the socket can get closed asymmetrically, in one direction, so even when the reading is closed, the writing may still work and needs to return the responses to any commands received from the socket. And of course the same is true for the "exit" command.

So here the full exclusive drain sequence is used, ending with the shutdown of this thread's own fragment, which will close the socket. Even though only one fragment needs to be shut down, the drain drains the whole model. Because of the potentially complex interdependencies, there is no way to reliably drain only a part, and all the drains are App-wide.

The last part, with $tsock->close(), is not technically necessary since the shutdown of the fragment will get the socket descriptor revoked anyway.  But other than that, it's a good practice that unregisters the socket from the TrieadOwner and then closes it.

Sunday, April 28, 2013

Multithreaded socket server, part 3, the listener thread

The listener thread is:

sub listenerT
{
  my $opts = {};
  &Triceps::Opt::parse("listenerT", $opts, {@Triceps::Triead::opts,
    socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
  }, @_);
  undef @_;
  my $owner = $opts->{owner};

  my ($tsock, $sock) = $owner->trackGetSocket($opts->{socketName}, "+<");

  # a chat text message
  my $rtMsg = Triceps::RowType->new(
    topic => "string",
    msg => "string",
  ) or confess "$!";

  # a control message between the reader and writer threads
  my $rtCtl = Triceps::RowType->new(
    cmd => "string", # the command to execute
    arg => "string", # the command argument
  ) or confess "$!";

  $owner->makeNexus(
    name => "chat",
    labels => [
      msg => $rtMsg,
    ],
    rowTypes => [
      ctl => $rtCtl,
    ],
    import => "none",
  );

  $owner->readyReady();

  Triceps::X::ThreadedServer::listen(
    owner => $owner,
    socket => $sock,
    prefix => "cliconn",
    handler => \&chatSockReadT,
  );
}

It gets the usual options of the thread start (and as usual you can pass more options to the startServer() and they will make their way to the listener thread. But there is also one more option added by startServer(): the socketName.

Since the the socket objects can't be passed directly between the threads, a roundabout way is taken. After startServer() opens a socket, it stores the dupped file descriptor in the App and passes the name used for store through, so that it can be used to load the socket back into another thread:

my ($tsock, $sock) = $owner->trackGetSocket($opts->{socketName}, "+<");


This does multiple things:
  • Loads the file descriptor from the App by name (with a dup()).
  • Opens a socket object from that file descriptor.
  • Registers that file descriptor for tracking with the TrieadOwner, so that if the thread needs to be shut down, that descriptor will be revoked and any further operations with it will fail.
  • Creates a TrackedFile object that will automatically unregister the file descriptor from TrieadOwner when the TrackedFile goes out of scope. This is important to avoid that races between the revocation and the normal close of the file.
  • Makes the App close and forget its file descriptor.

The $tsock returned  is the TrackedFile object, and $sock is the socket filehandle. You can actually get the filehandle directly from the TrackedFile, as $tsock->get(), so  why return the socked separately? As it turns out, Perl has issues with handling the Perl values stored inside the XS objects if they aren't referred by any Perl variables. When the listener thread creates the client handler threads, that would scramble the reference counts. Keeping the socket in the $socket variable prevents that.

The listener thread then creates the row types for the data messages and for the control messages between the client's reader and writer threads, and makes a nexus. The listener is not interested in the data, so it doesn't even import this nexus itself. The nexus passes the data only, so it has no label for the control messages, only the row type.

Then the mandatory readReady(), and then the control goes again to the library that accepts the connections and starts the client connection threads. The handler is the main function for the thread that gets started to handle the connection. The prefix is used to build the names for the new thread, for its fragment, and for the connection socked that gets also passed through by storing it in the App. The name is the same for all three and gets created by concatenating the prefix with a number that gets increased for every connection. The newly created thread will then get the option socketName with the name of the socket.

How does the ThreadedServer::listen() know to return? It runs until the App gets shut down, and returns only when the thread is requested to die as a result of the shutdown.

Multithreaded socket server, part2, starting the server

Let's start with the top-level: how the server gets started. It's really the last part of the code, that brings everything together.

It uses the ThreadedServer infrastructure:

use Triceps::X::ThreadedServer qw(printOrShut);

The X subdirectory is for the examples and experimental stuff, but the ThreadedServer is really of production quality, I just haven't written a whole set of tests for it yet.

The server gets started like this:

  my ($port, $pid) = Triceps::X::ThreadedServer::startServer(
      app => "chat",
      main => \&listenerT,
      port => 0,
      fork => 1,
  );

The port option of 0 means "pick any free port", it will be returned as the result.  If you know the fixed port number in advance, use it. "chat" will be the name of the App, and listenerT is the main function of the thread that will listen for the incoming connections and start the other threads. And it's also the first thread that gets started, so it's responsible for creating the core part of the App as well (though in this case there is not a whole lot of it).

The option "fork" determines how and whether the server gets started in the background. The value 1 means that a new process will be forked, and then the threads will be created there. The returned PID can be used to wait for that process to complete:

waitpid($pid, 0);

Of course, if you're starting a daemon, you'd probably write this PID to a file and then just exit the parent process.

The fork value of 0 starts the server in the current process, and the current thread becomes the server's harvester thread (the one that joins the other threads when the App shuts down).

In this case the server doesn't return until it's done, so there is not much point in the returned port value, by that time the socket is already closed. In this case you really need to either use a fixed port or write the port number to a file from your listener thread. The PID also doesn't make sense, and it's returned as undef. Here is an example of this kind of call:

  my ($port, $pid) = Triceps::X::ThreadedServer::startServer(
      app => "chat",
      main => \&listenerT,
      port => 12345,
      fork => 0,
  );

Finally, the server can be started in the current process, with a new thread created as the App's harvesteri, using the fork option -1. The original thread can then continue and do other things in parallel. It's the way I use for the unit tests.

  my ($port, $thread) = Triceps::X::ThreadedServer::startServer(
      app => "chat",
      main => \&listenerT,
      port => 0,
      fork => -1,
  );

In this case the second value returned is not a PID but the thread object for the harvester thread. You should either detach it or eventually join it:

$thread->join();

Perl propagates the errors in the threads through the join(), so if the harvester thread dies, that would show only in the join() call. And since Triceps propagates the errors too, any other thread dying will cause the harvester thread to die after it joins all the App's threads.

Multithreaded socket server, part 1, dynamic threads overview

As a quick announcement, I've renamed some of the methods described in the last post, and updated the post.

Now, to the new stuff. The threads can be used to run a TCP server that accepts the connections and then starts the new client communication thread(s) for each connection.  This thread can then communicate with the rest of the model, feeding and receiving data, as usual, through the nexuses.

The challenge here is that there must be a way to create the threads dynamically, and later when the client closes connection, to dispose of them. There are two possible general approaches:

  • dynamically create and delete the threads in the same App;
  • create a new App per connection and connect it to the main App.

Both have their own advantages  and difficulties, but the approach with the dynamic creation and deletion of threads ended up looking easier, and that's what Triceps has. The second approach is not particularly well supported yet. You can create multiple Apps in the program, and you can connect them by making two Triceps Trieads run in the same OS thread and ferry the data around. But it's extremely cumbersome. This will be improved in the future, but for now the first approach is the ticket.

The dynamically created threads are grouped into the fragments. This is done by specifying the fragment name option when creating a thread. The threads in a fragment have a few special properties.

One, it's possible to shut down the whole fragment in one fell swoop. There is no user-accessible way to shut down the individual threads, you can shut down either the whole App or a fragment. Shutting down individual threads is dangerous, since it can mess up the application in many non-obvious ways. But shutting down a fragment is OK, since the fragment serves a single logical function, such as service one TCP connection, and it's OK to shut down the whole logical function.

Two, when a thread in the fragment exits, it's really gone, and takes all its nexuses with it. Well, technically, the nexuses continue to exist as long as there are threads connected to them, but no new connections can be created after this point. Since usually the whole fragment will be gone together, and since the nexuses defined by the fragment's thread are normally used only by the other threads of the same fragment, a fragment shutdown cleans up its state like the fragment had never existed. By contrast, when a normal thread exists, the nexuses defined by it stay present and accessible until the App shuts down.

Another, somewhat surprising, challenge is the interaction of the threads and sockets (or file descriptors in general) in Perl.  I've already touched upon it, but there is a lot more.

To show how all this stuff works, I've created an example of a "chat server". It's not really a human-oriented chat, it's more of a machine-oriented publish-subscribe, and specially tilted to work through the running of a socket server with threads.

In this case the core logic is absolutely empty. All there is of it, is a nexus that passes messages through it. The clients read from this nexus to get the messages, and write to this nexus to send the messages.

When the App starts, it has only one thread, the listener thread that listens on a socket for the incoming connections. The listener doesn't even care about the common nexus and doesn't import it. When a connection comes in, the listener creates two threads to serve it: the reader reads the socket and sends to the nexus, and the writer receives from the nexus and writes to the socket. These two threads constitute a fragment for this client. They also create their own private nexus, allowing the reader to send control messages to the writer. That could also have been done through the central common nexus, but I wanted to show that there are different ways of doing things.

With a couple of clients connected, threads and sockets start looking like this:

client1 reader ----> client1 control nexus ------> client1 writer
         \                                           /
          ----------->\          /------------------>
                       chat nexus
          ----------->/          \------------------>
         /                                           \
client2 reader ----> client2 control nexus ------> client2 writer

And the listener thread still stays on the side.