Friday, May 24, 2013

how to find an index

When working on the next example (still in progress but now getting closer to completion), I've solved one more nagging little problem: finding an index type in the table type by key fields. It makes the joins more convenient: if you know, by which fields you want to join, it's nice to find the correct index automatically. My previous solution to this problem has been to go in the opposite direction: specify the indexes for the join, and find the list of fields automatically from there. But that created a problem with deciding, which field on the left to pair with which on the right, and so either the indexes had to be carefully controlled to have the key fields in the same order, or a separate option still had to be used to specify the pairing of the fields.

Now this problem is solved with the method:

@idxPath = $tableType-> findIndexPathForKeys(@keyFields);

It returns the array that represents the path to an index type that matches these key fields (the index type and all the types in the path still have to be of the Hashed variety). If the correct index can not be found, an empty array is returned. If you specify the fields that aren't present in the row type in the first place, this is simply treated the same as being unable to find an index for these fields.

If more that one index would match, the first one found in the direct order of the index tree walk is returned.

This allowed to change the LookupJoin to make the option  "rightIdxPath" optional: if undefined, the index that matches the key fields specified in the option "by" or "byLeft" will be found automatically. Of course, if such an index doesn't exist, it's still an error, the join could not create it for you yet.

The same way, in the JoinTwo now the options "leftIdxPath" and "rightIdxPath" have become optional if either "by" or "byLeft" is specified. The old way of specifying "leftIdxPath" and "rightIdxPath" without any of the "by" varieties still works too.

And this propagated into the TQL command "join": no more need for the option "rightIdxPath". You can still use it if you want but there is rarely any point to it, only if there are multiple matching indexes and you strongly prefer one of them.

One more indirect fall-out from these changes is the new option to LookupJoin:

  fieldsDropRightKey => 0/1

The default is 0, and if you set it to 1, the logic will automatically exclude from the result row type the key fields from the right side. This is convenient because these fields contain the values that are duplicates of the key fields from the left side anyway. It's what the TQL join was doing all along, and the implementation of this logic becomes simpler when it gets moved directly into LookupJoin.

Sunday, May 19, 2013

time and threads and ThreadedClient

I've been making the ThreadedClient more resilient, to handle the unexpected better. First, if it encounters a socket disconnect (__EOF__) while expecting data, it will immediately return and set the error message in $@. Second, there is now a way to specify a timeout for waiting. If the message doesn't get received in time, expect() with a timeout will also return immediately with a message in $@.

As an unrelated change, I've renamed the method protocol () to getTrace(). The word "protocol" creates to much confusion when used around the sockets, "trace" is a more distinct one.

And a mix thereof, the error messages get not only set in $@ but also included into the trace, so it can be detected all together. There is also a separate trace of the error messages only, that can be obtained with getErrorTrace().

There are three ways to specify the timeout. Two of them are in the new():

        my $client = Triceps::X::ThreadedClient->new(
            owner => $owner,
            totalTimeout => $timeout,
        );

        my $client = Triceps::X::ThreadedClient->new(
            owner => $owner,
            timeout => $timeout,
        );


The option "totalTimeout" gives a timeout for the whole run to complete. Once that timeout is reached, all future expect()s just fail immediately. The option "timeout" gives the default timeout for each expect(). It's possible to use both, and each call of expect() will be limited by the shorter time limit of the two (remember, "totalTimeout" starts counting since the call of new (not from startClient!), "timeout" starts counting since the call of expect).

All the timeouts are specified in seconds with fractions, so for 0.1 seconds you just use 0.1.

The third way is to use an extra argument in expect():

$client->expect("c1", "expected text", $timeout);

This timeout completely overrides whatever was set in new(). The value of 0 disables the timeout for this call, and 0 overrides the timeout from new() too, so it can be used for the one-off calls without the timeout.

And now, how it works inside. First thing, the call Triceps::now() returns the current time in seconds since epoch as a floating-point value, including the fractions of the seconds. The Triceps Perl API deals with time in this format.

Then, how to do the timeouts. Remember, if you look for repeatability of computation, you should really use an external time source synchronized with your data. The interface described here is for quick-and-dirty things, like time-limiting the tests, so that the unexpected input would not get the test stuck.

The core part of expect(), after it computes the time limit from the three sources, is this:

  $self->{error} = undef;
  $self->{expectDone} = 0;

  $owner->unit()->makeHashCall($self->{faCtl}->getLabel("msg"), "OP_INSERT",
    cmd => "expect",
    client => $client,
    arg => $pattern,
  );
  $owner->flushWriters();

  if ($limit > 0.) {
    while(!$self->{expectDone} && $owner->nextXtrayTimeLimit($limit)) { }
    # on timeout reset the expect and have that confirmed
    if (!$self->{expectDone}) {
      $owner->unit()->makeHashCall($self->{faCtl}->getLabel("msg"), "OP_INSERT",
        cmd => "cancel",
        client => $client,
      );
      $owner->flushWriters();
      # wait for confirmation
      while(!$self->{expectDone} && $owner->nextXtray()) { }
    }
  } else {
    while(!$self->{expectDone} && $owner->nextXtray()) { }
  }

  $@ = $self->{error};

The expects and inputs get combined in the collector thread. The expect request gets forwarded to it and then the current thread waits for the response. The response gets processed with nextXtray(), which represents one step of main loop. The main loop is literally implemented in C++ like this:

void TrieadOwner::mainLoop()
{
    while (nextXtray())
        { }
    markDead();
}

The nextXtray() takes the next tray from the Triead's read nexuses and processes it. "Xtray" is a special form of the trays used to pass the data across the nexus. It returns true until the thread is requested dead, and then it returns false.

The normal nextXtray() waits until there is more data to process or the thread is requested to die. But there are special forms of it:

nextXtrayNoWait()

Returns immediately if there is no data available at the moment.

nextXtrayTimeLimit($deadline)

Returns if no data becomes available before the absolute deadline.

nextXtrayTimeout($timeout)

Returns if no data becomes available before the timeout, starting at the time of the call. Just to reiterate, the difference is that the nextXtrayTimeLimit() receives the absolute time since the epoch while nextXtrayTimeou() receives the length of the timeout starting from the time of the call, both as seconds in floating-point.

All the versions that may return early on no data return false if they have to do so.

Expect() uses the version with the absolute deadline. If the collector thread finds a match, it will send a rowop back to the expect thread, it will get processed in nextXtrayTimeLimit(), calling a label that sets the flag $self->{expectDone}, and then nextXtrayTimeLimit() will return true, and the loop will find the flag and exit.

If the collector thread doesn't find a match, nextXtrayTimeLimit() will return false, and the loop will again exit. But then it will fill find the "done" flag not set, so it knows that the timeout has expired and it has to tell the controller thread that the call is being cancelled. So it sends another rowop for the cancel, and then waits for the confirmation with another nextXtray(), this time with no limit on it since the confirmation must arrive back quickly in any case.


It's the confirmation rowop processing that sets $self->{error}. But there is always a possibility that the match will arrive just after the timeout has expired but just before the cancellation. It's one of these things that you have to deal with when multiple threads exchange messages. What then? Then the normal confirmation will arrive back to the expecting thread. And when the cancel message will arrive to the collector thread, it will find that this client doesn't have an outstanding expect requests any more, and will just ignore the cancel. Thus, the second nextXtray() will receive either a confirmation of the cancel and set the error message, of it will receive the last-moment success message. Either way it will fall through and return (setting $@ if the cancel confirmation came back).

By the way, if the collector thread finds the socket closed, it will immediately return an error rowop, very similar to the confirmation of the cancel. And it will set both $self->{expectDone} and $self->{error} in the expect thread.

Saturday, May 11, 2013

on the export of table types

I've made an example for the export of the table types between threads but it didn't come out well. It has turned out to not particularly need this feature, and came out contrived and ugly. I'm working on a better example now, so in the meantime I want to tell more about the subject matter.

As I've said before, the limitation of exporting the table types between the threads is in keeping the involved Perl code snippets as source code. Their support for the sorted and ordered indexes has been already described, and I've also mentioned the aggregators. I've done this support for the basic aggregators too, with a side effect that the fatal errors in both the index and aggregator code snippets are now propagated much more nicely and come out as the table operations confessing. However when it came to the SimpleAggregator, I've found that I can't just do it as-is, the missing piece of the puzzle is the aggregator initialization routine that would run at the table type initialization time. It's a nice feature to have overall but I'm trying to cut a release here and push everything non-essential past it.

Fortunately, some thinking had showed that this feature is not really needed. There usually just isn't any need to export a table type with aggregators. Moreover, there is a need to export the table types with many elements stripped. What is to be stripped and why?

The most central part of the table type is its primary index. It defines how the data gets organized. And then the secondary indexes and aggregators perform the computations from the data in the table. The tables can not be shared between threads, and thus the way to copy a table between the threads is to export the table type and send the data, and let the other thread construct a copy of the table from that. But the table created in another thread really needs only the base data organization. If it does any computations on that data, that would be its own computations, different than the ones in the exporting thread. So all it needs to get is the basic table type with the primary index, very rarely some secondary indexes, and pretty much never the aggregators.

The way to get such a stripped table type with only the fundamentally important parts is:

$tabtype_fundamental = $tabtype->copyFundamental();

That copies the row type and the primary index (the whole path to the first leaf index type) and leaves alone the rest. All the aggregators on all the indexes, even on the primary one, are not included in the copy. In the context of the full nexus making it can look like

$facet = $owner->makeNexus(
    name => "data"
    labels => [ @labels ],
    tableTypes => [
         mytable => $mytable->getType()->copyFundamental(),
    ],
    import => "writer",
);

In case if more index types need to be included, they can be specified by path in the arguments of copyFundamental():

$tabtype_fundamental = $tabtype->copyFundamental(
    [ "byDate", "byAddress", "fifo" ],
    [ "byDate", "byPriority", "fifo" ],


);

The paths may overlap, as shown here, and the matching subtrees will be copied correctly, still properly overlapping in the result. There is also a special syntax:

$tabtype_fundamental = $tabtype->copyFundamental(
    [ "secondary", "+" ],

);

The "+" in the path means "do the path to the first leaf index of that subtree" and saves the necessity to write out the whole path.


Finally, what if you don't want to include the original primary index at all? You can use the string "NO_FIRST_LEAF" as the first argument. That would skip it. You can still include it by using its explicit path, possibly at the other position.


For example, suppose that you have a table type with two top-level indexes, "first" is the primary index and "second" as secondary, and make a copy:


$tabtype_fundamental = $tabtype->copyFundamental(
     "NO_FIRST_LEAF",
    [ "second", "+" ],

    [ "first", "+" ],

);

In the copied table type the index "second" becomes primary and "first" secondary.

Tuesday, May 7, 2013

Label chaining at the front, and label method confessions.

It has been bothering me, how the threaded pipeline example was sensitive to the order of chaining the output facet and the internal logic to the input facet. To get the input data pass through first and only then have the processed data come out, the output facet had to be connected (and thus defined) first, and only then the internal logic could be connected to the input facet.  Things would be much easier if the output facet could be connected at the end, but still put at the front of the chain of the input facet's label. And the FnReturn in general suffers from this problem as well.

So I've added this feature: a way to chain a label, placing it at the front of the chain.

Along the way I've also changed all the Label methods to use the new way of error reporting, confessing on errors. No more need to add "or confess" after that manually (and I've been forgetting to do that properly all over the place).

The new method is:

$label->chainFront($otherLabel);

In C++ this is done slightly differently, by adding an extra argument to chain:

err = label->chain(otherLabel, true);

The second argument has the default value of false, so the method is still backwards-compatible, and you can call either way

err = label->chain(otherLabel);
err = label->chain(otherLabel, false);


to chain a label normally, at the end of the chain. The return value in C++ is still the Erref (though hm, maybe it could use an exception as well).

Having done this, I went and changed the TrieadOwner::makeNexus() and FnReturn::new to chain their labels at the front by default. This can be switched to the old behavior by using a new option:

  chainFront => 0

The default value of this option is 1.

In C++ this is expressed also by an extra argument to FnReturn::addFrontLabel(), that also has a default value, and the default value is true, matching the Perl code. Now when you call

ret = initialize(FnReturn::make(unit, name)
    ->addLabel("lb1", rt1)
    ->addFromLabel("lb2", lbX)
);

or

ret = initialize(FnReturn::make(unit, name)
    ->addLabel("lb1", rt1)
    ->addFromLabel("lb2", lbX, true)
);


you add the FnReturn's label to the front of the lbX's chain. To get the old behavior, use:

ret = initialize(FnReturn::make(unit, name)
    ->addLabel("lb1", rt1)
    ->addFromLabel("lb2", lbX, false)
);


I've changed the default behavior because there would not be many uses for the old one.

I haven't described yet, how the nexuses are created in C++, but they are created from an FnReturn, and thus this change to FnReturn covers them both.

Wednesday, May 1, 2013

a little more of ThreadedClient

Forgot to mention one more method of ThreadedClient, it's used like this:

$client->sendClose("c4", "WR");

This allows to close the client socket (in socket terms, shut it down). The first argument is the client name. The second argument determines, which side of the socket gets closed: "WR" for the writing side, "RD" for the reading side, and "RDWR" for both. It's the same names as for the system call shutdown().

Tuesday, April 30, 2013

ThreadedClient: a Triceps expect

In case if you're not familiar with it, "expect" is a program that allows to connect to the interactive programs and pretend being an interactive user. Obviously, the terminal programs, not the GUI ones. It has originally been done as an extension for Tcl, and later ported as a library for Perl and other languages.

The class Triceps::X::ThreadedClient implements a variety of expect in the Triceps framework. I'm using it for the unit tests of the Triceps servers but it can have other uses as well. Why not just use expect? One reason, I don't like bringing in extra dependencies, especially just for tests, second, it was an interesting exercise, and third, I didn't realize that I'm writing a simplified variety of expect until I had it mostly completed. The biggest simplification is that ThreadedClient works with the complete lines.

It gets used in the unit tests like this: first the server gets started in a background process or thread, and then this port number is used to create the clients. The ThreadedClient gets embedded into a Triceps App, so you can start other things in the same App. Well, the names of the ThreadedClient threads are hardcoded at the moment, so you can start only one copy of it per App, and there could be conflicts if you start your other threads.

But first you need to start an App. I'll do it in yet another way this time:

  Triceps::App::build "client", sub {
    my $appname = $Triceps::App::name;
    my $owner = $Triceps::App::global;

    my $client = Triceps::X::ThreadedClient->new(
      owner => $owner,
      port => $port,
      debug => 0,
    );

    $owner->readyReady();

    $client->startClient("c1");
    $client->expect("c1", '!ready');

    # this repetition
    $client->send("c1", "publish,*,zzzzzz\n");
    $client->send("c1", "publish,*,zzzzzz\n");
    $client->expect("c1", '\*,zzzzzz');
    $client->expect("c1", '\*,zzzzzz');

    $client->startClient("c2");
    $client->expect("c2", '!ready,cliconn2');

    $client->send("c1", "kill,cliconn2\n");
    $client->expect("c2", '__EOF__');

    $client->send("c1", "shutdown\n");
    $client->expect("c1", '__EOF__');
  };


Triceps::App::build() is kind of like Triceps::Triead::startHere() but saves the trouble of parsing the options. The app name is its first argument and the code for the main routine of the first thread is the second argument. That first Triead will run in the current Perl thread. After that the name of the app is placed into the global variable $Triceps::App::name, the App object into Triceps::App::app, and the TrieadOwner into $Triceps::App::global. The name of the first Triead is hardcoded as "global".  After the main function exits, Triceps::App::build runs the harvester, very similar to startHere().

Triceps::X::ThreadedClient->new() is used to start an instance of a client. The option "owner" gives it the current TrieadOwner as a starting point but it will create its own threads starting from this point. The option "port" specifies the default port number for the connections.

The option "debug" is optional, with the default value of 0. In this mode it collects the protocol of the run but otherwise runs silently. Setting the debug to 1 makes it also print the protocol as it gets collected, so if something goes not the way you expected, you can see what it is. Setting the debug to 1 also adds the printouts from the socket-facing threads, so you can also see what goes in and out the client socket.

 After that the clients get started with startClient(). Its first argument is a symbolic name of the client that will be used in the further calls and also in the protocol. The second optional argument is the port number, if you want to use a different port than specified in the new().

After that the data gets sent into the client socket with send(), and the returned lines get parsed with expect(). The usual procedure is that you send something, then expect some response to it. The send lines are included into the protocol with the prefix "> ".

The second argument of expect() is actually a regexp placed inside a string. So to expect a literal special character like '*', prefix it with a backslash and put the string into single quotes, like:

    $client->expect("c1", '\*,zzzzzz');


The expect() keeps reading lines until it finds one matching the regexp. Then all the lines including that one are added to the protocol and the call returns.


There are a couple of special lines generated by the connection status itself rather than coming from the socket. When the socket gets connected, it generates the line "> connected name" in the protocol (but not as an expectable input). When the connection gets dropped, this generates an expectable line "__EOF__". And each line in the protocol gets prefixed with the client name and "|". The example of the chat server run in the previous post was created by the ThreadedClient.


The protocol can be extracted as $client->protocol(). So for unit tests you can check the protocol match as


ok($client->protocol(), $expected);


Internally the ThreadedClient is very much like the chat server, so there is not a whole lot of point in going over it in detail. But feel free to read it on your own.

ThreadedServer, part 2

The next part of the ThreadedServer is listen(), the function that gets called from the listener thread and takes care of accepting the connections and spawning the per-client threads.

sub listen # ($optName => $optValue, ...)
{
    my $myname = "Triceps::X::ThreadedServer::listen";
    my $opts = {};
    my @myOpts = (
        owner => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "Triceps::TrieadOwner") } ],
        socket => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "IO::Socket") } ],
        prefix => [ undef, \&Triceps::Opt::ck_mandatory ],
        handler => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "CODE") } ],
        pass => [ undef, sub { &Triceps::Opt::ck_ref(@_, "ARRAY") } ],
    );
    &Triceps::Opt::parse($myname, $opts, {
        @myOpts,
        '*' => [],
    }, @_);
    my $owner = $opts->{owner};
    my $app = $owner->app();
    my $prefix = $opts->{prefix};
    my $sock = $opts->{socket};

The first part is traditional, the only thing to note is that it also saves the list of options in a variable for the future use (startServer() did that too but I forgot to mention it). And the option "*" is the pass-through: it gets all the unknown options collected in $opts->{"*"}.

    my $clid = 0; # client id

    while(!$owner->isRqDead()) {
        my $client = $sock->accept();
        if (!defined $client) {
            my $err = "$!"; # or th etext message will be reset by isRqDead()
            if ($owner->isRqDead()) {
                last;
            } elsif($!{EAGAIN} || $!{EINTR}) { # numeric codes don't get reset
                next;
            } else {
                confess "$myname: accept failed: $err";
            }
        }

The accept loop starts. It runs until the listening socket gets revoked by shutdown, the revocation is done by dup2() of a file descriptor from /dev/null over it. Note that this function itself doesn't request to track the file descriptor for revocation. That's the caller's responsibility.

Oh, and by the way, if you've been tracing the possible ways of thread execution closely, you might be wondering: what if the shutdown happens after the socket is opened but before it is tracked? This can't really happen to the listening socket but can happen with the accepted client sockets. The answer is that the tracking enrollment will check whether the shutdown already happened, and if so, it will revoke the socket right away, before returning. So the reading loop will find the socket revoked right on its first iteration.

The revocation also sends the signal SIGUSR2 to the thread. This is done because the calls like accept() do not return on a simple revocation by dup2() but a signal does interrupt them. Triceps registers an empty handler for SIGUSR2 that just immediately returns, but the accept() system call gets interrupted, and the thread gets a chance to check for shutdown, and even if it calls accept() again, this will return an error and force it check for shutdown again.

And by the way, the Perl's threads::kill doesn't send a real signal, it just sets a flag for the interpreter. If you try it on your own, it won't interrupt the system calls, and now you know why. Instead Triceps gets the POSIX thread identity from the Perl thread and calls the honest ptherad_kill() from the C++ code.

So, the main loop goes by the shutdown condition, isRdDead() tells if this thread has been requested to die. After accept(), it checks for errors. The first thing to check for is again the isRdDead(), because the revocation will manifest as a socket operation error, and there is no point in reporting this spurious error. However, like other Triceps calls, isRdDead() will clear the error text, and the text has to be saved first. If the shutdown is found, the loop exits. Then the check for the spurious interruptions is done, and for them the loop continues. Funny enough, the $!{} uses the numeric part of $! that is independent from its text part and doesn't get cleared by the Triceps calls. And on any other errors the thread confesses. This will unroll the stack, eventually get caught by the Triceps threading code, abort the App, and propagate the error message to the harvester.

        $clid++;
        my $cliname = "$prefix$clid";
        $app->storeCloseFile($cliname, $client);

        Triceps::Triead::start(
            app => $app->getName(),
            thread => $cliname,
            fragment => $cliname,
            main => $opts->{handler},
            socketName => $cliname,
            &Triceps::Opt::drop({ @myOpts }, \@_),
        );

        # Doesn't wait for the new thread(s) to become ready.
    }
}

If a proper connection has been received, the socket gets stored into the App with a unique name, for later load by the per-client thread. And then the per-client thread gets started.

The drop passes through all the original options except for the ones handled by this thread. In retrospect, this is not the best solution for this case. It would be better to just use @{$opts->{"*"}} instead. The drop call is convenient when not all the explicitly recognized options but only a part of them has to be dropped.

After starting the thread, the loop doesn't call readyReady() but goes for the next iteration. This is basically because it doesn't care about the started thread and doesn't ever send anything to it. And waiting for the threads to start will make the loop slower, possibly overflowing the socket's listening queue and dropping the incoming connections if they arrive very fast.

And the last part of the ThreadedServer is the printOrShut:

sub printOrShut # ($app, $fragment, $sock, @text)
{
    my $app = shift;
    my $fragment = shift;
    my $sock = shift;

    undef $!;
    print $sock @_;
    $sock->flush();

    if ($!) { # can't write, so shutdown
        Triceps::App::shutdownFragment($app, $fragment);
    }
}

Nothing too complicated. Prints the text into the socket, flushes it and checks for errors. On errors shuts down the fragment. In this case there is no need for draining. After all, the socket leading to the client is dead and there is no way to send anything more through it, so there is no point in worrying about any unsent data. Just shut down as fast as it can, before the threads have generated more data that can't be sent any more. Any data queued in the nexuses for the shut down threads will be discarded.


Monday, April 29, 2013

ThreadedServer, part 1

And now I want to show the internals of the ThreadedServer methods. It shows how to store the socket file handles into the App, how the threads are harvested, and how the connections get accepted.

sub startServer # ($optName => $optValue, ...)
{
    my $myname = "Triceps::X::ThreadedServer::startServer";
    my $opts = {};
    my @myOpts = (
        app => [ undef, \&Triceps::Opt::ck_mandatory ],
        thread => [ "global", undef ],
        main => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "CODE") } ],
        port => [ undef, \&Triceps::Opt::ck_mandatory ],
        socketName => [ undef, undef ],
        fork => [ 1, undef ],
    );
    &Triceps::Opt::parse($myname, $opts, {
        @myOpts,
        '*' => [],
    }, @_);

    if (!defined $opts->{socketName}) {
        $opts->{socketName} = $opts->{thread} . ".listen";
    }

    my $srvsock = IO::Socket::INET->new(
        Proto => "tcp",
        LocalPort => $opts->{port},
        Listen => 10,
    ) or confess "$myname: socket creation failed: $!";
    my $port = $srvsock->sockport() or confess "$myname: sockport failed: $!";

So far it's pretty standard: get the options and open the socket for listening.

    if ($opts->{fork} > 0)  {
        my $pid = fork();
        confess "$myname: fork failed: $!" unless defined $pid;
        if ($pid) {
            # parent
            $srvsock->close();
            return ($port, $pid);
        }
        # for the child, fall through
    }

This handles the process forking option: if forking is requested, it executes and then the parent process returns the PID while the child process continues with the rest of the logic. By the way, your success with forking a process that has multiple running threads may vary. The resulting process usually has one running thread (continuing where the thread start called fork() was) but the synchronization primitives in the new process can be inherited in any state, so the attempts to continue the threaded processing are usually not such a good idea. It's usually best to fork first before there are more threads.

    # make the app explicitly, to put the socket into it first
    my $app = Triceps::App::make($opts->{app});
    $app->storeCloseFile($opts->{socketName}, $srvsock);
    Triceps::Triead::start(
        app => $opts->{app},
        thread => $opts->{thread},
        main => $opts->{main},
        socketName => $opts->{socketName},
        &Triceps::Opt::drop({ @myOpts }, \@_),
    );

Then the App gets created. Previously I was showing starting the App with startHere() that created the App and did a bunch of services implicitly. Here everything will be done manually. The listening socket has to be stored into the app before the listener thread gets started, so that the listener thread can find it.

Triceps keeps a global list of all its Apps in the process, and after an App is created, it's placed into that list and can be found by name from any thread. The App object will exist while there are references to it, including the reference from that global list. On the other hand, it's possible to remove the App from the list while it's still running but that's a bad practice because it will break any attempts from its threads to find it by name.
 
So the App is made, then the file handle gets stored. storeCloseFile() gets the file descriptor from the socket, dups it, stores into the App, and then closes the original file handle. All this monkeying with dupping and closing is needed because there is no way to extract the file descriptor from a Perl file handle without the handle trying to close it afterwards. But in result of this roundabout way, the file descriptor gets transferred into the App.

Then the listener thread is started, and as shown before, it's responsible for starting all the other threads. In the meantime, the startServer() continues.

    my $tharvest;
    if ($opts->{fork} < 0) {
        @_ = (); # prevent the Perl object leaks
        $tharvest = threads->create(sub {
            # In case of errors, the Perl's join() will transmit the error
            # message through.
            Triceps::App::find($_[0])->harvester();
        }, $opts->{app}); # app has to be passed by name
    } else {
        $app->harvester();
    }

Then the harvester logic is started. Each App must have its harvester. startHere() runs the harvester implicitly, unless told otherwise, but here the harvester has to be run manually. It can be run either in this thread or in another thread, as determined by the option "fork". If it says to make another thread, $tharvest will contain that thread's identity. A special thing about starting threads with threads->create() is that it's sensitive to anything in @_. If @_ contains anything, it will be leaked (though the more recent versions of Perl should have it fixed). So @_ gets cleared before starting the thread.

And one way or the other, the harvester is started. What does it do? It joins the App's threads as they exit. After all of them exit, it removes the App from the global list of Apps, which will allow to collect the App's memory when the last reference to it is gone, and then the harvester returns.

If any of the threads die, they cause the App to be aborted. The aborted App shuts down immediately and remembers the identity of the failed thread and its error message (only the first message is saved because the abort is likely to cause the other threads to die too, and there is no point in seeing these derivative messages). The harvester, in turn, collects this message from the App, and after all it cleaning-up work is done, dies propagating this message. Then if the harvester is running not in the first thread, that message will be propagated further by Perl's join().

A catch is that the errors are not reported until the harvester completes. Normally all the App's threads should exit immediately when shut down but if they don't, the program will be stuck without any indication of what happened.

It's also possible to disable this propagation of dying by using the option "die_on_abort":

$app->harvester(die_on_abort => 0);

Then the last part:

    if ($opts->{fork} > 0) {
        exit 0; # the forked child process
    }

    return ($port, $tharvest);
}



 If this was the child process forked before, it exits at this point. Otherwise the port and the harvester's thread object are returned.

Multithreaded socket server, part 6, a run example

Hare is a protocol from a run. It has been produced with the automated testing infrastructure that I'll describe later.

As usual, the lines sent from the clients to the socket server are preceded with a "> ". But since there are many clients, to tell them apart, both the sent and received lines are prefixed by the client's name and a "|". I've just picked arbitrary client names to tell them apart.

I've also marked the incoming connection as "> connect client_name", and the disconnections as "__EOF__" after the client name.

So, here we go.

 > connect c1
c1|!ready,cliconn1
> connect c2
c2|!ready,cliconn2

Two clients connect.

 > c1|publish,*,zzzzzz
c1|*,zzzzzz
c2|*,zzzzzz

A message published to the topic "*" gets forwarded to all the connected clients. In reality the messages may of course be received on separate sockets in any order, but I've ordered them here for the ease of reading.

> c2|garbage,trash
c2|!invalid command,garbage,trash

An invalid command gets detected in the writer thread and responded as such.

> c2|subscribe,A
c2|!subscribed,A
> c1|publish,A,xxx
c2|A,xxx

A subscription request gets acknowledged, and after that all the messages sent to this topic get received by the client.

> c1|subscribe,A
c1|!subscribed,A
> c1|publish,A,www
c1|A,www
c2|A,www

If more than one client is subscribed to a topic, all of them get the messages.

> c2|unsubscribe,A
c2|!unsubscribed,A
> c1|publish,A,vvv
c1|A,vvv

The unsubscription makes the client stop receiving messages from this topic.

> connect c3
c3|!ready,cliconn3
> c3|exit
c3|!exiting
c3|__EOF__

The third client connects, immediately requests an exit, gets the confirmation and gets disconnected.

> connect c4
c4|!ready,cliconn4
> close WR c4
c4|!exiting
c4|__EOF__

The fourth client connects and then closes its write side of the socket (that is the read side for the server). It produces the same affect as the exit command.

> c1|shutdown
c1|*,server shutting down
c1|__EOF__
c2|*,server shutting down
c2|__EOF__

And the shutdown command sends the notifications to all the remaining clients and closes the connections.

Multithreaded socket server, part 5, socket writer

The socket writer thread is the last part of the puzzle.

sub chatSockWriteT
{
    my $opts = {};
    &Triceps::Opt::parse("chatSockWriteT", $opts, {@Triceps::Triead::opts,
        socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
        ctlFrom => [ undef, \&Triceps::Opt::ck_mandatory ],
    }, @_);
    undef @_;
    my $owner = $opts->{owner};
    my $app = $owner->app();
    my $tname = $opts->{thread};

    my ($tsock, $sock) = $owner->trackGetSocket($opts->{socketName}, ">");

    my $faChat = $owner->importNexus(
        from => "global/chat",
        import => "reader",
    );

    my $faCtl = $owner->importNexus(
        from => $opts->{ctlFrom},
        import => "reader",
    );

The usual preamble. The trackGetSocket() consumes the socket from the App, and this time reopens it for writing. The previously created nexuses are imported.

    my %topics; # subscribed topics for this thread

    $faChat->getLabel("msg")->makeChained("lbMsg", undef, sub {
        my $row = $_[1]->getRow();
        my $topic = $row->get("topic");
        if ($topic eq "*" || exists $topics{$topic}) {
            printOrShut($app, $opts->{fragment}, $sock, $topic, ",", $row->get("msg"), "\n");
        }
    });

The logic is defined as the connected labels. The topic hash keeps the keys that this thread is subscribed to. When a message is received from the chat nexus and the topic is in the hash or is "*", the message gets sent into the socket in the CSV format:

topic,text

The function printOrShut() is imported from Triceps::X::ThreadedServer. Its first 3 arguments are fixed, and the rest are passed through to print(). It prints the message to the socket file handle, flushes the socket, and in case of any errors it shuts down the fragment specified in its second argument. This way if the socket gets closed from the other side, the threads handling it automatically shut down.

    $faCtl->getLabel("ctl")->makeChained("lbCtl", undef, sub {
        my $row = $_[1]->getRow();
        my ($cmd, $arg) = $row->toArray();
        if ($cmd eq "print") {
            printOrShut($app, $opts->{fragment}, $sock, $arg, "\n");
        } elsif ($cmd eq "subscribe") {
            $topics{$arg} = 1;
            printOrShut($app, $opts->{fragment}, $sock, "!subscribed,$arg\n");
        } elsif ($cmd eq "unsubscribe") {
            delete $topics{$arg};
            printOrShut($app, $opts->{fragment}, $sock, "!unsubscribed,$arg\n");
        } else {
            printOrShut($app, $opts->{fragment}, $sock, "!invalid command,$cmd,$arg\n");
        }
    });

The handling of the control commands is pretty straightforward.

    $owner->readyReady();

    $owner->mainLoop();

    $tsock->close(); # not strictly necessary
}

And the rest is taken care of by the mainLoop(). The thread's main loop runs until the thread gets shut down, by handling the incoming messages. So if say printOrShut() decides to shut down the fragment, the next iteration of the loop will detect it and exit.

Multithreaded socket server, part4, socket reader thread

As I've said before, each socket gets served by two threads: one sits reading from the socket and forwards the data into the model and another one sits getting data from the model and forwards it into the socket. Since the same thread can't wait for both a socket descriptor and a thread synchronization primitive, so they have to be separate.

The first thread started is the socket reader. Let's go through it bit by bit.

sub chatSockReadT
{
    my $opts = {};
    &Triceps::Opt::parse("chatSockReadT", $opts, {@Triceps::Triead::opts,
        socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
    }, @_);
    undef @_; # avoids a leak in threads module
    my $owner = $opts->{owner};
    my $app = $owner->app();
    my $unit = $owner->unit();
    my $tname = $opts->{thread};

    # only dup the socket, the writer thread will consume it
    my ($tsock, $sock) = $owner->trackDupSocket($opts->{socketName}, "<");

The beginning is quite usual. Then it loads the socked from the App and gets it tracked with the TrieadOwner. The difference between trackDupSocket() here and the trackGetSocket() used before is that trackDupSocket() leaves the socked copy in the App, to be found by the writer-side thread.

The socket is reopened in this thread as read-only. The writing to the socket from all the threads has to be synchronized to avoid mixing the half-messages. And the easiest way to synchronize is to always write from one thread, and if the other thread wants to write something, it has to pass the data to the writer thread through the control nexus.

    # user messages will be sent here
    my $faChat = $owner->importNexus(
        from => "global/chat",
        import => "writer",
    );

    # control messages to the reader side will be sent here
    my $faCtl = $owner->makeNexus(
        name => "ctl",
        labels => [
            ctl => $faChat->impRowType("ctl"),
        ],
        reverse => 1, # gives this nexus a high priority
        import => "writer",
    );

Imports the chat nexus and creates the private control nexus for communication with the writer side. The name of the chat nexus is hardcoded hare, since it's pretty much a solid part of the application. If this were a module, the name of the chat nexus could be passed through the options.

The control nexus is marked as reverse even though it really isn't. But the reverse option has a side effect of making this nexus high-priority. Even if the writer thread has a long queue of messages from the chat nexus, the messages from the control nexus will be read first. Which again isn't strictly necessary here, but I wanted to show how it's done.

The type of the control label is imported from the chat nexus, so it doesn't have to be defined from scratch.

    $owner->markConstructed();

    Triceps::Triead::start(
        app => $opts->{app},
        thread => "$tname.rd",
        fragment => $opts->{fragment},
        main => \&chatSockWriteT,
        socketName => $opts->{socketName},
        ctlFrom => "$tname/ctl",
    );

    $owner->readyReady();

Then the construction is done and the writer thread gets started.  And then the thread is ready and waits for the writer thread to be ready too. The readyReady() works in the fragments just as it does at the start of the app. Whenever a new thread is started, the App becomes not ready, and stays this way until all the threads report that they are ready. The rest of the App keeps working like nothing happened, at least sort of. Whenever a nexus is imported, the messages from this nexus start collecting for this thread, and if there are many of them, the nexus will become backed up and the threads writing to them will block. The new threads have to call readyReady() as usual to synchronize between themselves, and then everything gets on its way.

Of course, if two connections are received in a quick succession, that would start two sets of threads, and readyReady() will continue only after all of them are ready.


    my $lbChat = $faChat->getLabel("msg");
    my $lbCtl = $faCtl->getLabel("ctl");

    $unit->makeHashCall($lbCtl, "OP_INSERT", cmd => "print", arg => "!ready," . $opts->{fragment});
    $owner->flushWriters();

A couple of labels get remembered for the future use, and the connection ready message gets sent to the writer thread through the control nexus. By convention of this application, the messages go in the CVS format, with the control messages starting with "!". If this is the first client, this would send

!ready,cliconn1

to the client. It's important to call flushWriters() every time to get the message(s) delivered.

    while(<$sock>) {
        s/[\r\n]+$//;
        my @data = split(/,/);
        if ($data[0] eq "exit") {
            last; # a special case, handle in this thread
        } elsif ($data[0] eq "kill") {
            eval {$app->shutdownFragment($data[1]);};
            if ($@) {
                $unit->makeHashCall($lbCtl, "OP_INSERT", cmd => "print", arg => "!error,$@");
                $owner->flushWriters();
            }
        } elsif ($data[0] eq "shutdown") {
            $unit->makeHashCall($lbChat, "OP_INSERT", topic => "*", msg => "server shutting down");
            $owner->flushWriters();
            Triceps::AutoDrain::makeShared($owner);
            eval {$app->shutdown();};
        } elsif ($data[0] eq "publish") {
            $unit->makeHashCall($lbChat, "OP_INSERT", topic => $data[1], msg => $data[2]);
            $owner->flushWriters();
        } else {
            # this is not something you want to do in a real chat application
            # but it's cute for a demonstration
            $unit->makeHashCall($lbCtl, "OP_INSERT", cmd => $data[0], arg => $data[1]);
            $owner->flushWriters();
        }
    }

The main loop keeps reading lines from the socket and interpreting them. The lines are in CSV format, and the first field is the command and the rest are the arguments (if any).  The commands are:

publish - send a message with a topic to the chat nexus
exit - close the connection
kill - close another connection, by name
shutdown - shut down the server
subscribe - subscribe the client to a topic
unsibscribe - unsubscribe the client from a topic

The exit just exits the loop, since it works the same if the socket just gets closed from the other side.

The kill shuts down by name the fragment where the threads of the other connection belongs. This is a simple application, so it doesn't check any permissions, whether this fragment should allowed to be shut down. If there is no such fragment, the shutdown call will silently do nothing, so the error check and reporting is really redundant (if something goes grossly wrong in the thread interruption code, an error might still occur, but theoretically this should never happen).

The shutdown sends the notification to the common topic "*" (to which all the clients are subscribed by default), then drains the model and shuts it down. The drain makes sure that all the messages in the model get processed (and even written to the socket) without allowing any new messages to be injected. "Shared" means that there is no special exceptions for some threads.

makeShared() actually creates a drain object that keeps the drain active during its lifetime. Here this object is not assigned anywhere, so it gets immediately destroyed and lifts the drain. So potentially more messages can get squeezed in between this point and shutdown. Which doesn't matter a whole lot here.

If it were really important that nothing get sent after the shutdown notification, it could be done like this (this is an untested fragment, so it might contain typos):

        } elsif ($data[0] eq "shutdown") {
            my $drain = Triceps::AutoDrain::makeExclusive($owner);
            $unit->makeHashCall($lbChat, "OP_INSERT", topic => "*", msg => "server shutting down");
            $owner->flushWriters();
            $drain->wait();
            eval {$app->shutdown();};
        }

This starts the drain, but this time the exclusive mode means that this thread is allowed to send more data. When the drain is created, it waits for success, so when the new message is inserted, it will be after all the other messages. $drain->wait() does another wait and makes sure that this last message propagates all the way. And then the app gets shut down, while the drain is still in effect, so no more messages can be sent for sure.

The publish sends the data to the chat nexus (note the flushWriters(), as usual!).

And the rest of commands (that would be subscribe and unsubscribe but you can do any other commands like "print") get simply forwarded to the reader thread for execution. Sending through the commands like this without testing is not a good practice for a real application but it's cute for a demo.

    {
        # let the data drain through
        my $drain = Triceps::AutoDrain::makeExclusive($owner);

        # send the notification - can do it because the drain is excluding itself
        $unit->makeHashCall($lbCtl, "OP_INSERT", cmd => "print", arg => "!exiting");
        $owner->flushWriters();

        $drain->wait(); # wait for the notification to drain

        $app->shutdownFragment($opts->{fragment});
    }

    $tsock->close(); # not strictly necessary
}

The last part is when the connection get closed, either by the "exit" command or when the socket gets closed. Remember, the socket can get closed asymmetrically, in one direction, so even when the reading is closed, the writing may still work and needs to return the responses to any commands received from the socket. And of course the same is true for the "exit" command.

So here the full exclusive drain sequence is used, ending with the shutdown of this thread's own fragment, which will close the socket. Even though only one fragment needs to be shut down, the drain drains the whole model. Because of the potentially complex interdependencies, there is no way to reliably drain only a part, and all the drains are App-wide.

The last part, with $tsock->close(), is not technically necessary since the shutdown of the fragment will get the socket descriptor revoked anyway.  But other than that, it's a good practice that unregisters the socket from the TrieadOwner and then closes it.

Sunday, April 28, 2013

Multithreaded socket server, part 3, the listener thread

The listener thread is:

sub listenerT
{
  my $opts = {};
  &Triceps::Opt::parse("listenerT", $opts, {@Triceps::Triead::opts,
    socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
  }, @_);
  undef @_;
  my $owner = $opts->{owner};

  my ($tsock, $sock) = $owner->trackGetSocket($opts->{socketName}, "+<");

  # a chat text message
  my $rtMsg = Triceps::RowType->new(
    topic => "string",
    msg => "string",
  ) or confess "$!";

  # a control message between the reader and writer threads
  my $rtCtl = Triceps::RowType->new(
    cmd => "string", # the command to execute
    arg => "string", # the command argument
  ) or confess "$!";

  $owner->makeNexus(
    name => "chat",
    labels => [
      msg => $rtMsg,
    ],
    rowTypes => [
      ctl => $rtCtl,
    ],
    import => "none",
  );

  $owner->readyReady();

  Triceps::X::ThreadedServer::listen(
    owner => $owner,
    socket => $sock,
    prefix => "cliconn",
    handler => \&chatSockReadT,
  );
}

It gets the usual options of the thread start (and as usual you can pass more options to the startServer() and they will make their way to the listener thread. But there is also one more option added by startServer(): the socketName.

Since the the socket objects can't be passed directly between the threads, a roundabout way is taken. After startServer() opens a socket, it stores the dupped file descriptor in the App and passes the name used for store through, so that it can be used to load the socket back into another thread:

my ($tsock, $sock) = $owner->trackGetSocket($opts->{socketName}, "+<");


This does multiple things:
  • Loads the file descriptor from the App by name (with a dup()).
  • Opens a socket object from that file descriptor.
  • Registers that file descriptor for tracking with the TrieadOwner, so that if the thread needs to be shut down, that descriptor will be revoked and any further operations with it will fail.
  • Creates a TrackedFile object that will automatically unregister the file descriptor from TrieadOwner when the TrackedFile goes out of scope. This is important to avoid that races between the revocation and the normal close of the file.
  • Makes the App close and forget its file descriptor.

The $tsock returned  is the TrackedFile object, and $sock is the socket filehandle. You can actually get the filehandle directly from the TrackedFile, as $tsock->get(), so  why return the socked separately? As it turns out, Perl has issues with handling the Perl values stored inside the XS objects if they aren't referred by any Perl variables. When the listener thread creates the client handler threads, that would scramble the reference counts. Keeping the socket in the $socket variable prevents that.

The listener thread then creates the row types for the data messages and for the control messages between the client's reader and writer threads, and makes a nexus. The listener is not interested in the data, so it doesn't even import this nexus itself. The nexus passes the data only, so it has no label for the control messages, only the row type.

Then the mandatory readReady(), and then the control goes again to the library that accepts the connections and starts the client connection threads. The handler is the main function for the thread that gets started to handle the connection. The prefix is used to build the names for the new thread, for its fragment, and for the connection socked that gets also passed through by storing it in the App. The name is the same for all three and gets created by concatenating the prefix with a number that gets increased for every connection. The newly created thread will then get the option socketName with the name of the socket.

How does the ThreadedServer::listen() know to return? It runs until the App gets shut down, and returns only when the thread is requested to die as a result of the shutdown.

Multithreaded socket server, part2, starting the server

Let's start with the top-level: how the server gets started. It's really the last part of the code, that brings everything together.

It uses the ThreadedServer infrastructure:

use Triceps::X::ThreadedServer qw(printOrShut);

The X subdirectory is for the examples and experimental stuff, but the ThreadedServer is really of production quality, I just haven't written a whole set of tests for it yet.

The server gets started like this:

  my ($port, $pid) = Triceps::X::ThreadedServer::startServer(
      app => "chat",
      main => \&listenerT,
      port => 0,
      fork => 1,
  );

The port option of 0 means "pick any free port", it will be returned as the result.  If you know the fixed port number in advance, use it. "chat" will be the name of the App, and listenerT is the main function of the thread that will listen for the incoming connections and start the other threads. And it's also the first thread that gets started, so it's responsible for creating the core part of the App as well (though in this case there is not a whole lot of it).

The option "fork" determines how and whether the server gets started in the background. The value 1 means that a new process will be forked, and then the threads will be created there. The returned PID can be used to wait for that process to complete:

waitpid($pid, 0);

Of course, if you're starting a daemon, you'd probably write this PID to a file and then just exit the parent process.

The fork value of 0 starts the server in the current process, and the current thread becomes the server's harvester thread (the one that joins the other threads when the App shuts down).

In this case the server doesn't return until it's done, so there is not much point in the returned port value, by that time the socket is already closed. In this case you really need to either use a fixed port or write the port number to a file from your listener thread. The PID also doesn't make sense, and it's returned as undef. Here is an example of this kind of call:

  my ($port, $pid) = Triceps::X::ThreadedServer::startServer(
      app => "chat",
      main => \&listenerT,
      port => 12345,
      fork => 0,
  );

Finally, the server can be started in the current process, with a new thread created as the App's harvesteri, using the fork option -1. The original thread can then continue and do other things in parallel. It's the way I use for the unit tests.

  my ($port, $thread) = Triceps::X::ThreadedServer::startServer(
      app => "chat",
      main => \&listenerT,
      port => 0,
      fork => -1,
  );

In this case the second value returned is not a PID but the thread object for the harvester thread. You should either detach it or eventually join it:

$thread->join();

Perl propagates the errors in the threads through the join(), so if the harvester thread dies, that would show only in the join() call. And since Triceps propagates the errors too, any other thread dying will cause the harvester thread to die after it joins all the App's threads.

Multithreaded socket server, part 1, dynamic threads overview

As a quick announcement, I've renamed some of the methods described in the last post, and updated the post.

Now, to the new stuff. The threads can be used to run a TCP server that accepts the connections and then starts the new client communication thread(s) for each connection.  This thread can then communicate with the rest of the model, feeding and receiving data, as usual, through the nexuses.

The challenge here is that there must be a way to create the threads dynamically, and later when the client closes connection, to dispose of them. There are two possible general approaches:

  • dynamically create and delete the threads in the same App;
  • create a new App per connection and connect it to the main App.

Both have their own advantages  and difficulties, but the approach with the dynamic creation and deletion of threads ended up looking easier, and that's what Triceps has. The second approach is not particularly well supported yet. You can create multiple Apps in the program, and you can connect them by making two Triceps Trieads run in the same OS thread and ferry the data around. But it's extremely cumbersome. This will be improved in the future, but for now the first approach is the ticket.

The dynamically created threads are grouped into the fragments. This is done by specifying the fragment name option when creating a thread. The threads in a fragment have a few special properties.

One, it's possible to shut down the whole fragment in one fell swoop. There is no user-accessible way to shut down the individual threads, you can shut down either the whole App or a fragment. Shutting down individual threads is dangerous, since it can mess up the application in many non-obvious ways. But shutting down a fragment is OK, since the fragment serves a single logical function, such as service one TCP connection, and it's OK to shut down the whole logical function.

Two, when a thread in the fragment exits, it's really gone, and takes all its nexuses with it. Well, technically, the nexuses continue to exist as long as there are threads connected to them, but no new connections can be created after this point. Since usually the whole fragment will be gone together, and since the nexuses defined by the fragment's thread are normally used only by the other threads of the same fragment, a fragment shutdown cleans up its state like the fragment had never existed. By contrast, when a normal thread exists, the nexuses defined by it stay present and accessible until the App shuts down.

Another, somewhat surprising, challenge is the interaction of the threads and sockets (or file descriptors in general) in Perl.  I've already touched upon it, but there is a lot more.

To show how all this stuff works, I've created an example of a "chat server". It's not really a human-oriented chat, it's more of a machine-oriented publish-subscribe, and specially tilted to work through the running of a socket server with threads.

In this case the core logic is absolutely empty. All there is of it, is a nexus that passes messages through it. The clients read from this nexus to get the messages, and write to this nexus to send the messages.

When the App starts, it has only one thread, the listener thread that listens on a socket for the incoming connections. The listener doesn't even care about the common nexus and doesn't import it. When a connection comes in, the listener creates two threads to serve it: the reader reads the socket and sends to the nexus, and the writer receives from the nexus and writes to the socket. These two threads constitute a fragment for this client. They also create their own private nexus, allowing the reader to send control messages to the writer. That could also have been done through the central common nexus, but I wanted to show that there are different ways of doing things.

With a couple of clients connected, threads and sockets start looking like this:

client1 reader ----> client1 control nexus ------> client1 writer
         \                                           /
          ----------->\          /------------------>
                       chat nexus
          ----------->/          \------------------>
         /                                           \
client2 reader ----> client2 control nexus ------> client2 writer

And the listener thread still stays on the side.

Tuesday, April 23, 2013

Perl, threads and sockets

I've started writing an example that does a TCP server with threads and found that Perl doesn't allow to pass the file descriptors between the threads. Well, you sort of can pass them as arguments to another thread but then it ends up printing the error messages like these and corrupting the reference counts:

Unbalanced string table refcount: (1) for "GEN1" during global destruction.
Unbalanced string table refcount: (1) for "/usr/lib/perl5/5.10.0/Symbol.pm" during global destruction.
Scalars leaked: 1

If you try to pass a file descriptor through trheads::shared, it honestly won't allow you, while the thread arguments pretend that they can and then fail.

So, to get around this issue I've added the file descriptor passing service to Triceps::App. The easy way to pass a socket around is:

# in one thread
$app->storeFile('name', $socket);
close($socket);
# or both in one call
$app->storeCloseFile('name', $socket);

# in another thread
my $socket = $app->loadDupSocket('name', 'r+');
$app->closeFd('name');

The file descriptor gets extracted from the socket file handle, and its dup is stored in the App. Pick a unique name by which you can get it back. Then you can get it back and create an IO::Socket::INET object with it. When you get it back, it gets dupped again, and you can get it back multiple times, from the same or different threads. Then to avoid leaking sockets, you tell the App to close the file descriptor and forget it.

A limitation is that you can't really share the same descriptor between two thread, Perl is not happy about that. So you have to dup it and have two separate file descriptors for the same socket. You still need to be careful about writing to the same socket from two threads, the best idea is to write from one thread only (and the same applies to reading, though it might be the same or different thread as the writing one).

The other ways to create the file handle objects are:

my $fd = $app->loadDupFile('name', 'r+');

Gets the basic IO::Handle.

my $fd = $app-> loadDupFileClass('name', 'r+', $className);

Get a file handle of any named subclass of the IO::Handle, for example "IO::Socket::UNIX".

There is also the API for the raw file descriptors, that you can get with fileno():

$app->storeFd('name', $fd);

Store a dup of file descriptor. Same thing as the file handle, the original descriptor stays open and needs to be closed separately.

my $fd = $app->loadDupFd('name');

Get back a dup of the stored file descriptor. You can use it with IO::Handle or to open a named file descriptor in an old-fashioned way:

open(MYFILE, '+<&=' . $app->loadDupFd('name'));


Note the "=" in the opening mode, which tells open() to directly adopt the descriptor from the argument and not do another dup of it.


my $fd = $app->loadFd('name');

Get back the original file descriptor.  This is dangerous because now you'd have the same file descriptor in two places: in your thread and in the App. There are two ways out of this situation. One is to have it dupped on open:

open(MYFILE, '+<&' . $app->loadFd('name'));


Note, no "=" in this snippet's open mode, unlike the previous one.


The other way is to open without dupping and tell the App to simply forget this descriptor:


open(MYFILE, '+<&=' . $app->loadFd('name'));
$app->forgetFd('name');

In general, it's safer and easier stay with the dupping, and file the file handle interface instead of messing with the file descriptors.

P.S. The names of the load methods used to be different, I change dthem to be shorter, and added storeCloseFile().

Friday, April 19, 2013

Object passing between threads, and Perl code snippets

A limitation of the Perl threads is that no variables can be shared between them. When a new thread gets created, it gets a copy of all the variables of the parent. Well, of all the plain Perl variables. With the XS extensions your luck may vary: the variables might get copied, might become undef, or just become broken (if the XS module is not threads-aware). Copying the XS variables requires a quite high overhead at all the other times, so Triceps doesn't do it and all the Triceps object become undefined in the new thread.

However there is a way to pass around certain objects through the Nexuses.

First, obviously, the Nexuses are intended to pass through the Rowops. These Rowops coming out of a nexus are not the same Rowop objects that went in. Rowop is a single-threaded object and can not be shared by two threads. Instead it gets converted to an internal form while in the nexus, and gets re-created, pointing to the same Row object and to the correct Label in the local facet.

Then, again obviously, the Facets get imported through the Nexus, together with their row types.

And two more types of objects can be exported through a Nexus: the RowTypes and TableTypes. They get exported through the options as in this example:

$fa = $owner->makeNexus(
    name => "nx1",
    labels => [
        one => $rt1,
        two => $lb,
    ], 
    rowTypes => [
        one => $rt2,
        two => $rt1,
    ], 
    tableTypes => [
        one => $tt1,
        two => $tt2,
    ], 
    import => "writer",
); 

As you can see, the namespaces for the labels, row types and table types are completely independent, and the same names can be reused in each of them for different meaning. All the three sections are optional, so if you want, you can order only the types in the nexus, without any labels.

They can then be extracted from the imported facet as:

$rt1 = $fa->impRowType("one");
$tt1 = $fa->impTableType("one");


Or the whole set of name-value pairs can be obtained with:


@rtset = $fa->impRowTypesHash();
@ttset = $fa->impTableTypesHash();


The exact table types and row types (by themselves or in the table types or labels) in the importing thread will be copied. It's technically possible to share the references to the same row type in the C++ code but it's more efficient to make a separate copy for each thread, and thus the Perl API goes along the more efficient way.

The import is smart in the sense that it preserves the sameness of the row types: if in the exporting thread the same row type was referred from multiple places in the labels, row types and table types sections, in the imported facet that would again be the same row type (even though of course not the one that has been exported but its copy). This again helps with the efficiency when various objects decide if the rows created by this and that type are compatible.

This is all well until you want to export a table type that has an index with a Perl sort condition in it, or an aggregator with the Perl code. The Perl code objects are tricky: they get copied OK when a new thread is created but the attempts to import them through a nexus later causes a terrible memory corruption. So Triceps doesn't allow to export the table types with the function references in it. But it provides an alternative solution: the code snippets can be specified as the source code. It gets compiled when the table type gets initialized. When a table type gets imported through a nexus, it brings the source code with it. The imported table types are always uninitialized, so at initialization time the source code gets compiled in the new thread and works.

It all works transparently: just specify a string instead of a function reference when creating the index, and it will be recognized and processed. For example:

$it= Triceps::IndexType->newPerlSorted("b_c", undef, '
    my $res = ($_[0]->get("b") <=> $_[1]->get("b")
        || $_[0]->get("c") <=> $_[1]->get("c"));
    return $res;
    '
);

Before the code gets compiled, it gets wrapped into a 'sub { ... }', so don't write your own sub in the code string, that would be an error.

There is also the issue of arguments that can be specified for these functions. Triceps is now smart enough to handle the arguments that are one of:

  • undef
  • integer
  • floating-point
  • string
  • Triceps::RowType object
  • Triceps::Row object
  • reference to an array or hash thereof

It converts the data to an internal C++ representation in the nexus and then converts it back on import. So, if a TableType has all the code in it in the source form, and the arguments for this code within the limits of this format, it can be exported through the nexus. Otherwise an attempt to export it will fail.

I've modified the SimpleOrderedIndex to use the source code format, and it will pass through the nexuses as well.

The Aggregators have a similar problem, and I'm working on converting them to the source code format too.

A little more about the differences between the code references and the source code format:

When you compile a function, it carries with it the lexical context. So you can make the closures that refer to the "my" variables in their lexical scope. With the source code you can't do this. The table type compiles them at initialization time in the context of the main package, and that's all they can see. Remember also that the global variables are not shared between the threads, so if you refer to a global variable in the code snippet and rely on a value in that variable, it won't be present in the other threads (unless the other threads are direct descendants and the value was set before their creation).

While working with the custom sorted indexes, I've also fixed the way the errors are reported in their Perl handlers. The errors used to be just printed on stderr. Now they propagate properly through the table, and the table operations die with the Per handler's error message. Since an error in the sorting function means that things are going very, very wrong, after that the table becomes inoperative and will die on all the subsequent operations as well.

Thursday, April 11, 2013

Multithreaded pipeline, part 4

Let's look at the aggregation by the hour. First, the short version that skips over the actual logic and concentrates on how the nexuses are connected.

sub RawToHourlyMain # (@opts)
{
  my $opts = {};
  Triceps::Opt::parse("traffic main", $opts, {
    @Triceps::Triead::opts,
    from => [ undef, \&Triceps::Opt::ck_mandatory ],
  }, @_);
  my $owner = $opts->{owner};
  my $unit = $owner->unit();

  my $faIn = $owner->importNexus(
    from => $opts->{from},
    as => "input",
    import => "reader",
  );

  # ... create the table and aggregation ...

  my $faOut = $owner->makeNexus(
    name => "data",
    labels => [
      $faIn->getFnReturn()->getLabelHash(),
      hourly => $lbHourlyFiltered,
    ],
    import => "writer",
  );

  # ... connect the input nexus to the table ...
  # ... create the table dump logic ...

  $owner->readyReady();
  $owner->mainLoop(); # all driven by the reader
}

This function inherits the options from Triead::start() as usual and adds the option from of its own. This option's value is then used as the name of nexus to import for reading. The row types of the labels from that imported facet are then used to create the table and aggregation. But they aren't connected to the input labels yet.

First, the output nexus is created. The creation passes through all the incoming data, short-circuiting the input and output, and adds the extra label for the aggregated output. After that the rest of the logic can be connected to the inputs (and to the outputs too).

The reason why this connection order is important is that the labels get caller in the order they are chained from the input label. And when this thread reacts to some event, we want the original event to pass through to the output first and then send the reaction to it.

And after that it's all usual readyReady() and mainLoop().

The full text of the function follows. The logic is based on the previous example from the chapter 13, and the only big change is the use of SimpleAggergator instead of a manually-built one. The HourlyToDailyMain() is very similar, so I won't even show it, you can find the full text in SVN.

# compute an hour-rounded timestamp (in microseconds)
sub hourStamp # (time)
{
  return $_[0]  - ($_[0] % (1000*1000*3600));
}

sub RawToHourlyMain # (@opts)
{
  my $opts = {};
  Triceps::Opt::parse("traffic main", $opts, {
    @Triceps::Triead::opts,
    from => [ undef, \&Triceps::Opt::ck_mandatory ],
  }, @_);
  my $owner = $opts->{owner};
  my $unit = $owner->unit();

  # The current hour stamp that keeps being updated;
  # any aggregated data will be propagated when it is in the
  # current hour (to avoid the propagation of the aggregator clearing).
  my $currentHour;

  my $faIn = $owner->importNexus(
    from => $opts->{from},
    as => "input",
    import => "reader",
  );

  # the full stats for the recent time
  my $ttPackets = Triceps::TableType->new($faIn->getLabel("packet")->getRowType())
    ->addSubIndex("byHour",
      Triceps::IndexType->newPerlSorted("byHour", undef, sub {
        return &hourStamp($_[0]->get("time")) <=> &hourStamp($_[1]->get("time"));
      })
      ->addSubIndex("byIP",
        Triceps::IndexType->newHashed(key => [ "local_ip", "remote_ip" ])
        ->addSubIndex("group",
          Triceps::IndexType->newFifo()
        )
      )
    )
  or confess "$!";

  # type for a periodic summary, used for hourly, daily etc. updates
  my $rtSummary;
  Triceps::SimpleAggregator::make(
    tabType => $ttPackets,
    name => "hourly",
    idxPath => [ "byHour", "byIP", "group" ],
    result => [
      # time period's (here hour's) start timestamp, microseconds
      time => "int64", "last", sub {&hourStamp($_[0]->get("time"));},
      local_ip => "string", "last", sub {$_[0]->get("local_ip");},
      remote_ip => "string", "last", sub {$_[0]->get("remote_ip");},
      # bytes sent in a time period, here an hour
      bytes => "int64", "sum", sub {$_[0]->get("bytes");},
    ],
    saveRowTypeTo => \$rtSummary,
  );

  $ttPackets->initialize() or confess "$!";
  my $tPackets = $unit->makeTable($ttPackets,
    &Triceps::EM_CALL, "tPackets") or confess "$!";

  # Filter the aggregator output to match the current hour.
  my $lbHourlyFiltered = $unit->makeDummyLabel($rtSummary, "hourlyFiltered");
  $tPackets->getAggregatorLabel("hourly")->makeChained("hourlyFilter", undef, sub {
    if ($_[1]->getRow()->get("time") == $currentHour) {
      $unit->call($lbHourlyFiltered->adopt($_[1]));
    }
  });

  # It's important to connect the pass-through data first,
  # before chaining anything to the labels of the faIn, to
  # make sure that any requests and raw inputs get through before
  # our reactions to them.
  my $faOut = $owner->makeNexus(
    name => "data",
    labels => [
      $faIn->getFnReturn()->getLabelHash(),
      hourly => $lbHourlyFiltered,
    ],
    import => "writer",
  );

  my $lbPrint = $faOut->getLabel("print");

  # update the notion of the current hour before the table
  $faIn->getLabel("packet")->makeChained("processPackets", undef, sub {
    my $row = $_[1]->getRow();
    $currentHour = &hourStamp($row->get("time"));
    # skip the timestamp updates without data
    if (defined $row->get("bytes")) {
      $unit->call($tPackets->getInputLabel()->adopt($_[1]));
    }
  });

  # the dump request processing
  $tPackets->getDumpLabel()->makeChained("printDump", undef, sub {
    $unit->makeArrayCall($lbPrint, "OP_INSERT", $_[1]->getRow()->printP() . "\n");
  });
  $faIn->getLabel("dumprq")->makeChained("dump", undef, sub {
    if ($_[1]->getRow()->get("what") eq "packets") {
      $tPackets->dumpAll();
    }
  });

  $owner->readyReady();
  $owner->mainLoop(); # all driven by the reader
}

Multithreaded pipeline, part 3

The rest of this example might be easier to understand by looking at an example of a run first. The lines starting with a "!" are the copies of the input lines that ReaderMain() sends and PrintMain() faithfully prints.

input.packet are the rows that reach the PrintMain on the "print" label (remember, "input" is the name with which it imports its input nexus). input.hourly is the data aggregated by the hour intervals (and also by the IP addresses, dropping the port information), and input.daily further aggregates it per day (and again per the IP addresses). The timestamps in the hourly and daily rows are rounded down to the start of the hour or day.

And the lines without any prefixes are the dumps of the table contents that again reach the PrintMain() through the "print" label:

! new,OP_INSERT,1330886011000000,1.2.3.4,5.6.7.8,2000,80,100
input.packet OP_INSERT time="1330886011000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="100"
input.hourly OP_INSERT time="1330884000000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="100"
input.daily OP_INSERT time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="100"
! new,OP_INSERT,1330886012000000,1.2.3.4,5.6.7.8,2000,80,50
input.packet OP_INSERT time="1330886012000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="50"
input.hourly OP_DELETE time="1330884000000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="100"
input.daily OP_DELETE time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="100"
input.hourly OP_INSERT time="1330884000000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
input.daily OP_INSERT time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
! new,OP_INSERT,1330889612000000,1.2.3.4,5.6.7.8,2000,80,150
input.packet OP_INSERT time="1330889612000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="150"
input.hourly OP_INSERT time="1330887600000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
input.daily OP_DELETE time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
input.daily OP_INSERT time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="300"
! new,OP_INSERT,1330889811000000,1.2.3.4,5.6.7.8,2000,80,300
input.packet OP_INSERT time="1330889811000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="300"
input.hourly OP_DELETE time="1330887600000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
input.daily OP_DELETE time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="300"
input.daily OP_INSERT time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
input.hourly OP_INSERT time="1330887600000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="450"
input.daily OP_DELETE time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
input.daily OP_INSERT time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="600"
! new,OP_INSERT,1330972411000000,1.2.3.5,5.6.7.9,3000,80,200
input.packet OP_INSERT time="1330972411000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" local_port="3000" remote_port="80" bytes="200"
input.hourly OP_INSERT time="1330970400000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" bytes="200"
input.daily OP_INSERT time="1330905600000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" bytes="200"
! new,OP_INSERT,1331058811000000
input.packet OP_INSERT time="1331058811000000"
! new,OP_INSERT,1331145211000000
input.packet OP_INSERT time="1331145211000000"
! dump,packets
time="1330886011000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="100"
time="1330886012000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="50"
time="1330889612000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="150"
time="1330889811000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="300"
time="1330972411000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" local_port="3000" remote_port="80" bytes="200"
! dump,hourly
time="1330884000000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
time="1330887600000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="450"
time="1330970400000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" bytes="200"
! dump,daily
time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="600"
time="1330905600000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" bytes="200"

Note that the order of the lines is completely nice and predictable, nothing goes out of order. Each nexus preserves the order of the rows put into it, and the fact that there is only one writer per nexus and that every thread is fed from only one nexus, avoids the races.