Sunday, February 17, 2013

Aggregator in C++, part 2

Building an aggregator is a fairly complex subject, so next I want to show a working example. It turns out that I've never actually written an aggregator in C++ before, other than some tiny fragments; I was always working through Perl. So I've written some more interesting examples just now. The full text can be found in the unit test file cpp/table/t_Agg.cpp.

First, if your aggregator is truly stateless and fully hardcoded, the easier way to do it as by defining a plain function with the same handler arguments and building a BasicAggregatorType with it. And here is one that sums the values of an int64 field (the test case aggBasicSum):

void sumC(Table *table, AggregatorGadget *gadget, Index *index,
        const IndexType *parentIndexType, GroupHandle *gh, Tray *dest,
    Aggregator::AggOp aggop, Rowop::Opcode opcode, RowHandle *rh)
  // don't send the NULL record after the group becomes empty
  if (opcode == Rowop::OP_NOP || parentIndexType->groupSize(gh) == 0)

  int64_t sum = 0;
  for (RowHandle *rhi = index->begin(); rhi != NULL; rhi = index->next(rhi)) {
    sum += table->getRowType()->getInt64(rhi->getRow(), 2, 0); // field c at idx 2

  // pick the rest of fields from the last row of the group
  RowHandle *lastrh = index->last();

  // build the result row; relies on the aggregator result being of the
  // same type as the rows in the table
  FdataVec fields;
  table->getRowType()->splitInto(lastrh->getRow(), fields);
  fields[2].setPtr(true, &sum, sizeof(sum)); // set the field c from the sum

  Rowref res(gadget->getLabel()->getType(), fields);
  gadget->sendDelayed(dest, res, opcode);

  Autoref<TableType> tt = TableType::make(rt1)
    ->addSubIndex("Hashed", HashedIndexType::make( // will be the default index
        (new NameSet())->add("e")
      )->addSubIndex("Fifo", FifoIndexType::make()
        ->setAggregator(new BasicAggregatorType("aggr", rt1, sumC))

As I've told before, you create the BasicAggregatorType by giving it the aggregator name, aggregator result row type, and the handler function.

In this case the handler function is completely hardcoded. It works on the int64 field at index 2. The row type I used in this example is:

row {
  uint8[10] a,
  int32[] b,
  int64 c,
  float64 d,
  string e,

So the field is actually named "c", and that's why the aggregator function is named "sumC".  But since in this case everything is known in advance, to make it more efficient, the look-up of the field by name has been skipped, and the field index has been pre-computed and placed into the function.

The general outline of the aggregator is exactly the same as in Perl: check for an empty group, then iterate through all the rows in the group and compute a the sum, fill the rest of the fields from the last row, and send the result. The difference is that there is no AggregatorContext, and the calls are done directly on the bits and pieces received as arguments.

The group size is computed as


This is pretty much the reason for parentIndexType and gh to be passed as arguments to the handler. Why it's done like this is a long story. It allows an individual index (as in instance, not type!) to not care about the number of rows in it. Remember, a group may have multiple "parallel" indexes defined on it, with all of them containing the same rows, and thus the same number of rows. The group handle (gh) contains the common information about the group, including the group size. And the parent index type is the one that manages the group handles inside it, and knows how to extract the information from them.

The iteration is done as usual with the begin() and next(),  only called directly on the index. It will return NULL after the last row. The last row can also be found directly with index->last(); on an empty group it would return NULL, so be careful with that.

The input row type for reading the rows from the group is found as:


The result row is built in this example by copying the last row and replacing one field. The data from the last row is split into FdataVec (the data itself is not copied at this point but the data descriptors in the construction vector are made to point to the data in the original row). Then the descriptor for the field "c" is changed to point to the computed sum. Then the new row is built from the descriptor.

In this particular case the type of the input rows and of the result rows is the same, so either could have been used to construct the result rows.  There are two ways to find the result type:


They are exactly the same, just there are two paths leading to the same object.

Finally, the constructed row is sent. sendDelayed() takes care of constructing the rowop from the components.

Potentially, you could even call directly

sendDelayed(dest, gadget->getLabel()->getType()->makeRow(fields), opcode);

 However the caveat is that if the table's enqueuing mode is set to EM_IGNORE (which is an old idea, nowadays everyone should use EM_CALL and eventually there will be no other way than EM_CALL, but for now it could happen), the rows would leak. The reason for that leak is that sendDelayed() will skip the rowop creation with EM_IGNORE, and thus will skip the reference creation, and the row would never have a reference to it created and would never be freed. Creating an explicit reference makes sure that the row will always be cleaned up when it becomes unused.

No comments:

Post a Comment