Thursday, June 6, 2013

reordering the data from multiple threads, part 2

Before going to the example itself, let's talk more about the general issues of the data partitioning.

If the data processing is stateless, it's easy: just partition by the primary key, and each thread can happily work on its own. If the data depends on the previous data with the same primary key, the partitioning is still easy: each thread keeps the state for its part of the keys and updates it with the new data.

But what if you need to join two tables with independent primary keys, where the matches of the keys between them are fairly arbitrary? Then you can partition by one table but you have to give a copy of the second table to each thread. Typically, if one table is larger than the other, you would partition by the key of the big table, and copy the small table everywhere. Since the data rows are referenced in Triceps, the actual data of the smaller table won't be copied, it would be just referenced from multiple threads, but each copy will still have the overhead of its own index.

With some luck, and having enough CPUs, you might be able to save a little overhead by doing a matrix: if you have one table partitioned into the parts A, B and C, and the other table into parts 1, 2 and 3, you can then do a matrix-combination into 9 threads processing the combinations A1, A2, A3, B1, B2, B3, C1, C2, C3. If both tables are of about the same size, this would create a total of 18 indexes, each keeping 1/3 of one original table, so the total size of indexes will b 6 times the size of one original table (or 3 times the combined sizes of both tables). On the other hand, if you were to copy the first table to each thread and split the second table into 9 parts, creating the same 9 threads, the total size of indexes will be 9 times the first table and 1 times the second table, resulting in the 10 times the size of an original table (or 5 times the combined sizes of both tables). There is a win to this, but the catch is that the results from this kind of 3x3 matrix will really have to be restored to the correct order afterwards.

The reason is that when a row in the first table changes, it might make it join, say, in the thread A2 instead of the thread A1. So the thread A1 would generate a DELETE for the join result, and the thread A2 would generate a following INSERT. With two separate threads, the resulting order will be unpredictable, and the INSERT coming before the DELETE would be bad. The post-reordering is really in order.

By contrast, if you just partition the first table and copy the second everywhere, you get 9 threads A, B, C, D E, F, G, H, I, and the change in a row will still keep it in the same thread, so the updates will come out of that thread strictly sequentially. If you don't care about the order changes between different primary keys, you can get away without the post-reordering. Of course, if a key field might change and you care about it being processed in order, you'd still need the post-reordering.

The example I'm going to show is a somewhat of s strange mix. It's the adaptation of the Forex arbitration example from the section 12.13. Self-join done manually. As you can see from the name of the section, it's doing a self-join, kind of like going through the same table 3 times.

The partitioning in this example works as follows:
  • All the data is sent to all the threads. All the threads keep a full copy of the table and update it according to the input. 
  • But then they compute the join only if the first currency name in the update falls into the thread's partition. 
  • The partitioning is done by the first letter of the symbol, with interleaving: the symbols starting with A are handled by the thread 0, with B by thread 1, and so on until the threads end, and then continuing again with the thread 0. A production-ready implementation would use a hash function instead. But the interleaving approach makes much easier to predict, which symbol goes to which thread for the example.
  • Naturally, all this means that the loop of 3 currencies might get created by a change in one pair and then very quickly disappear by a change to another pair.of currencies. So the post-reordering of the result is important to keep the things consistent.
I've also added a tweak allowing to artificially slow down the thread 0, making the incorrect order to show up reliably, and make the reordering code really work. For example, suppose the following input sent quickly:

OP_INSERT,AAA,BBB,1.30
OP_INSERT,BBB,AAA,0.74
OP_INSERT,AAA,CCC,1.98
OP_INSERT,CCC,AAA,0.49
OP_INSERT,BBB,CCC,1.28
OP_INSERT,CCC,BBB,0.78
OP_DELETE,BBB,AAA,0.74
OP_INSERT,BBB,AAA,0.64

With two threads, and thread 0 working slowly, it would produce the raw result:

BEGIN OP_INSERT seq="2" triead="1"
BEGIN OP_INSERT seq="5" triead="1"
BEGIN OP_INSERT seq="7" triead="1"
result OP_DELETE ccy1="AAA" ccy2="CCC" ccy3="BBB" rate1="1.98" rate2="0.78" rate3="0.74" looprate="1.142856"
BEGIN OP_INSERT seq="8" triead="1"
BEGIN OP_INSERT seq="1" triead="0"
BEGIN OP_INSERT seq="3" triead="0"
BEGIN OP_INSERT seq="4" triead="0"
BEGIN OP_INSERT seq="6" triead="0"
result OP_INSERT ccy1="AAA" ccy2="CCC" ccy3="BBB" rate1="1.98" rate2="0.78" rate3="0.74" looprate="1.142856"

Here the BEGIN lines are generated by the code and show the sequence number of the input row and the id of the thread that did the join. The result lines show the arbitration opportunities produced by the join. Obviously, not every update produces a new result, most of them don't. But the INSERT and DELETE in the result come in the wrong order: the update 7 had overtaken the update 6.

The post-reordering comes to the resque and restores the order:

BEGIN OP_INSERT seq="1" triead="0"
BEGIN OP_INSERT seq="2" triead="1"
BEGIN OP_INSERT seq="3" triead="0"
BEGIN OP_INSERT seq="4" triead="0"
BEGIN OP_INSERT seq="5" triead="1"
BEGIN OP_INSERT seq="6" triead="0"
result OP_INSERT ccy1="AAA" ccy2="CCC" ccy3="BBB" rate1="1.98" rate2="0.78" rate3="0.74" looprate="1.142856"
BEGIN OP_INSERT seq="7" triead="1"
result OP_DELETE ccy1="AAA" ccy2="CCC" ccy3="BBB" rate1="1.98" rate2="0.78" rate3="0.74" looprate="1.142856"
BEGIN OP_INSERT seq="8" triead="1"

As you can see, now the sequence numbers go in the sequential order.

No comments:

Post a Comment