Monday, May 27, 2013

TQL server with multithreading

The next big example I've been talking about is finally ready. It's the adaptation of the TQL to work with the multithreaded server framework. The big reason for this example is the export of the table types through a nexus and creation of tables from them. And we'll get to that, but first let's look at the new abilities of the TQL.

TQL is still not of a production quality, in either single- or multi-threaded variety, and contains a large number of simplifying assumptions in its code. As the single-threaded version works symbiotically with the SimpleServer, the multithreaded version works with the ThreadedServer.

One thread created by the programmer contains the "core logic" of the model. It doesn't technically have to be all in a single thread: the data can be forwarded to the other threads and then the results forwarded back from them. But a single core logic thread is a convenient simplification. This thread has some input labels, to receive data from the outside, and some tables with the computed results that can be read by TQL. Of course, it's entirely realistic to have also just the output labels without tables, sending a stream or computed rowops, but again for simplicity I've left this out for now.

This core logic thread creates a TQL instance, which listens on a socket, accepts the connections, forwards the input data to the core logic, performs queries on the tables from the core logic and sends the results back to the client. To this end, the TQL instance creates a few nexuses in the core logic thread and uses them to communicate between all the fragments. The input labels and tables in the core thread also get properly connected to these nexuses. The following figure shows the thread architecture, I'll use it for the reference throughout the discussion:

Fig. 1. TQL application.

The core logic thread then goes into its main loop and performs as its name says the core logic computations.

Here is a very simple example of a TQL application:

sub appCoreT # (@opts)
{
  my $opts = {};
  &Triceps::Opt::parse("appCoreT", $opts, {@Triceps::Triead::opts,
    socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
  }, @_);
  undef @_; # avoids a leak in threads module
  my $owner = $opts->{owner};
  my $app = $owner->app();
  my $unit = $owner->unit();

  # build the core logic

  my $rtTrade = Triceps::RowType->new(
    id => "int32", # trade unique id
    symbol => "string", # symbol traded
    price => "float64",
    size => "float64", # number of shares traded
  ) or confess "$!";

  my $ttWindow = Triceps::TableType->new($rtTrade)
    ->addSubIndex("byId",
      Triceps::SimpleOrderedIndex->new(id => "ASC")
    )
    or confess "$!";
  $ttWindow->initialize() or confess "$!";

  # Represents the static information about a company.
  my $rtSymbol = Triceps::RowType->new(
    symbol => "string", # symbol name
    name => "string", # the official company name
    eps => "float64", # last quarter earnings per share
  ) or confess "$!";

  my $ttSymbol = Triceps::TableType->new($rtSymbol)
    ->addSubIndex("bySymbol",
      Triceps::SimpleOrderedIndex->new(symbol => "ASC")
    )
    or confess "$!";
  $ttSymbol->initialize() or confess "$!";

  my $tWindow = $unit->makeTable($ttWindow, "EM_CALL", "tWindow")
    or confess "$!";
  my $tSymbol = $unit->makeTable($ttSymbol, "EM_CALL", "tSymbol")
    or confess "$!";

  # export the endpoints for TQL (it starts the listener)
  my $tql = Triceps::X::Tql->new(
    name => "tql",
    trieadOwner => $owner,
    socketName => $opts->{socketName},
    tables => [
      $tWindow,
      $tSymbol,
    ],
    tableNames => [
      "window",
      "symbol",
    ],
    inputs => [
      $tWindow->getInputLabel(),
      $tSymbol->getInputLabel(),
    ],
    inputNames => [
      "window",
      "symbol",
    ],
  );

  $owner->readyReady();

  $owner->mainLoop();
}

{
  my ($port, $thread) = Triceps::X::ThreadedServer::startServer(
      app => "appTql",
      main => \&appCoreT,
      port => 0,
      fork => -1, # create a thread, not a process
  );
}

This core logic is very simple: all it does is create two tables and then send the input data into them. The server gets started in a background thread (fork => -1) because this code is taken from a test that then goes and runs the expect with the SimpleClient.

The specification of inputs and tables for TQL is somewhat ugly but I kept it as it was historic (it was done this way to keep the parsing of the options simpler).  The new options compared to the single-threaded TQL are the "threadOwner", "inputs" and "inputNames". The "threadOwner" is how TQL knows that it must run in the multithreaded mode, and it's used to create the nexuses for communication between the core logic and the rest of TQL. The inputs are needed because the multithreaded TQL parses and forwards the input data, unlike the single-threaded version that relies on the SimpleServer to do that according to the user-defined dispatch table.

The names options don't have to be used: if you name your labels and tables nicely and suitable for the external vieweing, the renaming-for-export can be skipped.

Similar to the single-threaded version, if any of the options "tables" or "inputs" is used, the TQL object gets initialized automatically, otherwise the tables and inputs can be added piecemeal with addTable(), addNamedTable(), addInput(), addNamedInput(), and then the whole thing initialized manually.

Then the clients can establish the connections with the TQL server, send in the data and the queries. To jump in, here is a trace of a simple session that sends some data, then does some table dumps and subscribes, not touching the queries yet. I'll go through it fragment by fragment and explain the meaning. The dumps and subscribes were the warm-up exercises before writing the full queries, but they're useful in their own right, and here they serve as the warm-up exercises for the making of the queries!


> connect c1
c1|ready

The "connect" is not an actual command send but just the indication in the trace that the connection was set up by the client "c1" (it's a trace from the SimpleClient, so it follows the usual conventions). The "ready" response is set when the connection is opened, similar to the chat server shown before.

> c1|subscribe,s1,symbol
c1|subscribe,s1,symbol

This is a subscription request. It means "I'm not interested in the current state of a table but send me all the updates". The response is the mirror of the request, so that the client knows that the request has been processed. "s1" is the unique identifier of the request, so that the client can match together the responses it received to the requests it sent (and keeping the uniqueness is up to the client, the server may refuse the requests with duplicate identifiers).  And "symbol" is the name of the table. Once a subscription is in place, there is no way to unsubscribe other than by disconnecting the client (it's doable but adds complications, and I wanted to skip over the nonessential parts). Subscribing multiple times to the same table will send a confirmation every time but the repeated confirmations will have no effect: only one copy of the data will be sent anyway.

> c1|d,symbol,OP_INSERT,ABC,ABC Corp,1.0
c1|d,symbol,OP_INSERT,ABC,ABC Corp,1

This sends the data into the model. And since it propagates through the subscription, the data gets sent back too. The "symbol" here means two different things: on the input side it's the name of the label where the data is sent, on the output side it's the name of the table that has been subscribed to.

The data lines start with the command "d" (since the data is sent much more frequently than the commands, I've picked a short one-letter "command name" for it), then the label/table name, opcode and the row fields in CSV format.

> c1|confirm,cf1
c1|confirm,cf1,,,

The "confirm" command provides a way for the client to check that the data it send had propagated through the model. And it doesn't have to subscribe back to the data and read them. Send some data lines, then send the "confirm" command and wait for it to come back (again, the unique id allows to keep multiple confirmations in flight if you please). This command doesn't guarantee that all the clients have seen the results from that data. It only guarantees that the core logic had seen the data, and more weakly guarantees that the data has been processed by the core logic, and this particular client had already seen all the results from it.

Why weakly? It has to do with the way it works inside, and it depends on the core logic. If the core logic consists of one thread, the guarantee is quite strong. But if the core logic farms out the work from the main thread to the other threads and then collects the results back, the guarantee breaks.

On the Fig. 1 you can see that unlike the chat server shown before, TQL doesn't have any private nexuses for communication between the reader and writer threads of a client. Instead it relies on the same input and output nexuses, adding a control label to them, to forward the commands from the reader to the writer. The TQL object in the core logic thread creates a short-circuit connection between the control labels in the input and output nexuses, forwarding the commands. And if the core logic all runs in one thread, this creates a natural pipeline: the data comes in, gets processed, comes out, the "confirm" command comes in, comes out after the data. But if the core logic farms out the work to more threads, the confirmation can "jump the line" because its path is a direct short circuit.

> c1|drain,dr1
c1|drain,dr1,,,

The "drain" is an analog of "confirm" but more reliable and slower:  the reader thread drains the whole model before sending the command on. This guarantees that all the processing is done, and all the output from it has been sent to all the clients.

> c1|dump,d2,symbol
c1|startdump,d2,symbol
c1|d,symbol,OP_INSERT,ABC,ABC Corp,1
c1|dump,d2,symbol

The "dump" command dumps the current contents of a table. Its result starts with "startdump", and the same id and table name as in the request, then goes the data (all with OP_INSERT), finishing with the completion confirmation echoing the original command. The dump is atomic, the contents of the table doesn't change in the middle of the dump. However if a subscription on this table is active, the data rows from that subscription may come before and after the dump.

I'm not going to describe the error reporting, but it's worth mentioning that if a command contains errors, its "confirmation" will be an error line with the same identifier.

> c1|dumpsub,ds3,symbol
c1|startdump,ds3,symbol
c1|d,symbol,OP_INSERT,ABC,ABC Corp,1
c1|dumpsub,ds3,symbol

The "dumpsub" command is a combination of a dump and subscribe: get the initial state and then get all the updates.  The confirmation of "dumpsub" marks the boundary between the original dump and the following updates.

> c1|d,symbol,OP_INSERT,DEF,Defense Corp,2.0
c1|d,symbol,OP_INSERT,DEF,Defense Corp,2

Send some more data, and it comes back only once, even though the subscription was done twice: once in "subscribe" and once in "dumpsub". The repeated subscription requests simply get consumed into one subscription.

> c1|d,window,OP_INSERT,1,ABC,101,10

This sends a row to the other table but nothing comes back because there is no subscription to that table.

> c1|dumpsub,ds4,window
c1|startdump,ds4,window
c1|d,window,OP_INSERT,1,ABC,101,10
c1|dumpsub,ds4,window
> c1|d,window,OP_INSERT,2,ABC,102,12
c1|d,window,OP_INSERT,2,ABC,102,12

This demonstrates the pure dump-and-subscribe without any interventions.

> c1|shutdown
c1|shutdown,,,,
c1|__EOF__

And the shutdown command works the same as in the chat server, draning and then shutting down the whole server.

Now on to the queries.

 > connect c1
c1|ready
> c1|d,symbol,OP_INSERT,ABC,ABC Corp,1.0

Starts a client connection and sends some data.


> c1|querysub,q1,query1,{read table symbol}{print tokenized 0}
c1|d,query1,OP_INSERT,ABC,ABC Corp,1
c1|querysub,q1,query1

The "querysub" command does the "query-and-subscribe": reads the initial state of the table, processed through the query, and then subscribes to any future updates. The single-threaded variety of TQL doesn't do this, it does just the one-time queries. The multithreaded TQL could also do the one-time queries, and also just the subscribes without the initial state, but I've been cutting corners for this example and the only thing that's actually available is the combination of two, the "querysub".

"q1" is similar to the other command, the command identifier. The next field "query1" is the name for the query, it's the name that will be shown for the data lines coming out of the query. And then goes the query in the brace-quoted format, same as the single-threaded TQL (and there is no further splitting by commas, so the commas can be used freely in the query).

The identified and the name for the query sound kind of redundant. But the client may generate them in different ways and need both. The name has the more symbolic character. The identifier can be generated as a sequence of numbers,  so that the client can keep track of its progress more easily. And the error reports include the identifier but not the query name in them.

For the query, there is no special line coming out before the initial dump. Supposedly, there would not be more than one query in flight with the same name, so this could be easily told apart based on the name in the data lines. There is also an underlying consideration that when the query involves a join, in the future the initial dump might be happening in multiple chunks, requiring to either surround every chunk with the start-end lines or just let them go without the extra notifications, as they are now.
 
And the initial dump ends as usual with getting the echo of the command (without the query part) back.

This particular query is very simple and equivalent to a "dumpsub".


> c1|d,symbol,OP_INSERT,DEF,Defense Corp,2.0
c1|d,query1,OP_INSERT,DEF,Defense Corp,2

Send more data and it will come out of the query.


> c1|querysub,q2,query2,{read table symbol}{where istrue {$%symbol =~ /^A/}}{project fields {symbol eps}}
c1|t,query2,query2 OP_INSERT symbol="ABC" eps="1"
c1|querysub,q2,query2

This query is more complicated, doing a selection (the "where" query command) and projection. It also prints the results in the tokenized format (the "print" command gets added automatically if it wasn't used explicitly, and the default options for it enable the tokenized format).


The tokenized lines come out with the command "t", query name and then the contents of the row. The query name happens to be sent twice, and I'm not sure yet if it's a feature or a bug.

> c1|d,symbol,OP_INSERT,AAA,Absolute Auto Analytics Inc,3.0
c1|d,query1,OP_INSERT,AAA,Absolute Auto Analytics Inc,3
c1|t,query2,query2 OP_INSERT symbol="AAA" eps="3"
> c1|d,symbol,OP_DELETE,DEF,Defense Corp,2.0
c1|d,query1,OP_DELETE,DEF,Defense Corp,2

More examples of the data sent, getting processed by both queries.  In the second case the "where" filters out the row from query2, so only query1 produces the result.

> c1|shutdown
c1|shutdown,,,,
c1|__EOF__

And the shutdown as usual.

Now the "piece de resistance": queries with joins.

> connect c1
c1|ready
> c1|d,symbol,OP_INSERT,ABC,ABC Corp,2.0
> c1|d,symbol,OP_INSERT,DEF,Defense Corp,2.0
> c1|d,symbol,OP_INSERT,AAA,Absolute Auto Analytics Inc,3.0
> c1|d,window,OP_INSERT,1,AAA,12,100

Connect and send some starting data.

> c1|querysub,q1,query1,{read table window}{join table symbol byLeft {symbol} type left}
c1|t,query1,query1 OP_INSERT id="1" symbol="AAA" price="12" size="100" name="Absolute Auto Analytics Inc" eps="3"
c1|querysub,q1,query1

A left join of the tables "window" and "symbol", by the field "symbol" as join condition.

Note that unlike the previous single-threaded TQL examples, the index type path for the table "symbol" is not explicitly specified. It's the result of the new method TableType::findIndexPathForKeys() described before, now the index gets found automatically. And the single-threaded TQL now has this feature too. If you really want, you can still specify the index path but usually there is no need to.

The TQL joins, even in the multithreaded mode, are still implemented internally as LookupJoin, driven only by the main flow of the query. So the changes to the joined dimension tables will not update the query results, and will be visible only when a change on the main flow picks them up, potentially creating inconsistencies in the output. This is wrong, but fixing it presents complexities that I've left alone until some later time.

> c1|d,window,OP_INSERT,2,ABC,13,100
c1|t,query1,query1 OP_INSERT id="2" symbol="ABC" price="13" size="100" name="ABC Corp" eps="2"
> c1|d,window,OP_INSERT,3,AAA,11,200
c1|t,query1,query1 OP_INSERT id="3" symbol="AAA" price="11" size="200" name="Absolute Auto Analytics Inc" eps="3"

Sending data updates the results of the query.

> c1|d,symbol,OP_DELETE,AAA,Absolute Auto Analytics Inc,3.0
> c1|d,symbol,OP_INSERT,AAA,Alcoholic Abstract Aliens,3.0

As described above, the modifications of the dimension table are mot visible in the query directly.

> c1|d,window,OP_DELETE,1
c1|t,query1,query1 OP_DELETE id="1" symbol="AAA" price="12" size="100" name="Alcoholic Abstract Aliens" eps="3"

But an update on the main flow brings them up (an in this case inconsistently, the row getting deleted is not exactly the same as the row inserted before).

> c1|querysub,q2,query2,{read table window}{join table symbol byLeft {symbol} type left}{join table symbol byLeft {eps} type left rightFields {symbol/symbol2}}
c1|t,query2,query2 OP_INSERT id="2" symbol="ABC" price="13" size="100" name="ABC Corp" eps="2" symbol2="ABC"
c1|t,query2,query2 OP_INSERT id="2" symbol="ABC" price="13" size="100" name="ABC Corp" eps="2" symbol2="DEF"
c1|t,query2,query2 OP_INSERT id="3" symbol="AAA" price="11" size="200" name="Alcoholic Abstract Aliens" eps="3" symbol2="AAA"
c1|querysub,q2,query2

This is a more complicated query, involving two joins, with the same dimension table "symbol". The second join by "eps" makes no real-world sense whatsoever but it's interesting from the technical perspective: if you check the table type of this table at the start of the post, you'll find that it has no index on the field "eps". The join adds this index on demand!

The way it works, all the dimension tables are copied into the client's writer thread, created from the table types exported by the core logic throuhg the output nexus. (And if a table is used in the same query twice, it's currently also copied twice). This provides a nice opportunity to amend the table type by adding any necessary secondary index before creating the table, and TQL makes a good use of it.

The details are forthcoming in the next post.

No comments:

Post a Comment