I've been wondering, how much faster is the native implementation of an atomic integer (such as Triceps imports from NSPR) versus a simple mutex-based approach (lock the mutex, change the value, unlock the mutex).
I've finally got interested enough to measure it. On my machine, compiled with GCC optimization -O3, the difference is at about 2.5-3 times. Both with and without contention, the difference is about the same. So it's faster but not hugely so.
An interesting thing is that with contention of two threads for the same mutex, Linux shows the total user CPU time about the same as the real time. With contention of two threads for the same atomic integer, the user CPU time is twice higher than the real time. It looks like Linux manages to account for the wait time properly even in these tiny increments and without adding a huge amount of overhead.
And the real time with contention of two threads for the same mutex (that is, threads doing nothing but locking-unlocking the mutex in a loop) is about 3-3.5 times higher than with the same two threads running sequentially. The same goes for the atomic integers as well.
The program I used was:
#include <stdlib.h>
#include <vector>
#include "pw/ptwrap.h"
#include "mem/Atomic.h"
using namespace Triceps;
const int NTHREADS = 2;
pw::pmutex m;
AtomicInt ai(0);
class TestThread: public pw::pwthread
{
public:
TestThread():
p_(NULL)
{ }
virtual void *execute()
{
m.lock();
m.unlock();
for (int i = 0; i < 10*1000*1000; i++) {
if (p_)
p_();
// ai.inc();
m.lock();
m.unlock();
}
}
void (*p_)();
};
std::vector<TestThread *> t;
int main(int argc, char **argv)
{
m.lock();
for (int i = 0; i < NTHREADS; i++) {
TestThread *job = new TestThread;
t.push_back(job);
job->start();
}
sched_yield();
sched_yield();
sched_yield();
sched_yield();
sched_yield();
sched_yield();
sched_yield();
sched_yield();
sched_yield();
m.unlock(); // the run starts!
for (int i = 0; i < NTHREADS; i++) {
t[i]->join();
delete t[i];
}
return 0;
}
Comment, uncomment and adjust the constants as needed. The iteration count of 10 millions may be much too low for the newer faster machines.
The build command for me was:
g++ -DTRICEPS_NSPR=4 -I $TRICEPS/trunk/cpp -O3 -o x x.cpp -lnspr4 -lpthread $TRICEPS/trunk/cpp/build/libtriceps.a
Adjust the library names and TRICEPS_NSPR value as needed (for Ubuntu it would likely be "-DTRICEPS_NSPR=0 -lnspr").To try the simple-minded implementation of atomics through the mutex, remove "-DTRICEPS_NSPR".
To measure the time, run it in the time command:
time ./x
When comparing time, don't forget that when you run 2 threads, they make twice more iterations!
This started as my thoughts on the field of Complex Event Processing, mostly about my OpenSource project Triceps. But now it's about all kinds of software-related things.
Sunday, February 24, 2013
Sunday, February 17, 2013
Aggregator in C++, Part 4
And finally here is the reference of actions available in the aggregator handler:
parentIndexType->groupSize(gh)
Get the size of the group. The result is of the type size_t. This is pretty much the only method of IndexType that should be called directly, and only in the aggregation. The rest of the IndexType methods should be accessed through the similar methods in the Table, and I won't even document them. However if you really, really want to, you can find the description of the other methods type/IndexType.h and call them in the aggregation as well.
gadget()->getLabel()->getType()
Get the result row type.
The rest of the actions are done by calling the Index methods on the argument index.Same as with the IndexType, the aggregation is the only place where these methods are called directly, everywhere else the equivalent actions are done through the Table methods. Because of this I've grouped their description with the aggregation and not deparately.
const IndexType *getType() const;
Get the type of this index.
RowHandle *begin() const;
Get the handle of the first row of the group, in the default order according to its first leaf index type. Note that here it's not the whole table's first leaf index type but the first leaf in the index type subtree under this index's type. All the iteration methods return NULL if there are no more rows.
RowHandle *next(const RowHandle *cur) const;
Get the handle of the next row (or NULL if that was the last one) in the default order. The NULL argument makes the NULL result.
RowHandle *last() const;
Get the handle of the last row in the group in the default order.
The rest of the methods of Index aren't really to be used directly.
Unlike the Perl API of AggregatorContext, there are no direct analogs of beginIdx() and such in C++ Index. To get them in C++, you need to translate the iteration to another index type through the Table (and of course, just like in Perl, you would need somehow to get the reference to another index type into your aggregator, and that index type better be in the subtree of the parentIndexType). To translate through the Table, you take any row from the group, usually the first one, and use it with the table methods that accept a sample row.
For example:
RowHandle *sample = index->begin();
RowHandle *rhend = table->nextGroupIdx(otherIndexType, sample);
for (RowHandle *rhit = table->firstOfGroupIdx(otherIndexType, sample); rhit != rhend; rhit = table->nextIdx(otherIndexType, rhit)) {
...
}
This concludes the discussion of the aggregators. This also concludes the description of the whole C++ API, except for the most recent additions.
parentIndexType->groupSize(gh)
Get the size of the group. The result is of the type size_t. This is pretty much the only method of IndexType that should be called directly, and only in the aggregation. The rest of the IndexType methods should be accessed through the similar methods in the Table, and I won't even document them. However if you really, really want to, you can find the description of the other methods type/IndexType.h and call them in the aggregation as well.
gadget()->getLabel()->getType()
Get the result row type.
The rest of the actions are done by calling the Index methods on the argument index.Same as with the IndexType, the aggregation is the only place where these methods are called directly, everywhere else the equivalent actions are done through the Table methods. Because of this I've grouped their description with the aggregation and not deparately.
const IndexType *getType() const;
Get the type of this index.
RowHandle *begin() const;
Get the handle of the first row of the group, in the default order according to its first leaf index type. Note that here it's not the whole table's first leaf index type but the first leaf in the index type subtree under this index's type. All the iteration methods return NULL if there are no more rows.
RowHandle *next(const RowHandle *cur) const;
Get the handle of the next row (or NULL if that was the last one) in the default order. The NULL argument makes the NULL result.
RowHandle *last() const;
Get the handle of the last row in the group in the default order.
The rest of the methods of Index aren't really to be used directly.
Unlike the Perl API of AggregatorContext, there are no direct analogs of beginIdx() and such in C++ Index. To get them in C++, you need to translate the iteration to another index type through the Table (and of course, just like in Perl, you would need somehow to get the reference to another index type into your aggregator, and that index type better be in the subtree of the parentIndexType). To translate through the Table, you take any row from the group, usually the first one, and use it with the table methods that accept a sample row.
For example:
RowHandle *sample = index->begin();
RowHandle *rhend = table->nextGroupIdx(otherIndexType, sample);
for (RowHandle *rhit = table->firstOfGroupIdx(otherIndexType, sample); rhit != rhend; rhit = table->nextIdx(otherIndexType, rhit)) {
...
}
This concludes the discussion of the aggregators. This also concludes the description of the whole C++ API, except for the most recent additions.
Aggregator in C++, Part 3
Doing a proper custom aggregator is more involved, and requires making subclasses of both Aggregator and AggregatorType. The test case aggSum shows an example of aggregator that can sum any 64-bit field.
The subclass of Aggregator contains only one method that is very similar to the BasicAggregator handler shown before:
class MySumAggregator: public Aggregator
{
public:
virtual void handle(Table *table, AggregatorGadget *gadget, Index *index,
const IndexType *parentIndexType, GroupHandle *gh, Tray *dest,
Aggregator::AggOp aggop, Rowop::Opcode opcode, RowHandle *rh);
};
void MySumAggregator::handle(Table *table, AggregatorGadget *gadget, Index *index,
const IndexType *parentIndexType, GroupHandle *gh, Tray *dest,
Aggregator::AggOp aggop, Rowop::Opcode opcode, RowHandle *rh)
{
// don't send the NULL record after the group becomes empty
if (opcode == Rowop::OP_NOP || parentIndexType->groupSize(gh) == 0)
return;
int fidx = gadget->typeAs<MySumAggregatorType>()->fieldIdx();
int64_t sum = 0;
for (RowHandle *rhi = index->begin(); rhi != NULL; rhi = index->next(rhi)) {
sum += table->getRowType()->getInt64(rhi->getRow(), fidx, 0);
}
// pick the rest of fields from the last row of the group
RowHandle *lastrh = index->last();
// build the result row; relies on the aggregator result being of the
// same type as the rows in the table
FdataVec fields;
table->getRowType()->splitInto(lastrh->getRow(), fields);
fields[fidx].setPtr(true, &sum, sizeof(sum));
gadget->sendDelayed(dest, fields, opcode);
}
The difference is that the field index is not hardcoded but taken from the aggregator type. The aggregator type is found with
gadget->typeAs<MySumAggregatorType>()
The typeAs template is a very recent addition, I've added it when writing this example. The previous (and still available) equivalent way to do it was to call gadget->getType() and then cast it to the pointer to the proper subclass. The method fieldIdx() is a custom addition to the MySumAggregatorType, not inherited from any base class.
The version of AggregatorGadget::sendDelayed() used here is different:
void sendDelayed(Tray *dest, FdataVec &data, Rowop::Opcode opcode) const;
I've actually added it after writing the first version of this post, then edited the post. It conveniently handles the construction of the row from fields and sending it.
Then the aggregator type needs to be defined with a fixed set of inherited virtual methods plus any needed custom parts.
class MySumAggregatorType: public AggregatorType
{
public:
// @param fname - field name to sum on
MySumAggregatorType(const string &name, const string &fname):
AggregatorType(name, NULL),
fname_(fname),
fidx_(-1)
{ }
// the copy constructor works fine
// (might set the non-NULL row type, but it will be overwritten
// during initialization)
// constructor for deep copy
// (might set the non-NULL row type, but it will be overwritten
// during initialization)
MySumAggregatorType(const MySumAggregatorType &agg, HoldRowTypes *holder):
AggregatorType(agg, holder),
fname_(agg.fname_),
fidx_(agg.fidx_)
{ }
virtual AggregatorType *copy() const
{
return new MySumAggregatorType(*this);
}
virtual AggregatorType *deepCopy(HoldRowTypes *holder) const
{
return new MySumAggregatorType(*this, holder);
}
virtual bool equals(const Type *t) const
{
if (this == t)
return true; // self-comparison, shortcut
if (!AggregatorType::equals(t))
return false;
const MySumAggregatorType *sumt = static_cast<const MySumAggregatorType *>(t);
if (fname_ != sumt->fname_)
return false;
return true;
}
virtual bool match(const Type *t) const
{
if (this == t)
return true; // self-comparison, shortcut
if (!AggregatorType::match(t))
return false;
const MySumAggregatorType *sumt = static_cast<const MySumAggregatorType *>(t);
if (fname_ != sumt->fname_)
return false;
return true;
}
virtual AggregatorGadget *makeGadget(Table *table, IndexType *intype) const
{
return new AggregatorGadget(this, table, intype);
}
virtual Aggregator *makeAggregator(Table *table, AggregatorGadget *gadget) const
{
return new MySumAggregator;
}
virtual void initialize(TableType *tabtype, IndexType *intype)
{
const RowType *rt = tabtype->rowType();
setRowType(rt); // the result has the same type as the argument
fidx_ = rt->findIdx(fname_);
if (fidx_ < 0)
errors_.fAppend(new Errors(rt->print()), "Unknown field '%s' in the row type:", fname_.c_str());
else {
if (rt->fields()[fidx_].arsz_ != RowType::Field::AR_SCALAR
|| rt->fields()[fidx_].type_->getTypeId() != Type::TT_INT64)
errors_.fAppend(new Errors(rt->print()),
"Field '%s' is not an int64 scalar in the row type:", fname_.c_str());
}
AggregatorType::initialize(tabtype, intype);
}
// called from the handler
int fieldIdx() const
{
return fidx_;
}
protected:
string fname_; // name of the field to sum, must be an int64
int fidx_; // index of field named fname_
};
The constructor accepts the aggregator name and the name of the field on which it will sum. The field name will be translated to field index during initialization, and made available to the MySumAggregator::handler() via the method fieldIdx(). The aggregator type starts with the result row type of NULL, with the actual row type set during initialization. This ability to set the row type later had the latent existence all along, but it's the first time I've made it explicit. It hasn't propagated to the Perl API yet. The idea here is that the result row type of this aggregator is always equal to the input row type, so rather than specifying the result type explicitly and then having to check it for compatibility, why not just take the table's row type when it becomes available? And it works beautifully.
The copy constructor and the constructor with HoldRowTypes are the implementations of the virtual methods copy() and deepCopy(). The deepCopy() is yet another recent addition used in the multithreaded support. I'll describe it in detail later, for now just Do It Like This.
The methods match() and equals() follow the same general shape as everywhere else. makeGadget() creates a generic gadget, and makeAggregator() creates an instance of aggregator for each group.
The interesting stuff starts happening in the initialization. The row type gets found from the table and set as the result type. Then the aggregation field is found in the row type and checked for being of the proper type. Its index is remembered for the later use.
errors_.fAppend() is a new part of the Erref API that makes the error construction more convenient. It is smart enough to check the reference for NULL and allocate a new Errors if so, then append a printf-formatted message and nested errors to it. I've been adding the direct printf-formatting for both Errors and Exceptions, this part of the API is to be documented yet.
The subclass of Aggregator contains only one method that is very similar to the BasicAggregator handler shown before:
class MySumAggregator: public Aggregator
{
public:
virtual void handle(Table *table, AggregatorGadget *gadget, Index *index,
const IndexType *parentIndexType, GroupHandle *gh, Tray *dest,
Aggregator::AggOp aggop, Rowop::Opcode opcode, RowHandle *rh);
};
void MySumAggregator::handle(Table *table, AggregatorGadget *gadget, Index *index,
const IndexType *parentIndexType, GroupHandle *gh, Tray *dest,
Aggregator::AggOp aggop, Rowop::Opcode opcode, RowHandle *rh)
{
// don't send the NULL record after the group becomes empty
if (opcode == Rowop::OP_NOP || parentIndexType->groupSize(gh) == 0)
return;
int fidx = gadget->typeAs<MySumAggregatorType>()->fieldIdx();
int64_t sum = 0;
for (RowHandle *rhi = index->begin(); rhi != NULL; rhi = index->next(rhi)) {
sum += table->getRowType()->getInt64(rhi->getRow(), fidx, 0);
}
// pick the rest of fields from the last row of the group
RowHandle *lastrh = index->last();
// build the result row; relies on the aggregator result being of the
// same type as the rows in the table
FdataVec fields;
table->getRowType()->splitInto(lastrh->getRow(), fields);
fields[fidx].setPtr(true, &sum, sizeof(sum));
gadget->sendDelayed(dest, fields, opcode);
}
The difference is that the field index is not hardcoded but taken from the aggregator type. The aggregator type is found with
gadget->typeAs<MySumAggregatorType>()
The typeAs template is a very recent addition, I've added it when writing this example. The previous (and still available) equivalent way to do it was to call gadget->getType() and then cast it to the pointer to the proper subclass. The method fieldIdx() is a custom addition to the MySumAggregatorType, not inherited from any base class.
The version of AggregatorGadget::sendDelayed() used here is different:
void sendDelayed(Tray *dest, FdataVec &data, Rowop::Opcode opcode) const;
I've actually added it after writing the first version of this post, then edited the post. It conveniently handles the construction of the row from fields and sending it.
Then the aggregator type needs to be defined with a fixed set of inherited virtual methods plus any needed custom parts.
class MySumAggregatorType: public AggregatorType
{
public:
// @param fname - field name to sum on
MySumAggregatorType(const string &name, const string &fname):
AggregatorType(name, NULL),
fname_(fname),
fidx_(-1)
{ }
// the copy constructor works fine
// (might set the non-NULL row type, but it will be overwritten
// during initialization)
// constructor for deep copy
// (might set the non-NULL row type, but it will be overwritten
// during initialization)
MySumAggregatorType(const MySumAggregatorType &agg, HoldRowTypes *holder):
AggregatorType(agg, holder),
fname_(agg.fname_),
fidx_(agg.fidx_)
{ }
virtual AggregatorType *copy() const
{
return new MySumAggregatorType(*this);
}
virtual AggregatorType *deepCopy(HoldRowTypes *holder) const
{
return new MySumAggregatorType(*this, holder);
}
virtual bool equals(const Type *t) const
{
if (this == t)
return true; // self-comparison, shortcut
if (!AggregatorType::equals(t))
return false;
const MySumAggregatorType *sumt = static_cast<const MySumAggregatorType *>(t);
if (fname_ != sumt->fname_)
return false;
return true;
}
virtual bool match(const Type *t) const
{
if (this == t)
return true; // self-comparison, shortcut
if (!AggregatorType::match(t))
return false;
const MySumAggregatorType *sumt = static_cast<const MySumAggregatorType *>(t);
if (fname_ != sumt->fname_)
return false;
return true;
}
virtual AggregatorGadget *makeGadget(Table *table, IndexType *intype) const
{
return new AggregatorGadget(this, table, intype);
}
virtual Aggregator *makeAggregator(Table *table, AggregatorGadget *gadget) const
{
return new MySumAggregator;
}
virtual void initialize(TableType *tabtype, IndexType *intype)
{
const RowType *rt = tabtype->rowType();
setRowType(rt); // the result has the same type as the argument
fidx_ = rt->findIdx(fname_);
if (fidx_ < 0)
errors_.fAppend(new Errors(rt->print()), "Unknown field '%s' in the row type:", fname_.c_str());
else {
if (rt->fields()[fidx_].arsz_ != RowType::Field::AR_SCALAR
|| rt->fields()[fidx_].type_->getTypeId() != Type::TT_INT64)
errors_.fAppend(new Errors(rt->print()),
"Field '%s' is not an int64 scalar in the row type:", fname_.c_str());
}
AggregatorType::initialize(tabtype, intype);
}
// called from the handler
int fieldIdx() const
{
return fidx_;
}
protected:
string fname_; // name of the field to sum, must be an int64
int fidx_; // index of field named fname_
};
The constructor accepts the aggregator name and the name of the field on which it will sum. The field name will be translated to field index during initialization, and made available to the MySumAggregator::handler() via the method fieldIdx(). The aggregator type starts with the result row type of NULL, with the actual row type set during initialization. This ability to set the row type later had the latent existence all along, but it's the first time I've made it explicit. It hasn't propagated to the Perl API yet. The idea here is that the result row type of this aggregator is always equal to the input row type, so rather than specifying the result type explicitly and then having to check it for compatibility, why not just take the table's row type when it becomes available? And it works beautifully.
The copy constructor and the constructor with HoldRowTypes are the implementations of the virtual methods copy() and deepCopy(). The deepCopy() is yet another recent addition used in the multithreaded support. I'll describe it in detail later, for now just Do It Like This.
The methods match() and equals() follow the same general shape as everywhere else. makeGadget() creates a generic gadget, and makeAggregator() creates an instance of aggregator for each group.
The interesting stuff starts happening in the initialization. The row type gets found from the table and set as the result type. Then the aggregation field is found in the row type and checked for being of the proper type. Its index is remembered for the later use.
errors_.fAppend() is a new part of the Erref API that makes the error construction more convenient. It is smart enough to check the reference for NULL and allocate a new Errors if so, then append a printf-formatted message and nested errors to it. I've been adding the direct printf-formatting for both Errors and Exceptions, this part of the API is to be documented yet.
Aggregator in C++, part 2
Building an aggregator is a fairly complex subject, so next I want to show a working example. It turns out that I've never actually written an aggregator in C++ before, other than some tiny fragments; I was always working through Perl. So I've written some more interesting examples just now. The full text can be found in the unit test file cpp/table/t_Agg.cpp.
First, if your aggregator is truly stateless and fully hardcoded, the easier way to do it as by defining a plain function with the same handler arguments and building a BasicAggregatorType with it. And here is one that sums the values of an int64 field (the test case aggBasicSum):
void sumC(Table *table, AggregatorGadget *gadget, Index *index,
const IndexType *parentIndexType, GroupHandle *gh, Tray *dest,
Aggregator::AggOp aggop, Rowop::Opcode opcode, RowHandle *rh)
{
// don't send the NULL record after the group becomes empty
if (opcode == Rowop::OP_NOP || parentIndexType->groupSize(gh) == 0)
return;
int64_t sum = 0;
for (RowHandle *rhi = index->begin(); rhi != NULL; rhi = index->next(rhi)) {
sum += table->getRowType()->getInt64(rhi->getRow(), 2, 0); // field c at idx 2
}
// pick the rest of fields from the last row of the group
RowHandle *lastrh = index->last();
// build the result row; relies on the aggregator result being of the
// same type as the rows in the table
FdataVec fields;
table->getRowType()->splitInto(lastrh->getRow(), fields);
fields[2].setPtr(true, &sum, sizeof(sum)); // set the field c from the sum
Rowref res(gadget->getLabel()->getType(), fields);
gadget->sendDelayed(dest, res, opcode);
}
...
Autoref<TableType> tt = TableType::make(rt1)
->addSubIndex("Hashed", HashedIndexType::make( // will be the default index
(new NameSet())->add("e")
)->addSubIndex("Fifo", FifoIndexType::make()
->setAggregator(new BasicAggregatorType("aggr", rt1, sumC))
)
);
...
As I've told before, you create the BasicAggregatorType by giving it the aggregator name, aggregator result row type, and the handler function.
In this case the handler function is completely hardcoded. It works on the int64 field at index 2. The row type I used in this example is:
row {
uint8[10] a,
int32[] b,
int64 c,
float64 d,
string e,
}
So the field is actually named "c", and that's why the aggregator function is named "sumC". But since in this case everything is known in advance, to make it more efficient, the look-up of the field by name has been skipped, and the field index has been pre-computed and placed into the function.
The general outline of the aggregator is exactly the same as in Perl: check for an empty group, then iterate through all the rows in the group and compute a the sum, fill the rest of the fields from the last row, and send the result. The difference is that there is no AggregatorContext, and the calls are done directly on the bits and pieces received as arguments.
The group size is computed as
parentIndexType->groupSize(gh)
This is pretty much the reason for parentIndexType and gh to be passed as arguments to the handler. Why it's done like this is a long story. It allows an individual index (as in instance, not type!) to not care about the number of rows in it. Remember, a group may have multiple "parallel" indexes defined on it, with all of them containing the same rows, and thus the same number of rows. The group handle (gh) contains the common information about the group, including the group size. And the parent index type is the one that manages the group handles inside it, and knows how to extract the information from them.
The iteration is done as usual with the begin() and next(), only called directly on the index. It will return NULL after the last row. The last row can also be found directly with index->last(); on an empty group it would return NULL, so be careful with that.
The input row type for reading the rows from the group is found as:
table->getRowType()
The result row is built in this example by copying the last row and replacing one field. The data from the last row is split into FdataVec (the data itself is not copied at this point but the data descriptors in the construction vector are made to point to the data in the original row). Then the descriptor for the field "c" is changed to point to the computed sum. Then the new row is built from the descriptor.
In this particular case the type of the input rows and of the result rows is the same, so either could have been used to construct the result rows. There are two ways to find the result type:
gadget()->getType()->getRowType()
gadget->getLabel()->getType()
They are exactly the same, just there are two paths leading to the same object.
Finally, the constructed row is sent. sendDelayed() takes care of constructing the rowop from the components.
Potentially, you could even call directly
sendDelayed(dest, gadget->getLabel()->getType()->makeRow(fields), opcode);
However the caveat is that if the table's enqueuing mode is set to EM_IGNORE (which is an old idea, nowadays everyone should use EM_CALL and eventually there will be no other way than EM_CALL, but for now it could happen), the rows would leak. The reason for that leak is that sendDelayed() will skip the rowop creation with EM_IGNORE, and thus will skip the reference creation, and the row would never have a reference to it created and would never be freed. Creating an explicit reference makes sure that the row will always be cleaned up when it becomes unused.
First, if your aggregator is truly stateless and fully hardcoded, the easier way to do it as by defining a plain function with the same handler arguments and building a BasicAggregatorType with it. And here is one that sums the values of an int64 field (the test case aggBasicSum):
void sumC(Table *table, AggregatorGadget *gadget, Index *index,
const IndexType *parentIndexType, GroupHandle *gh, Tray *dest,
Aggregator::AggOp aggop, Rowop::Opcode opcode, RowHandle *rh)
{
// don't send the NULL record after the group becomes empty
if (opcode == Rowop::OP_NOP || parentIndexType->groupSize(gh) == 0)
return;
int64_t sum = 0;
for (RowHandle *rhi = index->begin(); rhi != NULL; rhi = index->next(rhi)) {
sum += table->getRowType()->getInt64(rhi->getRow(), 2, 0); // field c at idx 2
}
// pick the rest of fields from the last row of the group
RowHandle *lastrh = index->last();
// build the result row; relies on the aggregator result being of the
// same type as the rows in the table
FdataVec fields;
table->getRowType()->splitInto(lastrh->getRow(), fields);
fields[2].setPtr(true, &sum, sizeof(sum)); // set the field c from the sum
Rowref res(gadget->getLabel()->getType(), fields);
gadget->sendDelayed(dest, res, opcode);
}
...
Autoref<TableType> tt = TableType::make(rt1)
->addSubIndex("Hashed", HashedIndexType::make( // will be the default index
(new NameSet())->add("e")
)->addSubIndex("Fifo", FifoIndexType::make()
->setAggregator(new BasicAggregatorType("aggr", rt1, sumC))
)
);
...
As I've told before, you create the BasicAggregatorType by giving it the aggregator name, aggregator result row type, and the handler function.
In this case the handler function is completely hardcoded. It works on the int64 field at index 2. The row type I used in this example is:
row {
uint8[10] a,
int32[] b,
int64 c,
float64 d,
string e,
}
So the field is actually named "c", and that's why the aggregator function is named "sumC". But since in this case everything is known in advance, to make it more efficient, the look-up of the field by name has been skipped, and the field index has been pre-computed and placed into the function.
The general outline of the aggregator is exactly the same as in Perl: check for an empty group, then iterate through all the rows in the group and compute a the sum, fill the rest of the fields from the last row, and send the result. The difference is that there is no AggregatorContext, and the calls are done directly on the bits and pieces received as arguments.
The group size is computed as
parentIndexType->groupSize(gh)
This is pretty much the reason for parentIndexType and gh to be passed as arguments to the handler. Why it's done like this is a long story. It allows an individual index (as in instance, not type!) to not care about the number of rows in it. Remember, a group may have multiple "parallel" indexes defined on it, with all of them containing the same rows, and thus the same number of rows. The group handle (gh) contains the common information about the group, including the group size. And the parent index type is the one that manages the group handles inside it, and knows how to extract the information from them.
The iteration is done as usual with the begin() and next(), only called directly on the index. It will return NULL after the last row. The last row can also be found directly with index->last(); on an empty group it would return NULL, so be careful with that.
The input row type for reading the rows from the group is found as:
table->getRowType()
The result row is built in this example by copying the last row and replacing one field. The data from the last row is split into FdataVec (the data itself is not copied at this point but the data descriptors in the construction vector are made to point to the data in the original row). Then the descriptor for the field "c" is changed to point to the computed sum. Then the new row is built from the descriptor.
In this particular case the type of the input rows and of the result rows is the same, so either could have been used to construct the result rows. There are two ways to find the result type:
gadget()->getType()->getRowType()
gadget->getLabel()->getType()
They are exactly the same, just there are two paths leading to the same object.
Finally, the constructed row is sent. sendDelayed() takes care of constructing the rowop from the components.
Potentially, you could even call directly
sendDelayed(dest, gadget->getLabel()->getType()->makeRow(fields), opcode);
However the caveat is that if the table's enqueuing mode is set to EM_IGNORE (which is an old idea, nowadays everyone should use EM_CALL and eventually there will be no other way than EM_CALL, but for now it could happen), the rows would leak. The reason for that leak is that sendDelayed() will skip the rowop creation with EM_IGNORE, and thus will skip the reference creation, and the row would never have a reference to it created and would never be freed. Creating an explicit reference makes sure that the row will always be cleaned up when it becomes unused.
Friday, February 15, 2013
Aggregator in C++, part 1
I've described the AggregatorType and AggregatorGadget before. Now, the last piece of the puzzle is the Aggregator class.
An object of subclass of AggregatorType defines the logic of the aggregation, and there is one of them per a TableType. An object of subclass of AggregatorGadget provides the aggregation output of a Table, and there is one of them per table. An object of subclass of Aggregator keeps the state of an aggregation group, and there is one of them per Index that holds a group in the table.
Repeating what I wrote before:
"The purpose of the Aggregator object created by AggregatorType's makeAggregator() is to keep the state of the group. If you're doing an additive aggregation, it allows you to keep the previous results. If you're doing the optimization of the deletes, it allows you to keep the previous sent row.
What if your aggregator keeps no state? You still have to make an Aggregator for every group, and no, you can't just return NULL, and no, they are not reference-countable, so you have to make a new copy of it for every group (i.e. for every call of makeAggergator()). This looks decidedly sub-optimal, and eventually I'll get around to straighten it out. The good news though is that most of the real aggerators keep the state anyway, so it doesn't matter much."
Once again, the Aggregator object doesn't normally do the logic by itself, it calls its AggregatorType for the logic. It only provides storage for one group's state and lets the AggergatorType logic use that storage.
The Aggregator is defined in table/Aggregator.h. It defines two things: the constants and the virtual method.
The constants are in the enum Aggregator::AggOp, same as defined before in Perl:
AO_BEFORE_MOD,
AO_AFTER_DELETE,
AO_AFTER_INSERT,
AO_COLLAPSE,
Their meaning is also the same. They can be converted to and from the string representation with methods
static const char *aggOpString(int code, const char *def = "???");
static int stringAggOp(const char *code);
They work the same way as the other constant conversion methods.
Notably, there is no explicit Aggregator constructor. The base class Aggregator doesn't have any knowledge of the group state, so the default constructor is good enough. You add this knowledge of the state in your subclass, and there you define your own constructor. Whether the constructor takes any arguments or not is completely up to you, since your subclass of AggregatorType will be calling that constructor.
And the actual work then happens in the method
virtual void handle(Table *table, AggregatorGadget *gadget, Index *index,
const IndexType *parentIndexType, GroupHandle *gh, Tray *dest,
AggOp aggop, Rowop::Opcode opcode, RowHandle *rh);
Your subclass absolutely has to define this method because it's abstract in the base class. The arguments provide the way to get back to the table, its components and its type, so you don't have to pass them through your Aggregator constructor and keep them in your instance, this saves memory.
Notably absent is the pointer to the AggregatorType. This is because the AggregatorGadget already has this pointer, so all you need to to is call
gadget->getType();
You might also want to cast it to your aggregator's type in case if you plan to call your custom methods, like:
MyAggregatorType *at = (MyAggregatorType *)gadget->getType();
Some of the argument types, Index and GroupHandle, have not been described yet, and will be described shortly.
But in general the method handle() is expected to work very much like the aggregator handler does in Perl. It's called at the same times (and indeed the Perl handler is called from the C++-level of the handler), with the same aggop and opcode. The rh is still the current modified row.
A bit of a difference is that the Perl way hides the sensitive arguments in an AggregatorContext while in C++ they are passed directly.
The job of the handle() is to do whatever is necessary, possibly iterate through the group, possibly use the previous state, produce the new state (and possibly remember it), then send the rowop to the gadget. This last step usually is:
gadget->sendDelayed(dest, newrow, opcode);
Here gadget is the AggregatorGadget from the arguments, opcode also helpfully comes from the arguments (and if it's OP_NOP, there is no need to send anything, it will be thrown away anyway), and newrow is the newly computed result Row (not RowHandle!). Obviously, the type of the result row must match the output row type of the AggregatorGadget, or all kinds of bad things will happen. dest is also an argument of handle(), and is a tray that will hold the aggregator results until a proper time when they can be sent. The "delayed" part of "sendDelayed" means exactly that: the created rowops are not sent directly to the gadget's output but are collected in a Tray until the Table decides that it's the right time to send them.
You absolutely must not use the Gadget's default method send(), use only sendDelayed(). To help with this discipline, AggregatorGadget hides the send() method and makes only the sendDelayed() visible.
An object of subclass of AggregatorType defines the logic of the aggregation, and there is one of them per a TableType. An object of subclass of AggregatorGadget provides the aggregation output of a Table, and there is one of them per table. An object of subclass of Aggregator keeps the state of an aggregation group, and there is one of them per Index that holds a group in the table.
Repeating what I wrote before:
"The purpose of the Aggregator object created by AggregatorType's makeAggregator() is to keep the state of the group. If you're doing an additive aggregation, it allows you to keep the previous results. If you're doing the optimization of the deletes, it allows you to keep the previous sent row.
What if your aggregator keeps no state? You still have to make an Aggregator for every group, and no, you can't just return NULL, and no, they are not reference-countable, so you have to make a new copy of it for every group (i.e. for every call of makeAggergator()). This looks decidedly sub-optimal, and eventually I'll get around to straighten it out. The good news though is that most of the real aggerators keep the state anyway, so it doesn't matter much."
Once again, the Aggregator object doesn't normally do the logic by itself, it calls its AggregatorType for the logic. It only provides storage for one group's state and lets the AggergatorType logic use that storage.
The Aggregator is defined in table/Aggregator.h. It defines two things: the constants and the virtual method.
The constants are in the enum Aggregator::AggOp, same as defined before in Perl:
AO_BEFORE_MOD,
AO_AFTER_DELETE,
AO_AFTER_INSERT,
AO_COLLAPSE,
Their meaning is also the same. They can be converted to and from the string representation with methods
static const char *aggOpString(int code, const char *def = "???");
static int stringAggOp(const char *code);
They work the same way as the other constant conversion methods.
Notably, there is no explicit Aggregator constructor. The base class Aggregator doesn't have any knowledge of the group state, so the default constructor is good enough. You add this knowledge of the state in your subclass, and there you define your own constructor. Whether the constructor takes any arguments or not is completely up to you, since your subclass of AggregatorType will be calling that constructor.
And the actual work then happens in the method
virtual void handle(Table *table, AggregatorGadget *gadget, Index *index,
const IndexType *parentIndexType, GroupHandle *gh, Tray *dest,
AggOp aggop, Rowop::Opcode opcode, RowHandle *rh);
Your subclass absolutely has to define this method because it's abstract in the base class. The arguments provide the way to get back to the table, its components and its type, so you don't have to pass them through your Aggregator constructor and keep them in your instance, this saves memory.
Notably absent is the pointer to the AggregatorType. This is because the AggregatorGadget already has this pointer, so all you need to to is call
gadget->getType();
You might also want to cast it to your aggregator's type in case if you plan to call your custom methods, like:
MyAggregatorType *at = (MyAggregatorType *)gadget->getType();
Some of the argument types, Index and GroupHandle, have not been described yet, and will be described shortly.
But in general the method handle() is expected to work very much like the aggregator handler does in Perl. It's called at the same times (and indeed the Perl handler is called from the C++-level of the handler), with the same aggop and opcode. The rh is still the current modified row.
A bit of a difference is that the Perl way hides the sensitive arguments in an AggregatorContext while in C++ they are passed directly.
The job of the handle() is to do whatever is necessary, possibly iterate through the group, possibly use the previous state, produce the new state (and possibly remember it), then send the rowop to the gadget. This last step usually is:
gadget->sendDelayed(dest, newrow, opcode);
Here gadget is the AggregatorGadget from the arguments, opcode also helpfully comes from the arguments (and if it's OP_NOP, there is no need to send anything, it will be thrown away anyway), and newrow is the newly computed result Row (not RowHandle!). Obviously, the type of the result row must match the output row type of the AggregatorGadget, or all kinds of bad things will happen. dest is also an argument of handle(), and is a tray that will hold the aggregator results until a proper time when they can be sent. The "delayed" part of "sendDelayed" means exactly that: the created rowops are not sent directly to the gadget's output but are collected in a Tray until the Table decides that it's the right time to send them.
You absolutely must not use the Gadget's default method send(), use only sendDelayed(). To help with this discipline, AggregatorGadget hides the send() method and makes only the sendDelayed() visible.
Friday, February 8, 2013
little table type additions
As I'm working on the multithreading, this has created some little additions that have been missed before.
First, I've added a method to copy a whole table type. In Perl it is:
$newtt = $oldtt->copy();
In C++ it is:
TableType *copy();
This copies the table type, along with copying all the index types in it, since each table type must have its own instances of the index types. The copied table is always uninitialized.
In case if the table type collected errors, the errors aren't copied, and you should not copy such a table type. This caveat applies to the C++ code, since the Perl code would not let you create a table type with errors, it would fail on these errors immediately.
Second, in the Perl implementation of AggregatorType I've added a method to get back the row type:
$rt = $aggtype->getRowType();
First, I've added a method to copy a whole table type. In Perl it is:
$newtt = $oldtt->copy();
In C++ it is:
TableType *copy();
This copies the table type, along with copying all the index types in it, since each table type must have its own instances of the index types. The copied table is always uninitialized.
In case if the table type collected errors, the errors aren't copied, and you should not copy such a table type. This caveat applies to the C++ code, since the Perl code would not let you create a table type with errors, it would fail on these errors immediately.
Second, in the Perl implementation of AggregatorType I've added a method to get back the row type:
$rt = $aggtype->getRowType();