The ordered index implemented in Perl has been pretty slow, so I've finally got around to implementing one in C++. It became much faster. Here is an excerpt from the performance test results (with an optimized build):
Table insert makeRowArray (single hashed idx, direct) 0.551577 s, 181298.48 per second.
Excluding makeRowArray 0.311936 s, 320578.44 per second.
Table insert makeRowArray (single ordered int idx, direct) 0.598462 s, 167095.09 per second.
Excluding makeRowArray 0.358821 s, 278690.37 per second.
Table insert makeRowArray (single ordered string idx, direct) 1.070565 s, 93408.64 per second.
Excluding makeRowArray 0.830924 s, 120347.91 per second.
Table insert makeRowArray (single perl sorted idx, direct) 21.810374 s, 4584.97 per second.
Excluding makeRowArray 21.570734 s, 4635.91 per second.
Table lookup (single hashed idx) 0.147418 s, 678342.02 per second.
Table lookup (single ordered int idx) 0.174585 s, 572785.77 per second.
Table lookup (single ordered string idx) 0.385963 s, 259092.38 per second.
Table lookup (single perl sorted idx) 6.840266 s, 14619.32 per second.
Here the "ordered" is the new ordered index in C++ (on an integer field and on a string field), and "perl sorted" is the ordering in Perl, on an integer field. Even though the ordered index is a little slower than the hashed index, it's about 40-70 times faster than the ordering implemented in Perl.
Naturally, ordering by a string field is slower than by an integer one, since it not only has to compare more bytes one-by-one but also honors the locale-specific collation order.
In Perl, the ordered index type is created very similarly to the hashed index type:
Triceps::IndexType->newOrdered(key => [ "a", "!b" ])
The single option "key" specifies the array with the names key fields of the index. The "!" in front of the field name says that the order by this field is descending, and otherwise the order is ascending. This is more compact and arguably easier-to-read format than the one used by the SimpleOrderedIndex.
As usual, the NULL values in the key fields are permitted, and are considered less than any non-NULL value.
The array fields may also be used in the ordered indexes.
The ordered index does return the list of key fields with getKey(), so it can be used in joins and can be found by key fields with TableType::findIndexPathForKeys() and friends, just like the hashed index.
The getKey() for an ordered index returns the list of plain field names, without the indications of descending order. Thus the finding of the index by key fields just works out of the box, unchanged.
To get the key fields as they were specified in the index creation, including the possible "!", use the new method:
@fields = $indexType->getKeyExpr();
The idea of this method is that the contents of the array returned by it depends on the index type and is an "expression" that can be used to build another instance of the same index type. For the hashed index it simply returns the same data as getKey(). For the ordered index it returns the list of keys with indications. For the indexes with Perl conditions it still returns nothing, though in the future might be used to store the condition.
In the C++ API the index type gets created with the constructor:
OrderedIndexType(NameSet *key = NULL);
static OrderedIndexType *make(NameSet *key = NULL);
The key can also be set after construction:
OrderedIndexType *setKey(NameSet *key);
Same as in Perl, the field names in the NameSet can be prefixed with a "!" to specify the descending order on that field.
The new method in the IndexType is:
virtual const NameSet *getKeyExpr() const;
And the new constant for the index id is IT_ORDERED, available in both C++ and Perl.
This started as my thoughts on the field of Complex Event Processing, mostly about my OpenSource project Triceps. But now it's about all kinds of software-related things.
Showing posts with label index. Show all posts
Showing posts with label index. Show all posts
Friday, April 17, 2015
Friday, April 19, 2013
Object passing between threads, and Perl code snippets
A limitation of the Perl threads is that no variables can be shared between them. When a new thread gets created, it gets a copy of all the variables of the parent. Well, of all the plain Perl variables. With the XS extensions your luck may vary: the variables might get copied, might become undef, or just become broken (if the XS module is not threads-aware). Copying the XS variables requires a quite high overhead at all the other times, so Triceps doesn't do it and all the Triceps object become undefined in the new thread.
However there is a way to pass around certain objects through the Nexuses.
First, obviously, the Nexuses are intended to pass through the Rowops. These Rowops coming out of a nexus are not the same Rowop objects that went in. Rowop is a single-threaded object and can not be shared by two threads. Instead it gets converted to an internal form while in the nexus, and gets re-created, pointing to the same Row object and to the correct Label in the local facet.
Then, again obviously, the Facets get imported through the Nexus, together with their row types.
And two more types of objects can be exported through a Nexus: the RowTypes and TableTypes. They get exported through the options as in this example:
$fa = $owner->makeNexus(
name => "nx1",
labels => [
one => $rt1,
two => $lb,
],
rowTypes => [
one => $rt2,
two => $rt1,
],
tableTypes => [
one => $tt1,
two => $tt2,
],
import => "writer",
);
As you can see, the namespaces for the labels, row types and table types are completely independent, and the same names can be reused in each of them for different meaning. All the three sections are optional, so if you want, you can order only the types in the nexus, without any labels.
They can then be extracted from the imported facet as:
$rt1 = $fa->impRowType("one");
$tt1 = $fa->impTableType("one");
Or the whole set of name-value pairs can be obtained with:
@rtset = $fa->impRowTypesHash();
@ttset = $fa->impTableTypesHash();
The exact table types and row types (by themselves or in the table types or labels) in the importing thread will be copied. It's technically possible to share the references to the same row type in the C++ code but it's more efficient to make a separate copy for each thread, and thus the Perl API goes along the more efficient way.
The import is smart in the sense that it preserves the sameness of the row types: if in the exporting thread the same row type was referred from multiple places in the labels, row types and table types sections, in the imported facet that would again be the same row type (even though of course not the one that has been exported but its copy). This again helps with the efficiency when various objects decide if the rows created by this and that type are compatible.
This is all well until you want to export a table type that has an index with a Perl sort condition in it, or an aggregator with the Perl code. The Perl code objects are tricky: they get copied OK when a new thread is created but the attempts to import them through a nexus later causes a terrible memory corruption. So Triceps doesn't allow to export the table types with the function references in it. But it provides an alternative solution: the code snippets can be specified as the source code. It gets compiled when the table type gets initialized. When a table type gets imported through a nexus, it brings the source code with it. The imported table types are always uninitialized, so at initialization time the source code gets compiled in the new thread and works.
It all works transparently: just specify a string instead of a function reference when creating the index, and it will be recognized and processed. For example:
$it= Triceps::IndexType->newPerlSorted("b_c", undef, '
my $res = ($_[0]->get("b") <=> $_[1]->get("b")
|| $_[0]->get("c") <=> $_[1]->get("c"));
return $res;
'
);
Before the code gets compiled, it gets wrapped into a 'sub { ... }', so don't write your own sub in the code string, that would be an error.
There is also the issue of arguments that can be specified for these functions. Triceps is now smart enough to handle the arguments that are one of:
It converts the data to an internal C++ representation in the nexus and then converts it back on import. So, if a TableType has all the code in it in the source form, and the arguments for this code within the limits of this format, it can be exported through the nexus. Otherwise an attempt to export it will fail.
I've modified the SimpleOrderedIndex to use the source code format, and it will pass through the nexuses as well.
The Aggregators have a similar problem, and I'm working on converting them to the source code format too.
A little more about the differences between the code references and the source code format:
When you compile a function, it carries with it the lexical context. So you can make the closures that refer to the "my" variables in their lexical scope. With the source code you can't do this. The table type compiles them at initialization time in the context of the main package, and that's all they can see. Remember also that the global variables are not shared between the threads, so if you refer to a global variable in the code snippet and rely on a value in that variable, it won't be present in the other threads (unless the other threads are direct descendants and the value was set before their creation).
While working with the custom sorted indexes, I've also fixed the way the errors are reported in their Perl handlers. The errors used to be just printed on stderr. Now they propagate properly through the table, and the table operations die with the Per handler's error message. Since an error in the sorting function means that things are going very, very wrong, after that the table becomes inoperative and will die on all the subsequent operations as well.
However there is a way to pass around certain objects through the Nexuses.
First, obviously, the Nexuses are intended to pass through the Rowops. These Rowops coming out of a nexus are not the same Rowop objects that went in. Rowop is a single-threaded object and can not be shared by two threads. Instead it gets converted to an internal form while in the nexus, and gets re-created, pointing to the same Row object and to the correct Label in the local facet.
Then, again obviously, the Facets get imported through the Nexus, together with their row types.
And two more types of objects can be exported through a Nexus: the RowTypes and TableTypes. They get exported through the options as in this example:
$fa = $owner->makeNexus(
name => "nx1",
labels => [
one => $rt1,
two => $lb,
],
rowTypes => [
one => $rt2,
two => $rt1,
],
tableTypes => [
one => $tt1,
two => $tt2,
],
import => "writer",
);
As you can see, the namespaces for the labels, row types and table types are completely independent, and the same names can be reused in each of them for different meaning. All the three sections are optional, so if you want, you can order only the types in the nexus, without any labels.
They can then be extracted from the imported facet as:
$rt1 = $fa->impRowType("one");
$tt1 = $fa->impTableType("one");
Or the whole set of name-value pairs can be obtained with:
@rtset = $fa->impRowTypesHash();
@ttset = $fa->impTableTypesHash();
The exact table types and row types (by themselves or in the table types or labels) in the importing thread will be copied. It's technically possible to share the references to the same row type in the C++ code but it's more efficient to make a separate copy for each thread, and thus the Perl API goes along the more efficient way.
The import is smart in the sense that it preserves the sameness of the row types: if in the exporting thread the same row type was referred from multiple places in the labels, row types and table types sections, in the imported facet that would again be the same row type (even though of course not the one that has been exported but its copy). This again helps with the efficiency when various objects decide if the rows created by this and that type are compatible.
This is all well until you want to export a table type that has an index with a Perl sort condition in it, or an aggregator with the Perl code. The Perl code objects are tricky: they get copied OK when a new thread is created but the attempts to import them through a nexus later causes a terrible memory corruption. So Triceps doesn't allow to export the table types with the function references in it. But it provides an alternative solution: the code snippets can be specified as the source code. It gets compiled when the table type gets initialized. When a table type gets imported through a nexus, it brings the source code with it. The imported table types are always uninitialized, so at initialization time the source code gets compiled in the new thread and works.
It all works transparently: just specify a string instead of a function reference when creating the index, and it will be recognized and processed. For example:
$it= Triceps::IndexType->newPerlSorted("b_c", undef, '
my $res = ($_[0]->get("b") <=> $_[1]->get("b")
|| $_[0]->get("c") <=> $_[1]->get("c"));
return $res;
'
);
Before the code gets compiled, it gets wrapped into a 'sub { ... }', so don't write your own sub in the code string, that would be an error.
There is also the issue of arguments that can be specified for these functions. Triceps is now smart enough to handle the arguments that are one of:
- undef
- integer
- floating-point
- string
- Triceps::RowType object
- Triceps::Row object
- reference to an array or hash thereof
It converts the data to an internal C++ representation in the nexus and then converts it back on import. So, if a TableType has all the code in it in the source form, and the arguments for this code within the limits of this format, it can be exported through the nexus. Otherwise an attempt to export it will fail.
I've modified the SimpleOrderedIndex to use the source code format, and it will pass through the nexuses as well.
The Aggregators have a similar problem, and I'm working on converting them to the source code format too.
A little more about the differences between the code references and the source code format:
When you compile a function, it carries with it the lexical context. So you can make the closures that refer to the "my" variables in their lexical scope. With the source code you can't do this. The table type compiles them at initialization time in the context of the main package, and that's all they can see. Remember also that the global variables are not shared between the threads, so if you refer to a global variable in the code snippet and rely on a value in that variable, it won't be present in the other threads (unless the other threads are direct descendants and the value was set before their creation).
While working with the custom sorted indexes, I've also fixed the way the errors are reported in their Perl handlers. The errors used to be just printed on stderr. Now they propagate properly through the table, and the table operations die with the Per handler's error message. Since an error in the sorting function means that things are going very, very wrong, after that the table becomes inoperative and will die on all the subsequent operations as well.
Sunday, February 17, 2013
Aggregator in C++, Part 4
And finally here is the reference of actions available in the aggregator handler:
parentIndexType->groupSize(gh)
Get the size of the group. The result is of the type size_t. This is pretty much the only method of IndexType that should be called directly, and only in the aggregation. The rest of the IndexType methods should be accessed through the similar methods in the Table, and I won't even document them. However if you really, really want to, you can find the description of the other methods type/IndexType.h and call them in the aggregation as well.
gadget()->getLabel()->getType()
Get the result row type.
The rest of the actions are done by calling the Index methods on the argument index.Same as with the IndexType, the aggregation is the only place where these methods are called directly, everywhere else the equivalent actions are done through the Table methods. Because of this I've grouped their description with the aggregation and not deparately.
const IndexType *getType() const;
Get the type of this index.
RowHandle *begin() const;
Get the handle of the first row of the group, in the default order according to its first leaf index type. Note that here it's not the whole table's first leaf index type but the first leaf in the index type subtree under this index's type. All the iteration methods return NULL if there are no more rows.
RowHandle *next(const RowHandle *cur) const;
Get the handle of the next row (or NULL if that was the last one) in the default order. The NULL argument makes the NULL result.
RowHandle *last() const;
Get the handle of the last row in the group in the default order.
The rest of the methods of Index aren't really to be used directly.
Unlike the Perl API of AggregatorContext, there are no direct analogs of beginIdx() and such in C++ Index. To get them in C++, you need to translate the iteration to another index type through the Table (and of course, just like in Perl, you would need somehow to get the reference to another index type into your aggregator, and that index type better be in the subtree of the parentIndexType). To translate through the Table, you take any row from the group, usually the first one, and use it with the table methods that accept a sample row.
For example:
RowHandle *sample = index->begin();
RowHandle *rhend = table->nextGroupIdx(otherIndexType, sample);
for (RowHandle *rhit = table->firstOfGroupIdx(otherIndexType, sample); rhit != rhend; rhit = table->nextIdx(otherIndexType, rhit)) {
...
}
This concludes the discussion of the aggregators. This also concludes the description of the whole C++ API, except for the most recent additions.
parentIndexType->groupSize(gh)
Get the size of the group. The result is of the type size_t. This is pretty much the only method of IndexType that should be called directly, and only in the aggregation. The rest of the IndexType methods should be accessed through the similar methods in the Table, and I won't even document them. However if you really, really want to, you can find the description of the other methods type/IndexType.h and call them in the aggregation as well.
gadget()->getLabel()->getType()
Get the result row type.
The rest of the actions are done by calling the Index methods on the argument index.Same as with the IndexType, the aggregation is the only place where these methods are called directly, everywhere else the equivalent actions are done through the Table methods. Because of this I've grouped their description with the aggregation and not deparately.
const IndexType *getType() const;
Get the type of this index.
RowHandle *begin() const;
Get the handle of the first row of the group, in the default order according to its first leaf index type. Note that here it's not the whole table's first leaf index type but the first leaf in the index type subtree under this index's type. All the iteration methods return NULL if there are no more rows.
RowHandle *next(const RowHandle *cur) const;
Get the handle of the next row (or NULL if that was the last one) in the default order. The NULL argument makes the NULL result.
RowHandle *last() const;
Get the handle of the last row in the group in the default order.
The rest of the methods of Index aren't really to be used directly.
Unlike the Perl API of AggregatorContext, there are no direct analogs of beginIdx() and such in C++ Index. To get them in C++, you need to translate the iteration to another index type through the Table (and of course, just like in Perl, you would need somehow to get the reference to another index type into your aggregator, and that index type better be in the subtree of the parentIndexType). To translate through the Table, you take any row from the group, usually the first one, and use it with the table methods that accept a sample row.
For example:
RowHandle *sample = index->begin();
RowHandle *rhend = table->nextGroupIdx(otherIndexType, sample);
for (RowHandle *rhit = table->firstOfGroupIdx(otherIndexType, sample); rhit != rhend; rhit = table->nextIdx(otherIndexType, rhit)) {
...
}
This concludes the discussion of the aggregators. This also concludes the description of the whole C++ API, except for the most recent additions.
Aggregator in C++, part 2
Building an aggregator is a fairly complex subject, so next I want to show a working example. It turns out that I've never actually written an aggregator in C++ before, other than some tiny fragments; I was always working through Perl. So I've written some more interesting examples just now. The full text can be found in the unit test file cpp/table/t_Agg.cpp.
First, if your aggregator is truly stateless and fully hardcoded, the easier way to do it as by defining a plain function with the same handler arguments and building a BasicAggregatorType with it. And here is one that sums the values of an int64 field (the test case aggBasicSum):
void sumC(Table *table, AggregatorGadget *gadget, Index *index,
const IndexType *parentIndexType, GroupHandle *gh, Tray *dest,
Aggregator::AggOp aggop, Rowop::Opcode opcode, RowHandle *rh)
{
// don't send the NULL record after the group becomes empty
if (opcode == Rowop::OP_NOP || parentIndexType->groupSize(gh) == 0)
return;
int64_t sum = 0;
for (RowHandle *rhi = index->begin(); rhi != NULL; rhi = index->next(rhi)) {
sum += table->getRowType()->getInt64(rhi->getRow(), 2, 0); // field c at idx 2
}
// pick the rest of fields from the last row of the group
RowHandle *lastrh = index->last();
// build the result row; relies on the aggregator result being of the
// same type as the rows in the table
FdataVec fields;
table->getRowType()->splitInto(lastrh->getRow(), fields);
fields[2].setPtr(true, &sum, sizeof(sum)); // set the field c from the sum
Rowref res(gadget->getLabel()->getType(), fields);
gadget->sendDelayed(dest, res, opcode);
}
...
Autoref<TableType> tt = TableType::make(rt1)
->addSubIndex("Hashed", HashedIndexType::make( // will be the default index
(new NameSet())->add("e")
)->addSubIndex("Fifo", FifoIndexType::make()
->setAggregator(new BasicAggregatorType("aggr", rt1, sumC))
)
);
...
As I've told before, you create the BasicAggregatorType by giving it the aggregator name, aggregator result row type, and the handler function.
In this case the handler function is completely hardcoded. It works on the int64 field at index 2. The row type I used in this example is:
row {
uint8[10] a,
int32[] b,
int64 c,
float64 d,
string e,
}
So the field is actually named "c", and that's why the aggregator function is named "sumC". But since in this case everything is known in advance, to make it more efficient, the look-up of the field by name has been skipped, and the field index has been pre-computed and placed into the function.
The general outline of the aggregator is exactly the same as in Perl: check for an empty group, then iterate through all the rows in the group and compute a the sum, fill the rest of the fields from the last row, and send the result. The difference is that there is no AggregatorContext, and the calls are done directly on the bits and pieces received as arguments.
The group size is computed as
parentIndexType->groupSize(gh)
This is pretty much the reason for parentIndexType and gh to be passed as arguments to the handler. Why it's done like this is a long story. It allows an individual index (as in instance, not type!) to not care about the number of rows in it. Remember, a group may have multiple "parallel" indexes defined on it, with all of them containing the same rows, and thus the same number of rows. The group handle (gh) contains the common information about the group, including the group size. And the parent index type is the one that manages the group handles inside it, and knows how to extract the information from them.
The iteration is done as usual with the begin() and next(), only called directly on the index. It will return NULL after the last row. The last row can also be found directly with index->last(); on an empty group it would return NULL, so be careful with that.
The input row type for reading the rows from the group is found as:
table->getRowType()
The result row is built in this example by copying the last row and replacing one field. The data from the last row is split into FdataVec (the data itself is not copied at this point but the data descriptors in the construction vector are made to point to the data in the original row). Then the descriptor for the field "c" is changed to point to the computed sum. Then the new row is built from the descriptor.
In this particular case the type of the input rows and of the result rows is the same, so either could have been used to construct the result rows. There are two ways to find the result type:
gadget()->getType()->getRowType()
gadget->getLabel()->getType()
They are exactly the same, just there are two paths leading to the same object.
Finally, the constructed row is sent. sendDelayed() takes care of constructing the rowop from the components.
Potentially, you could even call directly
sendDelayed(dest, gadget->getLabel()->getType()->makeRow(fields), opcode);
However the caveat is that if the table's enqueuing mode is set to EM_IGNORE (which is an old idea, nowadays everyone should use EM_CALL and eventually there will be no other way than EM_CALL, but for now it could happen), the rows would leak. The reason for that leak is that sendDelayed() will skip the rowop creation with EM_IGNORE, and thus will skip the reference creation, and the row would never have a reference to it created and would never be freed. Creating an explicit reference makes sure that the row will always be cleaned up when it becomes unused.
First, if your aggregator is truly stateless and fully hardcoded, the easier way to do it as by defining a plain function with the same handler arguments and building a BasicAggregatorType with it. And here is one that sums the values of an int64 field (the test case aggBasicSum):
void sumC(Table *table, AggregatorGadget *gadget, Index *index,
const IndexType *parentIndexType, GroupHandle *gh, Tray *dest,
Aggregator::AggOp aggop, Rowop::Opcode opcode, RowHandle *rh)
{
// don't send the NULL record after the group becomes empty
if (opcode == Rowop::OP_NOP || parentIndexType->groupSize(gh) == 0)
return;
int64_t sum = 0;
for (RowHandle *rhi = index->begin(); rhi != NULL; rhi = index->next(rhi)) {
sum += table->getRowType()->getInt64(rhi->getRow(), 2, 0); // field c at idx 2
}
// pick the rest of fields from the last row of the group
RowHandle *lastrh = index->last();
// build the result row; relies on the aggregator result being of the
// same type as the rows in the table
FdataVec fields;
table->getRowType()->splitInto(lastrh->getRow(), fields);
fields[2].setPtr(true, &sum, sizeof(sum)); // set the field c from the sum
Rowref res(gadget->getLabel()->getType(), fields);
gadget->sendDelayed(dest, res, opcode);
}
...
Autoref<TableType> tt = TableType::make(rt1)
->addSubIndex("Hashed", HashedIndexType::make( // will be the default index
(new NameSet())->add("e")
)->addSubIndex("Fifo", FifoIndexType::make()
->setAggregator(new BasicAggregatorType("aggr", rt1, sumC))
)
);
...
As I've told before, you create the BasicAggregatorType by giving it the aggregator name, aggregator result row type, and the handler function.
In this case the handler function is completely hardcoded. It works on the int64 field at index 2. The row type I used in this example is:
row {
uint8[10] a,
int32[] b,
int64 c,
float64 d,
string e,
}
So the field is actually named "c", and that's why the aggregator function is named "sumC". But since in this case everything is known in advance, to make it more efficient, the look-up of the field by name has been skipped, and the field index has been pre-computed and placed into the function.
The general outline of the aggregator is exactly the same as in Perl: check for an empty group, then iterate through all the rows in the group and compute a the sum, fill the rest of the fields from the last row, and send the result. The difference is that there is no AggregatorContext, and the calls are done directly on the bits and pieces received as arguments.
The group size is computed as
parentIndexType->groupSize(gh)
This is pretty much the reason for parentIndexType and gh to be passed as arguments to the handler. Why it's done like this is a long story. It allows an individual index (as in instance, not type!) to not care about the number of rows in it. Remember, a group may have multiple "parallel" indexes defined on it, with all of them containing the same rows, and thus the same number of rows. The group handle (gh) contains the common information about the group, including the group size. And the parent index type is the one that manages the group handles inside it, and knows how to extract the information from them.
The iteration is done as usual with the begin() and next(), only called directly on the index. It will return NULL after the last row. The last row can also be found directly with index->last(); on an empty group it would return NULL, so be careful with that.
The input row type for reading the rows from the group is found as:
table->getRowType()
The result row is built in this example by copying the last row and replacing one field. The data from the last row is split into FdataVec (the data itself is not copied at this point but the data descriptors in the construction vector are made to point to the data in the original row). Then the descriptor for the field "c" is changed to point to the computed sum. Then the new row is built from the descriptor.
In this particular case the type of the input rows and of the result rows is the same, so either could have been used to construct the result rows. There are two ways to find the result type:
gadget()->getType()->getRowType()
gadget->getLabel()->getType()
They are exactly the same, just there are two paths leading to the same object.
Finally, the constructed row is sent. sendDelayed() takes care of constructing the rowop from the components.
Potentially, you could even call directly
sendDelayed(dest, gadget->getLabel()->getType()->makeRow(fields), opcode);
However the caveat is that if the table's enqueuing mode is set to EM_IGNORE (which is an old idea, nowadays everyone should use EM_CALL and eventually there will be no other way than EM_CALL, but for now it could happen), the rows would leak. The reason for that leak is that sendDelayed() will skip the rowop creation with EM_IGNORE, and thus will skip the reference creation, and the row would never have a reference to it created and would never be freed. Creating an explicit reference makes sure that the row will always be cleaned up when it becomes unused.
Subscribe to:
Posts (Atom)