Monday, May 21, 2012

Large deletes in small chunks

If you have worked with Coral8 and similar CEP systems, you should be familiar with the situation when you ask it to delete a million rows from the table and the model goes into self-contemplation for half an hour, not reacting to any requests. It starts responding again only when the deletes are finished. That's because the execution is single-threaded, and deleting a million rows takes time.

Triceps is succeptible to the same issue. So, how to avoid it? Even better, how to make the deletes work "in background", at a low priority, kicking in only when there is no other pending requests?

The solution is do do it in smaller chunks. Delete a few rows (say, a thousand or so) then check if there are any other requests. Keep processing these other request until the model becomes idle. Then continue with deleting the next chunk of rows.

Let's make a small example of it. First, let's make a table.

our $uChunks = Triceps::Unit->new("uChunks") or confess "$!";

# data is just some dumb easily-generated filler
our $rtData = Triceps::RowType->new(
    s => "string",
    i => "int32",
) or confess "$!";

# the data is auto-generated by a sequence
our $seq = 0;

our $ttData = Triceps::TableType->new($rtData)
    ->addSubIndex("fifo", Triceps::IndexType->newFifo())
or confess "$!";
$ttData->initialize() or confess "$!";
our $tData = $uChunks->makeTable($ttData,
    &Triceps::EM_CALL, "tJoin1"
) or confess "$!";
makePrintLabel("lbPrintData", $tData->getOutputLabel());

The data in the table is completely silly, just something to put in there. Even the index is a simple FIFO, just something to keep the table together. And the data will be put in there by the main loop in an equally silly way:

while(<STDIN>) {
    chomp;
    my @data = split(/,/); # starts with a command, then string opcode
    my $type = shift @data;
    if ($type eq "data") {
        my $count = shift @data;
        for (; $count > 0; $count--) {
            ++$seq;
            $uChunks->makeHashCall($tData->getInputLabel(), "OP_INSERT",
                s => ("data_" . $seq),
                i => $seq,
            ) or confess "$!";
        }
    } elsif ($type eq "dump") {
        for (my $rhit = $tData->begin(); !$rhit->isNull(); $rhit = $rhit->next()) {
            print("dump: ", $rhit->getRow()->printP(), "\n");
        }
    }
    $uChunks->drainFrame(); # just in case, for completeness
}

When we send the command like "data,3", the mail loop will insert 3 new rows into the table. The contents is generated with sequential numbers, so the rows can be told apart. As the table gets changed, the updates get printed by the label lbPrintData. Also the contents of the table can be dumped with the main loop command "dump". Now let's add a main loop command to clear the table, initially by going through all the data and deleting it at once.

# notifications about the clearing
our $rtNote = Triceps::RowType->new(
    text => "string",
) or confess "$!";

our $lbReportNote = $uChunks->makeDummyLabel($rtNote, "lbReportNote"
) or confess "$!";
makePrintLabel("lbPrintNote", $lbReportNote);

# code that clears the table
our $lbClear = $uChunks->makeLabel($rtNote, "lbClear", undef, sub {
    my $next;
    for (my $rhit = $tData->begin(); !$rhit->isNull(); $rhit = $next) {
        $next = $rhit->next(); # advance before removal
        $tData->remove($rhit);
    }
    $uChunks->makeHashCall($lbReportNote, "OP_INSERT",
        text => "done clearing",
    ) or confess "$!";
}) or confess "$!";

It's done in a bit round-about way: the main loop will send the clearing notification row to the label lbClear. Which does the clearing, then sends a notification that the clearing has completed to the label lbReportNote. Which eventually gets printed.

In the real world not the whole table would be erased but only the old data, from before a certain date. I've shown it before in the traffic aggregation example, ordering the rows by a date field, and deleting until you see a newer row. Here for simplicity all the data get wiped out.

The part of the main loop responsible for the clearing command is:

    elsif ($type eq "clear") {
        $uChunks->makeHashCall($lbClear, "OP_INSERT",
            text => "clear",
        ) or confess "$!";
    } 

With the basic clearing done, time to add the chunking logic. First, add a tray to collect things that need to be done when the model is idle:

our $trayIdle = $uChunks->makeTray();

Then modify the lbClear code to work with the limited chunks:

# code that clears the table in small chunks
our $lbClear = $uChunks->makeLabel($rtNote, "lbClear", undef, sub {
    my $limit = 2; # no more than 2 rows per run
    my $next;
    for (my $rhit = $tData->begin(); !$rhit->isNull(); $rhit = $next) {
        if ($limit-- <= 0) {
            # request to be called again when the model becomes idle
            $trayIdle->push($_[0]->adopt($_[1]));
            return;
        }
        $next = $rhit->next(); # advance before removal
        $tData->remove($rhit);
    }
    $uChunks->makeHashCall($lbReportNote, "OP_INSERT",
        text => "done clearing",
    ) or confess "$!";
}) or confess "$!";

Since it's real inconvenient to play with a million rows, we'll play with just a few rows. And so the chunk size limit is also set smaller, to just two rows instead of a thousand. When the limit is reached, the code pushes the command row to the idle tray for later rescheduling and returns. The adoption part is not strictly necessary, and this small example would work fine without it. But it's a safeguard for the more complicated programs that may have the labels chained, with our clearing label being just one link in a chain. If the incoming rowop gets rescheduled as is, the whole chain will get executed again. which might not be desirable. Re-adopting it to our label will cause only our label (okay, and everything chained from it) to be executed.

How would the rowops in the idle tray get executed? In the real world, the main loop logic would be like this pseudocode:

while(1) {
  if (idle tray empty)
    timeout = infinity;
  else
    timeout = 0;
  poll(file descriptors, timeout);
  if (poll timed out)
    run the idle tray;
  else
    process the incoming data;
}

But it's hugely inconvenient for a toy demonstration, getting the timing right would be a major pain. So instead let's just add an extra command "idle" to the main loop, to trigger the idle logic at will:

    elsif ($type eq "idle") {
        $uChunks->schedule($trayIdle);
        $trayIdle->clear();
    }

And while at it, let's make the "dump" command show the contents of the idle tray as well:

        for my $r ($trayIdle->toArray()) {
            print("when idle: ", $r->printP(), "\n");
        }

All the pieces have been put together, let's run the code. As usual, the input lines are shown in italics:

data,1
tJoin1.out OP_INSERT s="data_1" i="1" 
clear
tJoin1.out OP_DELETE s="data_1" i="1" 
lbReportNote OP_INSERT text="done clearing" 

This is pretty much a dry run: put in one row (less than the chunk size), see it deleted on clearing. And see the completion reported afterwards.

data,5
tJoin1.out OP_INSERT s="data_2" i="2" 
tJoin1.out OP_INSERT s="data_3" i="3" 
tJoin1.out OP_INSERT s="data_4" i="4" 
tJoin1.out OP_INSERT s="data_5" i="5" 
tJoin1.out OP_INSERT s="data_6" i="6" 

Add more data, which will be enough for three chunks.

clear
tJoin1.out OP_DELETE s="data_2" i="2" 
tJoin1.out OP_DELETE s="data_3" i="3" 

Now the clearing  does one chunk and stops, waiting for the idle condition.

dump
dump: s="data_4" i="4" 
dump: s="data_5" i="5" 
dump: s="data_6" i="6" 
when idle: lbClear OP_INSERT text="clear" 

See what's inside: the remaining 3 rows, and a row in the idle tray saying that the clearing is in progress.

idle
tJoin1.out OP_DELETE s="data_4" i="4" 
tJoin1.out OP_DELETE s="data_5" i="5" 

The model goes idle once more, one more chunk of two rows gets deleted.

data,1
tJoin1.out OP_INSERT s="data_7" i="7"
dump
dump: s="data_6" i="6" 
dump: s="data_7" i="7" 
when idle: lbClear OP_INSERT text="clear" 

What will happen if we add more data in between the chunks of clearing? Let's see, let's add one more row. It shows up in the table as usual.

idle
tJoin1.out OP_DELETE s="data_6" i="6" 
tJoin1.out OP_DELETE s="data_7" i="7" 
lbReportNote OP_INSERT text="done clearing" 
dump
idle

And on the next idle condition the clearing picks up whatever was in the table for the next chunk. Since there were only two rows left, it's the last chunk, and the clearing reports a successful completion. And a dump shows that there is nothing left in the table nor in the idle tray. And the next idle condition does nothing, because the idle tray is empty.

The delete-by-chunks logic can be made into a template, just I'm not sure yet what is the best way to do it. It would have to have a lot of configurable parts.

 On another subject, scheduling the things to be done on idle adds an element of unpredictability to the model. It's impossible to predict the exact timing of the incoming requests, and the idle work may get inserted between any of them. Presumably it's OK because the data being deleted should not be participating in any logic at this time any more. For repeatability in the unit tests, make the chunk size adjustable and adjust it to a size larger than the biggest amount of data used in the unit tests.

A similar logic can also be used in querying the data. But it's more difficult. For deletion the continuation is easy: just take the first row in the index, and it will be the place to continue (because the index is ordered correctly, and because the previous rows are getting deleted). For querying you would have to remember the next row handle and continue from it. Which is OK if it can not get deleted in the meantime. But if it can get deleted, you'll have to keep track of that too, and advance to the next row handle when this happens. And if you want to receive a full snapshot with the following subscription to all updates, you'd have to check whether the modified rows are before or after the marked handle, and pass them through if they are before it, letting the user see the updates to the data already received. And since the data is being sent to the user, filling up the output buffer and stopping would stop the whole model too, and not restart until the user reads the buffered data. So there has to be a flow control logic that would stop the query when output buffer fills up, return to the normal operation, and then reschedule the idle job for the query only when the output buffer drains down. I've kind of started on doing an example of the chunked query too, but then because of all these complications decided to leave it for later.

No comments:

Post a Comment