Monday, March 26, 2012

The dreaded diamond and the execution order

The "diamond" is a particular topology of the data flow, when the computation separates based on some condition and then merges again. Like this:


It is also known as "fork-join" (the "join" here has nothing to do with the SQL join, it just means that the arrows merge to the same block).

This topology is a known source of two problems. The first problem is about the execution order.

To make things easier to see, let's consider a simple example.

Suppose the rows come into the block A with the schema:

key string
value int32

And come out of the blocks B and C into D with schema

key string
value int32
negative int32

With the logic in the blocks:

A: if value < 0 then B else C
B: negative = 1
C: negative = 0

Yes, this is a very dumb example that can usually be handled by a conditional expression in a single block. But that's to keep it simlple. A real example would often include some SQL joins, with different joins done on condition.

Suppose A then gets the input, in CSV form:

INSERT,key1,10
DELETE,key1,10
INSERT,key1,20
DELETE,key1,20
INSERT,key1,-1

What arrives at D should be

INSERT,key1,10,0
DELETE,key1,10,0
INSERT,key1,20,0
DELETE,key1,20,0
INSERT,key1,-1,1

And with the first four rows this is not a problem: they follow the same path and are queued sequentially, so the order is preserved. But the last row follows a different path.And the last two rows logically represent a single update and would likely arrive closely together. The last row might happen to overtake the one before it, and D would see the incorrect result:

INSERT,key1,10,0
DELETE,key1,10,0
INSERT,key1,20,0
INSERT,key1,-1,1
DELETE,key1,20,0

If all these input rows arrive closely one after another, the last row might overtake even more of them and produce an even more interesting result like

INSERT,key1,-1,1
INSERT,key1,10,0
DELETE,key1,10,0
INSERT,key1,20,0
DELETE,key1,20,0

Such misorderings may also happen between the rows with different keys. Those are usually less of a problem, because usually if D keeps a table, the rows with different keys may be updated in any order without losing the meaning. But in case if D keeps a  FIFO index (say, for a window based on a row count), and the two keys fall into the same FIFO bucket, their misordering would also affect the logic.

The reasons for this can be subdivided further into two classes:
  • asynchronous execution
  • incorrect scheduling in the synchronous execution


If each block executes asynchronously in its own thread, there is no way to predict, in which order they will actually execute. If some data is sent to B and C at about the same time, it becomes a race between them. One of the paths might also be longer than the other, making one alternative always win the race. This kind of problems is fairly common for the Aleri system that is highly multithreaded. But this is the problem of absolutely any CEP engine if you split the execution by multiple threads or processes.

But the single-threaded execution is not necessarily a cure either. Then the order of execution is up to the scheduler. And if the scheduler gets all these rows close together, and then decides to process all the input of A, then all the input of B, of C and of D, then D will receive the rows in the order:

INSERT,key1,-1,1
INSERT,key1,10,0
DELETE,key1,10,0
INSERT,key1,20,0
DELETE,key1,20,0

Which is typical for, say, Coral8 if all the input rows arrive in a single bundle (see the separate post on bundling too).

At the moment Triceps does not directly support the multithreaded execution, so that renders the first sub-case moot. But I have ideas on how to get this working without too much trouble.

When the single-threaded scheduling is concerned, Triceps provides two answers.

First, the conditional logic can often be expressed procedurally:

if ($a->get("value") < 0) {
  D($rtD->makeRowHash($a->toHash(), negative => 1));
} else {
  D($rtD->makeRowHash($a->toHash(), negative => 0));
}

The procedural if-else logic can easily handle not only the simple expressions but things like look-ups and modifications in the tables.

Second, if the logic is broken into the separate labels, the label call semantics provides the same ordering as well:

$lbA = $unit->makeLabel($rtA, "A", undef, sub {
  my $rop = $_[1]; 
  my $op = $rop->getOpcode(); my $a = $rop->getRow();
  if ($a->get("value") < 0) { 
    $unit->call($lbB->makeRowop($op, $a));
  } else { 
    $unit->call($lbC->makeRowop($op, $a));
  } 
}) or die "$!";

$lbB = $unit->makeLabel($rtA, "B", undef, sub {
  my $rop = $_[1]; 
  my $op = $rop->getOpcode(); my $a = $rop->getRow();
  $unit->makeHashCall($lbD, $op, $a->toHash(), negative => 1)
    or die "$!";
}) or die "$!";

$lbC = $unit->makeLabel($rtA, "C", undef, sub {
  my $rop = $_[1]; 
  my $op = $rop->getOpcode(); my $a = $rop->getRow();
  $unit->makeHashCall($lbD, $op, $a->toHash(), negative => 0)
    or die "$!";
}) or die "$!";

When the label A calls the label B or C, which calls the label D, A does not get to see its next input row until the whole chain of calls to D and beyond completes. B and C may be replaced with the label chains of arbitrary complexity, including loops, without disturbing the logic.

No comments:

Post a Comment