Tuesday, April 30, 2013

ThreadedServer, part 2

The next part of the ThreadedServer is listen(), the function that gets called from the listener thread and takes care of accepting the connections and spawning the per-client threads.

sub listen # ($optName => $optValue, ...)
{
    my $myname = "Triceps::X::ThreadedServer::listen";
    my $opts = {};
    my @myOpts = (
        owner => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "Triceps::TrieadOwner") } ],
        socket => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "IO::Socket") } ],
        prefix => [ undef, \&Triceps::Opt::ck_mandatory ],
        handler => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "CODE") } ],
        pass => [ undef, sub { &Triceps::Opt::ck_ref(@_, "ARRAY") } ],
    );
    &Triceps::Opt::parse($myname, $opts, {
        @myOpts,
        '*' => [],
    }, @_);
    my $owner = $opts->{owner};
    my $app = $owner->app();
    my $prefix = $opts->{prefix};
    my $sock = $opts->{socket};

The first part is traditional, the only thing to note is that it also saves the list of options in a variable for the future use (startServer() did that too but I forgot to mention it). And the option "*" is the pass-through: it gets all the unknown options collected in $opts->{"*"}.

    my $clid = 0; # client id

    while(!$owner->isRqDead()) {
        my $client = $sock->accept();
        if (!defined $client) {
            my $err = "$!"; # or th etext message will be reset by isRqDead()
            if ($owner->isRqDead()) {
                last;
            } elsif($!{EAGAIN} || $!{EINTR}) { # numeric codes don't get reset
                next;
            } else {
                confess "$myname: accept failed: $err";
            }
        }

The accept loop starts. It runs until the listening socket gets revoked by shutdown, the revocation is done by dup2() of a file descriptor from /dev/null over it. Note that this function itself doesn't request to track the file descriptor for revocation. That's the caller's responsibility.

Oh, and by the way, if you've been tracing the possible ways of thread execution closely, you might be wondering: what if the shutdown happens after the socket is opened but before it is tracked? This can't really happen to the listening socket but can happen with the accepted client sockets. The answer is that the tracking enrollment will check whether the shutdown already happened, and if so, it will revoke the socket right away, before returning. So the reading loop will find the socket revoked right on its first iteration.

The revocation also sends the signal SIGUSR2 to the thread. This is done because the calls like accept() do not return on a simple revocation by dup2() but a signal does interrupt them. Triceps registers an empty handler for SIGUSR2 that just immediately returns, but the accept() system call gets interrupted, and the thread gets a chance to check for shutdown, and even if it calls accept() again, this will return an error and force it check for shutdown again.

And by the way, the Perl's threads::kill doesn't send a real signal, it just sets a flag for the interpreter. If you try it on your own, it won't interrupt the system calls, and now you know why. Instead Triceps gets the POSIX thread identity from the Perl thread and calls the honest ptherad_kill() from the C++ code.

So, the main loop goes by the shutdown condition, isRdDead() tells if this thread has been requested to die. After accept(), it checks for errors. The first thing to check for is again the isRdDead(), because the revocation will manifest as a socket operation error, and there is no point in reporting this spurious error. However, like other Triceps calls, isRdDead() will clear the error text, and the text has to be saved first. If the shutdown is found, the loop exits. Then the check for the spurious interruptions is done, and for them the loop continues. Funny enough, the $!{} uses the numeric part of $! that is independent from its text part and doesn't get cleared by the Triceps calls. And on any other errors the thread confesses. This will unroll the stack, eventually get caught by the Triceps threading code, abort the App, and propagate the error message to the harvester.

        $clid++;
        my $cliname = "$prefix$clid";
        $app->storeCloseFile($cliname, $client);

        Triceps::Triead::start(
            app => $app->getName(),
            thread => $cliname,
            fragment => $cliname,
            main => $opts->{handler},
            socketName => $cliname,
            &Triceps::Opt::drop({ @myOpts }, \@_),
        );

        # Doesn't wait for the new thread(s) to become ready.
    }
}

If a proper connection has been received, the socket gets stored into the App with a unique name, for later load by the per-client thread. And then the per-client thread gets started.

The drop passes through all the original options except for the ones handled by this thread. In retrospect, this is not the best solution for this case. It would be better to just use @{$opts->{"*"}} instead. The drop call is convenient when not all the explicitly recognized options but only a part of them has to be dropped.

After starting the thread, the loop doesn't call readyReady() but goes for the next iteration. This is basically because it doesn't care about the started thread and doesn't ever send anything to it. And waiting for the threads to start will make the loop slower, possibly overflowing the socket's listening queue and dropping the incoming connections if they arrive very fast.

And the last part of the ThreadedServer is the printOrShut:

sub printOrShut # ($app, $fragment, $sock, @text)
{
    my $app = shift;
    my $fragment = shift;
    my $sock = shift;

    undef $!;
    print $sock @_;
    $sock->flush();

    if ($!) { # can't write, so shutdown
        Triceps::App::shutdownFragment($app, $fragment);
    }
}

Nothing too complicated. Prints the text into the socket, flushes it and checks for errors. On errors shuts down the fragment. In this case there is no need for draining. After all, the socket leading to the client is dead and there is no way to send anything more through it, so there is no point in worrying about any unsent data. Just shut down as fast as it can, before the threads have generated more data that can't be sent any more. Any data queued in the nexuses for the shut down threads will be discarded.


No comments:

Post a Comment