Sunday, March 18, 2012

The guts of SimpleAggregator, part 1

The implementation of the SimpleAggregator has turned out to be surprisingly small. Maybe a little biggish for a single post but still small. I've liked it so much that I've even saved the original small version in the file xSimpleAggregator.t. As more features will be added, the "official" version of the SimpleAggregator will grow (and already did) but that example file will stay small and simple.

I'll put the commentary interlaced with te code. So, here we go.

package MySimpleAggregator;
use Carp;

use strict;

sub make # (optName => optValue, ...)
{
  my $opts = {}; # the parsed options
  my $myname = "MySimpleAggregator::make";

  &Triceps::Opt::parse("MySimpleAggregator", $opts, { 
      tabType => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "Triceps::TableType") } ],
      name => [ undef, \&Triceps::Opt::ck_mandatory ],
      idxPath => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "ARRAY", "") } ],
      result => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "ARRAY") } ],
      saveRowTypeTo => [ undef, sub { &Triceps::Opt::ck_refscalar(@_) } ],
      saveInitTo => [ undef, sub { &Triceps::Opt::ck_refscalar(@_) } ],
      saveComputeTo => [ undef, sub { &Triceps::Opt::ck_refscalar(@_) } ],
    }, @_);

  # reset the saved source code
  ${$opts->{saveInitTo}} = undef if (defined($opts->{saveInitTo}));
  ${$opts->{saveComputeTo}} = undef if (defined($opts->{saveComputeTo}));
  ${$opts->{saveRowTypeTo}} = undef if (defined($opts->{saveRowTypeTo}));

Triceps::Opt is a class that deals with parsing and doing the basic checks on the options. I'll decribe it in detail in a separate post. For now, the important part is that the checked options are copied into the hash pointed to by $opts. If this were a proper object constructors, it would have been $self instead of $opts.

  # find the index type, on which to build the aggregator
  my $idx;
  { 
    my @path = @{$opts->{idxPath}};
    confess "$myname: idxPath must be an array of non-zero length"
      unless ($#path >= 0);
    my $cur = $opts->{tabType}; # the root of the tree
    my $progress = ''; 
    foreach my $p (@path) {
      $progress .= $p;
      $cur = $cur->findSubIndex($p) 
        or confess("$myname: unable to find the index type at path '$progress', table type is:\n" . $opts->{tabType}->print() . " "); 
      $progress .= '.';
    }   
    $idx = $cur;
  } 
  confess "$myname: the index type is already initialized, can not add an aggregator on it"
    if ($idx->isInitialized());

Since the SimpleAggregator uses an existing table with existing index, it doesn't require the aggregation key: it just takes an index that forms the group, and whatever key that leads to this index becomes the aggregation key. The lookup of index by path should probably become a standard method on a table type, but here it's implemented directly. Obviously, an aggregator can not be added on an already initialized index.

  # check the result definition and build the result row type and code snippets for the computation
  my $rtRes;
  my $needIter = 0; # flag: some of the functions require iteration
  my $needfirst = 0; # the result needs the first row of the group
  my $needlast = 0; # the result needs the last row of the group
  my $codeInit = ''; # code for function initialization
  my $codeStep = ''; # code for iteration
  my $codeResult = ''; # code to compute the intermediate values for the result
  my $codeBuild = ''; # code to build the result row
  my @compArgs; # the field functions are passed as args to the computation
  {
    my $grpstep = 4; # definition grouped by 4 items per result field
    my @resopt = @{$opts->{result}};
    my @rtdefRes; # field definition for the result
    my $id = 0; # numeric id of the field

    while ($#resopt >= 0) {
      confess "$myname: the values in the result definition must go in groups of 4"
        unless ($#resopt >= 3);
      my $fld = shift @resopt;
      my $type = shift @resopt;
      my $func = shift @resopt;
      my $funcarg = shift @resopt;

      confess("$myname: the result field name must be a string, got a " . ref($fld) . " ")
        unless (ref($fld) eq '');
      confess("$myname: the result field type must be a string, got a " . ref($type) . " for field '$fld'")
        unless (ref($type) eq '');
      confess("$myname: the result field function must be a string, got a " . ref($func) . " for field '$fld'")
        unless (ref($func) eq '');

This starts the loop that goes over the result fields and builds the code to create them. The code will be built in multiple snippets that will eventually be combined to produce the compute function. Since the arguments go in groups of 4, it becomes fairly easy to miss one element somewhere, and then everything gets real confusing. So the code attempts to check the types of the arguments, in hopes of catching these off-by-ones as early as possible. The variable $id will be used to produce the unique prefixes for the function's variables.

       my $funcDef = $FUNCTIONS->{$func}
        or confess("$myname: function '" . $func . "' is unknown");

      my $argCount = $funcDef->{argcount};
      $argCount = 1 # 1 is the default value
        unless defined($argCount);
      confess("$myname: in field '$fld' function '$func' requires an argument computation that must be a Perl sub reference")
        unless ($argCount == 0 || ref $funcarg eq 'CODE');
      confess("$myname: in field '$fld' function '$func' requires no argument, use undef as a placeholder")
        unless ($argCount != 0 || !defined $funcarg);

      push(@rtdefRes, $fld, $type);

      push(@compArgs, $funcarg)
        if (defined $funcarg);

The definitions of "standard" aggregation functions are kept in $FUNCTIONS (it will be shown later). They are defined in exactly the same way as the vwap function has been shown before. The types of the fields get collected for the row definition, and the aggregation argument computation closures (or, technically, functions) get also collected, to pass later as the arguments of the compute function.

      # add to the code snippets

      ### initialization
      my $vars = $funcDef->{vars};
      if (defined $vars) {
        foreach my $v (keys %$vars) {
          # the variable names are given a unique prefix;
          # the initialization values are constants, no substitutions
          $codeInit .= "  my \$v${id}_${v} = " . $vars->{$v} . ";\n";
        }       
      } else {
        $vars = { }; # a dummy
      }     

      ### iteration
      my $step = $funcDef->{step};
      if (defined $step) {
        $needIter = 1;
        $codeStep .= "    # field $fld=$func\n";
        if (defined $funcarg) {
          # compute the function argument from the current row
          $codeStep .= "    my \$a${id} = \$args[" . $#compArgs ."](\$row);\n";
        }       
        # substitute the variables in $step
        $step =~ s/\$\%(\w+)/&replaceStep($1, $func, $vars, $id, $argCount)/ge;
        $codeStep .= "    { $step; }\n";
      }     

The initialization and iteration are produced if defined. It remembers in $needIter if any of the functions involved needs iteration.  And the iteration step is placed into a block. An extra ";" is added just in case, it doesn't hurt and helps if it was forgotten in the function definition.

      ### result building
      my $result = $funcDef->{result};
      confess "MySimpleAggregator: internal error in definition of aggregation function '$func', missing result computation"
        unless (defined $result);
      # substitute the variables in $result
      if ($result =~ /\$\%argfirst/) {
        $needfirst = 1;
        $codeResult .= "  my \$f${id} = \$args[" . $#compArgs ."](\$rowFirst);\n";
      }
      if ($result =~ /\$\%arglast/) {
        $needlast = 1;
        $codeResult .= "  my \$l${id} = \$args[" . $#compArgs ."](\$rowLast);\n";
      }
      $result =~ s/\$\%(\w+)/&replaceResult($1, $func, $vars, $id, $argCount)/ge;
      $codeBuild .= "    ($result), # $fld\n";

      $id++;
    }
    $rtRes = Triceps::RowType->new(@rtdefRes)
      or confess "$myname: invalid result row type definition: $!";
  }
  ${$opts->{saveRowTypeTo}} = $rtRes if (defined($opts->{saveRowTypeTo}));

In the same way the result computation is created, and remembers if any function wanted the fields from the first or last row. And eventually the result row type is created. Next the compute function gets assembled:

  # build the computation function
  my $compText = "sub {\n";
  $compText .= "  use strict;\n";
  $compText .= "  my (\$table, \$context, \$aggop, \$opcode, \$rh, \$state, \@args) = \@_;\n";
  $compText .= "  return if (\$context->groupSize()==0 || \$opcode == &Triceps::OP_NOP);\n";
  $compText .= $codeInit;
  if ($needIter) {
    $compText .= "  my \$npos = 0;\n";
    $compText .= "  for (my \$rhi = \$context->begin(); !\$rhi->isNull(); \$rhi = \$context->next(\$rhi)) {\n";
    $compText .= "    my \$row = \$rhi->getRow();\n";
    $compText .= $codeStep;
    $compText .= "    \$npos++;\n";
    $compText .= "  }\n";
  }
  if ($needfirst) {
    $compText .= "  my \$rowFirst = \$context->begin()->getRow();\n";
  }
  if ($needlast) {
    $compText .= "  my \$rowLast = \$context->last()->getRow();\n";
  }
  $compText .= $codeResult;
  $compText .= "  \$context->makeArraySend(\$opcode,\n";
  $compText .= $codeBuild;
  $compText .= "  );\n";
  $compText .= "}\n";

  ${$opts->{saveComputeTo}} = $compText if (defined($opts->{saveComputeTo}));

The optional parts get included only if some of the functions needed them.

  # compile the computation function
  my $compFun = eval $compText
    or confess "$myname: error in compilation of the aggregation computation:\n  $@\nfunction text:\n$compText ";

  # build and add the aggregator
  my $agg = Triceps::AggregatorType->new($rtRes, $opts->{name}, undef, $compFun, @compArgs)
    or confess "$myname: internal error: failed to build an aggregator type: $! ";

  $idx->setAggregator($agg)
    or confess "$myname: failed to set the aggregator in the index type: $! ";

  return $opts->{tabType};
}

Then the compute function is compiled. In case if the compilation fails, the error message will include both the compilation error and the text of the auto-generated function. Otherwise there would be no way to know, what exactly went wrong. Well, since no used code is included into the auto-generated function, it should never fail. Except if there is some bad code in the aggregation function definitions. The compiled function and collected closures are then used to create the aggregator.

No comments:

Post a Comment