Saturday, March 31, 2012

The guts of Collapse

The Collapse implementation is fairly small, and is another worthy example for the docs. It's a template, and a "normal" one too: no code generation whatsoever, just a combination of ready components. As with SimpleAggregator, the current Collapse is quite simple and will grow more features over time, so I've copied the original simple version into t/xCollapse.t to stay there unchanged.

The most notable thing about Collapse is that it took just about an hour to write the first version of it and another three or so hours to test it. Which is a lot less than the similar code in the Aleri or Coral8 code base took. The reason for this is that Triceps provides the fairly flexible base data structures that can be combined easily directly in a scripting language. There is no need to redo a lot from scratch every time, just take something and add a little bit on top.

So here it is, with the interspersed comments.

package Triceps::Collapse;
use Carp;
use strict;

sub new # ($class, $optName => $optValue, ...)
{
  my $class = shift;
  my $self = {};

  &Triceps::Opt::parse($class, $self, { 
    unit => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "Triceps::Unit") } ],
    name => [ undef, \&Triceps::Opt::ck_mandatory ],
    data => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "ARRAY") } ],
  }, @_);

  # parse the data element
  my $dataref = $self->{data};
  my $dataset = {};
  # dataref->[1] is the best guess for the dataset name, in case if the option "name" goes first
  &Triceps::Opt::parse("$class data set (" . $dataref->[1] . ")", $dataset, { 
    name => [ undef, \&Triceps::Opt::ck_mandatory ],
    key => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "ARRAY", "") } ],
    rowType => [ undef, sub { &Triceps::Opt::ck_ref(@_, "Triceps::RowType"); } ],
    fromLabel => [ undef, sub { &Triceps::Opt::ck_ref(@_, "Triceps::Label"); } ],
  }, @$dataref);

The options parsing goes as usual. The option "data" is parsed again for the options inside it, and those are places into the hash %$dataset.

  # save the dataset for the future
  $self->{datasets}{$dataset->{name}} = $dataset;
  # check the options
  confess "The data set (" . $dataset->{name} . ") must have only one of options rowType or fromLabel"
    if (defined $dataset->{rowType} && defined $dataset->{fromLabel});
  confess "The data set (" . $dataset->{name} . ") must have exactly one of options rowType or fromLabel"
    if (!defined $dataset->{rowType} && !defined $dataset->{fromLabel});

The dataset options "rowType" and "fromLabel" are both optional but exactly one of them must be present, to be sufficient and non-conflicting. So the code makes sure of it.

  my $lbFrom = $dataset->{fromLabel};
  if (defined $lbFrom) {
    confess "The unit of the Collapse and the unit of its data set (" . $dataset->{name} . ") fromLabel must be the same"
      unless ($self->{unit}->same($lbFrom->getUnit()));
    $dataset->{rowType} = $lbFrom->getType();
  }

If "fromLabel" is used, the row type is found from it. This looks like a pretty good pattern that I plan to spread to the other elements in the future. The unit could also be found from it.

  # create the tables
  $dataset->{tt} = Triceps::TableType->new($dataset->{rowType})
    ->addSubIndex("primary",
      Triceps::IndexType->newHashed(key => $dataset->{key})
    );
  $dataset->{tt}->initialize()
    or confess "Collapse table type creation error for dataset '" . $dataset->{name} . "':\n$! ";

  $dataset->{tbInsert} = $self->{unit}->makeTable($dataset->{tt}, "EM_CALL", $self->{name} . "." . $dataset->{name} . ".tbInsert")
    or confess "Collapse internal error: insert table creation for dataset '" . $dataset->{name} . "':\n$! ";
  $dataset->{tbDelete} = $self->{unit}->makeTable($dataset->{tt}, "EM_CALL", $self->{name} . "." . $dataset->{name} . ".tbInsert")
    or confess "Collapse internal error: delete table creation for dataset '" . $dataset->{name} . "':\n$! ";

The state is kept in two tables. The reason for them is this: after collapsing, the Collapse may send for each key either a single INSERT rowop, if the row was not there before and became inserted, DELETE rowop if the row was there before and then became deleted, or a DELETE followed by an INSERT if the row was there but then changed its value. Accordingly, this state is kept in two tables: one contains the DELETE part, another the INSERT part for each key, and either part may be empty (or both, if the row at that key has not been changed). After each flush both tables become empty, and then start collecting the modifications again.

  # create the labels
  $dataset->{lbIn} = $self->{unit}->makeLabel($dataset->{rowType}, $self->{name} . "." . $dataset->{name} . ".in",
    undef, \&_handleInput, $self, $dataset)
      or confess "Collapse internal error: input label creation for dataset '" . $dataset->{name} . "':\n$! ";
  $dataset->{lbOut} = $self->{unit}->makeDummyLabel($dataset->{rowType}, $self->{name} . "." . $dataset->{name} . ".out")
    or confess "Collapse internal error: output label creation for dataset '" . $dataset->{name} . "':\n$! ";

The input and output labels get created. The input label has the function with the processing logic set as its handler. The output label is just a dummy. Note that the tables don't get connected anywhere, they are just used as storage, without any immediate reactions to their modifications.

  # chain the input label, if any
  if (defined $lbFrom) {
    $lbFrom->chain($dataset->{lbIn})
      or confess "Collapse internal error: input label chaining for dataset '" . $dataset->{name} . "' to '" . $lbFrom->getName() . "' failed:\n$! ";
    delete $dataset->{fromLabel}; # no need to keep the reference any more
  }

And if the fromLabel is used, the Collapse gets connected to it. After that there is no good reason to keep a separate reference to that label, especially considering that it creates a reference loop and would mess with the memory management. So it gets deleted.

  bless $self, $class;
  return $self;
}

The final blessing is boilerplate. The constructor creates the data structures but doesn't implement any logic. The logic goes next:

sub _handleInput # ($label, $rop, $self, $dataset)
{
  my $label = shift;
  my $rop = shift;
  my $self = shift;
  my $dataset = shift;

  if ($rop->isInsert()) {
    $dataset->{tbInsert}->insert($rop->getRow())
      or confess "Collapse " . $self->{name} . " internal error: dataset '" . $dataset->{name} . "' failed an insert-table-insert:\n$! ";

The Collapse element knows nothing about the data that went through it before. After each flush it starts again from scratch.  It expects that the stream of rows is self-consistent, and makes the conclusions about the previous data based on the new data it sees. An INSERT rowop may mean one of two things: either there was no previous record with this key, or there was a previous record with this key and then it got deleted. The Delete table can be use to differentiate these situations: if there was a row that was then deleted, the Delete table would contain that row. But for the INSERT it doesn't matter: in either case it just inserts the new row into the Insert table.If there was no such row before, it would be the new INSERT. If there was such a row before, it would be an INSERT following a DELETE.

Incidentally, this logic happens to work for the insert-only streams of data too, when the rows get replaced by simply sending another row with the same key. Then if there was a previous row in the Insert table, it would simply get replaced by a new one, and eventually at the flush time the last row would go through.

  } elsif($rop->isDelete()) {
    if (! $dataset->{tbInsert}->deleteRow($rop->getRow())) {
      confess "Collapse " . $self->{name} . " internal error: dataset '" . $dataset->{name} . "' failed an insert-table-delete:\n$! "
        if ($! ne "");
      $dataset->{tbDelete}->insert($rop->getRow())
        or confess "Collapse " . $self->{name} . " internal error: dataset '" . $dataset->{name} . "' failed a delete-table-insert:\n$! ";
    }
  }
}

The DELETE case is more interesting. If we see a DELETE rowop, this means that either there was an INSERT sent before the last flush and now that INSERT becomes undone, or that there was an INSERT after the flush, which also becomes undone. The actions for these cases are different: if the INSERT was before the flush, this row should go into the Delete table, and eventually propagate as a DELETE during the next flush. If the last INSERT was after the flush, then its row would be stored in the Insert table, and now we just need to delete that row and pretend that it never was.

That's what the logic does: first it tries to remove from the Insert table. If succeeded, then it was an INSERT after the flush, that became undone now, and there is nothing more to do. If there was no row to delete, this means that the INSERT must have happened before the last flush, and we need to remember this row in the Delete table and pass it on in the next flush.

Note that this logic is not resistant to an incorrect data sequences. If there ever are two DELETEs for the same key in a row (which should never happen in a correct sequence), the second DELETE will end up in the Delete table.

sub flush # ($self)
{
  my $self = shift;
  my $unit = $self->{unit};
  my $OP_INSERT = &Triceps::OP_INSERT;
  my $OP_DELETE = &Triceps::OP_DELETE;
  foreach my $dataset (values %{$self->{datasets}}) {
    my $tbIns = $dataset->{tbInsert};
    my $tbDel = $dataset->{tbDelete};
    my $lbOut = $dataset->{lbOut};
    my $next;
    # send the deletes always before the inserts
    for (my $rh = $tbDel->begin(); !$rh->isNull(); $rh = $next) {
      $next = $rh->next(); # advance the irerator before removing
      $tbDel->remove($rh);
      $unit->call($lbOut->makeRowop($OP_DELETE, $rh->getRow()));
    }
    for (my $rh = $tbIns->begin(); !$rh->isNull(); $rh = $next) {
      $next = $rh->next(); # advance the irerator before removing
      $tbIns->remove($rh);
      $unit->call($lbOut->makeRowop($OP_INSERT, $rh->getRow()));
    }
  }
}

The flushing is fairly straightforward: first it sends on all the DELETEs, then all the INSERTs, clearing the tables along the way. At first I've though of matching the DELETEs and INSERTs together, sending them next to each other in case if both are available for some key. It's not that difficult to do. But then I've realized that it doesn't matter and just did it the simple way.

sub getInputLabel($$) # ($self, $dsetname)
{
  my ($self, $dsetname) = @_;
  confess "Unknown dataset '$dsetname'"
    unless exists $self->{datasets}{$dsetname};
  return $self->{datasets}{$dsetname}{lbIn};
}

sub getOutputLabel($$) # ($self, $dsetname)
{
  my ($self, $dsetname) = @_;
  confess "Unknown dataset '$dsetname'"
    unless exists $self->{datasets}{$dsetname};
  return $self->{datasets}{$dsetname}{lbOut};
}

sub getDatasets($) # ($self)
{
  my $self = shift;
  return keys %{$self->{datasets}};
}

The getter functions are fairly simple. The only catch is that the code has to check for exists before it reads the value of $self->{datasets}{$dsetname}{lbOut}. Otherwise, if  an incorrect $dsetname is used,  the reading would return an undef but along the way would create an unpopulated $self->{datasets}{$dsetname}. Which would then cause a crash when flush() tries to iterate through it and finds the dataset options missing.

That's it, Collapse in a nutshell!

No comments:

Post a Comment