Sunday, May 19, 2013

time and threads and ThreadedClient

I've been making the ThreadedClient more resilient, to handle the unexpected better. First, if it encounters a socket disconnect (__EOF__) while expecting data, it will immediately return and set the error message in $@. Second, there is now a way to specify a timeout for waiting. If the message doesn't get received in time, expect() with a timeout will also return immediately with a message in $@.

As an unrelated change, I've renamed the method protocol () to getTrace(). The word "protocol" creates to much confusion when used around the sockets, "trace" is a more distinct one.

And a mix thereof, the error messages get not only set in $@ but also included into the trace, so it can be detected all together. There is also a separate trace of the error messages only, that can be obtained with getErrorTrace().

There are three ways to specify the timeout. Two of them are in the new():

        my $client = Triceps::X::ThreadedClient->new(
            owner => $owner,
            totalTimeout => $timeout,
        );

        my $client = Triceps::X::ThreadedClient->new(
            owner => $owner,
            timeout => $timeout,
        );


The option "totalTimeout" gives a timeout for the whole run to complete. Once that timeout is reached, all future expect()s just fail immediately. The option "timeout" gives the default timeout for each expect(). It's possible to use both, and each call of expect() will be limited by the shorter time limit of the two (remember, "totalTimeout" starts counting since the call of new (not from startClient!), "timeout" starts counting since the call of expect).

All the timeouts are specified in seconds with fractions, so for 0.1 seconds you just use 0.1.

The third way is to use an extra argument in expect():

$client->expect("c1", "expected text", $timeout);

This timeout completely overrides whatever was set in new(). The value of 0 disables the timeout for this call, and 0 overrides the timeout from new() too, so it can be used for the one-off calls without the timeout.

And now, how it works inside. First thing, the call Triceps::now() returns the current time in seconds since epoch as a floating-point value, including the fractions of the seconds. The Triceps Perl API deals with time in this format.

Then, how to do the timeouts. Remember, if you look for repeatability of computation, you should really use an external time source synchronized with your data. The interface described here is for quick-and-dirty things, like time-limiting the tests, so that the unexpected input would not get the test stuck.

The core part of expect(), after it computes the time limit from the three sources, is this:

  $self->{error} = undef;
  $self->{expectDone} = 0;

  $owner->unit()->makeHashCall($self->{faCtl}->getLabel("msg"), "OP_INSERT",
    cmd => "expect",
    client => $client,
    arg => $pattern,
  );
  $owner->flushWriters();

  if ($limit > 0.) {
    while(!$self->{expectDone} && $owner->nextXtrayTimeLimit($limit)) { }
    # on timeout reset the expect and have that confirmed
    if (!$self->{expectDone}) {
      $owner->unit()->makeHashCall($self->{faCtl}->getLabel("msg"), "OP_INSERT",
        cmd => "cancel",
        client => $client,
      );
      $owner->flushWriters();
      # wait for confirmation
      while(!$self->{expectDone} && $owner->nextXtray()) { }
    }
  } else {
    while(!$self->{expectDone} && $owner->nextXtray()) { }
  }

  $@ = $self->{error};

The expects and inputs get combined in the collector thread. The expect request gets forwarded to it and then the current thread waits for the response. The response gets processed with nextXtray(), which represents one step of main loop. The main loop is literally implemented in C++ like this:

void TrieadOwner::mainLoop()
{
    while (nextXtray())
        { }
    markDead();
}

The nextXtray() takes the next tray from the Triead's read nexuses and processes it. "Xtray" is a special form of the trays used to pass the data across the nexus. It returns true until the thread is requested dead, and then it returns false.

The normal nextXtray() waits until there is more data to process or the thread is requested to die. But there are special forms of it:

nextXtrayNoWait()

Returns immediately if there is no data available at the moment.

nextXtrayTimeLimit($deadline)

Returns if no data becomes available before the absolute deadline.

nextXtrayTimeout($timeout)

Returns if no data becomes available before the timeout, starting at the time of the call. Just to reiterate, the difference is that the nextXtrayTimeLimit() receives the absolute time since the epoch while nextXtrayTimeou() receives the length of the timeout starting from the time of the call, both as seconds in floating-point.

All the versions that may return early on no data return false if they have to do so.

Expect() uses the version with the absolute deadline. If the collector thread finds a match, it will send a rowop back to the expect thread, it will get processed in nextXtrayTimeLimit(), calling a label that sets the flag $self->{expectDone}, and then nextXtrayTimeLimit() will return true, and the loop will find the flag and exit.

If the collector thread doesn't find a match, nextXtrayTimeLimit() will return false, and the loop will again exit. But then it will fill find the "done" flag not set, so it knows that the timeout has expired and it has to tell the controller thread that the call is being cancelled. So it sends another rowop for the cancel, and then waits for the confirmation with another nextXtray(), this time with no limit on it since the confirmation must arrive back quickly in any case.


It's the confirmation rowop processing that sets $self->{error}. But there is always a possibility that the match will arrive just after the timeout has expired but just before the cancellation. It's one of these things that you have to deal with when multiple threads exchange messages. What then? Then the normal confirmation will arrive back to the expecting thread. And when the cancel message will arrive to the collector thread, it will find that this client doesn't have an outstanding expect requests any more, and will just ignore the cancel. Thus, the second nextXtray() will receive either a confirmation of the cancel and set the error message, of it will receive the last-moment success message. Either way it will fall through and return (setting $@ if the cancel confirmation came back).

By the way, if the collector thread finds the socket closed, it will immediately return an error rowop, very similar to the confirmation of the cancel. And it will set both $self->{expectDone} and $self->{error} in the expect thread.

No comments:

Post a Comment