Sunday, April 28, 2013

Multithreaded socket server, part 3, the listener thread

The listener thread is:

sub listenerT
{
  my $opts = {};
  &Triceps::Opt::parse("listenerT", $opts, {@Triceps::Triead::opts,
    socketName => [ undef, \&Triceps::Opt::ck_mandatory ],
  }, @_);
  undef @_;
  my $owner = $opts->{owner};

  my ($tsock, $sock) = $owner->trackGetSocket($opts->{socketName}, "+<");

  # a chat text message
  my $rtMsg = Triceps::RowType->new(
    topic => "string",
    msg => "string",
  ) or confess "$!";

  # a control message between the reader and writer threads
  my $rtCtl = Triceps::RowType->new(
    cmd => "string", # the command to execute
    arg => "string", # the command argument
  ) or confess "$!";

  $owner->makeNexus(
    name => "chat",
    labels => [
      msg => $rtMsg,
    ],
    rowTypes => [
      ctl => $rtCtl,
    ],
    import => "none",
  );

  $owner->readyReady();

  Triceps::X::ThreadedServer::listen(
    owner => $owner,
    socket => $sock,
    prefix => "cliconn",
    handler => \&chatSockReadT,
  );
}

It gets the usual options of the thread start (and as usual you can pass more options to the startServer() and they will make their way to the listener thread. But there is also one more option added by startServer(): the socketName.

Since the the socket objects can't be passed directly between the threads, a roundabout way is taken. After startServer() opens a socket, it stores the dupped file descriptor in the App and passes the name used for store through, so that it can be used to load the socket back into another thread:

my ($tsock, $sock) = $owner->trackGetSocket($opts->{socketName}, "+<");


This does multiple things:
  • Loads the file descriptor from the App by name (with a dup()).
  • Opens a socket object from that file descriptor.
  • Registers that file descriptor for tracking with the TrieadOwner, so that if the thread needs to be shut down, that descriptor will be revoked and any further operations with it will fail.
  • Creates a TrackedFile object that will automatically unregister the file descriptor from TrieadOwner when the TrackedFile goes out of scope. This is important to avoid that races between the revocation and the normal close of the file.
  • Makes the App close and forget its file descriptor.

The $tsock returned  is the TrackedFile object, and $sock is the socket filehandle. You can actually get the filehandle directly from the TrackedFile, as $tsock->get(), so  why return the socked separately? As it turns out, Perl has issues with handling the Perl values stored inside the XS objects if they aren't referred by any Perl variables. When the listener thread creates the client handler threads, that would scramble the reference counts. Keeping the socket in the $socket variable prevents that.

The listener thread then creates the row types for the data messages and for the control messages between the client's reader and writer threads, and makes a nexus. The listener is not interested in the data, so it doesn't even import this nexus itself. The nexus passes the data only, so it has no label for the control messages, only the row type.

Then the mandatory readReady(), and then the control goes again to the library that accepts the connections and starts the client connection threads. The handler is the main function for the thread that gets started to handle the connection. The prefix is used to build the names for the new thread, for its fragment, and for the connection socked that gets also passed through by storing it in the App. The name is the same for all three and gets created by concatenating the prefix with a number that gets increased for every connection. The newly created thread will then get the option socketName with the name of the socket.

How does the ThreadedServer::listen() know to return? It runs until the App gets shut down, and returns only when the thread is requested to die as a result of the shutdown.

No comments:

Post a Comment