Sunday, February 17, 2013

Aggregator in C++, Part 3

Doing a proper custom aggregator is more involved, and requires making subclasses of both Aggregator and AggregatorType. The test case aggSum shows an example of aggregator that can sum any 64-bit field.

The subclass of Aggregator contains only one method that is very similar to the BasicAggregator handler shown before:

class MySumAggregator: public Aggregator
{
public:
  virtual void handle(Table *table, AggregatorGadget *gadget, Index *index,
      const IndexType *parentIndexType, GroupHandle *gh, Tray *dest,
      Aggregator::AggOp aggop, Rowop::Opcode opcode, RowHandle *rh);
};

void MySumAggregator::handle(Table *table, AggregatorGadget *gadget, Index *index,
    const IndexType *parentIndexType, GroupHandle *gh, Tray *dest,
    Aggregator::AggOp aggop, Rowop::Opcode opcode, RowHandle *rh)
{
  // don't send the NULL record after the group becomes empty
  if (opcode == Rowop::OP_NOP || parentIndexType->groupSize(gh) == 0)
    return;

  int fidx = gadget->typeAs<MySumAggregatorType>()->fieldIdx();

  int64_t sum = 0;
  for (RowHandle *rhi = index->begin(); rhi != NULL; rhi = index->next(rhi)) {
    sum += table->getRowType()->getInt64(rhi->getRow(), fidx, 0);
  }

  // pick the rest of fields from the last row of the group
  RowHandle *lastrh = index->last();

  // build the result row; relies on the aggregator result being of the
  // same type as the rows in the table
  FdataVec fields;
  table->getRowType()->splitInto(lastrh->getRow(), fields);
  fields[fidx].setPtr(true, &sum, sizeof(sum));

  gadget->sendDelayed(dest, fields, opcode);
}

The difference is that the field index is not hardcoded but taken from the aggregator type. The aggregator type is found with

gadget->typeAs<MySumAggregatorType>()

The typeAs template is a very recent addition, I've added it when writing this example. The previous (and still available) equivalent way to do it was to call gadget->getType() and then cast it to the pointer to the proper subclass. The method fieldIdx() is a custom addition to the MySumAggregatorType, not inherited from any base class.

The version of AggregatorGadget::sendDelayed() used here is different:

void sendDelayed(Tray *dest, FdataVec &data, Rowop::Opcode opcode) const;

I've actually added it after writing the first version of this post, then edited the post. It conveniently handles the construction of the row from fields and sending it.

Then the aggregator type needs to be defined with a fixed set of inherited virtual methods plus any needed custom parts.

class MySumAggregatorType: public AggregatorType
{
public:
  // @param fname - field name to sum on
  MySumAggregatorType(const string &name, const string &fname):
    AggregatorType(name, NULL),
    fname_(fname),
    fidx_(-1)
  { }
  // the copy constructor works fine
  // (might set the non-NULL row type, but it will be overwritten
  // during initialization)

  // constructor for deep copy
  // (might set the non-NULL row type, but it will be overwritten
  // during initialization)
  MySumAggregatorType(const MySumAggregatorType &agg, HoldRowTypes *holder):
    AggregatorType(agg, holder),
    fname_(agg.fname_),
    fidx_(agg.fidx_)
  { }

  virtual AggregatorType *copy() const
  {
    return new MySumAggregatorType(*this);
  }

  virtual AggregatorType *deepCopy(HoldRowTypes *holder) const
  {
    return new MySumAggregatorType(*this, holder);
  }

  virtual bool equals(const Type *t) const
  {
    if (this == t)
      return true; // self-comparison, shortcut

    if (!AggregatorType::equals(t))
      return false;

    const MySumAggregatorType *sumt = static_cast<const MySumAggregatorType *>(t);

    if (fname_ != sumt->fname_)
      return false;

    return true;
  }

  virtual bool match(const Type *t) const
  {
    if (this == t)
      return true; // self-comparison, shortcut

    if (!AggregatorType::match(t))
      return false;

    const MySumAggregatorType *sumt = static_cast<const MySumAggregatorType *>(t);

    if (fname_ != sumt->fname_)
      return false;

    return true;
  }

  virtual AggregatorGadget *makeGadget(Table *table, IndexType *intype) const
  {
    return new AggregatorGadget(this, table, intype);
  }

  virtual Aggregator *makeAggregator(Table *table, AggregatorGadget *gadget) const
  {
    return new MySumAggregator;
  }

  virtual void initialize(TableType *tabtype, IndexType *intype)
  {
    const RowType *rt = tabtype->rowType();
    setRowType(rt); // the result has the same type as the argument
    fidx_ = rt->findIdx(fname_);
    if (fidx_ < 0)
      errors_.fAppend(new Errors(rt->print()), "Unknown field '%s' in the row type:", fname_.c_str());
    else {
      if (rt->fields()[fidx_].arsz_ != RowType::Field::AR_SCALAR
      || rt->fields()[fidx_].type_->getTypeId() != Type::TT_INT64)
        errors_.fAppend(new Errors(rt->print()),
          "Field '%s' is not an int64 scalar in the row type:", fname_.c_str());
    }
    AggregatorType::initialize(tabtype, intype);
  }

  // called from the handler
  int fieldIdx() const
  {
    return fidx_;
  }

protected:
  string fname_; // name of the field to sum, must be an int64
  int fidx_; // index of field named fname_
};

The constructor accepts the aggregator name and the name of the field on which it will sum. The field name will be translated to field index during initialization, and made available to the MySumAggregator::handler() via the method fieldIdx(). The aggregator type starts with the result row type of NULL, with the actual row type set during initialization. This ability to set the row type later had the latent existence all along, but it's the first time I've made it explicit. It hasn't propagated to the Perl API yet. The idea here is that the result row type of this aggregator is always equal to the input row type, so rather than specifying the result type explicitly and then having to check it for compatibility, why not just take the table's row type when it becomes available? And it works beautifully.

The copy constructor and the constructor with HoldRowTypes are the implementations of the virtual methods copy() and deepCopy(). The deepCopy() is yet another recent addition used in the multithreaded support. I'll describe it in detail later, for now just Do It Like This.

The methods match() and equals() follow the same general shape as everywhere else. makeGadget() creates a generic gadget, and makeAggregator() creates an instance of aggregator for each group.

The interesting stuff starts happening in the initialization. The row type gets found from the table and set as the result type. Then the aggregation field is found in the row type and checked for being of the proper type. Its index is remembered for the later use.

errors_.fAppend() is a new part of the Erref API that makes the error construction more convenient. It is smart enough to check the reference for NULL and allocate a new Errors if so, then append a printf-formatted message and nested errors to it. I've been adding the direct printf-formatting for both Errors and Exceptions, this part of the API is to be documented yet.

No comments:

Post a Comment