Showing posts with label aggr_function. Show all posts
Showing posts with label aggr_function. Show all posts

Wednesday, May 15, 2024

unusual aggregations

I've been doing a sizable amount of SQL querying, and it turns out the modern systems have some new and interesting aggregation functions. Like for example MAX_BY and MIN_BY that return the value of one expression from the row where another expression is maximal or minimal. And there are more creative functions if your system supports nested fields, array fields, or map fields. 

The other real nice syntax is using the indexes of the fields in the GROUP BY clause instead of the actual expressions - this avoids the need to write the same expressions twice. It could be made even better by specifying the names of the output fields instead of indexes.

There are a few more aggregation functions that would be nice to have.

One is being able to specify aggregation in a group vs aggregation across all records. A frequent problem is to find the percentage of record count in a group relative to the whole set. This would be easy to implement as (COUNT(*) * 100 / COUNT_GLOBAL(*)). And this could be generalized to nested aggregations (similarly to how Triceps allows the nested indexes) by specifying the nesting level. For example, if we have 4 fields in a GROUP BY clause, the generalized form COUNT_G(4, *) would be equivalent to COUNT(*), COUNT_G(0, *) will be the global count, and the intermediate values would do the grouping in the larger groups by fewer fields.

It would also be nice to have some simpler syntax to specify the two-pass querying.  It can be done with joins but it's very annoying to write those joins manually in plain SQL. I'm not sure yet what would be the good syntax, just have an idea of the semantics for now.

One example of that would be the grouping by percentiles. On the first pass you'd compute the percentiles of some expression, on the second pass you'd group the records by where they fit between these percentiles. So if you do the percentiles with a step of 10%, you'd get ten records with the summaries of these percentiles. Well, if you store the records you work on (as in a CEP system), it doesn't even have to be two passes, it can easily be a single pass where you decide in which bucket to fit a record, and then adjust all the buckets accordingly.

Another example of two-pass querying would be bringing the CEP windows into the plain SQL, in case if you want to identify some interesting records, and then go an aggregation on the other related records that preceded them within a time interval. Again, if you have the records come in the time order and cached for that time interval, it can be done with a single pass, but even if that is not available, two passes would do it in any circumstances. Very, very useful for time series analysis.

Or if you think about two-pass aggregation as being done with a map-reduce aggregator, the first pass would seed the mapping function and instantiate the reducers, where the second pass would feed the records into these reducers.

Note also that if you write such a join manually, you end up doing essentially a transposition of a normal query, where instead of specifying an expression per record, you specify an expression per row. Something like this (in pseudo-SQL, since remembering the exact syntax boggles me, I can only copy-paste fragments but not freely write them):

WITH
  SELECT .. AS input
WITH
  SELECT low, high
  FROM
    -- this is the transposition that propagates through
    -- the rest of the query
    INSERT (0, 20), (20, 40), (40, 60), (60, 80), (80, 100)
  AS percentiles
WITH
  SELECT
    percentiles.high AS percent,
    PERCENTILE(input.field, percentiles.low) AS low,
    PERCENTILE(input.field, percentiles.high) AS high
  FROM percentiles, input
  AS boundaries
SELECT
  percent,
  ... whatever aggregation ...
FROM boundaries, input
WHERE
  input.field >= boundaries.low
  AND input.field < boundaries.high

It would be nice to have an easier way to do such transpositions too, even aside from the two-pass queries.

Another idea for processing the time series is the ability to build a group from a set of records with sequential IDs/ timestamps. Suppose we have time in slices, where an event might be happening or not happening in each slice, and we want to consider an uninterrupted sequence of slices with events as one event. It's easy to build a join that would put together two consecutive events. But how about a hundred events? This would be easy to do with a group-building function that tracks the first and last timestamp in the group, looks for a new record being consecutive, and merging the groups if needed. Note that this is not an aggregation function, really. It's a new concept, a group building function that replaces the fixed function of all the values in a group having the same value. It would also work for building a window around an interesting record (with or without merging the close interesting records into the same window), where the window becomes the group.

Friday, April 19, 2013

Object passing between threads, and Perl code snippets

A limitation of the Perl threads is that no variables can be shared between them. When a new thread gets created, it gets a copy of all the variables of the parent. Well, of all the plain Perl variables. With the XS extensions your luck may vary: the variables might get copied, might become undef, or just become broken (if the XS module is not threads-aware). Copying the XS variables requires a quite high overhead at all the other times, so Triceps doesn't do it and all the Triceps object become undefined in the new thread.

However there is a way to pass around certain objects through the Nexuses.

First, obviously, the Nexuses are intended to pass through the Rowops. These Rowops coming out of a nexus are not the same Rowop objects that went in. Rowop is a single-threaded object and can not be shared by two threads. Instead it gets converted to an internal form while in the nexus, and gets re-created, pointing to the same Row object and to the correct Label in the local facet.

Then, again obviously, the Facets get imported through the Nexus, together with their row types.

And two more types of objects can be exported through a Nexus: the RowTypes and TableTypes. They get exported through the options as in this example:

$fa = $owner->makeNexus(
    name => "nx1",
    labels => [
        one => $rt1,
        two => $lb,
    ], 
    rowTypes => [
        one => $rt2,
        two => $rt1,
    ], 
    tableTypes => [
        one => $tt1,
        two => $tt2,
    ], 
    import => "writer",
); 

As you can see, the namespaces for the labels, row types and table types are completely independent, and the same names can be reused in each of them for different meaning. All the three sections are optional, so if you want, you can order only the types in the nexus, without any labels.

They can then be extracted from the imported facet as:

$rt1 = $fa->impRowType("one");
$tt1 = $fa->impTableType("one");


Or the whole set of name-value pairs can be obtained with:


@rtset = $fa->impRowTypesHash();
@ttset = $fa->impTableTypesHash();


The exact table types and row types (by themselves or in the table types or labels) in the importing thread will be copied. It's technically possible to share the references to the same row type in the C++ code but it's more efficient to make a separate copy for each thread, and thus the Perl API goes along the more efficient way.

The import is smart in the sense that it preserves the sameness of the row types: if in the exporting thread the same row type was referred from multiple places in the labels, row types and table types sections, in the imported facet that would again be the same row type (even though of course not the one that has been exported but its copy). This again helps with the efficiency when various objects decide if the rows created by this and that type are compatible.

This is all well until you want to export a table type that has an index with a Perl sort condition in it, or an aggregator with the Perl code. The Perl code objects are tricky: they get copied OK when a new thread is created but the attempts to import them through a nexus later causes a terrible memory corruption. So Triceps doesn't allow to export the table types with the function references in it. But it provides an alternative solution: the code snippets can be specified as the source code. It gets compiled when the table type gets initialized. When a table type gets imported through a nexus, it brings the source code with it. The imported table types are always uninitialized, so at initialization time the source code gets compiled in the new thread and works.

It all works transparently: just specify a string instead of a function reference when creating the index, and it will be recognized and processed. For example:

$it= Triceps::IndexType->newPerlSorted("b_c", undef, '
    my $res = ($_[0]->get("b") <=> $_[1]->get("b")
        || $_[0]->get("c") <=> $_[1]->get("c"));
    return $res;
    '
);

Before the code gets compiled, it gets wrapped into a 'sub { ... }', so don't write your own sub in the code string, that would be an error.

There is also the issue of arguments that can be specified for these functions. Triceps is now smart enough to handle the arguments that are one of:

  • undef
  • integer
  • floating-point
  • string
  • Triceps::RowType object
  • Triceps::Row object
  • reference to an array or hash thereof

It converts the data to an internal C++ representation in the nexus and then converts it back on import. So, if a TableType has all the code in it in the source form, and the arguments for this code within the limits of this format, it can be exported through the nexus. Otherwise an attempt to export it will fail.

I've modified the SimpleOrderedIndex to use the source code format, and it will pass through the nexuses as well.

The Aggregators have a similar problem, and I'm working on converting them to the source code format too.

A little more about the differences between the code references and the source code format:

When you compile a function, it carries with it the lexical context. So you can make the closures that refer to the "my" variables in their lexical scope. With the source code you can't do this. The table type compiles them at initialization time in the context of the main package, and that's all they can see. Remember also that the global variables are not shared between the threads, so if you refer to a global variable in the code snippet and rely on a value in that variable, it won't be present in the other threads (unless the other threads are direct descendants and the value was set before their creation).

While working with the custom sorted indexes, I've also fixed the way the errors are reported in their Perl handlers. The errors used to be just printed on stderr. Now they propagate properly through the table, and the table operations die with the Per handler's error message. Since an error in the sorting function means that things are going very, very wrong, after that the table becomes inoperative and will die on all the subsequent operations as well.

Sunday, March 25, 2012

AggregatorContext reference

AggregatorContext is one of the arguments passed to the aggregator computation function. It encapsulates the iteration through the aggregation group, in the order of the index on which the aggregator is defined. After the computation function returns, the context becomes invalidated and stops working, so there is no  point in saving it between the calls. There is no way to construct the aggregator context directly.

It provides the following methods:

$result = $ctx->groupSize();

Returns the number of rows in the group. This is currently a unique feature available only in an aggregator, not in the normal iteration through the table.

$rowType = $ctx->resultType();

Returns the row type of the aggregation result.

$rh = $ctx->begin();

The first row handle of the iteration. In case of an empty group would return a null handle.

$rh = $ctx->next($rh);

The next row handle in order. If the argument handle was the last row in the group, returns a null handle. So the iteration through the group with a context is similar to iteration through the whole table: it ends when begin() or next() returns a null handle.

$rh = $ctx->last();

The last row handle in the group. If the group is empty, returns a null handle.

$rh = $ctx->beginIdx($idxType);

The first row in the group, according to a specific index type's order. The index type must belong to the group, otherwise the result is undefined. If the group is empty, will return the same value as endIdx(). If $idxType is non-leaf, the effect is the same as if its first leaf were used.

$rh = $ctx->endIdx($idxType);

The handle past the last row in the group, according to a specific index type's order. The index type must belong to the group, otherwise the result is undefined and might even result in an endless iteration loop. If $idxType is non-leaf, the effect is the same as if its first leaf were used. This kind of iteration uses the table's $t->nextIdx($idxType, $rh) or $rh->next($idxType) to advance the position. The Table reference post said that that not every possible order is iterable. Well, with the aggregation context, every order is iterable. You can pick any index in the group and iterate in its order. And aggregation is where this ability counts the most.

If the group happens to be the last group of this index type (not of $idxType but of the index on which the aggregator is defined) in the table, endIdx()  would return a null row handle. If it's also empty, beginIdx() would also return a null handle, and in general, for an empty group beginIdx() would return the same value as endIdx(). If the group is not the last one, endIdx() returns the handle of the first row in the next group.

$rh = $ctx->lastIdx($idxType);

The last row in the group according to a particular index type's order. The index type must belong to the group, otherwise the result is undefined. If the group is empty, returns a null handle.

$ctx->send($opcode, $row) or die "$!";

Constructs a result rowop for the aggregator and arranges for it to be sent to the aggregator's output  label. The actual sending is delayed: it will be done only after all the aggregators run, then the table's changes are sent to the table's normal output label, then each aggregator's changes are sent to its label. Note that the aggregator's output label is not visible in the computation function, so the rowop can not be constructed directly. Instead send() takes care of it. The row must be of a type at least matching the aggregator's result type (and of course the normal practice is to use the aggregator's result type to construct the row). On success returns 1, on error returns undef and the error message.

$ctx->makeHashSend($opcode, $fieldName => $fieldValue, ...) or die "$!";

A convenience function that produces the row from pairs of field names and values and sends it. A combination of makeRowHash() and send().

$ctx->makeArraySend($opcode, @fields) or die "$!";

A convenience function that produces the row from the array of field values and sends it. A combination of makeRowArray() and send().

Note that an aggregator must never change the table. Any attempt to change the table is a fatal error.

Friday, March 16, 2012

The ubiquitous VWAP

Every CEP supplier loves an example of VWAP calculation: it's small, it's about that quintessential CEP activity: aggregation, and it sounds like something from the real world.

A quick sidebar: what is the VWAP? It's the Value-Weighted Average Price: the average price for the shares traded during some period of time, usually a day. If you take the price of every share traded during the day and calculate the average, you get the VWAP. What is the value-weighted part? The shares don't usually get sold one by one. They're sold in the variable-sized lots. If you think in the terms of lots and not individual shares, you have to weigh the trade prices (not to be confused with costs) for the lots proportional to the number of shares in them.

I've been using VWAP for trying out the approaches to the aggregation templates. The cutest so far is actually not a template at all: it's simply a user-defined aggregation function for the SimpleAggregator. Here is how it goes:

# VWAP function definition
my $myAggFunctions = {
  myvwap => {
    vars => { sum => 0, count => 0, size => 0, price => 0 },
    step => '($%size, $%price) = @$%argiter; '
      . 'if (defined $%size && defined $%price) '
        . '{$%count += $%size; $%sum += $%size * $%price;}',
    result => '($%count == 0? undef : $%sum / $%count)',
  },
};

my $ttWindow = Triceps::TableType->new($rtTrade)
  ->addSubIndex("byId",
    Triceps::IndexType->newHashed(key => [ "id" ])
  )
  ->addSubIndex("bySymbol",
    Triceps::IndexType->newHashed(key => [ "symbol" ])
    ->addSubIndex("fifo", Triceps::IndexType->newFifo())
  )
or die "$!";

# the aggregation result
my $rtVwap;
my $compText; # for debugging

Triceps::SimpleAggregator::make(
  tabType => $ttWindow,
  name => "aggrVwap",
  idxPath => [ "bySymbol", "fifo" ],
  result => [
    symbol => "string", "last", sub {$_[0]->get("symbol");},
    id => "int32", "last", sub {$_[0]->get("id");},
    volume => "float64", "sum", sub {$_[0]->get("size");},
    vwap => "float64", "myvwap", sub { [$_[0]->get("size"), $_[0]->get("price")];},
  ],
  functions => $myAggFunctions,
  saveRowTypeTo => \$rtVwap,
  saveComputeTo => \$compText,
) or die "$!";

The rest of the example is the same as for the previous examples of the trades aggregation.

The option "functions" of  Triceps::SimpleAggregator::make() lets you add the custom aggregation functions. They're actually defined in the same way as the "standard" functions that come with the SimpleAggregator. The argument of that option is a reference to a hash, with the names of functions as the key and references to the function definitions as values. Each definition is again a hash, containing up to 4 keys:

  • argcount - Defines the number of arguments of the function, which maybe currently 0 or 1, with 1 being the default.
  • vars - Defines the variables used to keep the context of this function.
  • step - The computation of a single step of iteration.
  • result - The computation of the result of the function. This key is mandatory. The rest can be skipped if not needed.
The vwap function actually has two arguments per row: the trade size and the price. But no more than one argument is supported. So it works in the same way as "nth_simple": it leaves the argcount as the default 1 and packs its two argument into one, combining them into a single array returned by reference. That's why the closure for this field is

sub { [$_[0]->get("size"), $_[0]->get("price")];}

The single array reference becomes the closure's result and the vwap function's single argument, later unpacked by its code. By the way, the order of the elements in this array is important, first size and then price, not the other way around, or your results will be wrong.

The value of "vars" is a reference to yet another hash that maps the variable names to their initial values. The variables are always scalars. I just didn't find anything yet that would require a non-scalar. If the need for arrays or hashes arises, you can just create a reference and put it into a scalar. The initial values are strings that are substituted into the generated code as is. For the numbers, you can usually just put them in as numbers and they will work fine: that's  what vwap does with its 0s. If you don't particularly want to initialize with anything, put "undef" there - in quotes. If you want to use a string constant, quote it twice, like "'initial'" or '"initial"'. If you ever need an array or hash reference, that would be a "[]" or "{}". The namespace of the  variables is local to the functions, and when SimpleAggregator generates the code with them, it will add a unique mangled prefix to make sure that the variables from different fields don't conflict with each other.

The vwap computation defines four variables:  two to build the aggregation result and two to keep temporarily the trade size and price extracted from the current row.

The presence of "step" is what tells the SimpleAggregator that this function needs  to iterate through the rows. Its value is a string defining the code snippet that would be placed into the iteration loop. The step computation can refer to the function's variables through the syntax of "$%varname". All such occurrences just get globally replaced, like in a K&R C macro. This is a fairly rarely occurring combination in the normal Perl code, so there should be little confusion. If you ever need to pass this sequence through as a literal, just break it up: depending on the circumstances, use'$\%' or '${%...}'.

There also are a few pre-defined variables that can be used in "step" (make sure not to name your variables conflicting with those):
  • $%argiter - The function's argument extracted from the current row.
  • $%niter - The number of the current row in the group, starting from 0.
  • $%groupsize - The size of the group ($context->groupSize()).

The step of vwap first extracts the size and price from the current row. It uses @$%argiter, which naturally means "array to which $%argiter refers". If you like to put everything into the explicit parenthesis, you could also use  @{$%argiter} instead. Then it updates the current count of shares and the sum of all trades' prices. The check for undef helps keeping things more consistent, taking into the consideration only the rows where both size and price are defined. Without this check, if some row has only the price undefined, its size will still affect and throw off the result.

The step's code gets enclosed in a block when the code is generated, so it can safely define some scope-local variables in it. Those get used as the normal $var or @var or %var. Here size and price could have been done as scope-local variables instead of being function-wide:

my ($size, $price) = @$%argiter; ...

The value of $%argiter gets actually computed in the generated code in advance, once per row, so it's safe to use it in the "step" code multiple times.

The "result" is very much like "step", only it's mandatory, its string value defines an expression and not a set of statements, and it has the different pre-defined variables:

  • $%argfist - The function argument from the first row of the group.
  • $%arglast - The function argument from the last row of the group.
  • $%groupsize - Still the size of the group.
For vwap the result is a straightforward division, with a little precaution against division by 0.

And that's it, a home-grown aggregation function for vwap. When debugging your aggregation functions, the ability to look at the generated code, as saved with the option saveComputeTo, comes pretty handy. If the error happens right during the compilation of the generated code, its source text gets printed automatically in the error message. In the future I plan to add the syntax checks for the code snippets of the functions even before embedding them into a full compute function, but things haven't gotten that far yet.