Monday, April 29, 2013

Multithreaded socket server, part4, socket reader thread

As I've said before, each socket gets served by two threads: one sits reading from the socket and forwards the data into the model and another one sits getting data from the model and forwards it into the socket. Since the same thread can't wait for both a socket descriptor and a thread synchronization primitive, so they have to be separate.

The first thread started is the socket reader. Let's go through it bit by bit.

sub chatSockReadT
{
    my $opts = {};
    &Triceps::Opt::parse("chatSockReadT", $opts, {@Triceps::Triead::opts,
        socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
    }, @_);
    undef @_; # avoids a leak in threads module
    my $owner = $opts->{owner};
    my $app = $owner->app();
    my $unit = $owner->unit();
    my $tname = $opts->{thread};

    # only dup the socket, the writer thread will consume it
    my ($tsock, $sock) = $owner->trackDupSocket($opts->{socketName}, "<");

The beginning is quite usual. Then it loads the socked from the App and gets it tracked with the TrieadOwner. The difference between trackDupSocket() here and the trackGetSocket() used before is that trackDupSocket() leaves the socked copy in the App, to be found by the writer-side thread.

The socket is reopened in this thread as read-only. The writing to the socket from all the threads has to be synchronized to avoid mixing the half-messages. And the easiest way to synchronize is to always write from one thread, and if the other thread wants to write something, it has to pass the data to the writer thread through the control nexus.

    # user messages will be sent here
    my $faChat = $owner->importNexus(
        from => "global/chat",
        import => "writer",
    );

    # control messages to the reader side will be sent here
    my $faCtl = $owner->makeNexus(
        name => "ctl",
        labels => [
            ctl => $faChat->impRowType("ctl"),
        ],
        reverse => 1, # gives this nexus a high priority
        import => "writer",
    );

Imports the chat nexus and creates the private control nexus for communication with the writer side. The name of the chat nexus is hardcoded hare, since it's pretty much a solid part of the application. If this were a module, the name of the chat nexus could be passed through the options.

The control nexus is marked as reverse even though it really isn't. But the reverse option has a side effect of making this nexus high-priority. Even if the writer thread has a long queue of messages from the chat nexus, the messages from the control nexus will be read first. Which again isn't strictly necessary here, but I wanted to show how it's done.

The type of the control label is imported from the chat nexus, so it doesn't have to be defined from scratch.

    $owner->markConstructed();

    Triceps::Triead::start(
        app => $opts->{app},
        thread => "$tname.rd",
        fragment => $opts->{fragment},
        main => \&chatSockWriteT,
        socketName => $opts->{socketName},
        ctlFrom => "$tname/ctl",
    );

    $owner->readyReady();

Then the construction is done and the writer thread gets started.  And then the thread is ready and waits for the writer thread to be ready too. The readyReady() works in the fragments just as it does at the start of the app. Whenever a new thread is started, the App becomes not ready, and stays this way until all the threads report that they are ready. The rest of the App keeps working like nothing happened, at least sort of. Whenever a nexus is imported, the messages from this nexus start collecting for this thread, and if there are many of them, the nexus will become backed up and the threads writing to them will block. The new threads have to call readyReady() as usual to synchronize between themselves, and then everything gets on its way.

Of course, if two connections are received in a quick succession, that would start two sets of threads, and readyReady() will continue only after all of them are ready.


    my $lbChat = $faChat->getLabel("msg");
    my $lbCtl = $faCtl->getLabel("ctl");

    $unit->makeHashCall($lbCtl, "OP_INSERT", cmd => "print", arg => "!ready," . $opts->{fragment});
    $owner->flushWriters();

A couple of labels get remembered for the future use, and the connection ready message gets sent to the writer thread through the control nexus. By convention of this application, the messages go in the CVS format, with the control messages starting with "!". If this is the first client, this would send

!ready,cliconn1

to the client. It's important to call flushWriters() every time to get the message(s) delivered.

    while(<$sock>) {
        s/[\r\n]+$//;
        my @data = split(/,/);
        if ($data[0] eq "exit") {
            last; # a special case, handle in this thread
        } elsif ($data[0] eq "kill") {
            eval {$app->shutdownFragment($data[1]);};
            if ($@) {
                $unit->makeHashCall($lbCtl, "OP_INSERT", cmd => "print", arg => "!error,$@");
                $owner->flushWriters();
            }
        } elsif ($data[0] eq "shutdown") {
            $unit->makeHashCall($lbChat, "OP_INSERT", topic => "*", msg => "server shutting down");
            $owner->flushWriters();
            Triceps::AutoDrain::makeShared($owner);
            eval {$app->shutdown();};
        } elsif ($data[0] eq "publish") {
            $unit->makeHashCall($lbChat, "OP_INSERT", topic => $data[1], msg => $data[2]);
            $owner->flushWriters();
        } else {
            # this is not something you want to do in a real chat application
            # but it's cute for a demonstration
            $unit->makeHashCall($lbCtl, "OP_INSERT", cmd => $data[0], arg => $data[1]);
            $owner->flushWriters();
        }
    }

The main loop keeps reading lines from the socket and interpreting them. The lines are in CSV format, and the first field is the command and the rest are the arguments (if any).  The commands are:

publish - send a message with a topic to the chat nexus
exit - close the connection
kill - close another connection, by name
shutdown - shut down the server
subscribe - subscribe the client to a topic
unsibscribe - unsubscribe the client from a topic

The exit just exits the loop, since it works the same if the socket just gets closed from the other side.

The kill shuts down by name the fragment where the threads of the other connection belongs. This is a simple application, so it doesn't check any permissions, whether this fragment should allowed to be shut down. If there is no such fragment, the shutdown call will silently do nothing, so the error check and reporting is really redundant (if something goes grossly wrong in the thread interruption code, an error might still occur, but theoretically this should never happen).

The shutdown sends the notification to the common topic "*" (to which all the clients are subscribed by default), then drains the model and shuts it down. The drain makes sure that all the messages in the model get processed (and even written to the socket) without allowing any new messages to be injected. "Shared" means that there is no special exceptions for some threads.

makeShared() actually creates a drain object that keeps the drain active during its lifetime. Here this object is not assigned anywhere, so it gets immediately destroyed and lifts the drain. So potentially more messages can get squeezed in between this point and shutdown. Which doesn't matter a whole lot here.

If it were really important that nothing get sent after the shutdown notification, it could be done like this (this is an untested fragment, so it might contain typos):

        } elsif ($data[0] eq "shutdown") {
            my $drain = Triceps::AutoDrain::makeExclusive($owner);
            $unit->makeHashCall($lbChat, "OP_INSERT", topic => "*", msg => "server shutting down");
            $owner->flushWriters();
            $drain->wait();
            eval {$app->shutdown();};
        }

This starts the drain, but this time the exclusive mode means that this thread is allowed to send more data. When the drain is created, it waits for success, so when the new message is inserted, it will be after all the other messages. $drain->wait() does another wait and makes sure that this last message propagates all the way. And then the app gets shut down, while the drain is still in effect, so no more messages can be sent for sure.

The publish sends the data to the chat nexus (note the flushWriters(), as usual!).

And the rest of commands (that would be subscribe and unsubscribe but you can do any other commands like "print") get simply forwarded to the reader thread for execution. Sending through the commands like this without testing is not a good practice for a real application but it's cute for a demo.

    {
        # let the data drain through
        my $drain = Triceps::AutoDrain::makeExclusive($owner);

        # send the notification - can do it because the drain is excluding itself
        $unit->makeHashCall($lbCtl, "OP_INSERT", cmd => "print", arg => "!exiting");
        $owner->flushWriters();

        $drain->wait(); # wait for the notification to drain

        $app->shutdownFragment($opts->{fragment});
    }

    $tsock->close(); # not strictly necessary
}

The last part is when the connection get closed, either by the "exit" command or when the socket gets closed. Remember, the socket can get closed asymmetrically, in one direction, so even when the reading is closed, the writing may still work and needs to return the responses to any commands received from the socket. And of course the same is true for the "exit" command.

So here the full exclusive drain sequence is used, ending with the shutdown of this thread's own fragment, which will close the socket. Even though only one fragment needs to be shut down, the drain drains the whole model. Because of the potentially complex interdependencies, there is no way to reliably drain only a part, and all the drains are App-wide.

The last part, with $tsock->close(), is not technically necessary since the shutdown of the fragment will get the socket descriptor revoked anyway.  But other than that, it's a good practice that unregisters the socket from the TrieadOwner and then closes it.

No comments:

Post a Comment