Monday, May 27, 2013

how to export a table, or the guts of TQL join exposed

Now to the point of why the multithreaded TQl example got written: the export of a table between two threads.

It all starts in the Tql initialization method. In the multithreaded mode it builds the nexuses for communication. I'll skip the input nexus and show the building of only the output and request-dump nexuses:

    # row type for dump requests and responses
    my $rtRequest = Triceps::RowType->new(
      client => "string", #requesting client
      id => "string", # request id
      name => "string", # the table name, for convenience of requestor
      cmd => "string", # for convenience of requestor, the command that it is executing
    ) or confess "$!";

The request row type is used by the client writer thread to request the table dumps from the core logic, and to get back the notifications about the dumps.

    # build the output side
    for (my $i = 0; $i <= $#{$self->{tables}}; $i++) {
      my $name = $self->{tableNames}[$i];
      my $table = $self->{tables}[$i];

      push @tabtypes, $name, $table->getType()->copyFundamental();
      push @labels, "t.out." . $name, $table->getOutputLabel();
      push @labels, "t.dump." . $name, $table->getDumpLabel();
    }
    push @labels, "control", $rtControl; # pass-through from in to out
    push @labels, "beginDump", $rtRequest; # framing for the table dumps
    push @labels, "endDump", $rtRequest;

    $self->{faOut} = $owner->makeNexus(
      name => $self->{nxprefix} . "out",
      labels => [ @labels ],
      tableTypes => [ @tabtypes ],
      import => "writer",
    );
    $self->{beginDump} = $self->{faOut}->getLabel("beginDump");
    $self->{endDump} = $self->{faOut}->getLabel("endDump");

On the output side each table is represented by 3 elements:
  • its fundamental table type (stripped down to the primary key);
  • its output label for normal updates;
  • its dump label for the responses to the dump requests.
There also are the "beginDump" and "endDump" labels that frame each response to a dump request.

The row type $rtControl and label "control" is used to pass the commands from the client reader to client writer, but it's exact contents is not important here.

The dump request nexus is built in a similar way:

    # build the dump requests, will be coming from below
    undef @labels;
    for (my $i = 0; $i <= $#{$self->{tables}}; $i++) {
      my $name = $self->{tableNames}[$i];
      my $table = $self->{tables}[$i];

      push @labels, "t.rqdump." . $name, $rtRequest;
    }
    $self->{faRqDump} = $owner->makeNexus(
      name => $self->{nxprefix} . "rqdump",
      labels => [ @labels ],
      reverse => 1, # avoids making a loop, and gives priority
      import => "reader",
    );
    # tie together the labels
    for (my $i = 0; $i <= $#{$self->{tables}}; $i++) {
      my $name = $self->{tableNames}[$i];
      my $table = $self->{tables}[$i];

      $self->{faRqDump}->getLabel("t.rqdump." . $name)->makeChained(
        $self->{nxprefix} . "rqdump." . $name, undef,
        \&_dumpTable, $self, $table
      );
    }

The dumps are executed in the function _dumpTable:

sub _dumpTable # ($label, $rowop, $self, $table)
{
  my ($label, $rop, $self, $table) = @_;
  my $unit = $label->getUnit();
  # pass through the client id to the dump
  $unit->call($self->{beginDump}->adopt($rop));
  $table->dumpAll();
  $unit->call($self->{endDump}->adopt($rop));
  $self->{faOut}->flushWriter();
}

The data gets framed around by the "beginDump" and "endDump" labels getting the copies of the original request. This helps the client writer thread keep track of its current spot. The flushing of the writer is not strictly needed. Just in case if multiple dump requests are received in a single tray, it breaks up the responses into a separate tray for each dump, keeping the size of the trays lower. Not that this situation could actually happen yet.

This part taken care of, let's jump around and see how the client writer thread processes a "querysub" request:

      } elsif ($cmd eq "querysub") {
        if ($id eq "" || exists $queries{$id}) {
          printOrShut($app, $fragment, $sock,
            "error,$id,Duplicate id '$id': query ids must be unique,bad_id,$id\n");
          next;
        }
        my $ctx = compileQuery(
          qid => $id,
          qname => $args[0],
          text => $args[1],
          subError => sub {
            chomp $_[2];
            $_[2] =~ s/\n/\\n/g; # no real newlines in the output
            $_[2] =~ s/,/;/g; # no confusing commas in the output
            printOrShut($app, $fragment, $sock, "error,", join(',', @_), "\n");
          },
          faOut => $faOut,
          faRqDump => $faRqDump,
          subPrint => sub {
            printOrShut($app, $fragment, $sock, @_);
          },
        );
        if ($ctx) { # otherwise the error is already reported
          $queries{$id} = $ctx;
          &$runNextRequest($ctx);
        }
      }

The query id is used to keep track of the outstanding queries, so the code makes sure that it's unique, and you can see an example of the query response. The bulk of the work is done in the method compileQuery(). The arguments to it give the details of the query and also provide the closures for the functionality that differs between the single-threaded and multi-threaded versions. The option "subError" is used to send the errors to the client, and "subPrint" is used to send the output to the client, it gets used for building the labels in the "print" command of the query.

compileQuery() returns the query context, which contains a compiled sub-model that executes the query and a set of requests that tell the writer how to connect the query to the incoming data. Or on error it reports the error using subError and returns an undef. If the compilation succeeded, the writer remembers the query and starts the asynchronous execution of the requests. More about the requests later, now let's look at the query compilation and context.

The context is created in compileQuery() thusly:

  my $ctx = {};
  $ctx->{qid} = $opts->{qid};
  $ctx->{qname} = $opts->{qname};

  # .. skipped the parts related to single-threadde TQL

  $ctx->{faOut} = $opts->{faOut};
  $ctx->{faRqDump} = $opts->{faRqDump};
  $ctx->{subPrint} = $opts->{subPrint};
  $ctx->{requests} = []; # dump and subscribe requests that will run the pipeline
  $ctx->{copyTables} = []; # the tables created in this query
    # (have to keep references to the tables or they will disappear)

  # The query will be built in a separate unit
  $ctx->{u} = Triceps::Unit->new($opts->{nxprefix} . "${q}.unit");
  $ctx->{prev} = undef; # will contain the output of the previous command in the pipeline
  $ctx->{id} = 0; # a unique id for auto-generated objects
  # deletion of the context will cause the unit in it to clean
  $ctx->{cleaner} = $ctx->{u}->makeClearingTrigger();

It has some parts common and some parts differing for the single- and multi-threaded varieties, here I've skipped over the single-threaded parts.

One element that is left undefined here is $ctx->{prev}. It's the label created as the output of the previous stage of the query pipeline. As each command in the pipeline builds its piece of processing, it chains its logic from $ctx->{prev} and leaves its result label in $ctx->{next}. Then compileQuery() moves "next" to "prev" and calls the compilation of the next command in the pipeline. The only command that accepts an undefined "prev" (and it must be undefined for it) is "read", that reads the table at the start of the pipeline.

$ctx->{copyTables} also has an important point behind it. When you create a label, it's OK to discard the original reference after you chain the label into the logic, that chaining will keep a reference and the label will stay alive. Not so with a table: if you create a table, chain its input label and then drop the reference to a table, the table will be discarded. Then when the input label will try to send any data to the table, it will die (and unless very recently it outright crashed). So it's important to keep the table reference alive, and that's what this array is for.

$ctx->{id} is used to generate the unique names for the objects build in a query.

Each query is built in its own unit. This is convenient, after the query is done or the compilation encounters an error, the unit with its whole contents gets easily discarded. The clearing trigger placed in the context makes sure that the unit gets properly cleared and discarded.

Next goes the compilation of the join query command, I'll go through it in chunks.

sub _tqlJoin # ($ctx, @args)
{
  my $ctx = shift;
  die "The join command may not be used at the start of a pipeline.\n"
    unless (defined($ctx->{prev}));
  my $opts = {};
  &Triceps::Opt::parse("join", $opts, {
    table => [ undef, \&Triceps::Opt::ck_mandatory ],
    rightIdxPath => [ undef, undef ],
    by => [ undef, undef ],
    byLeft => [ undef, undef ],
    leftFields => [ undef, undef ],
    rightFields => [ undef, undef ],
    type => [ "inner", undef ],
  }, @_);

  my $tabname = bunquote($opts->{table});
  my $unit = $ctx->{u};
  my $table;

  &Triceps::Opt::checkMutuallyExclusive("join", 1, "by", $opts->{by}, "byLeft", $opts->{byLeft});
  my $by = split_braced_final($opts->{by});
  my $byLeft = split_braced_final($opts->{byLeft});

  my $rightIdxPath;
  if (defined $opts->{rightIdxPath}) { # propagate the undef
    $rightIdxPath = split_braced_final($opts->{rightIdxPath});
  }

It starts by parsing the options and converting them to the internal representation, removing the braced quotes.

  if ($ctx->{faOut}) {
    # Potentially, the tables might be reused between multiple joins
    # in the query if the required keys match. But for now keep things
    # simpler by creating a new table from scratch each time.

    my $tt = eval {
      # copy to avoid adding an index to the original type
      $ctx->{faOut}->impTableType($tabname)->copy();
    };
    die ("Join found no such table '$tabname'\n") unless ($tt);

    if (!defined $rightIdxPath) {
      # determine or add the index automatically
      my @workby;
      if (defined $byLeft) { # need to translate
        my @leftfld = $ctx->{prev}->getRowType()->getFieldNames();
        @workby = &Triceps::Fields::filterToPairs("Join option 'byLeft'",
          \@leftfld, [ @$byLeft, "!.*" ]);
      } else {
        @workby = @$by;
      }

      my @idxkeys; # extract the keys for the right side table
      for (my $i = 1; $i <= $#workby; $i+= 2) {
        push @idxkeys, $workby[$i];
      }
      $rightIdxPath = [ $tt->findOrAddIndex(@idxkeys) ];
    }

    # build the table from the type
    $tt->initialize() or confess "$!";
    $table = $ctx->{u}->makeTable($tt, "EM_CALL", "tab" . $ctx->{id} . $tabname);
    push @{$ctx->{copyTables}}, $table;

    # build the request that fills the table with data and then
    # keeps it up to date;
    # the table has to be filled before the query's main flow starts,
    # so put the request at the front
    &_makeQdumpsub($ctx, $tabname, 1, $table->getInputLabel());
  } else {
    die ("Join found no such table '$tabname'\n")
      unless (exists $ctx->{tables}{$tabname});
    $table = $ctx->{tables}{$tabname};
  }

The presence of $ctx->{faOut} means that the query is compiled in the multithreaded context.

The command handles may freely die, and the error messages will be caught by compileQuery() and nicely (at least, sort-of) reported back to the user.

If an explicit rightIdxPath was not requested, it gets found or added automatically. On the way there the index fields need to be determined. Which can be specified as either explicit pairs in the option "by" or the in the name translation syntax in the option "byLeft". If we've got a "byLeft", first it gets translated to the same format as "by", and then the right-side fields are extracted from the format of "by". After that $tt->findOrAddIndex() takes care of all the heavy lifting. It either finds a matching index type in the table type or creates a new one from the specified fields, and either way returns the index path. (An invalid field will make it confess).

It looks a bit anti-climactic, but the three lines of exporting with copyFundamental(), impTableType() and findOrAddIndex() is what this large example is all about.

You might wonder, how come the explicit rightIdxPath is not checked in any way? It will be checked later by LookupJoin(), so not much point in doing the check twice.

After that the table is created in a straightforward way, and rememebered in copyTables. And the requests list gets prepended with a request to dump and subscribe to this table. I'll get back to that, for now let's finish up with _tqlJoin().

  my $isLeft = 0; # default for inner join
  my $type = $opts->{type};
  if ($type eq "inner") {
    # already default
  } elsif ($type eq "left") {
    $isLeft = 1;
  } else {
    die "Unsupported value '$type' of option 'type'.\n"
  }

  my $leftFields = split_braced_final($opts->{leftFields});
  my $rightFields = split_braced_final($opts->{rightFields});

  my $join = Triceps::LookupJoin->new(
    name => "join" . $ctx->{id},
    unit => $unit,
    leftFromLabel => $ctx->{prev},
    rightTable => $table,
    rightIdxPath => $rightIdxPath,
    leftFields => $leftFields,
    rightFields => $rightFields,
    by => $by,
    byLeft => $byLeft,
    isLeft => $isLeft,
    fieldsDropRightKey => 1,
  );

  $ctx->{next} = $join->getOutputLabel();
}

The rest of the options get parsed, and then all the collected data gets forwarded to the LookupJoin constructor. Finally the "next" label is assigned from the join's result.

Now jumping to the _makeQdumpsub(). It's used by both the "read" and "join" query commands to initiate the joins and subscriptions.

sub _makeQdumpsub # ($ctx, $tabname, [$front, $lbNext])
{
  my $ctx = shift;
  my $tabname = shift;
  my $front = shift;
  my $lbNext = shift;

  my $unit = $ctx->{u};

  my $lbrq = eval {
    $ctx->{faRqDump}->getLabel("t.rqdump.$tabname");
  };
  my $lbsrc = eval {
    $ctx->{faOut}->getLabel("t.out.$tabname");
  };
  die ("Found no such table '$tabname'\n") unless ($lbrq && $lbsrc);

  # compute the binding for the data dumps, that would be a cross-unit
  # binding to the original faOut but it's OK
  my $fretOut = $ctx->{faOut}->getFnReturn();
  my $dumpname = "t.dump.$tabname";
  # the dump and following subscription data will merge on this label
  if (!defined $lbNext) {
    $lbNext = $unit->makeDummyLabel(
      $lbsrc->getRowType(), "lb" . $ctx->{id} . "out_$tabname");
  }

  my $bindDump = Triceps::FnBinding->new(
    on => $fretOut,
    name => "bind" . $ctx->{id} . "dump",
    labels => [ $dumpname => $lbNext ],
  );

First it finds all the proper labels. The label $lbNext will accept the merged dump contents and the following subscription, and it might be either auto-generated or received as an argument. A join pass it as an argument, $table->getInputLabel(), so all the data goes to the copied table.

The binding is used to receive the dump. It's a bit of an optimization. Remember, the dump labels are shared between all the clients. Whenever any client requests a dump, all the clients will get the response. A client finds that the incoming dump is destined for it by processing the "beginDump" label. If it contains this client's name, the dump is destined here, and the client reacts by pushing the appropriate binding onto the facet's FnReturn, and the data flows. The matching "endDump" label then pops the binding and the data stops flowing. The binding allows to avoid checking every rowop for whethere it's supposed to be accepted and if yes then where exactly (rememeber, the same table may be dumped independently multiple times by multiple queries). Just check once at the start of the bundle and then let the data flow in bulk.

  # qdumpsub:
  #   * label where to send the dump request to
  #   * source output label, from which a subscription will be set up
  #     at the end of the dump
  #   * target label in the query that will be tied to the source label
  #   * binding to be used during the dump, which also directs the data
  #     to the same target label
  my $request = [ "qdumpsub", $lbrq, $lbsrc, $lbNext, $bindDump ];
  if ($front) {
    unshift @{$ctx->{requests}}, $request;
  } else {
    push @{$ctx->{requests}}, $request;
  }
  return $lbNext;
}

Finally, the created bits and pieces get packaged into a request and added to the list of requests in the query context. The last tricky part is that the request can be added at the back or the front of the list. The "normal" way is to add to the back, however the dimension tables for the joins have to be populated before the main data flow of the query starts. So for them the argument $front is set to 1, and they get added in the front.

Now jumping back to the writer thread logic, after it called compileQuery, it starts the query execution by calling &$runNextRequest(). Which is a closure function defined inside the client writer function, and knows how to process the "qdumpsub"s we've just seen created.

  my $runNextRequest = sub { # ($ctx)
    my $ctx = shift;
    my $requests = $ctx->{requests};
    undef $ctx->{curRequest}; # clear the info of the previous request
    my $r = shift @$requests;
    if (!defined $r) {
      # all done, now just need to pump the data through
      printOrShut($app, $fragment, $sock,
        "querysub,$ctx->{qid},$ctx->{qname}\n");
      return;
    }

First it clears the information about the previous request, if any. This function will be called after each request, to send on the next one, so on all its calls except the first one for a query it will have something to clear.

Then it checks if all the requests are already done. If so, it sends the query confirmation to the client and returns. The subscription part of the query will continue running on its own.

    $ctx->{curRequest} = $r; # remember until completed
    my $cmd = $$r[0];
    if ($cmd eq "qdumpsub") {
      # qdumpsub:
      #   * label where to send the dump request to
      #   * source output label, from which a subscription will be set up
      #     at the end of the dump
      #   * target label in the query that will be tied to the source label
      #   * binding to be used during the dump, which also directs the data
      #     to the same target label
      my $lbrq = $$r[1];
      $unit->makeHashCall($lbrq, "OP_INSERT",
        client => $fragment, id => $ctx->{qid}, name => $ctx->{qname}, cmd => $cmd);

The "qdumpsub" gets forwarded to the core logic. The responses will be processed in the handlers or "beginDump" and "endDump". One of the great pains of this "actor" architecture is that the linear logic gets broken up into many disjointed pieces in the separate handlers.

    } else {
      printOrShut($app, $fragment, $sock,
        "error,", $ctx->{qid}, ",Internal error: unknown request '$cmd',internal,", $cmd, "\n");
      $ctx->{requests} = [];
      undef $ctx->{curRequest};
      # and this will leave the query partially initialized,
      # but it should never happen
      return;
    }
  };

And a catch-all just in case if the query compiler ever decides to produce an invalid request.

Next goes the handling of the dump labels (again, this gets set up during the build of the client reader threads, and then the nature is left to run its course, reacting to the rowops as they come in).

  $faOut->getLabel("beginDump")->makeChained("lbBeginDump", undef, sub {
    my $row = $_[1]->getRow();
    my ($client, $id, $name, $cmd) = $row->toArray();
    return unless ($client eq $fragment);
    if ($cmd eq "qdumpsub") {
      return unless(exists $queries{$id});
      my $ctx = $queries{$id};
      $fretOut->push($ctx->{curRequest}[4]); # the binding for the dump
    } else {
      # .. skipped the handling of dump/dumpsub
    }
  });

All it does is checks if this is the destination client, and if there is an active request with this id, then it pushes the appropriate binding.

  $faOut->getLabel("endDump")->makeChained("lbEndDump", undef, sub {
    my $row = $_[1]->getRow();
    my ($client, $id, $name, $cmd) = $row->toArray();
    return unless ($client eq $fragment);

    if ($cmd eq "qdumpsub") {
      return unless(exists $queries{$id});
      my $ctx = $queries{$id};
      $fretOut->pop($ctx->{curRequest}[4]); # the binding for the dump
      # and chain together all the following updates
      $ctx->{curRequest}[2]->makeChained(
        "qsub$id." . $ctx->{curRequest}[3]->getName(), undef,
        sub {
          # a cross-unit call
          $_[2]->call($_[3]->adopt($_[1]));
        },
        $ctx->{u}, $ctx->{curRequest}[3]
      );

      &$runNextRequest($ctx);
    } else {
      # .. skipped the handling of dump/dumpsub
    }
  });

Same things as the "beginDump", checks if this is the right client, and if it has an outstanding dump request, then pops the binding. After the dump is completed, the subscription has to be set up, so it sets up a label that forwards the normal output of this table to the label specified in the request. Since each query is defined in its own unit, this forwarding is done as a cross-unit call.

And then the next request of this query can be started.

By the way, the cross-unit adopt() didn't work in Perl until I wrote this example. There was a check against it (the C++ API never bothered with this check). But the adoption between the units has turned out to be quite convenient, so I've removed that check.

And that's it. Long and winding but finally completed. It's kind of about only three lines of code, but I think the rest of it also shows the useful techniques of the work with threads.

No comments:

Post a Comment