Showing posts with label aggregation. Show all posts
Showing posts with label aggregation. Show all posts

Sunday, February 9, 2025

Asynchronous programming 8 - loops and a semahore

The two basic varieties of loops are where you either execute everything sequentially or schedule all the iterations in parallel. Suppose a loop consists of three parts A, B, C, connected by futures, and suppose the loop has 100 iterations. In the sequential form it unrolls into:

-> A1 -> B1 -> C1 -> A2 -> B2 -> C2 -> ... -> A100 -> B100 -> C100 ->

 In the parallel form it becomes

   /->A1 -> B1 -> C1 ---\
->          ...          ->
   \->A100->B100->C100 -/

An important thing to keep in mind for both forms is to prevent inlining at least at the start of each iteration. In the parallel form the inlining would kill the parallelism, and in either form it will likely overflow the stack by the recursive calls (300 of them in this example). This is a very good reason to ever avoid the inlining, and the tail scheduling slot from the previous installment is a good solution there.

In the parallel form all the futures at the end of each iteration are joined into one (usually ignoring the return value and becoming a void future) with an AllOf primitive, where the final future gets completed only after all the inputs get completed. It can be easily implemented by counting the completed futures, with pseudo-code like:

class AllOf {
public:
  AllOf(count):
    ctx_(make_shared<Context>{count})
  {}

  // inputs - a container of input future references
  template<typename Container>
  AllOf(const Container &inputs):
     ctx_(make_shared<Context>{inputs.size})
  {
    for (cost auto &inf in inputs){
      addInput(inf);
   }
  }

  void addInput(
shared_ptr<FutureBase> input)
  {
    // Note that the result promie is ignored
    input
->chain(advance, ctx_);
  }

 
shared_ptr<Future<void>> getResult() const
  {
    ctx_->result_->to_future();
  }

  static void advance(
    shared_ptr<FutureBase> input,
    shared_ptr<Context> ctx,
    shared_ptr<Promise<void>> result /*ignored*/)
  {
    if (atomic_decrease(ctx->count_) == 0) {
      ctx->result_->complete();
    }
  }  
protected:
  struct Context : public ContextBase {
    atomic_int count_;
    shared_ptr<Promise<void>> result_ = make_promise<void>();
  };
  shared_ptr<Context> ctx_;
};

// used like this:
  AllOf res(100);
  for (int i = 0; i < 100; i++) {
    res.addInput(shedule_iteration(ctx));
  }
  res.getResult->chain(...)

Although fundamentally this doesn't have to be just a counter, it can as well run a map-reduce aggregation pattern, where the "map" part is done in parallel as loop iterations, and "reduce" combines the results under a mutex in a class similar to AllOf here, and after all the inputs are processed, completes the output promise.

Note that there is no need to even collect all the inputs in an array as some libraries do, the constructor from a container is just a convenience. Once they're properly chained together, only the count matters. And it's easy to make a variation of the implementation where the count would be accumulated dynamically as the inputs are connected, and then finalized (it's important to not send out the result before finalization).

Now, what if you want to do something in between, a parallel loop with a limited parallelism? An executor with a limited number of threads won't work here because it limits the computational parallelism but here we want to limit the resource parallelism. Think for example of each iteration opening, proecessing, and closing a file descriptor, with a limited number of descriptors to be open at a time. Here we need a semaphore implemented with futures. A semaphore is kind of a mix of the AllOf pattern shown here and the asynchronous mutex pattern shown before. Of course, the easiest way is to just serialize the loop by partitioning it into K parts, each part fully serial. But that's not the same thing as the semaphore where the next iteration gets started when any of the running iterations complete, a semaphore is better at balancing the load.

A path to implementing a semaphore would be to keep two lists, one of the completed futures, another one of waiting promises, one of the lists being non-empty at each time. If a new promise gets chained when there is a spare completed future on the list, they can be "annihilated" by using the future to complete the promise and forgetting both (but pre-rigging the chain of execution off that promise that will return a completed future to the semaphore at the end). When a completed future gets added and there is a promise waiting, they can also be "annihilated" in the same way. Otherwise the future or promise gets added to the appropriate list. If we start by adding all the promises first, and telling the semaphore the degree of parallelism K, then the semaphore can also signal the completion of work when it collects all K futures on their list, completing another special promise.

Finally, there is a pattern of retry loop for manual programming that allows to reduce the number of small functions. Sometimes you need to prepare multiple resources before starting an operation:

if (resource_a == nullptr) {
  resource_a = acquire_a(); // may sleep
}
if (resource_b == nullptr) {
  resource_b = acquire_b(); // may sleep
}
operation(resource_a, resource_b);

An interesting property of this preparation is that it can be restarted from scratch with little cost. So instead of writing little fragment functions to do one check and one assignment per function, we can write one function and schedule it recursively after each acquisition, making a state machine to remember where we left off last time:

void startOperation(
  shared_ptr<FutureBase> input,
  shared_ptr<Context> ctx,
  shared_ptr<Promise<Sometype>> result)
{
  switch(ctx->state) {
  case AFTER_A:
    ctx->resource_a =
      static_cast<Future<ResourceA> *>(input.get())->value();
    break;
  case AFTER_B:
    ctx->resource_b =
      static_cast<Future<ResourceB> *>(input.get())->value();
    break;
  }
  ctx->state = INIT;

  if (ctx->resource_a == nullptr) {
    state = AFTER_A;
    acquire_a()->chain(startOperation, ctx);
    return;
  }
  if (ctx->resource_b == nullptr) {
    state = AFTER_B;
    acquire_b()->chain(startOperation, ctx);
    return;
  }
  operation(ctx->resource_a, ctx->resource_b)->chain(result);
}

It's doesn't look like the most efficient way of execution but surprisingly, if the resources are typically already acquired, it is, doing the whole check and initiating the operation in a single function. It's also a very efficient way to make the program more readable by humans, without a myriad of tiny functions.

P.S. See more about the loops in the installment on error handling.

Wednesday, May 15, 2024

unusual aggregations

I've been doing a sizable amount of SQL querying, and it turns out the modern systems have some new and interesting aggregation functions. Like for example MAX_BY and MIN_BY that return the value of one expression from the row where another expression is maximal or minimal. And there are more creative functions if your system supports nested fields, array fields, or map fields. 

The other real nice syntax is using the indexes of the fields in the GROUP BY clause instead of the actual expressions - this avoids the need to write the same expressions twice. It could be made even better by specifying the names of the output fields instead of indexes.

There are a few more aggregation functions that would be nice to have.

One is being able to specify aggregation in a group vs aggregation across all records. A frequent problem is to find the percentage of record count in a group relative to the whole set. This would be easy to implement as (COUNT(*) * 100 / COUNT_GLOBAL(*)). And this could be generalized to nested aggregations (similarly to how Triceps allows the nested indexes) by specifying the nesting level. For example, if we have 4 fields in a GROUP BY clause, the generalized form COUNT_G(4, *) would be equivalent to COUNT(*), COUNT_G(0, *) will be the global count, and the intermediate values would do the grouping in the larger groups by fewer fields.

It would also be nice to have some simpler syntax to specify the two-pass querying.  It can be done with joins but it's very annoying to write those joins manually in plain SQL. I'm not sure yet what would be the good syntax, just have an idea of the semantics for now.

One example of that would be the grouping by percentiles. On the first pass you'd compute the percentiles of some expression, on the second pass you'd group the records by where they fit between these percentiles. So if you do the percentiles with a step of 10%, you'd get ten records with the summaries of these percentiles. Well, if you store the records you work on (as in a CEP system), it doesn't even have to be two passes, it can easily be a single pass where you decide in which bucket to fit a record, and then adjust all the buckets accordingly.

Another example of two-pass querying would be bringing the CEP windows into the plain SQL, in case if you want to identify some interesting records, and then go an aggregation on the other related records that preceded them within a time interval. Again, if you have the records come in the time order and cached for that time interval, it can be done with a single pass, but even if that is not available, two passes would do it in any circumstances. Very, very useful for time series analysis.

Or if you think about two-pass aggregation as being done with a map-reduce aggregator, the first pass would seed the mapping function and instantiate the reducers, where the second pass would feed the records into these reducers.

Note also that if you write such a join manually, you end up doing essentially a transposition of a normal query, where instead of specifying an expression per record, you specify an expression per row. Something like this (in pseudo-SQL, since remembering the exact syntax boggles me, I can only copy-paste fragments but not freely write them):

WITH
  SELECT .. AS input
WITH
  SELECT low, high
  FROM
    -- this is the transposition that propagates through
    -- the rest of the query
    INSERT (0, 20), (20, 40), (40, 60), (60, 80), (80, 100)
  AS percentiles
WITH
  SELECT
    percentiles.high AS percent,
    PERCENTILE(input.field, percentiles.low) AS low,
    PERCENTILE(input.field, percentiles.high) AS high
  FROM percentiles, input
  AS boundaries
SELECT
  percent,
  ... whatever aggregation ...
FROM boundaries, input
WHERE
  input.field >= boundaries.low
  AND input.field < boundaries.high

It would be nice to have an easier way to do such transpositions too, even aside from the two-pass queries.

Another idea for processing the time series is the ability to build a group from a set of records with sequential IDs/ timestamps. Suppose we have time in slices, where an event might be happening or not happening in each slice, and we want to consider an uninterrupted sequence of slices with events as one event. It's easy to build a join that would put together two consecutive events. But how about a hundred events? This would be easy to do with a group-building function that tracks the first and last timestamp in the group, looks for a new record being consecutive, and merging the groups if needed. Note that this is not an aggregation function, really. It's a new concept, a group building function that replaces the fixed function of all the values in a group having the same value. It would also work for building a window around an interesting record (with or without merging the close interesting records into the same window), where the window becomes the group.

Sunday, January 5, 2014

added a little missing method

When editing the docs, I've noticed an incompleteness in AggregatorGadget, so I've added the method that was missing:

const IndexType *getIndexType() const;

Not that there was any use for it (and the subclasses could just read the field directly), but still.

Friday, April 19, 2013

Object passing between threads, and Perl code snippets

A limitation of the Perl threads is that no variables can be shared between them. When a new thread gets created, it gets a copy of all the variables of the parent. Well, of all the plain Perl variables. With the XS extensions your luck may vary: the variables might get copied, might become undef, or just become broken (if the XS module is not threads-aware). Copying the XS variables requires a quite high overhead at all the other times, so Triceps doesn't do it and all the Triceps object become undefined in the new thread.

However there is a way to pass around certain objects through the Nexuses.

First, obviously, the Nexuses are intended to pass through the Rowops. These Rowops coming out of a nexus are not the same Rowop objects that went in. Rowop is a single-threaded object and can not be shared by two threads. Instead it gets converted to an internal form while in the nexus, and gets re-created, pointing to the same Row object and to the correct Label in the local facet.

Then, again obviously, the Facets get imported through the Nexus, together with their row types.

And two more types of objects can be exported through a Nexus: the RowTypes and TableTypes. They get exported through the options as in this example:

$fa = $owner->makeNexus(
    name => "nx1",
    labels => [
        one => $rt1,
        two => $lb,
    ], 
    rowTypes => [
        one => $rt2,
        two => $rt1,
    ], 
    tableTypes => [
        one => $tt1,
        two => $tt2,
    ], 
    import => "writer",
); 

As you can see, the namespaces for the labels, row types and table types are completely independent, and the same names can be reused in each of them for different meaning. All the three sections are optional, so if you want, you can order only the types in the nexus, without any labels.

They can then be extracted from the imported facet as:

$rt1 = $fa->impRowType("one");
$tt1 = $fa->impTableType("one");


Or the whole set of name-value pairs can be obtained with:


@rtset = $fa->impRowTypesHash();
@ttset = $fa->impTableTypesHash();


The exact table types and row types (by themselves or in the table types or labels) in the importing thread will be copied. It's technically possible to share the references to the same row type in the C++ code but it's more efficient to make a separate copy for each thread, and thus the Perl API goes along the more efficient way.

The import is smart in the sense that it preserves the sameness of the row types: if in the exporting thread the same row type was referred from multiple places in the labels, row types and table types sections, in the imported facet that would again be the same row type (even though of course not the one that has been exported but its copy). This again helps with the efficiency when various objects decide if the rows created by this and that type are compatible.

This is all well until you want to export a table type that has an index with a Perl sort condition in it, or an aggregator with the Perl code. The Perl code objects are tricky: they get copied OK when a new thread is created but the attempts to import them through a nexus later causes a terrible memory corruption. So Triceps doesn't allow to export the table types with the function references in it. But it provides an alternative solution: the code snippets can be specified as the source code. It gets compiled when the table type gets initialized. When a table type gets imported through a nexus, it brings the source code with it. The imported table types are always uninitialized, so at initialization time the source code gets compiled in the new thread and works.

It all works transparently: just specify a string instead of a function reference when creating the index, and it will be recognized and processed. For example:

$it= Triceps::IndexType->newPerlSorted("b_c", undef, '
    my $res = ($_[0]->get("b") <=> $_[1]->get("b")
        || $_[0]->get("c") <=> $_[1]->get("c"));
    return $res;
    '
);

Before the code gets compiled, it gets wrapped into a 'sub { ... }', so don't write your own sub in the code string, that would be an error.

There is also the issue of arguments that can be specified for these functions. Triceps is now smart enough to handle the arguments that are one of:

  • undef
  • integer
  • floating-point
  • string
  • Triceps::RowType object
  • Triceps::Row object
  • reference to an array or hash thereof

It converts the data to an internal C++ representation in the nexus and then converts it back on import. So, if a TableType has all the code in it in the source form, and the arguments for this code within the limits of this format, it can be exported through the nexus. Otherwise an attempt to export it will fail.

I've modified the SimpleOrderedIndex to use the source code format, and it will pass through the nexuses as well.

The Aggregators have a similar problem, and I'm working on converting them to the source code format too.

A little more about the differences between the code references and the source code format:

When you compile a function, it carries with it the lexical context. So you can make the closures that refer to the "my" variables in their lexical scope. With the source code you can't do this. The table type compiles them at initialization time in the context of the main package, and that's all they can see. Remember also that the global variables are not shared between the threads, so if you refer to a global variable in the code snippet and rely on a value in that variable, it won't be present in the other threads (unless the other threads are direct descendants and the value was set before their creation).

While working with the custom sorted indexes, I've also fixed the way the errors are reported in their Perl handlers. The errors used to be just printed on stderr. Now they propagate properly through the table, and the table operations die with the Per handler's error message. Since an error in the sorting function means that things are going very, very wrong, after that the table becomes inoperative and will die on all the subsequent operations as well.

Sunday, February 17, 2013

Aggregator in C++, Part 4

And finally here is the reference of actions available in the aggregator handler:

parentIndexType->groupSize(gh)

Get the size of the group. The result is of the type size_t. This is pretty much the only method of IndexType that should be called directly, and only in the aggregation. The rest of the IndexType methods should be accessed through the similar methods in the Table, and I won't even document them. However if you really, really want to, you can find the description of the other methods type/IndexType.h and call them in the aggregation as well.

gadget()->getLabel()->getType()

Get the result row type.

The rest of the actions are done by calling the Index methods on the argument index.Same as with the IndexType, the aggregation is the only place where these methods are called directly, everywhere else the equivalent actions are done through the Table methods. Because of this I've grouped their description with the aggregation and not deparately.

const IndexType *getType() const;

Get the type of this index.

RowHandle *begin() const;


Get the handle of the first row of the group, in the default order according to its first leaf index type. Note that here it's not the whole table's first leaf index type but the first leaf in the index type subtree under this index's type. All the iteration methods return NULL if there are no more rows.

RowHandle *next(const RowHandle *cur) const;

Get the handle of the next row (or NULL if that was the last one) in the default order. The NULL argument makes the NULL result.

RowHandle *last() const;

Get the handle of the last row in the group in the default order.

The rest of the methods of Index aren't really to be used directly.

Unlike the Perl API of AggregatorContext, there are no direct analogs of beginIdx() and such in C++ Index. To get them in C++, you need to translate the iteration to another index type through the Table (and of course, just like in Perl, you would need somehow to get the reference to another index type into your aggregator, and that index type better be in the subtree of the parentIndexType). To translate through the Table, you take any row from the group, usually the first one, and use it with the table methods that accept a sample row.

For example:

RowHandle *sample = index->begin();
RowHandle *rhend =  table->nextGroupIdx(otherIndexType, sample);
for (RowHandle *rhit = table->firstOfGroupIdx(otherIndexType, sample); rhit != rhend; rhit = table->nextIdx(otherIndexType, rhit)) {
  ...
}

This concludes the discussion of the aggregators. This also concludes the description of the whole C++ API, except for the most recent additions.

Aggregator in C++, Part 3

Doing a proper custom aggregator is more involved, and requires making subclasses of both Aggregator and AggregatorType. The test case aggSum shows an example of aggregator that can sum any 64-bit field.

The subclass of Aggregator contains only one method that is very similar to the BasicAggregator handler shown before:

class MySumAggregator: public Aggregator
{
public:
  virtual void handle(Table *table, AggregatorGadget *gadget, Index *index,
      const IndexType *parentIndexType, GroupHandle *gh, Tray *dest,
      Aggregator::AggOp aggop, Rowop::Opcode opcode, RowHandle *rh);
};

void MySumAggregator::handle(Table *table, AggregatorGadget *gadget, Index *index,
    const IndexType *parentIndexType, GroupHandle *gh, Tray *dest,
    Aggregator::AggOp aggop, Rowop::Opcode opcode, RowHandle *rh)
{
  // don't send the NULL record after the group becomes empty
  if (opcode == Rowop::OP_NOP || parentIndexType->groupSize(gh) == 0)
    return;

  int fidx = gadget->typeAs<MySumAggregatorType>()->fieldIdx();

  int64_t sum = 0;
  for (RowHandle *rhi = index->begin(); rhi != NULL; rhi = index->next(rhi)) {
    sum += table->getRowType()->getInt64(rhi->getRow(), fidx, 0);
  }

  // pick the rest of fields from the last row of the group
  RowHandle *lastrh = index->last();

  // build the result row; relies on the aggregator result being of the
  // same type as the rows in the table
  FdataVec fields;
  table->getRowType()->splitInto(lastrh->getRow(), fields);
  fields[fidx].setPtr(true, &sum, sizeof(sum));

  gadget->sendDelayed(dest, fields, opcode);
}

The difference is that the field index is not hardcoded but taken from the aggregator type. The aggregator type is found with

gadget->typeAs<MySumAggregatorType>()

The typeAs template is a very recent addition, I've added it when writing this example. The previous (and still available) equivalent way to do it was to call gadget->getType() and then cast it to the pointer to the proper subclass. The method fieldIdx() is a custom addition to the MySumAggregatorType, not inherited from any base class.

The version of AggregatorGadget::sendDelayed() used here is different:

void sendDelayed(Tray *dest, FdataVec &data, Rowop::Opcode opcode) const;

I've actually added it after writing the first version of this post, then edited the post. It conveniently handles the construction of the row from fields and sending it.

Then the aggregator type needs to be defined with a fixed set of inherited virtual methods plus any needed custom parts.

class MySumAggregatorType: public AggregatorType
{
public:
  // @param fname - field name to sum on
  MySumAggregatorType(const string &name, const string &fname):
    AggregatorType(name, NULL),
    fname_(fname),
    fidx_(-1)
  { }
  // the copy constructor works fine
  // (might set the non-NULL row type, but it will be overwritten
  // during initialization)

  // constructor for deep copy
  // (might set the non-NULL row type, but it will be overwritten
  // during initialization)
  MySumAggregatorType(const MySumAggregatorType &agg, HoldRowTypes *holder):
    AggregatorType(agg, holder),
    fname_(agg.fname_),
    fidx_(agg.fidx_)
  { }

  virtual AggregatorType *copy() const
  {
    return new MySumAggregatorType(*this);
  }

  virtual AggregatorType *deepCopy(HoldRowTypes *holder) const
  {
    return new MySumAggregatorType(*this, holder);
  }

  virtual bool equals(const Type *t) const
  {
    if (this == t)
      return true; // self-comparison, shortcut

    if (!AggregatorType::equals(t))
      return false;

    const MySumAggregatorType *sumt = static_cast<const MySumAggregatorType *>(t);

    if (fname_ != sumt->fname_)
      return false;

    return true;
  }

  virtual bool match(const Type *t) const
  {
    if (this == t)
      return true; // self-comparison, shortcut

    if (!AggregatorType::match(t))
      return false;

    const MySumAggregatorType *sumt = static_cast<const MySumAggregatorType *>(t);

    if (fname_ != sumt->fname_)
      return false;

    return true;
  }

  virtual AggregatorGadget *makeGadget(Table *table, IndexType *intype) const
  {
    return new AggregatorGadget(this, table, intype);
  }

  virtual Aggregator *makeAggregator(Table *table, AggregatorGadget *gadget) const
  {
    return new MySumAggregator;
  }

  virtual void initialize(TableType *tabtype, IndexType *intype)
  {
    const RowType *rt = tabtype->rowType();
    setRowType(rt); // the result has the same type as the argument
    fidx_ = rt->findIdx(fname_);
    if (fidx_ < 0)
      errors_.fAppend(new Errors(rt->print()), "Unknown field '%s' in the row type:", fname_.c_str());
    else {
      if (rt->fields()[fidx_].arsz_ != RowType::Field::AR_SCALAR
      || rt->fields()[fidx_].type_->getTypeId() != Type::TT_INT64)
        errors_.fAppend(new Errors(rt->print()),
          "Field '%s' is not an int64 scalar in the row type:", fname_.c_str());
    }
    AggregatorType::initialize(tabtype, intype);
  }

  // called from the handler
  int fieldIdx() const
  {
    return fidx_;
  }

protected:
  string fname_; // name of the field to sum, must be an int64
  int fidx_; // index of field named fname_
};

The constructor accepts the aggregator name and the name of the field on which it will sum. The field name will be translated to field index during initialization, and made available to the MySumAggregator::handler() via the method fieldIdx(). The aggregator type starts with the result row type of NULL, with the actual row type set during initialization. This ability to set the row type later had the latent existence all along, but it's the first time I've made it explicit. It hasn't propagated to the Perl API yet. The idea here is that the result row type of this aggregator is always equal to the input row type, so rather than specifying the result type explicitly and then having to check it for compatibility, why not just take the table's row type when it becomes available? And it works beautifully.

The copy constructor and the constructor with HoldRowTypes are the implementations of the virtual methods copy() and deepCopy(). The deepCopy() is yet another recent addition used in the multithreaded support. I'll describe it in detail later, for now just Do It Like This.

The methods match() and equals() follow the same general shape as everywhere else. makeGadget() creates a generic gadget, and makeAggregator() creates an instance of aggregator for each group.

The interesting stuff starts happening in the initialization. The row type gets found from the table and set as the result type. Then the aggregation field is found in the row type and checked for being of the proper type. Its index is remembered for the later use.

errors_.fAppend() is a new part of the Erref API that makes the error construction more convenient. It is smart enough to check the reference for NULL and allocate a new Errors if so, then append a printf-formatted message and nested errors to it. I've been adding the direct printf-formatting for both Errors and Exceptions, this part of the API is to be documented yet.

Aggregator in C++, part 2

Building an aggregator is a fairly complex subject, so next I want to show a working example. It turns out that I've never actually written an aggregator in C++ before, other than some tiny fragments; I was always working through Perl. So I've written some more interesting examples just now. The full text can be found in the unit test file cpp/table/t_Agg.cpp.

First, if your aggregator is truly stateless and fully hardcoded, the easier way to do it as by defining a plain function with the same handler arguments and building a BasicAggregatorType with it. And here is one that sums the values of an int64 field (the test case aggBasicSum):

void sumC(Table *table, AggregatorGadget *gadget, Index *index,
        const IndexType *parentIndexType, GroupHandle *gh, Tray *dest,
    Aggregator::AggOp aggop, Rowop::Opcode opcode, RowHandle *rh)
{
  // don't send the NULL record after the group becomes empty
  if (opcode == Rowop::OP_NOP || parentIndexType->groupSize(gh) == 0)
    return;

  int64_t sum = 0;
  for (RowHandle *rhi = index->begin(); rhi != NULL; rhi = index->next(rhi)) {
    sum += table->getRowType()->getInt64(rhi->getRow(), 2, 0); // field c at idx 2
  }

  // pick the rest of fields from the last row of the group
  RowHandle *lastrh = index->last();

  // build the result row; relies on the aggregator result being of the
  // same type as the rows in the table
  FdataVec fields;
  table->getRowType()->splitInto(lastrh->getRow(), fields);
  fields[2].setPtr(true, &sum, sizeof(sum)); // set the field c from the sum

  Rowref res(gadget->getLabel()->getType(), fields);
  gadget->sendDelayed(dest, res, opcode);
}

...
  Autoref<TableType> tt = TableType::make(rt1)
    ->addSubIndex("Hashed", HashedIndexType::make( // will be the default index
        (new NameSet())->add("e")
      )->addSubIndex("Fifo", FifoIndexType::make()
        ->setAggregator(new BasicAggregatorType("aggr", rt1, sumC))
      )
    );
...

As I've told before, you create the BasicAggregatorType by giving it the aggregator name, aggregator result row type, and the handler function.

In this case the handler function is completely hardcoded. It works on the int64 field at index 2. The row type I used in this example is:

row {
  uint8[10] a,
  int32[] b,
  int64 c,
  float64 d,
  string e,
}

So the field is actually named "c", and that's why the aggregator function is named "sumC".  But since in this case everything is known in advance, to make it more efficient, the look-up of the field by name has been skipped, and the field index has been pre-computed and placed into the function.

The general outline of the aggregator is exactly the same as in Perl: check for an empty group, then iterate through all the rows in the group and compute a the sum, fill the rest of the fields from the last row, and send the result. The difference is that there is no AggregatorContext, and the calls are done directly on the bits and pieces received as arguments.

The group size is computed as

parentIndexType->groupSize(gh)

This is pretty much the reason for parentIndexType and gh to be passed as arguments to the handler. Why it's done like this is a long story. It allows an individual index (as in instance, not type!) to not care about the number of rows in it. Remember, a group may have multiple "parallel" indexes defined on it, with all of them containing the same rows, and thus the same number of rows. The group handle (gh) contains the common information about the group, including the group size. And the parent index type is the one that manages the group handles inside it, and knows how to extract the information from them.

The iteration is done as usual with the begin() and next(),  only called directly on the index. It will return NULL after the last row. The last row can also be found directly with index->last(); on an empty group it would return NULL, so be careful with that.


The input row type for reading the rows from the group is found as:


table->getRowType()

The result row is built in this example by copying the last row and replacing one field. The data from the last row is split into FdataVec (the data itself is not copied at this point but the data descriptors in the construction vector are made to point to the data in the original row). Then the descriptor for the field "c" is changed to point to the computed sum. Then the new row is built from the descriptor.

In this particular case the type of the input rows and of the result rows is the same, so either could have been used to construct the result rows.  There are two ways to find the result type:


gadget()->getType()->getRowType()
gadget->getLabel()->getType()


They are exactly the same, just there are two paths leading to the same object.


Finally, the constructed row is sent. sendDelayed() takes care of constructing the rowop from the components.



Potentially, you could even call directly


sendDelayed(dest, gadget->getLabel()->getType()->makeRow(fields), opcode);

 However the caveat is that if the table's enqueuing mode is set to EM_IGNORE (which is an old idea, nowadays everyone should use EM_CALL and eventually there will be no other way than EM_CALL, but for now it could happen), the rows would leak. The reason for that leak is that sendDelayed() will skip the rowop creation with EM_IGNORE, and thus will skip the reference creation, and the row would never have a reference to it created and would never be freed. Creating an explicit reference makes sure that the row will always be cleaned up when it becomes unused.

Friday, February 15, 2013

Aggregator in C++, part 1

I've described the AggregatorType and AggregatorGadget before. Now, the last piece of the puzzle is the Aggregator class.

An object of subclass of AggregatorType defines the logic of the aggregation, and there is one of them per a TableType. An object of subclass of AggregatorGadget provides the aggregation output of a Table, and there is one of them per table. An object of subclass of Aggregator keeps the state of an aggregation group, and there is one of them per Index that holds a group in the table.

Repeating what I wrote before:

"The purpose of the Aggregator object created by AggregatorType's makeAggregator() is to keep the state of the group. If you're doing an additive aggregation, it allows you to keep the previous results. If you're doing the optimization of the deletes, it allows you to keep the previous sent row.

What if your aggregator keeps no state? You still have to make an Aggregator for every group, and no, you can't just return NULL, and no, they are not reference-countable, so you have to make a new copy of it for every group (i.e. for every call of makeAggergator()). This looks decidedly sub-optimal, and eventually I'll get around to straighten it out. The good news though is that most of the real aggerators keep the state anyway, so it doesn't matter much."

Once again, the Aggregator object doesn't normally do  the logic by itself, it calls its AggregatorType for the logic. It only provides storage for one group's state and lets the AggergatorType logic use that storage.

The Aggregator is defined in table/Aggregator.h. It defines two things: the constants and the virtual method.

The constants are in the enum Aggregator::AggOp, same as defined before in Perl:

AO_BEFORE_MOD,
AO_AFTER_DELETE,
AO_AFTER_INSERT,
AO_COLLAPSE,

Their meaning is also the same. They can be converted to and from the string representation with methods

static const char *aggOpString(int code, const char *def = "???");
static int stringAggOp(const char *code);

They work the same way as the other constant conversion methods.

Notably, there is no explicit Aggregator constructor. The base class Aggregator doesn't have any knowledge of the group state, so the default constructor is good enough. You add this knowledge of the state in your subclass, and there you define your own constructor. Whether the constructor takes any arguments or not is completely up to you, since your subclass of AggregatorType will be calling that constructor.

And the actual work then happens in the method

virtual void handle(Table *table, AggregatorGadget *gadget, Index *index,
    const IndexType *parentIndexType, GroupHandle *gh, Tray *dest,
    AggOp aggop, Rowop::Opcode opcode, RowHandle *rh);

Your subclass absolutely has to define this method because it's abstract in the base class. The arguments provide the way to get back to the table, its components and its type, so you don't have to pass them through your Aggregator constructor and keep them in your instance, this saves memory.

Notably absent is the pointer to the AggregatorType. This is because the AggregatorGadget already has this pointer, so all you need to to is call

gadget->getType();

You might also want to cast it to your aggregator's type in case if you plan to call your custom methods, like:

MyAggregatorType *at = (MyAggregatorType *)gadget->getType();

Some of the argument types, Index and GroupHandle, have not been described yet, and will be described shortly.

But in general the method handle() is expected to work very much like the aggregator handler does in Perl. It's called at the same times (and indeed the Perl handler is called from the C++-level of the handler), with the same aggop and opcode. The rh is still the current modified row.

A bit of a difference is that the Perl way hides the sensitive arguments in an AggregatorContext while in C++ they are passed directly.

The job of the handle() is to do whatever is necessary, possibly iterate through the group, possibly use the previous state, produce the new state (and possibly remember it), then send the rowop to the gadget. This last step usually is:

gadget->sendDelayed(dest, newrow, opcode);



Here gadget is the AggregatorGadget from the arguments, opcode also helpfully comes from the arguments (and if it's OP_NOP, there is no need to send anything, it will be thrown away anyway), and newrow is the newly computed result Row (not RowHandle!). Obviously, the type of the result row must match the output row type of the AggregatorGadget, or all kinds of bad things will happen. dest is also an argument of handle(), and is a tray that will hold the aggregator results until a proper time when they can be sent. The "delayed" part of "sendDelayed" means exactly that: the created rowops are not sent directly to the gadget's output but are collected in a Tray until the Table decides that it's the right time to send them.

You absolutely must not use the Gadget's default method send(), use only sendDelayed(). To help with this discipline, AggregatorGadget hides the send() method and makes only the sendDelayed() visible.

Friday, February 8, 2013

little table type additions

As I'm working on the multithreading, this has created some little additions that have been missed before.

First, I've added a method to copy a whole table type. In Perl it is:

$newtt = $oldtt->copy();

In C++ it is:

TableType *copy();

This copies the table type, along with copying all the index types in it, since each table type must have its own instances of the index types. The copied table is always uninitialized.

In case if the table type collected errors, the errors aren't copied, and you should not copy such a table type. This caveat applies to the C++ code, since the Perl code would not let you create a table type with errors, it would fail on these errors immediately.

Second, in the Perl implementation of AggregatorType I've added a method to get back the row type:

$rt = $aggtype->getRowType();

Saturday, December 29, 2012

AggregatorGadget

AggregatorGadget is a fairly internal class, but I'll describe it as well while at it. Each aggregator in a table has its own gadget, and that's what it is. It carries some extra information.

The grand plan was that the different aggregator types may define their own subclasses of AggregatorGadget but in reality there appears no need to. So far all the aggregators happily live with the base AggregatorGadget.

AggregatorGadget(const AggregatorType *type, Table *table, IndexType *intype);

The type of the aggregator and the index type on which this particular aggregator is defined will be kept as references in the AggregatorGadget. The table will be remembered as a simple pointer (as usual, to avoid the circulare references, since the Table references all its AggregatorGadgets).

Table *getTable() const;
const AggregatorType* getType() const;

Get back the information. By now I'm not sure, why there is no method to get back the index type. Looks like nothing needs it, so the index type reference from the gadget is fully superfluous. The potential subclasses may read it from the field indexType_.

The normal way to use the AggregatorGadget is to call its method sendDelayed(). And it's called by other classes, not by its subclasses, so it's exported as publuc. On the other hand, the method send() must never be used with the AggregatorGadget, so it's made private (yes, I know that if you really want, you can use the superclass method, but just don't, the idea here is to guard against the accidental misuse, not against the malicious one).

Saturday, September 22, 2012

AggregatorType and BasicAggregatorType, part 3

The purpose of the Aggregator object created by makeAggregator is to keep the state of the group. If you're doing an additive aggregation, it allows you to keep the previous results. If you're doing the optimization of the deletes, it allows you to keep the previous sent row.

What if your aggregator keeps no state? You still have to make an Aggregator for every group, and no, you can't just return NULL, and no, they are not reference-countable, so you have to make a new copy of it for every group (i.e. for every call of makeAggergator()). This looks decidedly sub-optimal, and eventually I'll get around to straighten it out. The good new though is that most of the real aggerators keep the state anyway, so it doesn't matter much.

More of the AggregatorType working can't be explained without going into the working of the aggregators, which requires looking at the tables first, so it all will be discussed later.

The class BasicAggregatorType (defined in type/BasicAggregatorType.h) provides for a simple case: the stateless aggregation, where the aggregation is done by a single simple C function. This C function has all the arguments of Aggregator::handle forwarded to it:

typedef void Callback(Table *table, AggregatorGadget *gadget, Index *index,
    const IndexType *parentIndexType, GroupHandle *gh, Tray *dest,
    Aggregator::AggOp aggop, Rowop::Opcode opcode, RowHandle *rh, Tray *copyTray);

If you have a function like this, you just give it to the BasicAggregatorType constructor, and you don't need to worry about the rest of it:

BasicAggregatorType(const string &name, const RowType *rt, Callback *cb);

BasicAggregatorType takes care of the rest of the infrastructure: gadgets, aggregators etc.

Thursday, September 20, 2012

AggregatorType, part 2

The other method that you can re-define or leave alone is printTo():

virtual void printTo(string &res, const string &indent = "", const string &subindent = "  ") const;

The default one prints "aggregator (<result row type>) <name>". If you want to print more information, such as the name of the aggregator class and its arguments, you can define your own.

Finally, there are methods that will produce objects that do the actual work:

virtual AggregatorGadget *makeGadget(Table *table, IndexType *intype) const;
virtual Aggregator *makeAggregator(Table *table, AggregatorGadget *gadget);

This exposes quite a bit of the inherent complexity of the aggregators. For the simpler cases you can use the subclass BasicAggregatorType that handles most of this complexity for you and just skip these "make" methods. By the way, the IndexType has a "make" method of this kind too but it was not discussed because unless you define a completely new IndexType, you don't need to worry about it: it just happen under the hood. The SortedIndexType just asks you to define a condition and takes care of the rest, like the BasicAggregatorType for aggregators.

Gadget is a concept that has not been mentioned yet. It's not present in the Perl API, only in C++. Fundamentally it's a general base class that means  "something with an output label". It doesn't have to be limited to one label, it just has one "default" output label and then the subclasses can add anything they want. A table is a gadget. Each aggregator type in a table is a gadget too. So whenever a table is created from a table type, each aggregator type in that table type is called to produce its gadget, and these gadgets are collected in the table. When you call table->getAggregatorLabel("name"), you get the output label from the appropriate gadget.

The gadget construction gets the pointers to the concrete table and concrete index type to which it will be connected. It can store these pointers in the gadget, but it must not make them into references: that would create cyclic references, because the table already references all its aggregator gadgets. There is normally no need to worry that the table will disappear: when the table is destroyed, it will never call the aggregator gadget again, and the dereferencing of the aggregator gadget will likely cause it to be destroyed too (unless you hold another reference to it, which you normally should not).

Once again, short version: one AggregatorGadget per table per aggregator type.

On the other hand, an Aggergator represents a concrete aggregation on a concrete index (not on an index type, on an index!). Whenever an index of some type is created, an aggregator of its connected type is created with it. A table with a complicated tree structure of indexes can have lots of aggregators of a single type. The difference between an index type and an index is explained in http://triceps.sourceforge.net/docs-latest/guide.html#sc_table_indextree. In short, it's one index per group.

The way it works, whenever some row in the table gets deleted or inserted, the table determines for each index type, which actual index in the tree (i.e. which group) got changed. Then for aggregation purposes, if that index has an aggegator on it, that aggregator is called to do its work on the group. It produces an output row or two (or maybe none) for that group and sends it to the aggregator gadget of the same type.

Once again, short version: one Aggregator object per group, produces the updates when asked, sends them to the single common gadget.

The pointers to the Table and Gadget are given for convenience, the Aggergator doesn't need to remember it. Whenever it will be called, it will also be given these pointers as arguments. This is done in the attempt to reduce the amount of data stored per aggregator.

Sunday, September 16, 2012

AggregatorType, part 1

The AggregatorType is a base class in which you define the concrete aggregator types, very much like the sorted index type. It has a chunk of functionality common for all the aggregator types and a bunch of virtual functions that compute the actual aggregation in the subclasses.

AggregatorType(const string &name, const RowType *rt);

The constructor provides a name and the result row type. Remember, that AggregatorType is an abstract class,  and will never be instantiated directly. Instead your subclass that performs a concrete aggregation will invoke this constructor as a part of its constructor.

As has been described in the Perl part of the manual, the aggregator type is unique in the fact that it has a name.  And it's a bit weird name: each aggregator type is kind of by itself and can be reused in multiple table types, but all the aggregator types in a table type must have different names. This is the name that is used to generate the name of the aggregator's output label in a table: '<table_name>.<aggregator_type_name>'. Fundamentally, the aggregator type itself should not have a name, it should be given a name when connected to an index in the table type. But at the time the current idea looked good enough, it's easy, convenient for error messages, and doesn't get much in the way.

The result row type might not be known at the time of the aggregator type creation. All the constructor does with it is place the value into a field, so if the right type is not known, just make up some (as long as it's not NULL!) and use it, then change later at the initialization time.

For 1.1 I've changed this code to accept a NULL result row type until the initialization is completed. If it's still NULL after initialization, this will be reported as an error.

AggregatorType(const AggregatorType &agg);
virtual AggregatorType *copy() const;

An aggregator type must provide a copy constructor that does the deep copy and the virtual  method copy() that invokes it. It's the same as with the index types: when an agggregator type gets connected into a table type, it gets actually copied, and the must always be uninitialized.

Speaking of the fields, the fields in the AggregatorType and available to the subclasses are:

    const_Autoref<RowType> rowType_; // row type of result
    Erref errors_; // errors from initialization
    string name_; // name inside the table's dotted namespace
    int pos_; // a table has a flat vector of AggregatorGadgets in it, this is the index for this one (-1 if not set)
    bool initialized_; // flag: already initialized, no future changes

rowType_ is the row type of the result. The constructor puts the argument value there but it can be changed at any time (until the initialization is completed) later.

errors_ is a place to put the errors during initialization. It comes set to NULL, so if you want to report any errors, you have to create an Errors object first.

name_ is where the name is kept. Generally, don't change it, treat it as read-only.

pos_ has to do with management of the aggregator types in a table type. Before initialization it's -1, after initialization each aggregator type (that becomes tied to its table type) will be assigned a sequential number. Again, treat it as read-only, and you probably would never need to even read it.

initialized_ shows that the initialization has already happened. Your initialization should call the initialization of the base class, which would set this flag. No matter if the initialization succeesed or failed, this flag gets set. It never gets reset in the original AggregatorType object, it gets reset only in the copies.

const string &getName() const;
const RowType *getRowType() const;
bool isInitialized() const;
virtual Erref getErrors() const;

The convenience getter functions that return the data from the fields. You can override getErrors() but there probably is no point to it.



virtual bool equals(const Type *t) const;
virtual bool match(const Type *t) const;

The equality and match comparisons are as usual. The defaults provided in the base AggregatorType check that the result row type is equal or matching (or, in version 1.1, that both result row types are NULL), and that the typeid of both are the same. So if your aggregator type has no parameters, this is good enough and you don't need to redefine these methods. If you do have parameters, you call the base class method first, if it returns false, you return false, otherwise you check the parameters. Like this:

bool MyAggregatorType::equals(const Type *t) const
{
     if (!AggregatorType::equals(t))
        return false;

    // the typeid matched, so safe to cast
    const MyAggregatorType *at = static_cast<const MyAggregatorType *>(t);
    // ... check the type-specific parameters ...
}


Wednesday, June 6, 2012

error handling in the Perl wrappers

Since the API started the transition to confessing on the fatal errors instead of just returning an undef and an error message, I've converted the Perl wrapper methods to do the same. This includes:

AggregatorContext::makeHashSend()
AggregatorContext::makeArraySend()

Label::makeRowopHash()
Label::makeRowopArray()

Table::findBy()Table::findIdxBy() 

Unit::makeHashCall()
Unit::makeArrayCall()
Unit::makeHashSchedule()
Unit::makeArraySchedule()
Unit::makeHashLoopAt()
Unit::makeArrayLoopAt() 

Sunday, May 13, 2012

Tables: no more bundling

I believe I've told before that the table first processes all the rows from an operation on it (only one row is the argument of the operation but it may trigger the deletion of multiple rows with the replacement policies) and only then sends all the results. This is essentially an implicit bundling of the rowops, and has all the issues of the bundling that have been described before. Since a join sees the rowops only after they come out of the table, the processing of the missing matches for the outer joins (described in the last post) could not work reliably. If there are multiple rows at the same join key affected by an operation, when the join looks in the table, it would see the state after all of them have been already applied, and would make the wrong decisions. So, how does it work?

The answer is that now I've changed the way the tables work. No more implicit bundling. Each row gets changed in the table, and a rowop is immediately called on the table's output label. The handler of that rowop can read the table and see it exactly in the state right after that rowop was applied, and none more. Nice, consistent, convenient.

Note though that any labels called from this point may only read the table, not modify it. The table is still in the middle of the previous modification, and starting a new modification at this point would corrupt it. So if you want to modify the table, you have to schedule it for later, after the current, modification is completed, using the Unit methods schedule() or loopAt(). But keep in mind that by the time that rowop gets called, many other changes may have already happened to the table. So it's best to schedule not the direct table changes but the more high-level operations which would look at the state of the table at their run time and decide the proper action.

The order of sending the aggregation results has also changed. It used to be the table changes, then all the aggregation results. Now first the aggregation handlers get called with AO_BEFORE_MOD and their results get sent through, then the table modifications work through as described, and then the aggregation handlers get called with AO_AFTER_* and their results go through. The aggregation modifications are still bundled: all the aggregators get called with their results remembered, and then all the result are sent through. This is not very pretty but not such a big deal either. The reason is that the aggregation code has to detect whether each modification is the last one for each aggregation group or not. And it's hard enough to do in the bundled way, and would be quite difficult to unbundle. Besides, the aggregators are specially designed to improve their efficiency by throwing away the intermediate updates on the same group, so there is not much use in unbundling that.

Another new feature of the table is the "pre" label. It can be found with:

$lb = $table->getPreLabel();

It has the name of "TableName.pre". A rowop on this label gets called right before applying the row to the table. Just as with the table's output label, the code that handles that rowop can't do any other direct modification to the table and can't prevent the ongoing modifications from happening. However if it reads the table, it will find it in exactly the state before that modification gets applied. Which comes useful sometimes, in particular for the self-joins that will be shown later. There is also a bit of optimization going on: since the "pre" label gets used fairly rarely, the table code first checks if there are any labels chained from it. If none, the "pre" label doesn't get called at all. If you do the unit tracing, you won't see the call of the "pre" label in the trace unless there are other labels chained from it.

To recap, the new high-level order of the table operation processing is:

  • Execute the replacement policies on all the indexes, find all the rows that need to be deleted first.
  • If any of the index policies forbid the modification, return 0.
  • Call all the aggregators with AO_BEFORE_MOD on all the affected rows.
  • Send these aggregator results.
  • For each affected row:
    • Call the "pre" label (if it has any labels chained to it).
    • Modify the row in the table.
    • Call the "out" label.
  • Call all the aggregators with AO_AFTER_*, on all the affected rows.
  • Send these aggregator results.
 And to let the join find whether some row is the only one in the group or not, I've added another call:

$size = $table->groupSizeIdx($idxType, $row_or_rh);

It works very similar to the AggregatorContext::groupSize(), only it has no context and has to get the index type and row or row handle as its arguments. It returns the count of rows in the group. If there is no such group in the table, the result will be 0. If the argument is a row handle, that handle may be in the table or not in the table, either will be handled transparently (though calling it for a row handle in the table is more efficient because the group would not need to be looked up first). If the argument is a row, it gets handled similarly to findIdx(): a temporary row handle gets created, used to find the result, and then destroyed.

The $idxType is the one that owns the split. Naturally, it must be a non-leaf index. (Using a non-leaf index type is not an error but it always returns 0, because there are no groups under it). It's basically the same index type as you would use in findIdx() to find the first row of the group. For example, if you have a table type defined as

our $ttPosition = Triceps::TableType->new($rtPosition)
  ->addSubIndex("primary",
    Triceps::IndexType->newHashed(key => [ "date", "customer", "symbol" ])
  ) 
  ->addSubIndex("currencyLookup", # for joining with currency conversion
    Triceps::IndexType->newHashed(key => [ "date", "currency" ])
    ->addSubIndex("grouping", Triceps::IndexType->newFifo())
  ) 
  ->addSubIndex("byDate", # for cleaning by date
    Triceps::SimpleOrderedIndex->new(date => "ASC")
    ->addSubIndex("grouping", Triceps::IndexType->newFifo())
  ) 
or die "$!";

Then it would make sense to call groupSizeIdx on the indexes "currencyLookup" or "byDate" but not on "primary", "currencyLookup/grouping" nor "byDate/grouping". Remember, a non-leaf index type defines the groups, and the nested index types under it define the order in those groups (and possibly further break them down into sub-groups).

Sunday, March 25, 2012

AggregatorContext reference

AggregatorContext is one of the arguments passed to the aggregator computation function. It encapsulates the iteration through the aggregation group, in the order of the index on which the aggregator is defined. After the computation function returns, the context becomes invalidated and stops working, so there is no  point in saving it between the calls. There is no way to construct the aggregator context directly.

It provides the following methods:

$result = $ctx->groupSize();

Returns the number of rows in the group. This is currently a unique feature available only in an aggregator, not in the normal iteration through the table.

$rowType = $ctx->resultType();

Returns the row type of the aggregation result.

$rh = $ctx->begin();

The first row handle of the iteration. In case of an empty group would return a null handle.

$rh = $ctx->next($rh);

The next row handle in order. If the argument handle was the last row in the group, returns a null handle. So the iteration through the group with a context is similar to iteration through the whole table: it ends when begin() or next() returns a null handle.

$rh = $ctx->last();

The last row handle in the group. If the group is empty, returns a null handle.

$rh = $ctx->beginIdx($idxType);

The first row in the group, according to a specific index type's order. The index type must belong to the group, otherwise the result is undefined. If the group is empty, will return the same value as endIdx(). If $idxType is non-leaf, the effect is the same as if its first leaf were used.

$rh = $ctx->endIdx($idxType);

The handle past the last row in the group, according to a specific index type's order. The index type must belong to the group, otherwise the result is undefined and might even result in an endless iteration loop. If $idxType is non-leaf, the effect is the same as if its first leaf were used. This kind of iteration uses the table's $t->nextIdx($idxType, $rh) or $rh->next($idxType) to advance the position. The Table reference post said that that not every possible order is iterable. Well, with the aggregation context, every order is iterable. You can pick any index in the group and iterate in its order. And aggregation is where this ability counts the most.

If the group happens to be the last group of this index type (not of $idxType but of the index on which the aggregator is defined) in the table, endIdx()  would return a null row handle. If it's also empty, beginIdx() would also return a null handle, and in general, for an empty group beginIdx() would return the same value as endIdx(). If the group is not the last one, endIdx() returns the handle of the first row in the next group.

$rh = $ctx->lastIdx($idxType);

The last row in the group according to a particular index type's order. The index type must belong to the group, otherwise the result is undefined. If the group is empty, returns a null handle.

$ctx->send($opcode, $row) or die "$!";

Constructs a result rowop for the aggregator and arranges for it to be sent to the aggregator's output  label. The actual sending is delayed: it will be done only after all the aggregators run, then the table's changes are sent to the table's normal output label, then each aggregator's changes are sent to its label. Note that the aggregator's output label is not visible in the computation function, so the rowop can not be constructed directly. Instead send() takes care of it. The row must be of a type at least matching the aggregator's result type (and of course the normal practice is to use the aggregator's result type to construct the row). On success returns 1, on error returns undef and the error message.

$ctx->makeHashSend($opcode, $fieldName => $fieldValue, ...) or die "$!";

A convenience function that produces the row from pairs of field names and values and sends it. A combination of makeRowHash() and send().

$ctx->makeArraySend($opcode, @fields) or die "$!";

A convenience function that produces the row from the array of field values and sends it. A combination of makeRowArray() and send().

Note that an aggregator must never change the table. Any attempt to change the table is a fatal error.

Tuesday, March 20, 2012

AggregatorType reference

The aggregator type gets created by the constructor

$at = Triceps::AggregatorType->new($resultRowType, "aggName", $initFunc,
  $handlerFunc, @args...) or die "$!";

The rows created by the aggregator are of $resultRowType . The aggregator name is used to name the aggregator result label in the table, "tableName.aggName". It is also used to get the reference of that label from the table.

The optional args are passed to both the init and handler functions (to which $initFunc and $handlerFunc are references). The init function is called when the row group (contained in an index of the type, on which this aggregator type is set) is created. It initializes the group's aggregation state.  The handler function gets called on the changes to the group, as had been described at length previously.

The methods for comparison, printing and copying work similarly to the index types:

$result = $at1->same($at2);
$result = $at1->equals($at2);
$result = $at1->match($at2);
$result = $at->print(); 
$atCopy =  $at->copy();

The matching aggregator types may differ in the aggregator name and in the field names of the result row type. However the function references and their arguments must be the same.

Sunday, March 18, 2012

The guts of SimpleAggregator, part 2

The functions that translate the $%variable names are built after the same pattern but have the different built-in variables:

# @param varname - variable to replace
# @param func - function name, for error messages
# @param vars - definitions of the function's vars
# @param id - the unique id of this field
# @param argCount - the argument count declared by the function
sub replaceStep # ($varname, $func, $vars, $id, $argCount)
{
  my ($varname, $func, $vars, $id, $argCount) = @_;

  if ($varname eq 'argiter') {
    confess "MySimpleAggregator: internal error in definition of aggregation function '$func', step computation refers to 'argiter' but the function declares no arguments"
      unless ($argCount > 0);
    return "\$a${id}";
  } elsif ($varname eq 'niter') {
    return "\$npos";
  } elsif ($varname eq 'groupsize') {
    return "\$context->groupSize()";
  } elsif (exists $vars->{$varname}) {
    return "\$v${id}_${varname}";
  } else {
    confess "MySimpleAggregator: internal error in definition of aggregation function '$func', step computation refers to an unknown variable '$varname'"
  }
}

sub replaceResult # ($varname, $func, $vars, $id, $argCount)
{
  my ($varname, $func, $vars, $id, $argCount) = @_;

  if ($varname eq 'argfirst') {
    confess "MySimpleAggregator: internal error in definition of aggregation function '$func', result computation refers to '$varname' but the function declares no arguments"
      unless ($argCount > 0);
    return "\$f${id}";
  } elsif ($varname eq 'arglast') {
    confess "MySimpleAggregator: internal error in definition of aggregation function '$func', result computation refers to '$varname' but the function declares no arguments"
      unless ($argCount > 0);
    return "\$l${id}";
  } elsif ($varname eq 'groupsize') {
    return "\$context->groupSize()";
  } elsif (exists $vars->{$varname}) {
    return "\$v${id}_${varname}";
  } else {
    confess "MySimpleAggregator: internal error in definition of aggregation function '$func', result computation refers to an unknown variable '$varname'"
  }
}

And finally the definition of the aggregation functions:

our $FUNCTIONS = {
  first => {
    result => '$%argfirst',
  },
  last => {
    result => '$%arglast',
  },
  count_star => {
    argcount => 0,
    result => '$%groupsize',
  },
  count => {
    vars => { count => 0 },
    step => '$%count++ if (defined $%argiter);',
    result => '$%count',
  },
  sum => {
    vars => { sum => 0 },
    step => '$%sum += $%argiter;',
    result => '$%sum',
  },
  max => {
    vars => { max => 'undef' },
    step => '$%max = $%argiter if (!defined $%max || $%argiter > $%max);',
    result => '$%max',
  },
  min => {
    vars => { min => 'undef' },
    step => '$%min = $%argiter if (!defined $%min || $%argiter < $%min);',
    result => '$%min',
  },
  avg => {
    vars => { sum => 0, count => 0 },
    step => 'if (defined $%argiter) { $%sum += $%argiter; $%count++; }',
    result => '($%count == 0? undef : $%sum / $%count)',
  },
  avg_perl => { # Perl-like treat the NULLs as 0s
    vars => { sum => 0 },
    step => '$%sum += $%argiter;',
    result => '$%sum / $%groupsize',
  },
  nth_simple => { # inefficient, need proper multi-args for better efficiency
    vars => { n => 'undef', tmp => 'undef', val => 'undef' },
    step => '($%n, $%tmp) = @$%argiter; if ($%n == $%niter) { $%val = $%tmp; }',
    result => '$%val',
  },
};

You can use as the starting point for building your own. As you can see, this very first simple version of SimpleAggregator didn't include the user-provided functions but the real one already does.

That's it, the whole aggregator generation.

The guts of SimpleAggregator, part 1

The implementation of the SimpleAggregator has turned out to be surprisingly small. Maybe a little biggish for a single post but still small. I've liked it so much that I've even saved the original small version in the file xSimpleAggregator.t. As more features will be added, the "official" version of the SimpleAggregator will grow (and already did) but that example file will stay small and simple.

I'll put the commentary interlaced with te code. So, here we go.

package MySimpleAggregator;
use Carp;

use strict;

sub make # (optName => optValue, ...)
{
  my $opts = {}; # the parsed options
  my $myname = "MySimpleAggregator::make";

  &Triceps::Opt::parse("MySimpleAggregator", $opts, { 
      tabType => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "Triceps::TableType") } ],
      name => [ undef, \&Triceps::Opt::ck_mandatory ],
      idxPath => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "ARRAY", "") } ],
      result => [ undef, sub { &Triceps::Opt::ck_mandatory(@_); &Triceps::Opt::ck_ref(@_, "ARRAY") } ],
      saveRowTypeTo => [ undef, sub { &Triceps::Opt::ck_refscalar(@_) } ],
      saveInitTo => [ undef, sub { &Triceps::Opt::ck_refscalar(@_) } ],
      saveComputeTo => [ undef, sub { &Triceps::Opt::ck_refscalar(@_) } ],
    }, @_);

  # reset the saved source code
  ${$opts->{saveInitTo}} = undef if (defined($opts->{saveInitTo}));
  ${$opts->{saveComputeTo}} = undef if (defined($opts->{saveComputeTo}));
  ${$opts->{saveRowTypeTo}} = undef if (defined($opts->{saveRowTypeTo}));

Triceps::Opt is a class that deals with parsing and doing the basic checks on the options. I'll decribe it in detail in a separate post. For now, the important part is that the checked options are copied into the hash pointed to by $opts. If this were a proper object constructors, it would have been $self instead of $opts.

  # find the index type, on which to build the aggregator
  my $idx;
  { 
    my @path = @{$opts->{idxPath}};
    confess "$myname: idxPath must be an array of non-zero length"
      unless ($#path >= 0);
    my $cur = $opts->{tabType}; # the root of the tree
    my $progress = ''; 
    foreach my $p (@path) {
      $progress .= $p;
      $cur = $cur->findSubIndex($p) 
        or confess("$myname: unable to find the index type at path '$progress', table type is:\n" . $opts->{tabType}->print() . " "); 
      $progress .= '.';
    }   
    $idx = $cur;
  } 
  confess "$myname: the index type is already initialized, can not add an aggregator on it"
    if ($idx->isInitialized());

Since the SimpleAggregator uses an existing table with existing index, it doesn't require the aggregation key: it just takes an index that forms the group, and whatever key that leads to this index becomes the aggregation key. The lookup of index by path should probably become a standard method on a table type, but here it's implemented directly. Obviously, an aggregator can not be added on an already initialized index.

  # check the result definition and build the result row type and code snippets for the computation
  my $rtRes;
  my $needIter = 0; # flag: some of the functions require iteration
  my $needfirst = 0; # the result needs the first row of the group
  my $needlast = 0; # the result needs the last row of the group
  my $codeInit = ''; # code for function initialization
  my $codeStep = ''; # code for iteration
  my $codeResult = ''; # code to compute the intermediate values for the result
  my $codeBuild = ''; # code to build the result row
  my @compArgs; # the field functions are passed as args to the computation
  {
    my $grpstep = 4; # definition grouped by 4 items per result field
    my @resopt = @{$opts->{result}};
    my @rtdefRes; # field definition for the result
    my $id = 0; # numeric id of the field

    while ($#resopt >= 0) {
      confess "$myname: the values in the result definition must go in groups of 4"
        unless ($#resopt >= 3);
      my $fld = shift @resopt;
      my $type = shift @resopt;
      my $func = shift @resopt;
      my $funcarg = shift @resopt;

      confess("$myname: the result field name must be a string, got a " . ref($fld) . " ")
        unless (ref($fld) eq '');
      confess("$myname: the result field type must be a string, got a " . ref($type) . " for field '$fld'")
        unless (ref($type) eq '');
      confess("$myname: the result field function must be a string, got a " . ref($func) . " for field '$fld'")
        unless (ref($func) eq '');

This starts the loop that goes over the result fields and builds the code to create them. The code will be built in multiple snippets that will eventually be combined to produce the compute function. Since the arguments go in groups of 4, it becomes fairly easy to miss one element somewhere, and then everything gets real confusing. So the code attempts to check the types of the arguments, in hopes of catching these off-by-ones as early as possible. The variable $id will be used to produce the unique prefixes for the function's variables.

       my $funcDef = $FUNCTIONS->{$func}
        or confess("$myname: function '" . $func . "' is unknown");

      my $argCount = $funcDef->{argcount};
      $argCount = 1 # 1 is the default value
        unless defined($argCount);
      confess("$myname: in field '$fld' function '$func' requires an argument computation that must be a Perl sub reference")
        unless ($argCount == 0 || ref $funcarg eq 'CODE');
      confess("$myname: in field '$fld' function '$func' requires no argument, use undef as a placeholder")
        unless ($argCount != 0 || !defined $funcarg);

      push(@rtdefRes, $fld, $type);

      push(@compArgs, $funcarg)
        if (defined $funcarg);

The definitions of "standard" aggregation functions are kept in $FUNCTIONS (it will be shown later). They are defined in exactly the same way as the vwap function has been shown before. The types of the fields get collected for the row definition, and the aggregation argument computation closures (or, technically, functions) get also collected, to pass later as the arguments of the compute function.

      # add to the code snippets

      ### initialization
      my $vars = $funcDef->{vars};
      if (defined $vars) {
        foreach my $v (keys %$vars) {
          # the variable names are given a unique prefix;
          # the initialization values are constants, no substitutions
          $codeInit .= "  my \$v${id}_${v} = " . $vars->{$v} . ";\n";
        }       
      } else {
        $vars = { }; # a dummy
      }     

      ### iteration
      my $step = $funcDef->{step};
      if (defined $step) {
        $needIter = 1;
        $codeStep .= "    # field $fld=$func\n";
        if (defined $funcarg) {
          # compute the function argument from the current row
          $codeStep .= "    my \$a${id} = \$args[" . $#compArgs ."](\$row);\n";
        }       
        # substitute the variables in $step
        $step =~ s/\$\%(\w+)/&replaceStep($1, $func, $vars, $id, $argCount)/ge;
        $codeStep .= "    { $step; }\n";
      }     

The initialization and iteration are produced if defined. It remembers in $needIter if any of the functions involved needs iteration.  And the iteration step is placed into a block. An extra ";" is added just in case, it doesn't hurt and helps if it was forgotten in the function definition.

      ### result building
      my $result = $funcDef->{result};
      confess "MySimpleAggregator: internal error in definition of aggregation function '$func', missing result computation"
        unless (defined $result);
      # substitute the variables in $result
      if ($result =~ /\$\%argfirst/) {
        $needfirst = 1;
        $codeResult .= "  my \$f${id} = \$args[" . $#compArgs ."](\$rowFirst);\n";
      }
      if ($result =~ /\$\%arglast/) {
        $needlast = 1;
        $codeResult .= "  my \$l${id} = \$args[" . $#compArgs ."](\$rowLast);\n";
      }
      $result =~ s/\$\%(\w+)/&replaceResult($1, $func, $vars, $id, $argCount)/ge;
      $codeBuild .= "    ($result), # $fld\n";

      $id++;
    }
    $rtRes = Triceps::RowType->new(@rtdefRes)
      or confess "$myname: invalid result row type definition: $!";
  }
  ${$opts->{saveRowTypeTo}} = $rtRes if (defined($opts->{saveRowTypeTo}));

In the same way the result computation is created, and remembers if any function wanted the fields from the first or last row. And eventually the result row type is created. Next the compute function gets assembled:

  # build the computation function
  my $compText = "sub {\n";
  $compText .= "  use strict;\n";
  $compText .= "  my (\$table, \$context, \$aggop, \$opcode, \$rh, \$state, \@args) = \@_;\n";
  $compText .= "  return if (\$context->groupSize()==0 || \$opcode == &Triceps::OP_NOP);\n";
  $compText .= $codeInit;
  if ($needIter) {
    $compText .= "  my \$npos = 0;\n";
    $compText .= "  for (my \$rhi = \$context->begin(); !\$rhi->isNull(); \$rhi = \$context->next(\$rhi)) {\n";
    $compText .= "    my \$row = \$rhi->getRow();\n";
    $compText .= $codeStep;
    $compText .= "    \$npos++;\n";
    $compText .= "  }\n";
  }
  if ($needfirst) {
    $compText .= "  my \$rowFirst = \$context->begin()->getRow();\n";
  }
  if ($needlast) {
    $compText .= "  my \$rowLast = \$context->last()->getRow();\n";
  }
  $compText .= $codeResult;
  $compText .= "  \$context->makeArraySend(\$opcode,\n";
  $compText .= $codeBuild;
  $compText .= "  );\n";
  $compText .= "}\n";

  ${$opts->{saveComputeTo}} = $compText if (defined($opts->{saveComputeTo}));

The optional parts get included only if some of the functions needed them.

  # compile the computation function
  my $compFun = eval $compText
    or confess "$myname: error in compilation of the aggregation computation:\n  $@\nfunction text:\n$compText ";

  # build and add the aggregator
  my $agg = Triceps::AggregatorType->new($rtRes, $opts->{name}, undef, $compFun, @compArgs)
    or confess "$myname: internal error: failed to build an aggregator type: $! ";

  $idx->setAggregator($agg)
    or confess "$myname: failed to set the aggregator in the index type: $! ";

  return $opts->{tabType};
}

Then the compute function is compiled. In case if the compilation fails, the error message will include both the compilation error and the text of the auto-generated function. Otherwise there would be no way to know, what exactly went wrong. Well, since no used code is included into the auto-generated function, it should never fail. Except if there is some bad code in the aggregation function definitions. The compiled function and collected closures are then used to create the aggregator.

Friday, March 16, 2012

The ubiquitous VWAP

Every CEP supplier loves an example of VWAP calculation: it's small, it's about that quintessential CEP activity: aggregation, and it sounds like something from the real world.

A quick sidebar: what is the VWAP? It's the Value-Weighted Average Price: the average price for the shares traded during some period of time, usually a day. If you take the price of every share traded during the day and calculate the average, you get the VWAP. What is the value-weighted part? The shares don't usually get sold one by one. They're sold in the variable-sized lots. If you think in the terms of lots and not individual shares, you have to weigh the trade prices (not to be confused with costs) for the lots proportional to the number of shares in them.

I've been using VWAP for trying out the approaches to the aggregation templates. The cutest so far is actually not a template at all: it's simply a user-defined aggregation function for the SimpleAggregator. Here is how it goes:

# VWAP function definition
my $myAggFunctions = {
  myvwap => {
    vars => { sum => 0, count => 0, size => 0, price => 0 },
    step => '($%size, $%price) = @$%argiter; '
      . 'if (defined $%size && defined $%price) '
        . '{$%count += $%size; $%sum += $%size * $%price;}',
    result => '($%count == 0? undef : $%sum / $%count)',
  },
};

my $ttWindow = Triceps::TableType->new($rtTrade)
  ->addSubIndex("byId",
    Triceps::IndexType->newHashed(key => [ "id" ])
  )
  ->addSubIndex("bySymbol",
    Triceps::IndexType->newHashed(key => [ "symbol" ])
    ->addSubIndex("fifo", Triceps::IndexType->newFifo())
  )
or die "$!";

# the aggregation result
my $rtVwap;
my $compText; # for debugging

Triceps::SimpleAggregator::make(
  tabType => $ttWindow,
  name => "aggrVwap",
  idxPath => [ "bySymbol", "fifo" ],
  result => [
    symbol => "string", "last", sub {$_[0]->get("symbol");},
    id => "int32", "last", sub {$_[0]->get("id");},
    volume => "float64", "sum", sub {$_[0]->get("size");},
    vwap => "float64", "myvwap", sub { [$_[0]->get("size"), $_[0]->get("price")];},
  ],
  functions => $myAggFunctions,
  saveRowTypeTo => \$rtVwap,
  saveComputeTo => \$compText,
) or die "$!";

The rest of the example is the same as for the previous examples of the trades aggregation.

The option "functions" of  Triceps::SimpleAggregator::make() lets you add the custom aggregation functions. They're actually defined in the same way as the "standard" functions that come with the SimpleAggregator. The argument of that option is a reference to a hash, with the names of functions as the key and references to the function definitions as values. Each definition is again a hash, containing up to 4 keys:

  • argcount - Defines the number of arguments of the function, which maybe currently 0 or 1, with 1 being the default.
  • vars - Defines the variables used to keep the context of this function.
  • step - The computation of a single step of iteration.
  • result - The computation of the result of the function. This key is mandatory. The rest can be skipped if not needed.
The vwap function actually has two arguments per row: the trade size and the price. But no more than one argument is supported. So it works in the same way as "nth_simple": it leaves the argcount as the default 1 and packs its two argument into one, combining them into a single array returned by reference. That's why the closure for this field is

sub { [$_[0]->get("size"), $_[0]->get("price")];}

The single array reference becomes the closure's result and the vwap function's single argument, later unpacked by its code. By the way, the order of the elements in this array is important, first size and then price, not the other way around, or your results will be wrong.

The value of "vars" is a reference to yet another hash that maps the variable names to their initial values. The variables are always scalars. I just didn't find anything yet that would require a non-scalar. If the need for arrays or hashes arises, you can just create a reference and put it into a scalar. The initial values are strings that are substituted into the generated code as is. For the numbers, you can usually just put them in as numbers and they will work fine: that's  what vwap does with its 0s. If you don't particularly want to initialize with anything, put "undef" there - in quotes. If you want to use a string constant, quote it twice, like "'initial'" or '"initial"'. If you ever need an array or hash reference, that would be a "[]" or "{}". The namespace of the  variables is local to the functions, and when SimpleAggregator generates the code with them, it will add a unique mangled prefix to make sure that the variables from different fields don't conflict with each other.

The vwap computation defines four variables:  two to build the aggregation result and two to keep temporarily the trade size and price extracted from the current row.

The presence of "step" is what tells the SimpleAggregator that this function needs  to iterate through the rows. Its value is a string defining the code snippet that would be placed into the iteration loop. The step computation can refer to the function's variables through the syntax of "$%varname". All such occurrences just get globally replaced, like in a K&R C macro. This is a fairly rarely occurring combination in the normal Perl code, so there should be little confusion. If you ever need to pass this sequence through as a literal, just break it up: depending on the circumstances, use'$\%' or '${%...}'.

There also are a few pre-defined variables that can be used in "step" (make sure not to name your variables conflicting with those):
  • $%argiter - The function's argument extracted from the current row.
  • $%niter - The number of the current row in the group, starting from 0.
  • $%groupsize - The size of the group ($context->groupSize()).

The step of vwap first extracts the size and price from the current row. It uses @$%argiter, which naturally means "array to which $%argiter refers". If you like to put everything into the explicit parenthesis, you could also use  @{$%argiter} instead. Then it updates the current count of shares and the sum of all trades' prices. The check for undef helps keeping things more consistent, taking into the consideration only the rows where both size and price are defined. Without this check, if some row has only the price undefined, its size will still affect and throw off the result.

The step's code gets enclosed in a block when the code is generated, so it can safely define some scope-local variables in it. Those get used as the normal $var or @var or %var. Here size and price could have been done as scope-local variables instead of being function-wide:

my ($size, $price) = @$%argiter; ...

The value of $%argiter gets actually computed in the generated code in advance, once per row, so it's safe to use it in the "step" code multiple times.

The "result" is very much like "step", only it's mandatory, its string value defines an expression and not a set of statements, and it has the different pre-defined variables:

  • $%argfist - The function argument from the first row of the group.
  • $%arglast - The function argument from the last row of the group.
  • $%groupsize - Still the size of the group.
For vwap the result is a straightforward division, with a little precaution against division by 0.

And that's it, a home-grown aggregation function for vwap. When debugging your aggregation functions, the ability to look at the generated code, as saved with the option saveComputeTo, comes pretty handy. If the error happens right during the compilation of the generated code, its source text gets printed automatically in the error message. In the future I plan to add the syntax checks for the code snippets of the functions even before embedding them into a full compute function, but things haven't gotten that far yet.