Tuesday, April 30, 2013

ThreadedClient: a Triceps expect

In case if you're not familiar with it, "expect" is a program that allows to connect to the interactive programs and pretend being an interactive user. Obviously, the terminal programs, not the GUI ones. It has originally been done as an extension for Tcl, and later ported as a library for Perl and other languages.

The class Triceps::X::ThreadedClient implements a variety of expect in the Triceps framework. I'm using it for the unit tests of the Triceps servers but it can have other uses as well. Why not just use expect? One reason, I don't like bringing in extra dependencies, especially just for tests, second, it was an interesting exercise, and third, I didn't realize that I'm writing a simplified variety of expect until I had it mostly completed. The biggest simplification is that ThreadedClient works with the complete lines.

It gets used in the unit tests like this: first the server gets started in a background process or thread, and then this port number is used to create the clients. The ThreadedClient gets embedded into a Triceps App, so you can start other things in the same App. Well, the names of the ThreadedClient threads are hardcoded at the moment, so you can start only one copy of it per App, and there could be conflicts if you start your other threads.

But first you need to start an App. I'll do it in yet another way this time:

  Triceps::App::build "client", sub {
    my $appname = $Triceps::App::name;
    my $owner = $Triceps::App::global;

    my $client = Triceps::X::ThreadedClient->new(
      owner => $owner,
      port => $port,
      debug => 0,
    );

    $owner->readyReady();

    $client->startClient("c1");
    $client->expect("c1", '!ready');

    # this repetition
    $client->send("c1", "publish,*,zzzzzz\n");
    $client->send("c1", "publish,*,zzzzzz\n");
    $client->expect("c1", '\*,zzzzzz');
    $client->expect("c1", '\*,zzzzzz');

    $client->startClient("c2");
    $client->expect("c2", '!ready,cliconn2');

    $client->send("c1", "kill,cliconn2\n");
    $client->expect("c2", '__EOF__');

    $client->send("c1", "shutdown\n");
    $client->expect("c1", '__EOF__');
  };


Triceps::App::build() is kind of like Triceps::Triead::startHere() but saves the trouble of parsing the options. The app name is its first argument and the code for the main routine of the first thread is the second argument. That first Triead will run in the current Perl thread. After that the name of the app is placed into the global variable $Triceps::App::name, the App object into Triceps::App::app, and the TrieadOwner into $Triceps::App::global. The name of the first Triead is hardcoded as "global".  After the main function exits, Triceps::App::build runs the harvester, very similar to startHere().

Triceps::X::ThreadedClient->new() is used to start an instance of a client. The option "owner" gives it the current TrieadOwner as a starting point but it will create its own threads starting from this point. The option "port" specifies the default port number for the connections.

The option "debug" is optional, with the default value of 0. In this mode it collects the protocol of the run but otherwise runs silently. Setting the debug to 1 makes it also print the protocol as it gets collected, so if something goes not the way you expected, you can see what it is. Setting the debug to 1 also adds the printouts from the socket-facing threads, so you can also see what goes in and out the client socket.

 After that the clients get started with startClient(). Its first argument is a symbolic name of the client that will be used in the further calls and also in the protocol. The second optional argument is the port number, if you want to use a different port than specified in the new().

After that the data gets sent into the client socket with send(), and the returned lines get parsed with expect(). The usual procedure is that you send something, then expect some response to it. The send lines are included into the protocol with the prefix "> ".

The second argument of expect() is actually a regexp placed inside a string. So to expect a literal special character like '*', prefix it with a backslash and put the string into single quotes, like:

    $client->expect("c1", '\*,zzzzzz');


The expect() keeps reading lines until it finds one matching the regexp. Then all the lines including that one are added to the protocol and the call returns.


There are a couple of special lines generated by the connection status itself rather than coming from the socket. When the socket gets connected, it generates the line "> connected name" in the protocol (but not as an expectable input). When the connection gets dropped, this generates an expectable line "__EOF__". And each line in the protocol gets prefixed with the client name and "|". The example of the chat server run in the previous post was created by the ThreadedClient.


The protocol can be extracted as $client->protocol(). So for unit tests you can check the protocol match as


ok($client->protocol(), $expected);


Internally the ThreadedClient is very much like the chat server, so there is not a whole lot of point in going over it in detail. But feel free to read it on your own.

ThreadedServer, part 2

The next part of the ThreadedServer is listen(), the function that gets called from the listener thread and takes care of accepting the connections and spawning the per-client threads.

sub listen # ($optName => $optValue, ...)
{
    my $myname = "Triceps::X::ThreadedServer::listen";
    my $opts = {};
    my @myOpts = (
        owner => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "Triceps::TrieadOwner") } ],
        socket => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "IO::Socket") } ],
        prefix => [ undef, \&Triceps::Opt::ck_mandatory ],
        handler => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "CODE") } ],
        pass => [ undef, sub { &Triceps::Opt::ck_ref(@_, "ARRAY") } ],
    );
    &Triceps::Opt::parse($myname, $opts, {
        @myOpts,
        '*' => [],
    }, @_);
    my $owner = $opts->{owner};
    my $app = $owner->app();
    my $prefix = $opts->{prefix};
    my $sock = $opts->{socket};

The first part is traditional, the only thing to note is that it also saves the list of options in a variable for the future use (startServer() did that too but I forgot to mention it). And the option "*" is the pass-through: it gets all the unknown options collected in $opts->{"*"}.

    my $clid = 0; # client id

    while(!$owner->isRqDead()) {
        my $client = $sock->accept();
        if (!defined $client) {
            my $err = "$!"; # or th etext message will be reset by isRqDead()
            if ($owner->isRqDead()) {
                last;
            } elsif($!{EAGAIN} || $!{EINTR}) { # numeric codes don't get reset
                next;
            } else {
                confess "$myname: accept failed: $err";
            }
        }

The accept loop starts. It runs until the listening socket gets revoked by shutdown, the revocation is done by dup2() of a file descriptor from /dev/null over it. Note that this function itself doesn't request to track the file descriptor for revocation. That's the caller's responsibility.

Oh, and by the way, if you've been tracing the possible ways of thread execution closely, you might be wondering: what if the shutdown happens after the socket is opened but before it is tracked? This can't really happen to the listening socket but can happen with the accepted client sockets. The answer is that the tracking enrollment will check whether the shutdown already happened, and if so, it will revoke the socket right away, before returning. So the reading loop will find the socket revoked right on its first iteration.

The revocation also sends the signal SIGUSR2 to the thread. This is done because the calls like accept() do not return on a simple revocation by dup2() but a signal does interrupt them. Triceps registers an empty handler for SIGUSR2 that just immediately returns, but the accept() system call gets interrupted, and the thread gets a chance to check for shutdown, and even if it calls accept() again, this will return an error and force it check for shutdown again.

And by the way, the Perl's threads::kill doesn't send a real signal, it just sets a flag for the interpreter. If you try it on your own, it won't interrupt the system calls, and now you know why. Instead Triceps gets the POSIX thread identity from the Perl thread and calls the honest ptherad_kill() from the C++ code.

So, the main loop goes by the shutdown condition, isRdDead() tells if this thread has been requested to die. After accept(), it checks for errors. The first thing to check for is again the isRdDead(), because the revocation will manifest as a socket operation error, and there is no point in reporting this spurious error. However, like other Triceps calls, isRdDead() will clear the error text, and the text has to be saved first. If the shutdown is found, the loop exits. Then the check for the spurious interruptions is done, and for them the loop continues. Funny enough, the $!{} uses the numeric part of $! that is independent from its text part and doesn't get cleared by the Triceps calls. And on any other errors the thread confesses. This will unroll the stack, eventually get caught by the Triceps threading code, abort the App, and propagate the error message to the harvester.

        $clid++;
        my $cliname = "$prefix$clid";
        $app->storeCloseFile($cliname, $client);

        Triceps::Triead::start(
            app => $app->getName(),
            thread => $cliname,
            fragment => $cliname,
            main => $opts->{handler},
            socketName => $cliname,
            &Triceps::Opt::drop({ @myOpts }, \@_),
        );

        # Doesn't wait for the new thread(s) to become ready.
    }
}

If a proper connection has been received, the socket gets stored into the App with a unique name, for later load by the per-client thread. And then the per-client thread gets started.

The drop passes through all the original options except for the ones handled by this thread. In retrospect, this is not the best solution for this case. It would be better to just use @{$opts->{"*"}} instead. The drop call is convenient when not all the explicitly recognized options but only a part of them has to be dropped.

After starting the thread, the loop doesn't call readyReady() but goes for the next iteration. This is basically because it doesn't care about the started thread and doesn't ever send anything to it. And waiting for the threads to start will make the loop slower, possibly overflowing the socket's listening queue and dropping the incoming connections if they arrive very fast.

And the last part of the ThreadedServer is the printOrShut:

sub printOrShut # ($app, $fragment, $sock, @text)
{
    my $app = shift;
    my $fragment = shift;
    my $sock = shift;

    undef $!;
    print $sock @_;
    $sock->flush();

    if ($!) { # can't write, so shutdown
        Triceps::App::shutdownFragment($app, $fragment);
    }
}

Nothing too complicated. Prints the text into the socket, flushes it and checks for errors. On errors shuts down the fragment. In this case there is no need for draining. After all, the socket leading to the client is dead and there is no way to send anything more through it, so there is no point in worrying about any unsent data. Just shut down as fast as it can, before the threads have generated more data that can't be sent any more. Any data queued in the nexuses for the shut down threads will be discarded.


Monday, April 29, 2013

ThreadedServer, part 1

And now I want to show the internals of the ThreadedServer methods. It shows how to store the socket file handles into the App, how the threads are harvested, and how the connections get accepted.

sub startServer # ($optName => $optValue, ...)
{
    my $myname = "Triceps::X::ThreadedServer::startServer";
    my $opts = {};
    my @myOpts = (
        app => [ undef, \&Triceps::Opt::ck_mandatory ],
        thread => [ "global", undef ],
        main => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "CODE") } ],
        port => [ undef, \&Triceps::Opt::ck_mandatory ],
        socketName => [ undef, undef ],
        fork => [ 1, undef ],
    );
    &Triceps::Opt::parse($myname, $opts, {
        @myOpts,
        '*' => [],
    }, @_);

    if (!defined $opts->{socketName}) {
        $opts->{socketName} = $opts->{thread} . ".listen";
    }

    my $srvsock = IO::Socket::INET->new(
        Proto => "tcp",
        LocalPort => $opts->{port},
        Listen => 10,
    ) or confess "$myname: socket creation failed: $!";
    my $port = $srvsock->sockport() or confess "$myname: sockport failed: $!";

So far it's pretty standard: get the options and open the socket for listening.

    if ($opts->{fork} > 0)  {
        my $pid = fork();
        confess "$myname: fork failed: $!" unless defined $pid;
        if ($pid) {
            # parent
            $srvsock->close();
            return ($port, $pid);
        }
        # for the child, fall through
    }

This handles the process forking option: if forking is requested, it executes and then the parent process returns the PID while the child process continues with the rest of the logic. By the way, your success with forking a process that has multiple running threads may vary. The resulting process usually has one running thread (continuing where the thread start called fork() was) but the synchronization primitives in the new process can be inherited in any state, so the attempts to continue the threaded processing are usually not such a good idea. It's usually best to fork first before there are more threads.

    # make the app explicitly, to put the socket into it first
    my $app = Triceps::App::make($opts->{app});
    $app->storeCloseFile($opts->{socketName}, $srvsock);
    Triceps::Triead::start(
        app => $opts->{app},
        thread => $opts->{thread},
        main => $opts->{main},
        socketName => $opts->{socketName},
        &Triceps::Opt::drop({ @myOpts }, \@_),
    );

Then the App gets created. Previously I was showing starting the App with startHere() that created the App and did a bunch of services implicitly. Here everything will be done manually. The listening socket has to be stored into the app before the listener thread gets started, so that the listener thread can find it.

Triceps keeps a global list of all its Apps in the process, and after an App is created, it's placed into that list and can be found by name from any thread. The App object will exist while there are references to it, including the reference from that global list. On the other hand, it's possible to remove the App from the list while it's still running but that's a bad practice because it will break any attempts from its threads to find it by name.
 
So the App is made, then the file handle gets stored. storeCloseFile() gets the file descriptor from the socket, dups it, stores into the App, and then closes the original file handle. All this monkeying with dupping and closing is needed because there is no way to extract the file descriptor from a Perl file handle without the handle trying to close it afterwards. But in result of this roundabout way, the file descriptor gets transferred into the App.

Then the listener thread is started, and as shown before, it's responsible for starting all the other threads. In the meantime, the startServer() continues.

    my $tharvest;
    if ($opts->{fork} < 0) {
        @_ = (); # prevent the Perl object leaks
        $tharvest = threads->create(sub {
            # In case of errors, the Perl's join() will transmit the error
            # message through.
            Triceps::App::find($_[0])->harvester();
        }, $opts->{app}); # app has to be passed by name
    } else {
        $app->harvester();
    }

Then the harvester logic is started. Each App must have its harvester. startHere() runs the harvester implicitly, unless told otherwise, but here the harvester has to be run manually. It can be run either in this thread or in another thread, as determined by the option "fork". If it says to make another thread, $tharvest will contain that thread's identity. A special thing about starting threads with threads->create() is that it's sensitive to anything in @_. If @_ contains anything, it will be leaked (though the more recent versions of Perl should have it fixed). So @_ gets cleared before starting the thread.

And one way or the other, the harvester is started. What does it do? It joins the App's threads as they exit. After all of them exit, it removes the App from the global list of Apps, which will allow to collect the App's memory when the last reference to it is gone, and then the harvester returns.

If any of the threads die, they cause the App to be aborted. The aborted App shuts down immediately and remembers the identity of the failed thread and its error message (only the first message is saved because the abort is likely to cause the other threads to die too, and there is no point in seeing these derivative messages). The harvester, in turn, collects this message from the App, and after all it cleaning-up work is done, dies propagating this message. Then if the harvester is running not in the first thread, that message will be propagated further by Perl's join().

A catch is that the errors are not reported until the harvester completes. Normally all the App's threads should exit immediately when shut down but if they don't, the program will be stuck without any indication of what happened.

It's also possible to disable this propagation of dying by using the option "die_on_abort":

$app->harvester(die_on_abort => 0);

Then the last part:

    if ($opts->{fork} > 0) {
        exit 0; # the forked child process
    }

    return ($port, $tharvest);
}



 If this was the child process forked before, it exits at this point. Otherwise the port and the harvester's thread object are returned.

Multithreaded socket server, part 6, a run example

Hare is a protocol from a run. It has been produced with the automated testing infrastructure that I'll describe later.

As usual, the lines sent from the clients to the socket server are preceded with a "> ". But since there are many clients, to tell them apart, both the sent and received lines are prefixed by the client's name and a "|". I've just picked arbitrary client names to tell them apart.

I've also marked the incoming connection as "> connect client_name", and the disconnections as "__EOF__" after the client name.

So, here we go.

 > connect c1
c1|!ready,cliconn1
> connect c2
c2|!ready,cliconn2

Two clients connect.

 > c1|publish,*,zzzzzz
c1|*,zzzzzz
c2|*,zzzzzz

A message published to the topic "*" gets forwarded to all the connected clients. In reality the messages may of course be received on separate sockets in any order, but I've ordered them here for the ease of reading.

> c2|garbage,trash
c2|!invalid command,garbage,trash

An invalid command gets detected in the writer thread and responded as such.

> c2|subscribe,A
c2|!subscribed,A
> c1|publish,A,xxx
c2|A,xxx

A subscription request gets acknowledged, and after that all the messages sent to this topic get received by the client.

> c1|subscribe,A
c1|!subscribed,A
> c1|publish,A,www
c1|A,www
c2|A,www

If more than one client is subscribed to a topic, all of them get the messages.

> c2|unsubscribe,A
c2|!unsubscribed,A
> c1|publish,A,vvv
c1|A,vvv

The unsubscription makes the client stop receiving messages from this topic.

> connect c3
c3|!ready,cliconn3
> c3|exit
c3|!exiting
c3|__EOF__

The third client connects, immediately requests an exit, gets the confirmation and gets disconnected.

> connect c4
c4|!ready,cliconn4
> close WR c4
c4|!exiting
c4|__EOF__

The fourth client connects and then closes its write side of the socket (that is the read side for the server). It produces the same affect as the exit command.

> c1|shutdown
c1|*,server shutting down
c1|__EOF__
c2|*,server shutting down
c2|__EOF__

And the shutdown command sends the notifications to all the remaining clients and closes the connections.

Multithreaded socket server, part 5, socket writer

The socket writer thread is the last part of the puzzle.

sub chatSockWriteT
{
    my $opts = {};
    &Triceps::Opt::parse("chatSockWriteT", $opts, {@Triceps::Triead::opts,
        socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
        ctlFrom => [ undef, \&Triceps::Opt::ck_mandatory ],
    }, @_);
    undef @_;
    my $owner = $opts->{owner};
    my $app = $owner->app();
    my $tname = $opts->{thread};

    my ($tsock, $sock) = $owner->trackGetSocket($opts->{socketName}, ">");

    my $faChat = $owner->importNexus(
        from => "global/chat",
        import => "reader",
    );

    my $faCtl = $owner->importNexus(
        from => $opts->{ctlFrom},
        import => "reader",
    );

The usual preamble. The trackGetSocket() consumes the socket from the App, and this time reopens it for writing. The previously created nexuses are imported.

    my %topics; # subscribed topics for this thread

    $faChat->getLabel("msg")->makeChained("lbMsg", undef, sub {
        my $row = $_[1]->getRow();
        my $topic = $row->get("topic");
        if ($topic eq "*" || exists $topics{$topic}) {
            printOrShut($app, $opts->{fragment}, $sock, $topic, ",", $row->get("msg"), "\n");
        }
    });

The logic is defined as the connected labels. The topic hash keeps the keys that this thread is subscribed to. When a message is received from the chat nexus and the topic is in the hash or is "*", the message gets sent into the socket in the CSV format:

topic,text

The function printOrShut() is imported from Triceps::X::ThreadedServer. Its first 3 arguments are fixed, and the rest are passed through to print(). It prints the message to the socket file handle, flushes the socket, and in case of any errors it shuts down the fragment specified in its second argument. This way if the socket gets closed from the other side, the threads handling it automatically shut down.

    $faCtl->getLabel("ctl")->makeChained("lbCtl", undef, sub {
        my $row = $_[1]->getRow();
        my ($cmd, $arg) = $row->toArray();
        if ($cmd eq "print") {
            printOrShut($app, $opts->{fragment}, $sock, $arg, "\n");
        } elsif ($cmd eq "subscribe") {
            $topics{$arg} = 1;
            printOrShut($app, $opts->{fragment}, $sock, "!subscribed,$arg\n");
        } elsif ($cmd eq "unsubscribe") {
            delete $topics{$arg};
            printOrShut($app, $opts->{fragment}, $sock, "!unsubscribed,$arg\n");
        } else {
            printOrShut($app, $opts->{fragment}, $sock, "!invalid command,$cmd,$arg\n");
        }
    });

The handling of the control commands is pretty straightforward.

    $owner->readyReady();

    $owner->mainLoop();

    $tsock->close(); # not strictly necessary
}

And the rest is taken care of by the mainLoop(). The thread's main loop runs until the thread gets shut down, by handling the incoming messages. So if say printOrShut() decides to shut down the fragment, the next iteration of the loop will detect it and exit.

Multithreaded socket server, part4, socket reader thread

As I've said before, each socket gets served by two threads: one sits reading from the socket and forwards the data into the model and another one sits getting data from the model and forwards it into the socket. Since the same thread can't wait for both a socket descriptor and a thread synchronization primitive, so they have to be separate.

The first thread started is the socket reader. Let's go through it bit by bit.

sub chatSockReadT
{
    my $opts = {};
    &Triceps::Opt::parse("chatSockReadT", $opts, {@Triceps::Triead::opts,
        socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
    }, @_);
    undef @_; # avoids a leak in threads module
    my $owner = $opts->{owner};
    my $app = $owner->app();
    my $unit = $owner->unit();
    my $tname = $opts->{thread};

    # only dup the socket, the writer thread will consume it
    my ($tsock, $sock) = $owner->trackDupSocket($opts->{socketName}, "<");

The beginning is quite usual. Then it loads the socked from the App and gets it tracked with the TrieadOwner. The difference between trackDupSocket() here and the trackGetSocket() used before is that trackDupSocket() leaves the socked copy in the App, to be found by the writer-side thread.

The socket is reopened in this thread as read-only. The writing to the socket from all the threads has to be synchronized to avoid mixing the half-messages. And the easiest way to synchronize is to always write from one thread, and if the other thread wants to write something, it has to pass the data to the writer thread through the control nexus.

    # user messages will be sent here
    my $faChat = $owner->importNexus(
        from => "global/chat",
        import => "writer",
    );

    # control messages to the reader side will be sent here
    my $faCtl = $owner->makeNexus(
        name => "ctl",
        labels => [
            ctl => $faChat->impRowType("ctl"),
        ],
        reverse => 1, # gives this nexus a high priority
        import => "writer",
    );

Imports the chat nexus and creates the private control nexus for communication with the writer side. The name of the chat nexus is hardcoded hare, since it's pretty much a solid part of the application. If this were a module, the name of the chat nexus could be passed through the options.

The control nexus is marked as reverse even though it really isn't. But the reverse option has a side effect of making this nexus high-priority. Even if the writer thread has a long queue of messages from the chat nexus, the messages from the control nexus will be read first. Which again isn't strictly necessary here, but I wanted to show how it's done.

The type of the control label is imported from the chat nexus, so it doesn't have to be defined from scratch.

    $owner->markConstructed();

    Triceps::Triead::start(
        app => $opts->{app},
        thread => "$tname.rd",
        fragment => $opts->{fragment},
        main => \&chatSockWriteT,
        socketName => $opts->{socketName},
        ctlFrom => "$tname/ctl",
    );

    $owner->readyReady();

Then the construction is done and the writer thread gets started.  And then the thread is ready and waits for the writer thread to be ready too. The readyReady() works in the fragments just as it does at the start of the app. Whenever a new thread is started, the App becomes not ready, and stays this way until all the threads report that they are ready. The rest of the App keeps working like nothing happened, at least sort of. Whenever a nexus is imported, the messages from this nexus start collecting for this thread, and if there are many of them, the nexus will become backed up and the threads writing to them will block. The new threads have to call readyReady() as usual to synchronize between themselves, and then everything gets on its way.

Of course, if two connections are received in a quick succession, that would start two sets of threads, and readyReady() will continue only after all of them are ready.


    my $lbChat = $faChat->getLabel("msg");
    my $lbCtl = $faCtl->getLabel("ctl");

    $unit->makeHashCall($lbCtl, "OP_INSERT", cmd => "print", arg => "!ready," . $opts->{fragment});
    $owner->flushWriters();

A couple of labels get remembered for the future use, and the connection ready message gets sent to the writer thread through the control nexus. By convention of this application, the messages go in the CVS format, with the control messages starting with "!". If this is the first client, this would send

!ready,cliconn1

to the client. It's important to call flushWriters() every time to get the message(s) delivered.

    while(<$sock>) {
        s/[\r\n]+$//;
        my @data = split(/,/);
        if ($data[0] eq "exit") {
            last; # a special case, handle in this thread
        } elsif ($data[0] eq "kill") {
            eval {$app->shutdownFragment($data[1]);};
            if ($@) {
                $unit->makeHashCall($lbCtl, "OP_INSERT", cmd => "print", arg => "!error,$@");
                $owner->flushWriters();
            }
        } elsif ($data[0] eq "shutdown") {
            $unit->makeHashCall($lbChat, "OP_INSERT", topic => "*", msg => "server shutting down");
            $owner->flushWriters();
            Triceps::AutoDrain::makeShared($owner);
            eval {$app->shutdown();};
        } elsif ($data[0] eq "publish") {
            $unit->makeHashCall($lbChat, "OP_INSERT", topic => $data[1], msg => $data[2]);
            $owner->flushWriters();
        } else {
            # this is not something you want to do in a real chat application
            # but it's cute for a demonstration
            $unit->makeHashCall($lbCtl, "OP_INSERT", cmd => $data[0], arg => $data[1]);
            $owner->flushWriters();
        }
    }

The main loop keeps reading lines from the socket and interpreting them. The lines are in CSV format, and the first field is the command and the rest are the arguments (if any).  The commands are:

publish - send a message with a topic to the chat nexus
exit - close the connection
kill - close another connection, by name
shutdown - shut down the server
subscribe - subscribe the client to a topic
unsibscribe - unsubscribe the client from a topic

The exit just exits the loop, since it works the same if the socket just gets closed from the other side.

The kill shuts down by name the fragment where the threads of the other connection belongs. This is a simple application, so it doesn't check any permissions, whether this fragment should allowed to be shut down. If there is no such fragment, the shutdown call will silently do nothing, so the error check and reporting is really redundant (if something goes grossly wrong in the thread interruption code, an error might still occur, but theoretically this should never happen).

The shutdown sends the notification to the common topic "*" (to which all the clients are subscribed by default), then drains the model and shuts it down. The drain makes sure that all the messages in the model get processed (and even written to the socket) without allowing any new messages to be injected. "Shared" means that there is no special exceptions for some threads.

makeShared() actually creates a drain object that keeps the drain active during its lifetime. Here this object is not assigned anywhere, so it gets immediately destroyed and lifts the drain. So potentially more messages can get squeezed in between this point and shutdown. Which doesn't matter a whole lot here.

If it were really important that nothing get sent after the shutdown notification, it could be done like this (this is an untested fragment, so it might contain typos):

        } elsif ($data[0] eq "shutdown") {
            my $drain = Triceps::AutoDrain::makeExclusive($owner);
            $unit->makeHashCall($lbChat, "OP_INSERT", topic => "*", msg => "server shutting down");
            $owner->flushWriters();
            $drain->wait();
            eval {$app->shutdown();};
        }

This starts the drain, but this time the exclusive mode means that this thread is allowed to send more data. When the drain is created, it waits for success, so when the new message is inserted, it will be after all the other messages. $drain->wait() does another wait and makes sure that this last message propagates all the way. And then the app gets shut down, while the drain is still in effect, so no more messages can be sent for sure.

The publish sends the data to the chat nexus (note the flushWriters(), as usual!).

And the rest of commands (that would be subscribe and unsubscribe but you can do any other commands like "print") get simply forwarded to the reader thread for execution. Sending through the commands like this without testing is not a good practice for a real application but it's cute for a demo.

    {
        # let the data drain through
        my $drain = Triceps::AutoDrain::makeExclusive($owner);

        # send the notification - can do it because the drain is excluding itself
        $unit->makeHashCall($lbCtl, "OP_INSERT", cmd => "print", arg => "!exiting");
        $owner->flushWriters();

        $drain->wait(); # wait for the notification to drain

        $app->shutdownFragment($opts->{fragment});
    }

    $tsock->close(); # not strictly necessary
}

The last part is when the connection get closed, either by the "exit" command or when the socket gets closed. Remember, the socket can get closed asymmetrically, in one direction, so even when the reading is closed, the writing may still work and needs to return the responses to any commands received from the socket. And of course the same is true for the "exit" command.

So here the full exclusive drain sequence is used, ending with the shutdown of this thread's own fragment, which will close the socket. Even though only one fragment needs to be shut down, the drain drains the whole model. Because of the potentially complex interdependencies, there is no way to reliably drain only a part, and all the drains are App-wide.

The last part, with $tsock->close(), is not technically necessary since the shutdown of the fragment will get the socket descriptor revoked anyway.  But other than that, it's a good practice that unregisters the socket from the TrieadOwner and then closes it.

Sunday, April 28, 2013

Multithreaded socket server, part 3, the listener thread

The listener thread is:

sub listenerT
{
  my $opts = {};
  &Triceps::Opt::parse("listenerT", $opts, {@Triceps::Triead::opts,
    socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
  }, @_);
  undef @_;
  my $owner = $opts->{owner};

  my ($tsock, $sock) = $owner->trackGetSocket($opts->{socketName}, "+<");

  # a chat text message
  my $rtMsg = Triceps::RowType->new(
    topic => "string",
    msg => "string",
  ) or confess "$!";

  # a control message between the reader and writer threads
  my $rtCtl = Triceps::RowType->new(
    cmd => "string", # the command to execute
    arg => "string", # the command argument
  ) or confess "$!";

  $owner->makeNexus(
    name => "chat",
    labels => [
      msg => $rtMsg,
    ],
    rowTypes => [
      ctl => $rtCtl,
    ],
    import => "none",
  );

  $owner->readyReady();

  Triceps::X::ThreadedServer::listen(
    owner => $owner,
    socket => $sock,
    prefix => "cliconn",
    handler => \&chatSockReadT,
  );
}

It gets the usual options of the thread start (and as usual you can pass more options to the startServer() and they will make their way to the listener thread. But there is also one more option added by startServer(): the socketName.

Since the the socket objects can't be passed directly between the threads, a roundabout way is taken. After startServer() opens a socket, it stores the dupped file descriptor in the App and passes the name used for store through, so that it can be used to load the socket back into another thread:

my ($tsock, $sock) = $owner->trackGetSocket($opts->{socketName}, "+<");


This does multiple things:
  • Loads the file descriptor from the App by name (with a dup()).
  • Opens a socket object from that file descriptor.
  • Registers that file descriptor for tracking with the TrieadOwner, so that if the thread needs to be shut down, that descriptor will be revoked and any further operations with it will fail.
  • Creates a TrackedFile object that will automatically unregister the file descriptor from TrieadOwner when the TrackedFile goes out of scope. This is important to avoid that races between the revocation and the normal close of the file.
  • Makes the App close and forget its file descriptor.

The $tsock returned  is the TrackedFile object, and $sock is the socket filehandle. You can actually get the filehandle directly from the TrackedFile, as $tsock->get(), so  why return the socked separately? As it turns out, Perl has issues with handling the Perl values stored inside the XS objects if they aren't referred by any Perl variables. When the listener thread creates the client handler threads, that would scramble the reference counts. Keeping the socket in the $socket variable prevents that.

The listener thread then creates the row types for the data messages and for the control messages between the client's reader and writer threads, and makes a nexus. The listener is not interested in the data, so it doesn't even import this nexus itself. The nexus passes the data only, so it has no label for the control messages, only the row type.

Then the mandatory readReady(), and then the control goes again to the library that accepts the connections and starts the client connection threads. The handler is the main function for the thread that gets started to handle the connection. The prefix is used to build the names for the new thread, for its fragment, and for the connection socked that gets also passed through by storing it in the App. The name is the same for all three and gets created by concatenating the prefix with a number that gets increased for every connection. The newly created thread will then get the option socketName with the name of the socket.

How does the ThreadedServer::listen() know to return? It runs until the App gets shut down, and returns only when the thread is requested to die as a result of the shutdown.

Multithreaded socket server, part2, starting the server

Let's start with the top-level: how the server gets started. It's really the last part of the code, that brings everything together.

It uses the ThreadedServer infrastructure:

use Triceps::X::ThreadedServer qw(printOrShut);

The X subdirectory is for the examples and experimental stuff, but the ThreadedServer is really of production quality, I just haven't written a whole set of tests for it yet.

The server gets started like this:

  my ($port, $pid) = Triceps::X::ThreadedServer::startServer(
      app => "chat",
      main => \&listenerT,
      port => 0,
      fork => 1,
  );

The port option of 0 means "pick any free port", it will be returned as the result.  If you know the fixed port number in advance, use it. "chat" will be the name of the App, and listenerT is the main function of the thread that will listen for the incoming connections and start the other threads. And it's also the first thread that gets started, so it's responsible for creating the core part of the App as well (though in this case there is not a whole lot of it).

The option "fork" determines how and whether the server gets started in the background. The value 1 means that a new process will be forked, and then the threads will be created there. The returned PID can be used to wait for that process to complete:

waitpid($pid, 0);

Of course, if you're starting a daemon, you'd probably write this PID to a file and then just exit the parent process.

The fork value of 0 starts the server in the current process, and the current thread becomes the server's harvester thread (the one that joins the other threads when the App shuts down).

In this case the server doesn't return until it's done, so there is not much point in the returned port value, by that time the socket is already closed. In this case you really need to either use a fixed port or write the port number to a file from your listener thread. The PID also doesn't make sense, and it's returned as undef. Here is an example of this kind of call:

  my ($port, $pid) = Triceps::X::ThreadedServer::startServer(
      app => "chat",
      main => \&listenerT,
      port => 12345,
      fork => 0,
  );

Finally, the server can be started in the current process, with a new thread created as the App's harvesteri, using the fork option -1. The original thread can then continue and do other things in parallel. It's the way I use for the unit tests.

  my ($port, $thread) = Triceps::X::ThreadedServer::startServer(
      app => "chat",
      main => \&listenerT,
      port => 0,
      fork => -1,
  );

In this case the second value returned is not a PID but the thread object for the harvester thread. You should either detach it or eventually join it:

$thread->join();

Perl propagates the errors in the threads through the join(), so if the harvester thread dies, that would show only in the join() call. And since Triceps propagates the errors too, any other thread dying will cause the harvester thread to die after it joins all the App's threads.

Multithreaded socket server, part 1, dynamic threads overview

As a quick announcement, I've renamed some of the methods described in the last post, and updated the post.

Now, to the new stuff. The threads can be used to run a TCP server that accepts the connections and then starts the new client communication thread(s) for each connection.  This thread can then communicate with the rest of the model, feeding and receiving data, as usual, through the nexuses.

The challenge here is that there must be a way to create the threads dynamically, and later when the client closes connection, to dispose of them. There are two possible general approaches:

  • dynamically create and delete the threads in the same App;
  • create a new App per connection and connect it to the main App.

Both have their own advantages  and difficulties, but the approach with the dynamic creation and deletion of threads ended up looking easier, and that's what Triceps has. The second approach is not particularly well supported yet. You can create multiple Apps in the program, and you can connect them by making two Triceps Trieads run in the same OS thread and ferry the data around. But it's extremely cumbersome. This will be improved in the future, but for now the first approach is the ticket.

The dynamically created threads are grouped into the fragments. This is done by specifying the fragment name option when creating a thread. The threads in a fragment have a few special properties.

One, it's possible to shut down the whole fragment in one fell swoop. There is no user-accessible way to shut down the individual threads, you can shut down either the whole App or a fragment. Shutting down individual threads is dangerous, since it can mess up the application in many non-obvious ways. But shutting down a fragment is OK, since the fragment serves a single logical function, such as service one TCP connection, and it's OK to shut down the whole logical function.

Two, when a thread in the fragment exits, it's really gone, and takes all its nexuses with it. Well, technically, the nexuses continue to exist as long as there are threads connected to them, but no new connections can be created after this point. Since usually the whole fragment will be gone together, and since the nexuses defined by the fragment's thread are normally used only by the other threads of the same fragment, a fragment shutdown cleans up its state like the fragment had never existed. By contrast, when a normal thread exists, the nexuses defined by it stay present and accessible until the App shuts down.

Another, somewhat surprising, challenge is the interaction of the threads and sockets (or file descriptors in general) in Perl.  I've already touched upon it, but there is a lot more.

To show how all this stuff works, I've created an example of a "chat server". It's not really a human-oriented chat, it's more of a machine-oriented publish-subscribe, and specially tilted to work through the running of a socket server with threads.

In this case the core logic is absolutely empty. All there is of it, is a nexus that passes messages through it. The clients read from this nexus to get the messages, and write to this nexus to send the messages.

When the App starts, it has only one thread, the listener thread that listens on a socket for the incoming connections. The listener doesn't even care about the common nexus and doesn't import it. When a connection comes in, the listener creates two threads to serve it: the reader reads the socket and sends to the nexus, and the writer receives from the nexus and writes to the socket. These two threads constitute a fragment for this client. They also create their own private nexus, allowing the reader to send control messages to the writer. That could also have been done through the central common nexus, but I wanted to show that there are different ways of doing things.

With a couple of clients connected, threads and sockets start looking like this:

client1 reader ----> client1 control nexus ------> client1 writer
         \                                           /
          ----------->\          /------------------>
                       chat nexus
          ----------->/          \------------------>
         /                                           \
client2 reader ----> client2 control nexus ------> client2 writer

And the listener thread still stays on the side.

Tuesday, April 23, 2013

Perl, threads and sockets

I've started writing an example that does a TCP server with threads and found that Perl doesn't allow to pass the file descriptors between the threads. Well, you sort of can pass them as arguments to another thread but then it ends up printing the error messages like these and corrupting the reference counts:

Unbalanced string table refcount: (1) for "GEN1" during global destruction.
Unbalanced string table refcount: (1) for "/usr/lib/perl5/5.10.0/Symbol.pm" during global destruction.
Scalars leaked: 1

If you try to pass a file descriptor through trheads::shared, it honestly won't allow you, while the thread arguments pretend that they can and then fail.

So, to get around this issue I've added the file descriptor passing service to Triceps::App. The easy way to pass a socket around is:

# in one thread
$app->storeFile('name', $socket);
close($socket);
# or both in one call
$app->storeCloseFile('name', $socket);

# in another thread
my $socket = $app->loadDupSocket('name', 'r+');
$app->closeFd('name');

The file descriptor gets extracted from the socket file handle, and its dup is stored in the App. Pick a unique name by which you can get it back. Then you can get it back and create an IO::Socket::INET object with it. When you get it back, it gets dupped again, and you can get it back multiple times, from the same or different threads. Then to avoid leaking sockets, you tell the App to close the file descriptor and forget it.

A limitation is that you can't really share the same descriptor between two thread, Perl is not happy about that. So you have to dup it and have two separate file descriptors for the same socket. You still need to be careful about writing to the same socket from two threads, the best idea is to write from one thread only (and the same applies to reading, though it might be the same or different thread as the writing one).

The other ways to create the file handle objects are:

my $fd = $app->loadDupFile('name', 'r+');

Gets the basic IO::Handle.

my $fd = $app-> loadDupFileClass('name', 'r+', $className);

Get a file handle of any named subclass of the IO::Handle, for example "IO::Socket::UNIX".

There is also the API for the raw file descriptors, that you can get with fileno():

$app->storeFd('name', $fd);

Store a dup of file descriptor. Same thing as the file handle, the original descriptor stays open and needs to be closed separately.

my $fd = $app->loadDupFd('name');

Get back a dup of the stored file descriptor. You can use it with IO::Handle or to open a named file descriptor in an old-fashioned way:

open(MYFILE, '+<&=' . $app->loadDupFd('name'));


Note the "=" in the opening mode, which tells open() to directly adopt the descriptor from the argument and not do another dup of it.


my $fd = $app->loadFd('name');

Get back the original file descriptor.  This is dangerous because now you'd have the same file descriptor in two places: in your thread and in the App. There are two ways out of this situation. One is to have it dupped on open:

open(MYFILE, '+<&' . $app->loadFd('name'));


Note, no "=" in this snippet's open mode, unlike the previous one.


The other way is to open without dupping and tell the App to simply forget this descriptor:


open(MYFILE, '+<&=' . $app->loadFd('name'));
$app->forgetFd('name');

In general, it's safer and easier stay with the dupping, and file the file handle interface instead of messing with the file descriptors.

P.S. The names of the load methods used to be different, I change dthem to be shorter, and added storeCloseFile().

Friday, April 19, 2013

Object passing between threads, and Perl code snippets

A limitation of the Perl threads is that no variables can be shared between them. When a new thread gets created, it gets a copy of all the variables of the parent. Well, of all the plain Perl variables. With the XS extensions your luck may vary: the variables might get copied, might become undef, or just become broken (if the XS module is not threads-aware). Copying the XS variables requires a quite high overhead at all the other times, so Triceps doesn't do it and all the Triceps object become undefined in the new thread.

However there is a way to pass around certain objects through the Nexuses.

First, obviously, the Nexuses are intended to pass through the Rowops. These Rowops coming out of a nexus are not the same Rowop objects that went in. Rowop is a single-threaded object and can not be shared by two threads. Instead it gets converted to an internal form while in the nexus, and gets re-created, pointing to the same Row object and to the correct Label in the local facet.

Then, again obviously, the Facets get imported through the Nexus, together with their row types.

And two more types of objects can be exported through a Nexus: the RowTypes and TableTypes. They get exported through the options as in this example:

$fa = $owner->makeNexus(
    name => "nx1",
    labels => [
        one => $rt1,
        two => $lb,
    ], 
    rowTypes => [
        one => $rt2,
        two => $rt1,
    ], 
    tableTypes => [
        one => $tt1,
        two => $tt2,
    ], 
    import => "writer",
); 

As you can see, the namespaces for the labels, row types and table types are completely independent, and the same names can be reused in each of them for different meaning. All the three sections are optional, so if you want, you can order only the types in the nexus, without any labels.

They can then be extracted from the imported facet as:

$rt1 = $fa->impRowType("one");
$tt1 = $fa->impTableType("one");


Or the whole set of name-value pairs can be obtained with:


@rtset = $fa->impRowTypesHash();
@ttset = $fa->impTableTypesHash();


The exact table types and row types (by themselves or in the table types or labels) in the importing thread will be copied. It's technically possible to share the references to the same row type in the C++ code but it's more efficient to make a separate copy for each thread, and thus the Perl API goes along the more efficient way.

The import is smart in the sense that it preserves the sameness of the row types: if in the exporting thread the same row type was referred from multiple places in the labels, row types and table types sections, in the imported facet that would again be the same row type (even though of course not the one that has been exported but its copy). This again helps with the efficiency when various objects decide if the rows created by this and that type are compatible.

This is all well until you want to export a table type that has an index with a Perl sort condition in it, or an aggregator with the Perl code. The Perl code objects are tricky: they get copied OK when a new thread is created but the attempts to import them through a nexus later causes a terrible memory corruption. So Triceps doesn't allow to export the table types with the function references in it. But it provides an alternative solution: the code snippets can be specified as the source code. It gets compiled when the table type gets initialized. When a table type gets imported through a nexus, it brings the source code with it. The imported table types are always uninitialized, so at initialization time the source code gets compiled in the new thread and works.

It all works transparently: just specify a string instead of a function reference when creating the index, and it will be recognized and processed. For example:

$it= Triceps::IndexType->newPerlSorted("b_c", undef, '
    my $res = ($_[0]->get("b") <=> $_[1]->get("b")
        || $_[0]->get("c") <=> $_[1]->get("c"));
    return $res;
    '
);

Before the code gets compiled, it gets wrapped into a 'sub { ... }', so don't write your own sub in the code string, that would be an error.

There is also the issue of arguments that can be specified for these functions. Triceps is now smart enough to handle the arguments that are one of:

  • undef
  • integer
  • floating-point
  • string
  • Triceps::RowType object
  • Triceps::Row object
  • reference to an array or hash thereof

It converts the data to an internal C++ representation in the nexus and then converts it back on import. So, if a TableType has all the code in it in the source form, and the arguments for this code within the limits of this format, it can be exported through the nexus. Otherwise an attempt to export it will fail.

I've modified the SimpleOrderedIndex to use the source code format, and it will pass through the nexuses as well.

The Aggregators have a similar problem, and I'm working on converting them to the source code format too.

A little more about the differences between the code references and the source code format:

When you compile a function, it carries with it the lexical context. So you can make the closures that refer to the "my" variables in their lexical scope. With the source code you can't do this. The table type compiles them at initialization time in the context of the main package, and that's all they can see. Remember also that the global variables are not shared between the threads, so if you refer to a global variable in the code snippet and rely on a value in that variable, it won't be present in the other threads (unless the other threads are direct descendants and the value was set before their creation).

While working with the custom sorted indexes, I've also fixed the way the errors are reported in their Perl handlers. The errors used to be just printed on stderr. Now they propagate properly through the table, and the table operations die with the Per handler's error message. Since an error in the sorting function means that things are going very, very wrong, after that the table becomes inoperative and will die on all the subsequent operations as well.

Thursday, April 11, 2013

Multithreaded pipeline, part 4

Let's look at the aggregation by the hour. First, the short version that skips over the actual logic and concentrates on how the nexuses are connected.

sub RawToHourlyMain # (@opts)
{
  my $opts = {};
  Triceps::Opt::parse("traffic main", $opts, {
    @Triceps::Triead::opts,
    from => [ undef, \&Triceps::Opt::ck_mandatory ],
  }, @_);
  my $owner = $opts->{owner};
  my $unit = $owner->unit();

  my $faIn = $owner->importNexus(
    from => $opts->{from},
    as => "input",
    import => "reader",
  );

  # ... create the table and aggregation ...

  my $faOut = $owner->makeNexus(
    name => "data",
    labels => [
      $faIn->getFnReturn()->getLabelHash(),
      hourly => $lbHourlyFiltered,
    ],
    import => "writer",
  );

  # ... connect the input nexus to the table ...
  # ... create the table dump logic ...

  $owner->readyReady();
  $owner->mainLoop(); # all driven by the reader
}

This function inherits the options from Triead::start() as usual and adds the option from of its own. This option's value is then used as the name of nexus to import for reading. The row types of the labels from that imported facet are then used to create the table and aggregation. But they aren't connected to the input labels yet.

First, the output nexus is created. The creation passes through all the incoming data, short-circuiting the input and output, and adds the extra label for the aggregated output. After that the rest of the logic can be connected to the inputs (and to the outputs too).

The reason why this connection order is important is that the labels get caller in the order they are chained from the input label. And when this thread reacts to some event, we want the original event to pass through to the output first and then send the reaction to it.

And after that it's all usual readyReady() and mainLoop().

The full text of the function follows. The logic is based on the previous example from the chapter 13, and the only big change is the use of SimpleAggergator instead of a manually-built one. The HourlyToDailyMain() is very similar, so I won't even show it, you can find the full text in SVN.

# compute an hour-rounded timestamp (in microseconds)
sub hourStamp # (time)
{
  return $_[0]  - ($_[0] % (1000*1000*3600));
}

sub RawToHourlyMain # (@opts)
{
  my $opts = {};
  Triceps::Opt::parse("traffic main", $opts, {
    @Triceps::Triead::opts,
    from => [ undef, \&Triceps::Opt::ck_mandatory ],
  }, @_);
  my $owner = $opts->{owner};
  my $unit = $owner->unit();

  # The current hour stamp that keeps being updated;
  # any aggregated data will be propagated when it is in the
  # current hour (to avoid the propagation of the aggregator clearing).
  my $currentHour;

  my $faIn = $owner->importNexus(
    from => $opts->{from},
    as => "input",
    import => "reader",
  );

  # the full stats for the recent time
  my $ttPackets = Triceps::TableType->new($faIn->getLabel("packet")->getRowType())
    ->addSubIndex("byHour",
      Triceps::IndexType->newPerlSorted("byHour", undef, sub {
        return &hourStamp($_[0]->get("time")) <=> &hourStamp($_[1]->get("time"));
      })
      ->addSubIndex("byIP",
        Triceps::IndexType->newHashed(key => [ "local_ip", "remote_ip" ])
        ->addSubIndex("group",
          Triceps::IndexType->newFifo()
        )
      )
    )
  or confess "$!";

  # type for a periodic summary, used for hourly, daily etc. updates
  my $rtSummary;
  Triceps::SimpleAggregator::make(
    tabType => $ttPackets,
    name => "hourly",
    idxPath => [ "byHour", "byIP", "group" ],
    result => [
      # time period's (here hour's) start timestamp, microseconds
      time => "int64", "last", sub {&hourStamp($_[0]->get("time"));},
      local_ip => "string", "last", sub {$_[0]->get("local_ip");},
      remote_ip => "string", "last", sub {$_[0]->get("remote_ip");},
      # bytes sent in a time period, here an hour
      bytes => "int64", "sum", sub {$_[0]->get("bytes");},
    ],
    saveRowTypeTo => \$rtSummary,
  );

  $ttPackets->initialize() or confess "$!";
  my $tPackets = $unit->makeTable($ttPackets,
    &Triceps::EM_CALL, "tPackets") or confess "$!";

  # Filter the aggregator output to match the current hour.
  my $lbHourlyFiltered = $unit->makeDummyLabel($rtSummary, "hourlyFiltered");
  $tPackets->getAggregatorLabel("hourly")->makeChained("hourlyFilter", undef, sub {
    if ($_[1]->getRow()->get("time") == $currentHour) {
      $unit->call($lbHourlyFiltered->adopt($_[1]));
    }
  });

  # It's important to connect the pass-through data first,
  # before chaining anything to the labels of the faIn, to
  # make sure that any requests and raw inputs get through before
  # our reactions to them.
  my $faOut = $owner->makeNexus(
    name => "data",
    labels => [
      $faIn->getFnReturn()->getLabelHash(),
      hourly => $lbHourlyFiltered,
    ],
    import => "writer",
  );

  my $lbPrint = $faOut->getLabel("print");

  # update the notion of the current hour before the table
  $faIn->getLabel("packet")->makeChained("processPackets", undef, sub {
    my $row = $_[1]->getRow();
    $currentHour = &hourStamp($row->get("time"));
    # skip the timestamp updates without data
    if (defined $row->get("bytes")) {
      $unit->call($tPackets->getInputLabel()->adopt($_[1]));
    }
  });

  # the dump request processing
  $tPackets->getDumpLabel()->makeChained("printDump", undef, sub {
    $unit->makeArrayCall($lbPrint, "OP_INSERT", $_[1]->getRow()->printP() . "\n");
  });
  $faIn->getLabel("dumprq")->makeChained("dump", undef, sub {
    if ($_[1]->getRow()->get("what") eq "packets") {
      $tPackets->dumpAll();
    }
  });

  $owner->readyReady();
  $owner->mainLoop(); # all driven by the reader
}

Multithreaded pipeline, part 3

The rest of this example might be easier to understand by looking at an example of a run first. The lines starting with a "!" are the copies of the input lines that ReaderMain() sends and PrintMain() faithfully prints.

input.packet are the rows that reach the PrintMain on the "print" label (remember, "input" is the name with which it imports its input nexus). input.hourly is the data aggregated by the hour intervals (and also by the IP addresses, dropping the port information), and input.daily further aggregates it per day (and again per the IP addresses). The timestamps in the hourly and daily rows are rounded down to the start of the hour or day.

And the lines without any prefixes are the dumps of the table contents that again reach the PrintMain() through the "print" label:

! new,OP_INSERT,1330886011000000,1.2.3.4,5.6.7.8,2000,80,100
input.packet OP_INSERT time="1330886011000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="100"
input.hourly OP_INSERT time="1330884000000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="100"
input.daily OP_INSERT time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="100"
! new,OP_INSERT,1330886012000000,1.2.3.4,5.6.7.8,2000,80,50
input.packet OP_INSERT time="1330886012000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="50"
input.hourly OP_DELETE time="1330884000000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="100"
input.daily OP_DELETE time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="100"
input.hourly OP_INSERT time="1330884000000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
input.daily OP_INSERT time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
! new,OP_INSERT,1330889612000000,1.2.3.4,5.6.7.8,2000,80,150
input.packet OP_INSERT time="1330889612000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="150"
input.hourly OP_INSERT time="1330887600000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
input.daily OP_DELETE time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
input.daily OP_INSERT time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="300"
! new,OP_INSERT,1330889811000000,1.2.3.4,5.6.7.8,2000,80,300
input.packet OP_INSERT time="1330889811000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="300"
input.hourly OP_DELETE time="1330887600000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
input.daily OP_DELETE time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="300"
input.daily OP_INSERT time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
input.hourly OP_INSERT time="1330887600000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="450"
input.daily OP_DELETE time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
input.daily OP_INSERT time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="600"
! new,OP_INSERT,1330972411000000,1.2.3.5,5.6.7.9,3000,80,200
input.packet OP_INSERT time="1330972411000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" local_port="3000" remote_port="80" bytes="200"
input.hourly OP_INSERT time="1330970400000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" bytes="200"
input.daily OP_INSERT time="1330905600000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" bytes="200"
! new,OP_INSERT,1331058811000000
input.packet OP_INSERT time="1331058811000000"
! new,OP_INSERT,1331145211000000
input.packet OP_INSERT time="1331145211000000"
! dump,packets
time="1330886011000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="100"
time="1330886012000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="50"
time="1330889612000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="150"
time="1330889811000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="300"
time="1330972411000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" local_port="3000" remote_port="80" bytes="200"
! dump,hourly
time="1330884000000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150"
time="1330887600000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="450"
time="1330970400000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" bytes="200"
! dump,daily
time="1330819200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="600"
time="1330905600000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" bytes="200"

Note that the order of the lines is completely nice and predictable, nothing goes out of order. Each nexus preserves the order of the rows put into it, and the fact that there is only one writer per nexus and that every thread is fed from only one nexus, avoids the races.

Multithreaded pipeline, part 2

The reader thread drives the pipeline:

sub ReaderMain # (@opts)
{
  my $opts = {};
  Triceps::Opt::parse("traffic main", $opts, {@Triceps::Triead::opts}, @_);
  my $owner = $opts->{owner};
  my $unit = $owner->unit();

  my $rtPacket = Triceps::RowType->new(
    time => "int64", # packet's timestamp, microseconds
    local_ip => "string", # string to make easier to read
    remote_ip => "string", # string to make easier to read
    local_port => "int32",
    remote_port => "int32",
    bytes => "int32", # size of the packet
  ) or confess "$!";

  my $rtPrint = Triceps::RowType->new(
    text => "string", # the text to print (including \n)
  ) or confess "$!";

  my $rtDumprq = Triceps::RowType->new(
    what => "string", # identifies, what to dump
  ) or confess "$!";

  my $faOut = $owner->makeNexus(
    name => "data",
    labels => [
      packet => $rtPacket,
      print => $rtPrint,
      dumprq => $rtDumprq,
    ],
    import => "writer",
  );

  my $lbPacket = $faOut->getLabel("packet");
  my $lbPrint = $faOut->getLabel("print");
  my $lbDumprq = $faOut->getLabel("dumprq");

  $owner->readyReady();

  while(<STDIN>) {
    chomp;
    # print the input line, as a debugging exercise
    $unit->makeArrayCall($lbPrint, "OP_INSERT", "! $_\n");

    my @data = split(/,/); # starts with a command, then string opcode
    my $type = shift @data;
    if ($type eq "new") {
      $unit->makeArrayCall($lbPacket, @data);
    } elsif ($type eq "dump") {
      $unit->makeArrayCall($lbDumprq, "OP_INSERT", $data[0]);
    } else {
      $unit->makeArrayCall($lbPrint, "OP_INSERT", "Unknown command '$type'\n");
    }
    $owner->flushWriters();
  }

  {
    # drain the pipeline before shutting down
    my $ad = Triceps::AutoDrain::makeShared($owner);
    $owner->app()->shutdown();
  }
}

It starts by creating the nexus with the initial set of the labels: for the data about the network packets, for the lines to be printed at the end of the pipeline and for the dump requests to the tables in the other threads. It gets exported for the other threads to import, and also imported right back into this thread, for writing. And then the setup is done, readyReady() is called, and the processing starts.

It reads the CSV lines, splits them, makes a decision if it's a data line or dump request, and one way or the other sends it into the nexus. The data sent to a facet doesn't get immediately forwarded to the nexus. It's collected internally in a tray, and then flushWriters() sends it on. The mainLoop() shown in the last post calls flushWriters() automatically after every tray it processes from the input. But when reading from a file you've got to do it yourself. Of course, it's more efficient to send through multiple rows at once, so a smarter implementation would check if multiple lines are available from the file and send them in larger bundles.

The last part is the shutdown. After the end of file is reached, it's time to shut down the application. You can't just shut down it right away because there still might be data in the pipeline, and if you shut it down, that data will be lost. The right way is to drain the pipeline first, and then do the shutdown when the app is drained. AutoDrain::makeShared() creates a scoped drain: the drain request for all the threads is started when this object is created, and the object construction completes when the drain succeeds. When the object is destroyed, that lifts the drain. So in this case the drain succeeds and then the app gets shut down.

The shutdown causes the mainLoop() calls in all the other threads to return, and the threads to exit. Then startHere() in the first thread has the special logic in it that joins all the started threads after its own main function returns and before it completes. After that the script continues on its way and is free to exit.

Multithreaded pipeline, part 1

The Perl API for the threads is pretty much done now, time for the examples.

The first one re-does a variation of already shown example, the traffic data aggregation from the Chapter 13. The short recap is that it gets the data for each network packet going through and keeps it for a some time, aggregates the data by the hour and keeps it for a longer time, and aggregates it by the day and keeps for a longer time yet. This multi-stage computation naturally matches the pipeline approach.

Since this new example highlights different features than the one in chapter 13, I've changed it logic a little too: it updates both the hourly and daily summaries on every packet received. And I didn't bother to implement the part with the automatic cleaning of the old data, it doesn't add anything to the pipeline works.

The pipeline topologies are quite convenient for working with the threads. The parallel computations create a possibility of things happening in an unpredictable order and producing unpredictable results. The pipeline topology allows the parallelism and at the same time also keeps the data in the same predictable order, with no possibility of rows overtaking each other.

The computation can be split into the following threads:

  • read the input, convert and send the data into the model
  • store the recent data and aggregate it by the hour
  • store the hourly data and aggregate it by the day
  • store the daily data
  • get the data at the end of the pipeline and print it

Technically, each next stage only needs the data from the previous stage, but to get the updates to the printing stage, they all go all the way through.

Dumping the contents of the tables also requires some special support. Each table is local to its thread and can't be access from the other threads. To dump its contents, the dump request needs to be sent to its thread, which would extract the data and send it through. There are multiple ways to deal with the dump results. One is to have a special label for each table's dump and propagate it to the last stage to print. If all that is needed is text, one label that allows to send strings is good enough, all the dumps can send the data converted to text into it, and it would go all the way through the pipeline. For this example I've picked the last approach.

And now is time to show some code. The main part goes like this:

Triceps::Triead::startHere(
  app => "traffic",
  thread => "print",
  main => \&PrintMain,
);


The startHere() creates an App and starts a Triead in the current OS thread. "traffic" is the app name, "print" the thread name. This thread will be the end of the pipeline, and it will create the rest of the threads. This is a convenient pattern when the results of the model need to be fed back to the current thread, and it works out very conveniently for the unit tests. PrintMain() is the body function of this printing thread:

sub PrintMain # (@opts)
{
  my $opts = {};
  Triceps::Opt::parse("traffic main", $opts, {@Triceps::Triead::opts}, @_);
  my $owner = $opts->{owner};
  my $unit = $owner->unit();

  Triceps::Triead::start(
    app => $opts->{app},
    thread => "read",
    main => \&ReaderMain,
  );
  Triceps::Triead::start(
    app => $opts->{app},
    thread => "raw_hour",
    main => \&RawToHourlyMain,
    from => "read/data",
  );
  Triceps::Triead::start(
    app => $opts->{app},
    thread => "hour_day",
    main => \&HourlyToDailyMain,
    from => "raw_hour/data",
  );
  Triceps::Triead::start(
    app => $opts->{app},
    thread => "day",
    main => \&StoreDailyMain,
    from => "hour_day/data",
  );

  my $faIn = $owner->importNexus(
    from => "day/data",
    as => "input",
    import => "reader",
  );

  $faIn->getLabel("print")->makeChained("print", undef, sub {
    print($_[1]->getRow()->get("text"));
  });
  for my $tag ("packet", "hourly", "daily") {
    makePrintLabel($tag, $faIn->getLabel($tag));
  }

  $owner->readyReady();
  $owner->mainLoop(); # all driven by the reader
}

startHere() accepts a number of fixed options plus arbitrary options that itself doesn't care about but passes to the thread's main function, which are then the responsibility of the main function to parse. To reiterate, the main function gets all the options from the call of startHere(), both these that startHere() parses and these that it simply passes through. startHere() also adds one more option on its own: owner containing the TrieadOwner object that the thread uses to communicate with the rest of the App.

In this case PrintMain() doesn't have any extra options on its own, it's just happy to get startHere()'s standard set that it takes all together from @Triceps::Triead::opts.

It gets the TrieadOwner object $owner from the option appended by startHere(). Each TrieadOwner is created with its own Unit, so the unit is obtained from it to create the thread's model in it. Incidentally, the TrieadOwner  also acts as a clearing trigger object for the Unit, so when the TrieadOwner is destroyed, it properly clears the Unit.

Then it goes and creates all the threads of the pipeline. The start() works very much like startHere(), only it actually creates a new thread and starts the main function in it. The main function can be the same whether it runs through start() or startHere(). The special catch is that the options to start() must contain only the plain Perl values, not Triceps objects. It has to do with how Perl works with threads: it makes a copy of every value for the new thread, and it cant's copy the XS objects, so they simply become undefined in the new thread.

All but the first thread in the pipeline have the extra option from: it tells the input nexus for this thread, and each thread creates an output nexus "data". As mentioned before, the nexus namespaces are per thread that created it, so when the option from says "read/data", it's the nexus "data" created by the thread "read".

So, the pipeline gets all connected sequentially until eventually PrintMain() imports the nexus at its tail. importNexus() returns a facet, which is the thread's API to the nexus. A facet looks very much like an FnReturn for most purposes, with a few additions. It even has a real FnReturn in it, and you work with the labels of that FnReturn. The option as of importNexus gives the name to the facet and to its same-named FnReturn (without it the facet would be named the same as the short name of the nexus, in this case "data"). The option import tells whether this thread will be reading or writing to the nexus, and in this case it's reading.

By the time the pipeline gets to the last stage, it connects a few label:

  • print - carries the direct text lines to print in its field "text", and its contents gets printed
  • dumprq - carries the dump requests to the tables, so the printing thread doesn't care about it
  • packet - carries the raw data about the packets
  • hourly - carries the hourly summaries
  • daily - carries the daily summaries

The last three get also printed but this time as whole rows.

And after everything is connected, the thread both tells that it's ready and waits for all the other threads to be ready by calling readyReady(). Then its the run time, and mainLoop() takes care of it: until it's told to shutdown, it keeps reading data from the nexus and processes it. The shutdown will be controlled by the file reading thread at the start of the pipeline.The processing is done by getting the rowops from the nexus and calling them on the appropriate label in the facet, which then calls the the labels chained from it, and that gets all the rest of the thread's model running.

Monday, April 8, 2013

file descriptor revocation

In my book "The practice of parallel programming" I went through a few ways to interrupt a thread that is sleeping trying to read a file descriptor. And as it shows, the system call dup2() copying a descriptor of /dev/null over the target file descriptor is the best solution. Only it doesn't always work. I've thought that on Linux it works reliably but now I've found that it works on the sockets but not on the pipes, and even with sockets the accept() seems to ignore it.

Well, I think I've found a better solution: use a dup2() but then also send a signal (SIGUSR1 or SIGUSR2) to the target thread (of course, this requires some handler registered for this signal, at least a dummy one, to avoid killing the processs). Even if dup2() gets ignored by the current system call, the signal will get through and either return EINTR to make the user code retry or cause a system call restart in the OS. In either case the new file descriptor copied by dup2() will be discovered on the next attempt and cause the desired interruption. And unlike the signal used by itself, dup2() closes the race window around the signal.

Thursday, April 4, 2013

the Triead lifecycle

Each Triead goes through a few stages in its life:
  • declared
  • defined
  • constructed
  • ready
  • waited ready
  • requested dead
  • dead

Note by the way that it's the stages of the Triead object, the OS-level thread as such doesn't know much about them, even though these stages do have some connections to its state.

These stages always go in order and can not be skipped. However for convenience you can move directly to a further stage, this will just automatically pass through all the intermediate stages. Although, well, there is one exception: the "waited ready" and "requested dead" stages can get skipped on the way to "dead". Other than that, there is always the sequence, so if you find out that a Triead is dead, you can be sure that it's also declared, defined, constructed and ready. The attempts to go to a previous stage are silently ignored.

Now, what do these stages mean?

Declared: The App knows the name of the thread and that this thread will eventually exist. When an App is asked to find the resources from this thread (such as Nexuses, and by the way, the nexuses are associated with the threads that created them) it will know to wait until this thread becomes constructed, and then look for the resources. It closes an important race condition: the code that defines the Triead normally runs in a new OS thread but there is no way to tell when exactly will it run and do its work. If you spawned a new thread and then attempted to get a nexus from it before it actually runs, the App would tell you that there is no such thread and fail. To get around it, you declare the thread first and then start it. Most of the time there is no need to declare explicitly, the library code that wraps the thread creation does it for you.

Defined: The Triead object has been created and connected to the App. Since this is normally done from the new OS thread, it also implies that the thread is running and is busy about constructing the nexuses and whatever its own internal resources.

Constructed: The Triead had constructed and exported all the nexuses that it planned to. This means that now these nexuses can be imported by the other threads (i.e. connected to the other threads). After this point the thread can not construct any more nexuses. However it can keep importing the nexuses from the other threads. It's actually a good idea to do all your exports, mark the thread constructed, and only then start importing. This order guarantees the absence of deadlocks (which would be detected and will cause the App to be aborted). There are some special cases when you need to import a nexus from a thread that is not fully constructed yet, and it's possible, but requires more attention and a special override. I'll talk about it in more detail later.

Ready: The thread had imported all the nexuses it wanted and fully initialized all its internals (for example, if it needs to load data from a file, it would do that before telling that it's ready). After this point no more nexuses can be imported. A fine point is that the other threads may still be created, and they may do their exporting and importing, but once a thread is marked as ready, it's cast in bronze. And in the simple cases you don't need to worry about separating the constructed and ready stages, just initialize everything and mark the thread as ready.

Waited ready: Before proceeding further, the thread has to wait for all the threads in App to be ready, or it would lose data when it tries to communicate with them. It's essentially a barrier. Normally both the stages "ready" and "waited ready" are advanced to with a single call readyReady(), the thread says "I'm ready, and let me continue when everyone is ready". After that the actual work can begin. It's still possible to create more threads after that (normally parts of the transient fragments), and until they all become ready, the App may temporarily become unready again, but that's a whole separate advanced topic.

Requested dead: This is a way to request a thread to exit. Normally some control thread will decide that the App needs to exit and will request all its threads to die. The threads will get these requests, perform their last rites and exit. The threads don't have to get this request to exit, that can also always decide to exit on their own. When a thread is requested to die, all the data communication with it stops. No more data will get to it through the nexuses and any data it sends will be discarded. It might churn a little bit through the data in its input buffers but any results produced will be discarded. The good practice is to make sure that all the data is drained before requesting a thread to die. Note that the nexuses created by this thread aren't affected at all, they keep working as usual. It's the data connections between this thread and any nexuses that get broken.

Dead: The thread had completed its execution and exited. Normally you don't need to mark this explicitly. When the thread's main function exits, the library will do it for you. Marking the thread dead also drives the harvesting of the OS threads: the harvesting logic will perform a join() (not to be confused with SQL join) of the thread and thus free the OS resources. The dead Trieads are still visible in the App (except for some special cases with the fragments), and their nexuses continue working as usual (even including the special cases with the fragments), the other threads can keep communicating through them for as long as they want.