Monday, April 29, 2013

ThreadedServer, part 1

And now I want to show the internals of the ThreadedServer methods. It shows how to store the socket file handles into the App, how the threads are harvested, and how the connections get accepted.

sub startServer # ($optName => $optValue, ...)
{
    my $myname = "Triceps::X::ThreadedServer::startServer";
    my $opts = {};
    my @myOpts = (
        app => [ undef, \&Triceps::Opt::ck_mandatory ],
        thread => [ "global", undef ],
        main => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "CODE") } ],
        port => [ undef, \&Triceps::Opt::ck_mandatory ],
        socketName => [ undef, undef ],
        fork => [ 1, undef ],
    );
    &Triceps::Opt::parse($myname, $opts, {
        @myOpts,
        '*' => [],
    }, @_);

    if (!defined $opts->{socketName}) {
        $opts->{socketName} = $opts->{thread} . ".listen";
    }

    my $srvsock = IO::Socket::INET->new(
        Proto => "tcp",
        LocalPort => $opts->{port},
        Listen => 10,
    ) or confess "$myname: socket creation failed: $!";
    my $port = $srvsock->sockport() or confess "$myname: sockport failed: $!";

So far it's pretty standard: get the options and open the socket for listening.

    if ($opts->{fork} > 0)  {
        my $pid = fork();
        confess "$myname: fork failed: $!" unless defined $pid;
        if ($pid) {
            # parent
            $srvsock->close();
            return ($port, $pid);
        }
        # for the child, fall through
    }

This handles the process forking option: if forking is requested, it executes and then the parent process returns the PID while the child process continues with the rest of the logic. By the way, your success with forking a process that has multiple running threads may vary. The resulting process usually has one running thread (continuing where the thread start called fork() was) but the synchronization primitives in the new process can be inherited in any state, so the attempts to continue the threaded processing are usually not such a good idea. It's usually best to fork first before there are more threads.

    # make the app explicitly, to put the socket into it first
    my $app = Triceps::App::make($opts->{app});
    $app->storeCloseFile($opts->{socketName}, $srvsock);
    Triceps::Triead::start(
        app => $opts->{app},
        thread => $opts->{thread},
        main => $opts->{main},
        socketName => $opts->{socketName},
        &Triceps::Opt::drop({ @myOpts }, \@_),
    );

Then the App gets created. Previously I was showing starting the App with startHere() that created the App and did a bunch of services implicitly. Here everything will be done manually. The listening socket has to be stored into the app before the listener thread gets started, so that the listener thread can find it.

Triceps keeps a global list of all its Apps in the process, and after an App is created, it's placed into that list and can be found by name from any thread. The App object will exist while there are references to it, including the reference from that global list. On the other hand, it's possible to remove the App from the list while it's still running but that's a bad practice because it will break any attempts from its threads to find it by name.
 
So the App is made, then the file handle gets stored. storeCloseFile() gets the file descriptor from the socket, dups it, stores into the App, and then closes the original file handle. All this monkeying with dupping and closing is needed because there is no way to extract the file descriptor from a Perl file handle without the handle trying to close it afterwards. But in result of this roundabout way, the file descriptor gets transferred into the App.

Then the listener thread is started, and as shown before, it's responsible for starting all the other threads. In the meantime, the startServer() continues.

    my $tharvest;
    if ($opts->{fork} < 0) {
        @_ = (); # prevent the Perl object leaks
        $tharvest = threads->create(sub {
            # In case of errors, the Perl's join() will transmit the error
            # message through.
            Triceps::App::find($_[0])->harvester();
        }, $opts->{app}); # app has to be passed by name
    } else {
        $app->harvester();
    }

Then the harvester logic is started. Each App must have its harvester. startHere() runs the harvester implicitly, unless told otherwise, but here the harvester has to be run manually. It can be run either in this thread or in another thread, as determined by the option "fork". If it says to make another thread, $tharvest will contain that thread's identity. A special thing about starting threads with threads->create() is that it's sensitive to anything in @_. If @_ contains anything, it will be leaked (though the more recent versions of Perl should have it fixed). So @_ gets cleared before starting the thread.

And one way or the other, the harvester is started. What does it do? It joins the App's threads as they exit. After all of them exit, it removes the App from the global list of Apps, which will allow to collect the App's memory when the last reference to it is gone, and then the harvester returns.

If any of the threads die, they cause the App to be aborted. The aborted App shuts down immediately and remembers the identity of the failed thread and its error message (only the first message is saved because the abort is likely to cause the other threads to die too, and there is no point in seeing these derivative messages). The harvester, in turn, collects this message from the App, and after all it cleaning-up work is done, dies propagating this message. Then if the harvester is running not in the first thread, that message will be propagated further by Perl's join().

A catch is that the errors are not reported until the harvester completes. Normally all the App's threads should exit immediately when shut down but if they don't, the program will be stuck without any indication of what happened.

It's also possible to disable this propagation of dying by using the option "die_on_abort":

$app->harvester(die_on_abort => 0);

Then the last part:

    if ($opts->{fork} > 0) {
        exit 0; # the forked child process
    }

    return ($port, $tharvest);
}



 If this was the child process forked before, it exits at this point. Otherwise the port and the harvester's thread object are returned.

No comments:

Post a Comment