Showing posts with label scheduling. Show all posts
Showing posts with label scheduling. Show all posts

Saturday, February 8, 2025

Asynchronous programming 6 - inlining done right

To recap, "inlining" is when we complete a future that has some function chained to it (directly or through other futures), and that function gets immediately executed from the completion library call. The opposite is "trampolining" where this function gets put into a scheduler queue and executed later.

Inlining allows to save on the cost of scheduling, and also keeps the cache hot: completing a future means that we've just put some value into it, and so reading that value (and other values it's connected to) immediately means that it will still be in the cache.

However inlining can also be self-defeating: suppose we want to complete ten futures, each with a function chained to it. If trampolined, ten CPUs can pick them from the scheduler queue and execute in parallel. But inlining would inadvertently cause them to be executed sequentially.

The reality is that inlining is only efficient when it's done at the tail of the current function. On the other hand, the issues with inlining (stack overflows and bad interactions with mutexes and serializing the parallel execution) can be avoided if the inlined function was called only after the current function returns.

Put this way, the straightforward solution is to replace inlining with a special case of trampolining via a "delayed scheduling slot": have a special thread-local variable in the scheduler, sufficient to hold a reference to a single scheduled function. Whenever a future is completed, put one chained function there and schedule the rest as usual. If the delayed slot is already used, then it can be either left as-is and all the new functions scheduled as usual, or in the hope that the later completions have a hotter cache, move the old contents of the delayed slot into the normal scheduling queue and put the new function there. Then when the current asynchronous function is completed, have the scheduler code check the delay slot, and if not empty, call the function from there.

This can be expressed in pseudocode:

thread_local<Function> delaySlot;

complete_future(Future fut)
{
  FunctionList funcs;
  FutureList recfut;

  recfut.insert(fut);

  for (f in recfut) {
    scopedlock(f);

    if (f.completed)
      continue;  // a double completion, nothing to do 

    funcs.merge(f.chained_functions);
    f.chained_functions.clear();

    recfut.merge(f.chained_futures);
    f.chained_futures.clear();
  }
  if (delaySlot.empty() && funcs.size() == 1) {
    delaySlot = funcs.front();
  } else if (!funcs.empty()) {
    scopelock(schedulerQueue);
    if (!delaySlot.empty()) {
      schedulerQueue.insert(delaySlot);
    }
    delaySlot = funcs.front();
    funcs.pop_front();
    for (fun in funcs) {
      schedulerQueue.insert(fun);
  }
}

scheduler_thread()
{
  while (true) {
    Function f;
    if (!delaySlot.empty()) {
      f = delaySlot;
      delaySlot.clear();
    } else {
      f = getNextFromQueue();
    }
    execute(f);
  }
}

Another interesting point is that cache locality gets improved by unfair scheduling, inserting the new functions at the front of the scheduling queue, with the premise that the more recent inserts will have a hotter cache. It's not exactly unfair either: Remember that in asynchronous programming the sequential execution gets broken up into a sequence of separate small functions. And so the most recently scheduled function is likely the continuation of the previous function, and running it first is completely fair, with the scheduling queue becoming a representation of the call stack, the innermost "return addresses" being at the front of the queue.

This is very similar to Triceps's scheduling logic, following from the same reasoning. Or to put it differently, this is the reason why Triceps's scheduling logic should also be used for the asynchronous programming in general.

Wednesday, February 19, 2020

Scheduling policy for burst multithreading

Over a year ago I've been reading Eric Raymond's blog post on the limitations of multithreading: http://esr.ibiblio.org/?p=8223

That gave me an idea that on the desktop we routinely have the spare CPUs going to waste. If we could reduce the cost of cross-CPU interaction, that would allow to schedule efficiently even the small snippets of parallel code. And we can reduce this cost by scheduling multiple threads of one process together, even if they have nothing to do - then they would just park the CPU in user space, and wake up quickly, directly in the user space, with already pre-heated cache and TLB. And then we can use the fast low-overhead inter-thread communications directly in the user space.

And then I've realized that another way to look at it is kind of like superscalar execution on a higher level, sort of what they've tried to do in Itanium but better.

Well, this finally ended up in a defense publication:
https://www.tdcommons.org/dpubs_series/2896/

Not sure if it will end up in anything useful but at least the idea is out there and you can read the details of it.

Tuesday, October 16, 2012

More of the fork reform

I've made one more change to the execution logic. I'm not sure if I've told anything explicit about this area before, or if it's going to be a new information but here we go anyway:

All the time before it was implicitly assumed that when a label in FnReturn passes the call to a label in its bound FnBinding, it does a proper call. And indeed it was so. But the problem with that is that the bound label can not easily fork a rowop to the frame of its parent label in FnReturn. This was breaking some of the examples that I've been playing with but I haven't shown yet.

So I've changed it to work like a chained label. All along, even in 1.0, the chained labels were executing their rowops reusing the frame of their parent label, from which they are chained. Now the binding labels do the same thing. And they even are shown in the traces as being chained from the labels in FnReturn.

The only exception (this is another subject that I haven't described yet) is when the FnReturn and FnBinding are in the different units. Then the bound label is properly called with its own frame in the unit where it belongs.

Monday, October 8, 2012

Fork revisited

I've been working on the streaming functions, and that gave me an idea for a change in scheduling. Sorry that this description is a little dense, you'd need to get the context of the old ways from the manual for the description of the changes to make sense.

If you'd want to look up the section on Basic scheduling http://triceps.sourceforge.net/docs-1.0.1/guide.html#sc_sched_basic, and the section on Loop scheduling http://triceps.sourceforge.net/docs-1.0.1/guide.html#sc_sched_loop, I've been saying that the loop logic could use some simplification, and the forking of the rowops is getting deprecated. Now I've come up with a solution for them both.

The loops required a separate label at the beginning of the loop to put a mark on its queue frame. When the loop's body unwinds and the next iteration starts, it has to avoid pushing more frames with each iteration. So it has to put the rowop for the next iteration into that beginning frame (like fork but farther up the stack), and then unwind the whole body before the beginning label picks the next rowop from its frame and runs the loop body for the next iteration.

But now one little change in the execution of the forked rowops from the frame fixes things: rather than doing a proper call and pushing a new frame for each of them, just execute them using the parent's frame. This muddles up the precise forking sequence a little (where the rowops forked by a label were guaranteed to execute before any other rowops forked by its parent). But this precision doesn't matter much: first, forking is not used much anyway, and second, the forked labels can't have an expectation that the model won't change between them being forked and executed. However this little change is very convenient for the loops.

In a loop the first label of the loop can now put the mark directly on its frame. This mark will stay there until the loop completes, executing every iteration from that point.

If we review the example from the section on Loop scheduling, with the topology

X -> A -> B -> C -> Y
     ^         |
     +---------+

Then the sequence will look like this:

Rowop X1 scheduled  on the outer frame:

[X1]

Rowop X1 executes:

[ ] ~X1
[ ]

Label X calls the first label of the loop, A, with rowop A1:

[ ] ~A1
[ ] ~X1
[ ]

The label A calls setMark() and puts the mark M on itself:

[ ] ~A1, mark M
[ ] ~X1
[ ]


The label A then calls the rowop B1 with calls the rowop C1:


[ ] ~C1

[ ] ~B1

[ ] ~A1, mark M

[ ] ~X1
[ ]


The label C loops the rowop A2 (for the second iteration of the loop) at mark M, thus placing A2 into the A1's frame.

[ ] ~C1

[ ] ~B1

[A2] ~A1, mark M

[ ] ~X1
[ ]


Then the label C returns, label B returns, and label A returns. But A1's frame is not empty yet (* shows that A1 has completed and now it's a frame without a rowop as such).

[A2] *, mark M

[ ] ~X1
[ ]


Then A2 gets taken from the frame and executed with the context of the same frame:

[ ] ~A2, mark M

[ ] ~X1
[ ]


The label A again sets the mark M, which marks the same frame, so it's pretty much a no-op (so A doesn't really have to set the mark the second time, it's just easier this way). And then it proceeds to call B and C again:


[ ] ~C2

[ ] ~B2

[ ] ~A2, mark M

[ ] ~X1
[ ]


The label C loops again back to A:


[ ] ~C2

[ ] ~B2

[A3] ~A2, mark M

[ ] ~X1
[ ]


The stack then unrolls, finds the A2's frame not empty, takes A3 from it, and continues in the same way until C decides to not loop to A any more, calling Y instead.

This has pulled with it a few more changes. The first consequence is that the frame draining doesn't happen between executing the label itself and executing its chained labels. Now it has moved to the very end. Now the label runs, then calls whatever labels are chained from it, then the frame draining happens after all the other processing has completed. If the frame is found not empty, the first label from it gets removed from the frame and "semi-called" with the same frame. If the frame is not empty again (because either the original rowop had forked/looped rowops onto it, or because the "semi-called" one did), the next label gets removed and "semi-called", and so on.

The second consequence is that this has changed the traces of the unit tracers, and I've had to add one more TracerWhen constant. Remembering the difficulties with the nesting of the traces, this was a good time to fix that too, so I've added the second TracerWhen constant. Now all of them go nicely in pairs:

TW_BEFORE, // before calling the label's execution as such
TW_AFTER, // after all the execution is done
TW_BEFORE_CHAINED, // after execution, before calling the chained labels (if they are present)
TW_AFTER_CHAINED, // after calling the chained labels (if they were present)
TW_BEFORE_DRAIN, // before draining the label's frame if it's not empty
TW_AFTER_DRAIN, // after draining the label's frame if was not empty

The TW_BEFORE/AFTER_CHAINED trace points now get called only if there actually were any chained labels to call, and TW_BEFORE/AFTER_DRAIN trace points get called only if there were anything to drain. The DRAIN trace points get always called with the original rowop that pushed this frame onto the stack first (so that matching the "before" and "after" is easy).

The full sequence in the correct order now becomes:

TW_BEFORE
TW_BEFORE_CHAINED
TW_AFTER_CHAINED 
TW_AFTER
TW_BEFORE_DRAIN
TW_AFTER_DRAIN 

But since parts of it are optional, the minimal (and most typical) one is only:

TW_BEFORE
TW_AFTER

There also are new methods to check if a particular constant (in its integer form, not as a string) is a "before" or "after". Their typical usage in a trace function, to print an opening or closing brace, looks like:

     if (Triceps::tracerWhenIsBefore($when)) {
        $msg .= " {";
    } elsif (Triceps::tracerWhenIsAfter($when)) {
        $msg .= " }";
    }


More trace points that are neither "before" or "after" could get added in the future, so a good practice is to use an elsif with both conditions rather than a simple if/else with one condition.



The third consequence is that the methods Unit::makeLoopHead() and Unit::makeLoopAround() now return only a pair of values, not a triplet. The "begin" label is not needed any more, so it's not created and not returned.

Friday, July 6, 2012

more updates

Some more stuff has been getting cleaned up:

$unit->setTracer($tracer);

now confesses on errors, so the problem with its error checking is fixed.

The tables now allow to create rowops of rows of all matching types, not only of the equal types. The approach with matching types was not consistent with what the labels did, so I've changed it.

The TableType now has the method

$tt->getRowType() 

that has the name consistent with the tables and labels. The old method rowType() also still exists.

The Opt::ck_ref() now also accepts the subclasses of the defined classes.

The new helper Fields::isStringType() has been added.

There also have been some major addition of the examples:

I've added a pretty big example of the main loop that includes the full socket handling in the chapter on Scheduling. It's not exactly production-ready but gives some idea.

Another addition in the chapter on Scheduling is an example of a topological loop that computes the Fibonacci numbers.

Many new examples have been added to the chapter on Templates.

Wednesday, June 20, 2012

more Unit methods

I still keep adding stuff. In Unit I've added 3 more methods:

$result = $unit->getStackDepth();

Returns the current depth of the call stack (the number of the  stack frames on the queue). It isn't of any use for the model logic as such but comes handy for debugging, to check in the loops that you haven't accidentally created a stack growing with iterations. When the unit is not running, the stack depth is 1, since the outermost frame always stays on the stack. When a rowop is being executed, the stack depth is at least 2.

($labelBegin, $labelNext, $frameMark) = $unit->makeLoopHead(
    $rowType, "name", $clearSub, $execSub, @args);

($labelBegin, $labelNext, $frameMark) = $unit->makeLoopAround(
    "name", $labelFirst);

The convenience methods to create the whole front part of the topological loop.

These methods use the new error handling convention, confessing on the errors. There is no need to check the result.

makeLoopHead() creates the front part of the loop that starts with a Perl label. It gets the arguments for that label and creates it among the other things. makeLoopAround() creates the front part of the loop around an existing label that will be the first one executed in the loop. makeLoopHead() is really redundant and can be replaced with a combination of makeLabel() and makeLoopAround().

They both return the same results, a triplet:

  • The label where you send a rowop to initiate the loop (remember that the loop consists of one row going through the loop at a time), $labelBegin.
  • The label that you use at the end of the loop in the loopAt() to do the next iteration of the loop, $labelNext.
  • The frame mark that you use in loopAt(), $frameMark. You don't need to set the frame mark, it will be set for you in the wrapper logic.

The name is used to construct the names of the elements by adding a dotted suffix: “name.begin”, “name.next” for makeLoopHead() or “name.wrapnext” for makeLoopAround(), “name.mark”. makeLoopAround() takes the row type for its created labels from the first label that is given to it as an argument.

The manual contains a whole new big example with them, but I see no point in copying it to the blog now, you'll have to read the manual for it.

Friday, June 15, 2012

the Unit update

I've been editing the docs about Units, and there have been a couple of developments.

First, I've noticed that I've already got a decent fix for the more serious scheduling issue. To get the predictable order with the scheduling, just always feed the rowops one by one into the model. I've been doing it in the later examples all over the place.

Second, I think I've found a nice compromise for the tracing that on one hand doesn't create deep indentation, and on the other hand lets to find the nesting boundaries easy: add "{" and "}" at the end of the lines before and after running the label. Then it can be written to a file and an editor like vi or Emacs can be used to jump from one end to the other.

Monday, May 21, 2012

Large deletes in small chunks

If you have worked with Coral8 and similar CEP systems, you should be familiar with the situation when you ask it to delete a million rows from the table and the model goes into self-contemplation for half an hour, not reacting to any requests. It starts responding again only when the deletes are finished. That's because the execution is single-threaded, and deleting a million rows takes time.

Triceps is succeptible to the same issue. So, how to avoid it? Even better, how to make the deletes work "in background", at a low priority, kicking in only when there is no other pending requests?

The solution is do do it in smaller chunks. Delete a few rows (say, a thousand or so) then check if there are any other requests. Keep processing these other request until the model becomes idle. Then continue with deleting the next chunk of rows.

Let's make a small example of it. First, let's make a table.

our $uChunks = Triceps::Unit->new("uChunks") or confess "$!";

# data is just some dumb easily-generated filler
our $rtData = Triceps::RowType->new(
    s => "string",
    i => "int32",
) or confess "$!";

# the data is auto-generated by a sequence
our $seq = 0;

our $ttData = Triceps::TableType->new($rtData)
    ->addSubIndex("fifo", Triceps::IndexType->newFifo())
or confess "$!";
$ttData->initialize() or confess "$!";
our $tData = $uChunks->makeTable($ttData,
    &Triceps::EM_CALL, "tJoin1"
) or confess "$!";
makePrintLabel("lbPrintData", $tData->getOutputLabel());

The data in the table is completely silly, just something to put in there. Even the index is a simple FIFO, just something to keep the table together. And the data will be put in there by the main loop in an equally silly way:

while(<STDIN>) {
    chomp;
    my @data = split(/,/); # starts with a command, then string opcode
    my $type = shift @data;
    if ($type eq "data") {
        my $count = shift @data;
        for (; $count > 0; $count--) {
            ++$seq;
            $uChunks->makeHashCall($tData->getInputLabel(), "OP_INSERT",
                s => ("data_" . $seq),
                i => $seq,
            ) or confess "$!";
        }
    } elsif ($type eq "dump") {
        for (my $rhit = $tData->begin(); !$rhit->isNull(); $rhit = $rhit->next()) {
            print("dump: ", $rhit->getRow()->printP(), "\n");
        }
    }
    $uChunks->drainFrame(); # just in case, for completeness
}

When we send the command like "data,3", the mail loop will insert 3 new rows into the table. The contents is generated with sequential numbers, so the rows can be told apart. As the table gets changed, the updates get printed by the label lbPrintData. Also the contents of the table can be dumped with the main loop command "dump". Now let's add a main loop command to clear the table, initially by going through all the data and deleting it at once.

# notifications about the clearing
our $rtNote = Triceps::RowType->new(
    text => "string",
) or confess "$!";

our $lbReportNote = $uChunks->makeDummyLabel($rtNote, "lbReportNote"
) or confess "$!";
makePrintLabel("lbPrintNote", $lbReportNote);

# code that clears the table
our $lbClear = $uChunks->makeLabel($rtNote, "lbClear", undef, sub {
    my $next;
    for (my $rhit = $tData->begin(); !$rhit->isNull(); $rhit = $next) {
        $next = $rhit->next(); # advance before removal
        $tData->remove($rhit);
    }
    $uChunks->makeHashCall($lbReportNote, "OP_INSERT",
        text => "done clearing",
    ) or confess "$!";
}) or confess "$!";

It's done in a bit round-about way: the main loop will send the clearing notification row to the label lbClear. Which does the clearing, then sends a notification that the clearing has completed to the label lbReportNote. Which eventually gets printed.

In the real world not the whole table would be erased but only the old data, from before a certain date. I've shown it before in the traffic aggregation example, ordering the rows by a date field, and deleting until you see a newer row. Here for simplicity all the data get wiped out.

The part of the main loop responsible for the clearing command is:

    elsif ($type eq "clear") {
        $uChunks->makeHashCall($lbClear, "OP_INSERT",
            text => "clear",
        ) or confess "$!";
    } 

With the basic clearing done, time to add the chunking logic. First, add a tray to collect things that need to be done when the model is idle:

our $trayIdle = $uChunks->makeTray();

Then modify the lbClear code to work with the limited chunks:

# code that clears the table in small chunks
our $lbClear = $uChunks->makeLabel($rtNote, "lbClear", undef, sub {
    my $limit = 2; # no more than 2 rows per run
    my $next;
    for (my $rhit = $tData->begin(); !$rhit->isNull(); $rhit = $next) {
        if ($limit-- <= 0) {
            # request to be called again when the model becomes idle
            $trayIdle->push($_[0]->adopt($_[1]));
            return;
        }
        $next = $rhit->next(); # advance before removal
        $tData->remove($rhit);
    }
    $uChunks->makeHashCall($lbReportNote, "OP_INSERT",
        text => "done clearing",
    ) or confess "$!";
}) or confess "$!";

Since it's real inconvenient to play with a million rows, we'll play with just a few rows. And so the chunk size limit is also set smaller, to just two rows instead of a thousand. When the limit is reached, the code pushes the command row to the idle tray for later rescheduling and returns. The adoption part is not strictly necessary, and this small example would work fine without it. But it's a safeguard for the more complicated programs that may have the labels chained, with our clearing label being just one link in a chain. If the incoming rowop gets rescheduled as is, the whole chain will get executed again. which might not be desirable. Re-adopting it to our label will cause only our label (okay, and everything chained from it) to be executed.

How would the rowops in the idle tray get executed? In the real world, the main loop logic would be like this pseudocode:

while(1) {
  if (idle tray empty)
    timeout = infinity;
  else
    timeout = 0;
  poll(file descriptors, timeout);
  if (poll timed out)
    run the idle tray;
  else
    process the incoming data;
}

But it's hugely inconvenient for a toy demonstration, getting the timing right would be a major pain. So instead let's just add an extra command "idle" to the main loop, to trigger the idle logic at will:

    elsif ($type eq "idle") {
        $uChunks->schedule($trayIdle);
        $trayIdle->clear();
    }

And while at it, let's make the "dump" command show the contents of the idle tray as well:

        for my $r ($trayIdle->toArray()) {
            print("when idle: ", $r->printP(), "\n");
        }

All the pieces have been put together, let's run the code. As usual, the input lines are shown in italics:

data,1
tJoin1.out OP_INSERT s="data_1" i="1" 
clear
tJoin1.out OP_DELETE s="data_1" i="1" 
lbReportNote OP_INSERT text="done clearing" 

This is pretty much a dry run: put in one row (less than the chunk size), see it deleted on clearing. And see the completion reported afterwards.

data,5
tJoin1.out OP_INSERT s="data_2" i="2" 
tJoin1.out OP_INSERT s="data_3" i="3" 
tJoin1.out OP_INSERT s="data_4" i="4" 
tJoin1.out OP_INSERT s="data_5" i="5" 
tJoin1.out OP_INSERT s="data_6" i="6" 

Add more data, which will be enough for three chunks.

clear
tJoin1.out OP_DELETE s="data_2" i="2" 
tJoin1.out OP_DELETE s="data_3" i="3" 

Now the clearing  does one chunk and stops, waiting for the idle condition.

dump
dump: s="data_4" i="4" 
dump: s="data_5" i="5" 
dump: s="data_6" i="6" 
when idle: lbClear OP_INSERT text="clear" 

See what's inside: the remaining 3 rows, and a row in the idle tray saying that the clearing is in progress.

idle
tJoin1.out OP_DELETE s="data_4" i="4" 
tJoin1.out OP_DELETE s="data_5" i="5" 

The model goes idle once more, one more chunk of two rows gets deleted.

data,1
tJoin1.out OP_INSERT s="data_7" i="7"
dump
dump: s="data_6" i="6" 
dump: s="data_7" i="7" 
when idle: lbClear OP_INSERT text="clear" 

What will happen if we add more data in between the chunks of clearing? Let's see, let's add one more row. It shows up in the table as usual.

idle
tJoin1.out OP_DELETE s="data_6" i="6" 
tJoin1.out OP_DELETE s="data_7" i="7" 
lbReportNote OP_INSERT text="done clearing" 
dump
idle

And on the next idle condition the clearing picks up whatever was in the table for the next chunk. Since there were only two rows left, it's the last chunk, and the clearing reports a successful completion. And a dump shows that there is nothing left in the table nor in the idle tray. And the next idle condition does nothing, because the idle tray is empty.

The delete-by-chunks logic can be made into a template, just I'm not sure yet what is the best way to do it. It would have to have a lot of configurable parts.

 On another subject, scheduling the things to be done on idle adds an element of unpredictability to the model. It's impossible to predict the exact timing of the incoming requests, and the idle work may get inserted between any of them. Presumably it's OK because the data being deleted should not be participating in any logic at this time any more. For repeatability in the unit tests, make the chunk size adjustable and adjust it to a size larger than the biggest amount of data used in the unit tests.

A similar logic can also be used in querying the data. But it's more difficult. For deletion the continuation is easy: just take the first row in the index, and it will be the place to continue (because the index is ordered correctly, and because the previous rows are getting deleted). For querying you would have to remember the next row handle and continue from it. Which is OK if it can not get deleted in the meantime. But if it can get deleted, you'll have to keep track of that too, and advance to the next row handle when this happens. And if you want to receive a full snapshot with the following subscription to all updates, you'd have to check whether the modified rows are before or after the marked handle, and pass them through if they are before it, letting the user see the updates to the data already received. And since the data is being sent to the user, filling up the output buffer and stopping would stop the whole model too, and not restart until the user reads the buffered data. So there has to be a flow control logic that would stop the query when output buffer fills up, return to the normal operation, and then reschedule the idle job for the query only when the output buffer drains down. I've kind of started on doing an example of the chunked query too, but then because of all these complications decided to leave it for later.

Monday, May 14, 2012

The new error handling

I've been considering the change to the error handling for a while. The repeating checks of the calls for "or confess" are pretty annoying, just dying/confessing by default would be better. And then if anyone wants to catch that death, they can use eval {} around the call.

The need to check for the recursive modification attempts in the tables struck me as something that could particularly benefit from just dying rather than returning an error code that would likely be missed. This pushed me towards starting the shift towards this new error handling scheme.

With the interaction through the Perl and the native C++ code, unrolling the call stack is a pretty tricky proposition but I've got it worked out. If the error is not caught with eval, it keeps unrolling the call sequence through both the Perl call stack and the Triceps unit call stack. If the error is caught with eval, the message and the whole stack trace will be as usual in $@.

The bad news is that so far only a limited subset of the calls use the new scheme. The rest are still setting $! and returning an undef. So overall it's a mix and you need to remember, which calls work which way. In the future versions eventually everything will be converted to the new scheme. For a bit of backwards-compatibility, the error messages from the new-style dying calls are saved in both $@ and $!. This will eventually go away, and only $@ will be used.

The converted calls are:
  • The Table modification methods:
    • insert()
    • remove()
    • deleteRow()
  • The Unit methods that deal with the scheduling:
    • schedule()
    • fork()
    • call()
    • enqueue()
    • setMark()
    • loopAt()
    • callNext()
    • drainFrame()
    • clearLabels()
If the code dies in the middle of executing a Perl label's handler, the scheduler will catch it as before, but instead of just printing an error message on stderr and continuing, it will start unrolling the Unit call stack, collecting the label call trace until it eventually gets back to the Perl level, which in order will die. If not caught by eval, that would continue the unrolling, until the outermost Perl code dies and prints the whole stack trace.

Note though that right now this works only with the label handlers. The handlers for the tracers, aggregators, sorted indexes etc. still work in the old way, with an error message printed to stderr.

These errors from the label handlers are not to be treated lightly. Usually you can't just catch them be an eval and continue on your way. The reason is that as the Unit scheduling stack gets unrolled, any unprocessed rowops in it get thrown away. By the time you catch the error, the data is probably in an inconsistent state, and you can't just dust off and continue. You would have to reset your model to a good state first. Treat such errors as near-fatal. It could have been possible to keep going through the scheduled rowops, collecting the errors along the way and then returning the whole pile. But it seems more important to report the error as soon as possible. And anyway, if something has died somewhere, it has probably already left some state inconsistent, and continuing to run forward as normal would just pile up crap upon crap. If you want the errors to be handled early and lightly, make sure that your Perl code doesn't die in the first place.

Another added item is an explicit check that the labels are not called recursively. That is, if a label is called, it can not call itself again, directly or through the other labels, until it returns. Such recursive calls don't hurt anything by themselves but they are a bad design practice, and it seems more important to catch the accidental errors of this kind early than to leave the door open for the intentional use of them by design. If you want a label's processing to loop back to itself, the proper way it to arrange it with schedule() or loopAt().

Tuesday, January 24, 2012

Yes bundling

Even though Triceps does no bundling in scheduling, there still is a need to store the sequences of row operations. This is an important distinction, since the stored sequences are to be scheduled somewhere in the future (or maybe not scheduled at all, but iterated through manually). If and when they get scheduled, they will be unbundled. The ordered storage only provides the order for that future scheduling or iteration.

The easiest way to store rowops is to put them into the Perl arrays, like:

my @ops = ($rowop1, $rowop2);
push @ops, $rowop3;

However the C++ internals of Triceps do not know about the Perl arrays. And some of them work directly with the sequences of rowops. So Triceps defines an internal sort-of-equivalent of Perl array for rowops, called a Tray.

The trays have first been used to "catch" the side effects of operations on the stateful elements, so the name "tray" came from the metaphor "put a tray under it to catch the drippings".

The trays get created as:

$tray = $unit->makeTray($rowop, ...) or die "$!";

A tray always stores rowops for only one unit. It can be only used in one thread.  A tray can be used in all the scheduling functions, just like the direct rowops:

$unit->call($tray);
$unit->fork($tray);
$unit->schedule($tray);
$unit->loopAt($mark, $tray);

Moreover, the single rowops and  trays can be mixed in the multiple arguments of these functions, like:

$unit->call($rowopStartPkg, $tray, $rowopEndPkg);

In this example nothing really stops you from placing the start and end rows into the tray too. A tray may contain the rowops of any types mixed in any order. This is by design, and it's an important feature that allows to build the protocol blocks out of rowops and perform an orderly data exchange. This feature is an absolute necessity for proper inter-process and inter-thread communication.

The ability to send the rows of multiple types through the same channel in order is a must, and its lack makes the communication with some other CEP systems exceedingly difficult. Coral8 supports only one stream per connection. Aleri (and I believe Sybase R5) allows to send multiple streams through the same connection but has no guarantees of order between them. I don't know about the others, check yourself.

To iterate on a tray, it can be converted to a Perl array:

@array = $tray->toArray();

The size of the tray (the count of rowops in it) can be read directly without a conversion, and the unit can be read back too:

$size = $tray->size();
$traysUnit = $tray->getUnit();

Another way to create a tray is by copying an existing one:

$tray2 = $tray1->copy();

This copies the contents (which is the references to the rowops) and does not create any ties between the trays. The copying is really just a more efficient way to do

$tray2 = $tray1->getUnit()->makeTray($tray1->toArray());

The tray references can be checked, whether they point to the same tray object:

$result = $tray1->same($tray2);

The contents of a tray may be cleared. Which is convenient and more efficient than discarding a tray and creating another one:

$tray->clear();

The data may be added to any tray:

$tray->push($rowop, ...);

Multiple rowops can be pushed in a single call. There are no other Perl-like operations on a tray: it's either create from a set of rowops, push, or convert to Perl array.

Note that the trays are mutable, unlike the rows and rowops. Multiple references to a tray will see the same contents. If a rowop is added to a tray through one reference, it will be visible through all the others.

Wednesday, January 18, 2012

Execution Unit

After discussing the principles of scheduling in Triceps, let's get down to the nuts and bolts.

An execution unit represents one logical thread of Triceps. In some special cases more that one unit per actual thread may be useful, but never one unit shared between threads.

A unit is created as:

$myUnit = Triceps::Unit->new("name") or die "$!";

The name argument as usual is used for later debugging, and by convention should be the same as the name of the unite variable ("myUnit" in this case). The name can also be changed later:

$myUnit->setName("newName");

It returns no value. Though in practice there is no good reason for it, and this call will likely be removed in the future. The name can be received back:

$name = $myUnit->getName();

Also, as usual, the variable $myName here contains a reference to the actual unit object, and two references can be compared, whether they refer to the same object:

$result = $unit1->same($unit2);

The rowops are enqueued with the calls:

$unit->call($rowop, ...) or die "$!";
$unit->fork($rowop, ...) or die "$!";
$unit->schedule($rowop, ...) or die "$!"; 

Also there is a call that selects the enqueueing mode by argument:

$unit->enqueque($mode, $rowop, ...) or die "$!";

Multiple rowops can be specified as arguments. Calling these functions with multiple arguments produces the same result as doing multiple calls with one argument at a time. Not only rowops but also trays (to be discussed later) of rowops can be used as arguments.

The mode for enqueue() is one of either Triceps constants

&Triceps::EM_CALL
&Triceps::EM_FORK
&Triceps::EM_SCHEDULE

or the matching strings "EM_CALL", "EM_FORK", "EM_SCHEDULE". As usual, the constant form is more efficient. There are calls to convert between the constant and string representations:

$string = &Triceps::emString($value);
$value = &Triceps::stringEm($string);

As usual, if the value can not be translated they return undef.

The frame marks for looping are created as their own class:

$mark = Triceps::FrameMark->new("name") or die "$!";

The name can be received back from the mark:

$name = $mark->getName();

Other than that, the frame marks are completely opaque, and can be used only for the loop scheduling. Not even the same() method is supported for them at the moment, though it probably will be in the future. The mark gets set and used as:

$unit->setMark($mark);
$unit->loopAt($mark, $rowop, ...) or die "$!";

The rowop arguments of the loopAt() are the same as for the other enqueueing functions.

The methods for creation of labels have been already discussed. There also are similar methods for creation of tables and trays that will be discussed later:

$label = $unit->makeDummyLabel($rowType, "name") or die "$!";
$label = $unit->makeLabel($rowType, "name",
  $clearSub, $execSub, @args) or die "$!";
$table = $unit->makeTable($tableType, $endMode, "name") or die "$!";
$tray = $unit->makeTray(@rowops) or die "$!"; 

The unit can be checked for the emptiness of its queues:

$result = $unit->empty();

The functions for execution are:

$unit->callNext();
$unit->drainFrame();

The callNext() takes one label from the top stack frame queue and calls it. If the innermost frame happens to be empty, it does nothing. The drainFrame() calls the rowops from the top stack frame until it becomes empty. This includes any rowops that may be created and enqueued as part of the execution of the previous rowops.

A typical way of processing the incoming rowops in a loop is:

$stop = 0; 
while (!$stop) {
  $rowop = readRowop(); # some user-defined function
  $unit->schedule($rowop);
  $unit->drainFrame();
} 

All the unit's labels get cleared with the call

$unit->clearLabels();

To not forget calling it, a separate clearing trigger object can be created:

my $clearUnit = $unit->makeClearingTrigger();

The variable $clearUnit would normally be a global (in a thread) variable. Don't copy the reference to the other variables! Then when the thread completes and the global variables get destroyed, the trigger object will be also destroyed, and will trigger the clearing of the unit's labels, thus breaking up any reference loops and allowing to destroy the bits and pieces.

The only item left is the tracers, and they will be described in a separate post.

Tuesday, January 17, 2012

Issues with Triceps scheduling

Some of the issues have been already mentioned in the description of the loop scheduling: it's a bit confusing that the frame mark is placed on the next outer scheduling stack frame and not on the current one. This leads to the interesting effects in execution order.

But there are issues with the basic execution too:

The schedule() call, when used from inside the scheduled code, introduces unpredictability in the execution order: it puts the rowop after the last rowop in the outermost stack frame. But the outermost stack frame contains the queue of rowops that come from the outside. This means that the exact order of execution will depend on the timing of the rowops arriving from outside. Because of this, if the repeatable execution order is important, schedule() should be used only for the rowops coming from the outside.

The same issue happens with the loops that have been temporarily stopped and then resumed on arrival of more data from outside. The mark of such a loop will be unset when the loop continues, and looping at this mark will be equivalent to schedule(), having the same repeatability problem.

The call fork() is not exactly useful. Its main purpose was when I've thought that it's the solution to the problem of the loops. Which it has turned out to not solve, and another solution had to be devised. Now it really doesn't have much use, and will probably be removed in the future.

I have a few ideas for solutions of these issues, but they will need a bit more experimentation. Just keep in mind that the scheduling will be reformed in the future. It will still have the same general shape but differ in detail.

Sunday, January 15, 2012

loop scheduling

The easiest way to schedule the loops is to do it procedurally, something like this:

foreach my $row (@rowset) {
  $unit->call($lbA->makeRowop(&Triceps::OP_INSERT, $row)); 
}

However the labels topologically connected into a loop can come handy as well. Some logic may be easier to express this way. Suppose the model contains the labels connected in a loop:

X->A->B->C->Y
   ^     |
   +-----+


Suppose a rowop X1 is scheduled for label X, and causes the loop executed twice, with rowops X1, A2, B3, C4, A5, B6, C7, Y8. If each operation is done as a CALL, the stack grows like this: It starts with X1 scheduled.

[X1]

Which then gets executed, with its own execution frame (marked as such for clarity:

[ ] of X1
[ ]

Which then calls A2:

[ ] of A2
[ ] of X1
[ ]

By the time the execution comes to Y8, the stack looks like:

[ ] of Y8
[ ] of C7
[ ] of B6
[ ] of A5
[ ] of C4
[ ] of B3
[ ] of A2
[ ] of X1
[ ]

The loop has been converted into recursion, and the whole length of execution is the deep of the recursion. If the loop executes a million times, the stack will be two million levels deep. Worse yet, it's not just the Triceps scheduler stack that grows, it's also the process (C++) stack.

Would things be better with FORK instead of CALL used throughout the loop? It starts the  same way:

[X1]

Then X1 executes, gets its own frame and forks A2:

[A2] of X1
[ ]

Then A2 executes, gets its own frame and forks B3:

[B3] of A2
[ ] of X1
[ ]

By the end of the loop the picture becomes exactly the same as with CALL. For a while I've thought that optimizing out the empty stack frames would solve the problem, but no, that doesn't work: the problem is that the C++ process stack keeps growing no matter what. The jump back in the loop needs to be placed into an earlier stack frame.

One way to do it would be to use the SCHEDULE operation in C to jump back to A, placing the rowop A5 back onto the outermost frame. The scheduler stack at the end of C4 would look like:

[ ] of C4
[ ] of B3
[ ] of A2
[ ] of X1
[A5]

Then the stack would unwind back to

[A5]

and the next iteration of the loop will start afresh. The problem here is that if X1 wanted to complete the loop and then do something, it can't. By the time the second iteration of the loop starts, X1 is completely gone. It would be better to be able to enqueue the next execution of the loop at the specific point of the stack.

Here the concept of the frame mark comes in: a frame mark is a token object, completely opaque to the program. It can be used only in two operations:

  • setMark() remembers the position in the frame stack, just outside the current frame
  • loopAt() enqueues a rowop at the marked frame

Then the loop wold have its mark object M. The label A will execute setMark(M), and the label C will execute loopAt(M, rowop(A)). The rest of the execution can as well use call().

When A2 calls setMark(M), the stack will look like this:

[ ] of A2
[ ] of X1 * mark M
[ ]

The mark M remembers the frame one outer to the current one. The stack at the end of C4, after it has called loopAt(M, A5), is:

[ ] of C4
[ ] of B3
[ ] of A2
[A5] of X1 * mark M
[ ]

The stack then unwinds until A5 starts its execution:

[ ] of A5
[ ] of X1 * mark M
[ ]

Each iteration starts with a fresh stack, and the stack depth is limited to one iteration. The nested loops can also be properly executed.

Now, why does the mark is placed on the frame that is one out from the current one? Suppose that it did remember the current frame. Then at the end of C4 the stack will be:

[ ] of C4
[ ] of B3
[A5] of A2 * mark M
[ ] of X1
[ ]

The stack will unwind until A5. Which would then have its own frame pushed onto the stack, and call setMark(M) again, moving the mark to its own frame:

[ ] of A5 * mark M
[ ] of A2
[ ] of X1
[ ]

So on each iteration of the loop one extra frame will be pushed onto the stack, and the mark moved by one level. A loop executing a million times will push a million frames, which is bad. Marking the next outer frame prevents this.  Another option would have been to put the mark in X, but that would mean that every loop must have a preceding label that just marks the frame (well, and potentially could do the other initializations too), which seems to be too annoying.

However as things are, another problem is that if X does call(A2), when it returns, the loop would not be completed yet, only the first iteration would be completed. To have the whole loop completed, there would have to be another label W, and when W does call(X1), the loop would be completed.

This is still messy, and I'm still thinking about the ways to improve the situation.

What happens after the stack unwinds past the mark? The mark gets unset. When someone calls loopAt() with an unset mark, the rowop is enqueued in the outermost frame, having the same effect as schedule().

rowop sets the condition, it would free that original row and make it continue through the loop.  Eventually the loop will come to the looping point, calling loopAt(). But the original mark will be long unset. Scheduling at the outermost frame seems to be a logical thing to do at this point.

What if setMark() is called when there is only one frame on the stack? Then there is no second frame outer to it. The mark will simply be left unset.

Sunday, January 8, 2012

Basic scheduling

The Triceps scheduler provides 3 basic ways of executing of a label:

  • Call: execute the label right now, including all the nested calls. All of this will be completed after the call returns.
  • Fork: execute the label after the current label returns but before anything else is done. Obviously, if multiple labels are forked, they will execute in order after the current label returns (but before its caller gets the control back).
  • Schedule: execute the label after everything else is done.
How it works inside:

A scheduler in the execution unit keeps a stack of queues. Each queue is essentially a stack frame. The stack always contains at least one queue, which is called the outermost stack frame. 

When the new rowops come from the outside world, they are added with schedule() to that stack frame. That's what schedule() does: always adds messages to the outermost stack frame. If rowops 1, 2 and 3 are added, the stack looks like this:

[1, 2, 3]

The unit method drainFrame() is then used to process the rowops. It makes the unit call each label on the innermost frame (which is at the moment the same as outermost frame) in order.

First it calls the rowop 1. It's removed from the queue, then a new frame is pushed onto the stack

[ ]
[2, 3]

Then the rowop 1 executes. If it calls rowop 4, another frame is pushed onto the stack

[ ]
[ ]
[2, 3]

then the rowop 4 executes. After it is finished (not calling any other rowops), the outermost empty frame is popped before the execution of rowop 1 continues.

[ ]
[2, 3]


Suppose then rowop 1 forks rowops 5 and 6. They are appended to the innermost frame.

[5, 6]
[2, 3]

If rowop 1 calls rowop 7, again a frame is pushed onto the stack before it executes

[ ]
[5, 6]
[2, 3]


Again, note that a call does not put the target rowop into any scheduler queue. The identity of rowop being processed is just kept in the call context. A call also involves a direct C++ call on the thread stack, and if any Perl code is involved, a Perl call too. Because of this, if you nest the calls too deeply, you may run out of the thread stack space.

Returning back to the sequence, after the call of rowop 7 completes, the scheduler stack returns to

[5, 6]
[2, 3]


Suppose now the execution of rowop 1 completes. But its call is not done yet, because its stack queue is not empty. The scheduler calls drainFrame() recursively, which picks the next rowop from the innermost queue (rowop 5), and calls it, pushing a new stack frame and executing the rowop 5 code:

[ ]
[6]
[2, 3]

If rowop 5 forks rowop 8, the stack becomes:

[8]
[6]
[2, 3]

When the execution of rowop 5 returns, its queue is also not empty. So the scheduler calls rowop 8. During its execution the stack is:

[ ]
[ ]
[6]
[2, 3]

Suppose the rowop 8 doesn't call or fork anything else and returns. Its innermost queue is empty, so the call completes and pops the stack frame.

[ ]
[6]
[2, 3]

Now the queue of rowop 5 is also empty, so its call completes and pops the stack frame.

[6]
[2, 3]

The call of rowop 6 begins.

[ ]
[ ]
[2, 3]

Suppose rowop 6 calls schedule() of rowop 9. Rowop 9 is then added to the outermost queue:

[ ]
[ ]
[2, 3, 9]

Rowop 6 then returns, its queue is empty, so it's popped and its call completes.

[ ]
[2, 3, 9]

Now the queue of rowop 1 has become empty, so it's popped from the stack and the call fo rowop 1 completes.

[2, 3, 9]

The unit method drainFrame() keeps running, now taking the rowop 2 and executing it, and so on, until the outermost queue becomes empty, and drainFrame() returns.

Saturday, January 7, 2012

No bundling

The most important principle of Triceps scheduling is: No Bundling. Every rowop is for itself. The bundling is what messes up the Coral8 scheduler the most.


What is a bundle? It's a set of records that go through the execution together. If you have two functional elements F1 and F2 arranged in a sequential fashion F1-> F2, and a few loose records R1, R2, R3, the normal execution order will be:


F1(R1), F2(R1),
F1(R2), F2(R2),
F1(R3), F2(R3)

If the same records are placed in a bundle, the execution order will be different:


F1(R1), F1(R2), F1(R3),
F2(R1), F2(R2), F2(R3)

That would not be a problem, and even could be occasionally useful, if the bundles were always created explicitly. In reality every time a statement produces multiple record from a single one (think of a join that picks multiple records from another side), it creates a bundle and messes up all the logic after it. Some logic gets affected so badly that a few statements in CCL (like ON UPDATE) had to be designated as always ignoring the bundles, otherwise they would not work at all. At DB I wrote a CCL pattern for breaking up the bundles. It's rather heavyweight and thus could not be used all over the place but provides a generic solution for the most unpleasant cases.

Worse yet, the bundles may get created in Coral8 absolutely accidentally: if two records happen to have the same timestamp, for all practical purposes they would act as a bundle. In models that were designed without the appropriate guards, this leads to the time-based bugs that are hard to catch and debug. Writing these guards correctly is hard, and testing them is even harder.

Another issue with bundles is that they make the large queries slower. Suppose you do a query from a window that returns a million records.  All of them will be collected in a bundle, then the bundle will be sent to the interface gateway that would build one huge protocol packet, which will then be sent to the client, which will receive the whole packet and then finally iterate on the records in it. Assuming that nothing runs out of memory along the way, it will be a long time until the client sees the first record.  Very, very annoying.

Aleri also has its own version of bundles, called transactions, but a more smart one. Aleri always relies on the primary keys. The condition for a transaction is that it must never contain multiple modification for the same primary key. Since there are no execution order guarantees between the functional elements, in this respect the transactions work in the same way as loose records, only with a more efficient communication between threads. Still, if the primary key changes in an element, the condition does not propagate through it. Such elements have to internally collapse the outgoing transactions along the new key, adding overhead.

Introduction to Triceps scheduling

The main point of an execution unit in Triceps is scheduling of the execution of the row operations. It keeps a queue of the operations and selects, which one to execute next. The scheduling is important for a predictable execution order within a single thread.

There are multiple approaches to scheduling. Aleri essentially doesn't have any, except for the flow control between threads, because each its element is a separate thread. Coral8 has an intricate scheduling algorithm. Sybase R5 has the same logic as Coral8 inside each thread. StreamBase presumably also has some.

The scheduling logic in Triceps is different from the other CEP systems. The Coral8 logic looks at first like the only reasonable way to go, but could not be used for three reasons: First, it's a trade secret, so it can't be simply reused. If I'd never seen it, that would not be an issue but I've worked on it and implemented its version for R5. Second, it relies on the properties that the compiler computes from the model graph analysis. Triceps has no compiler, and could not do this. Third, in reality it simply doesn't work that well. There are quite a few cases when the Coral8 scheduler comes up with a strange and troublesome execution order.

For a while I've hoped that Triceps would need no scheduler at all, and everything would be handled by the procedural calls.This has proved to have its own limitations, and thus the labels and their scheduling were born. The Triceps scheduling still has issues to resolve, but overall still feels much better than the Coral8 one.