Showing posts with label time. Show all posts
Showing posts with label time. Show all posts

Saturday, May 9, 2020

consistent time and loops

It's obvious that the graph loops cannot be treated like the rest of the links with consistent time, or there would be no pipelining: we'd be forced to wait for an update from the looping link every time we send a record down the loop. Or we might end up sending a whole lot of records down the loop before reading back from the looping link, accumulating a whole lot of records in the queue of the looping link.

So what is the right thing to do? I think it depends on the circumstances, on what we're trying to achieve. I can think of the following uses:

1. Record the processing at real time and be able to reproduce it exactly in an accelerated playback.

2. Process records in accelerated time to start with, but as in real time don't care too much about the exact outcome on the first run as long as it's within the reasonable constraints. (But be able to replay it again in exactly the same way).

3. Process records in accelerated time to start with, and make sure that they get treated in an exactly consistent way, any run from scratch producing the exact same result.

The case(1) can be reasonably resolved by treating the looping links like the inputs form the external systems where we don't particularly care about the exact synchronization: re-timestamping the incoming records, logging them, and then processing with the new timestamp. On replay, read the logged records with their timestamps, and process them in the same way.

A few more words about how the re-timestamping would work: tag the record in the log with both the original timestamp and the new one, and use the new timestamp in the further processing. A simple-minded way to handle the replay would be to just read the log, and ignore the records sent through the looping link on the replay (since they presumably should be the same). A smarter way would be to receive the records form the link and compare them with the records in the log (including the original timestamp). If they match, all is well, the new timestamp from the log can be used. If they diverge then something somewhere has changed and either the replay should be aborted or some mitigation should be done, perhaps in the same way as for the case (2).

So, how to solve the case (2)? The trouble there is that the model's clock gets driven by the incoming records, that get queued, allowing the clock to go far ahead before the records get processed and arrive back through a looping link. The solution seems to be to limit, how far can the records be delayed at the re-timestamping point. Suppose we choose some time interval that is large enough to let the records go through the loop (to allow the queues to do an efficient pipelining) but not outlandishly large, let's call this interval L. Say, 1 second or 0.1 second. Set it as the limit. Then when a record arrives through the looping link (such as, in the figure below, B receiving a record back from D) with a timestamp T, we re-stamp it with a timestamp that is the smaller of: (T + L) and the lowest timestamp of the next record in the incoming queues (going up the links as usual to get the timestamp of the record they would send next, and since the records from the looping links are processed first, this in effect means "go just before that record").

.
           input
             |
             V
        timestamper
             |
             V
           +---+
           | A |
           +---+
           /   \
   +----+ /     \
   |    V V      V
   |    +---+  +---+
   |    | B |  | C |
   |    +---+  +---+
   |       \    /
   |        \  /
   |         V V
   |       +-----+
   |       |  D  |
   |       +-----+
   |        |
   +--------+

This would essentially allow the looping links to "run behind the clock" by up to L. So if B receives from A a record with timestamp T2, it can treat the looping link "almost normally", by asking it if it has any earlier records, but using the time limit (T2 - L) instead of T2 (as it would for the normal "downstream" links). This is very much the same as the case (1), only it limits by how much the clock could run ahead. This solution would actually work fine for both the cases (1) and (2). And since it preserves the high priority of the looping links, it would prevent them from buffering too much data.

It can also be stretched to cover (3) by always using (T + L) for the looped records, and not the optimistic minimum. The trouble there is that a large number of records might enter the loop in the interval L, and they will collect on the looping link's queue. But it should be fine for the small test models with a small amount of data.

I've thought about fixing this issue by using a semi-blocking queue on the looping link: essentially compose the looping link from a blocking queue with the usual limit (and special logic) followed by a non-blocking queue. The re-timestamping would happen when moving the records from the blocking queue to the non-blocking one. If the non-blocking queue is empty, treat the records from the blocking queue as having the timestamps behind their original ones by L, and move them to the non-blocking queue when the time (T + L) is reached. Unless the loop is about to go deadlocked. Then instead of deadlocking, move the first record from the blocking queue on the looping link to the non-blocking queue, re-stamping it with the lowest timestamp of the next record in the incoming queues as in the solution for (2). Since the total depth of the queues in a loop is fixed for a given model, that would make the ordering of the records fully predictable, even if they get processed before (T + L).

But this fix has its own trouble: for consistency it requires that the loop must be fully deadlocked before breaking this deadlock. Not just that the queues to and from the top node are full, but that every queue in the loop is full. Which is not easy to track. Triceps already finds the loops between the Trieads, so that part is not a problem, but tracking at runtime how the queues become full, with all the possibilities of intertwined loops, might add a good deal of overhead. So it might not be a practical fix.

This might need more thinking or maybe it's just a problem that doesn't need to be fixed.

In the meantime, there is one more aspect to the loops. A looping link might be used essentially to send a timing event, telling the nodes upstream to re-examine their state. In the example below, the looping link from D to B would be driven by the scheduler on D:

.
           input
             |
             V
        timestamper
             |
             V
           +---+
           | A |
           +---+
           /   \
   +----+ /     \
   |    V V      V
   |    +---+  +---+
   |    | B |  | C |
   |    +---+  +---+    scheduler
   |       \    /        |
   |        \  / /+------+
   |         V V V
   |       +-----+
   |       |  D  |
   |       +-----+
   |        |
   +--------+

In this case adding the delay L looks weird, since the time of the scheduler event might already be well past L from the original record that triggered it. It would make sense to make a special scheduler driven by a looping link instead:

.
           input
             |
             V
        timestamper
             |
             V
           +---+
           | A |
           +---+
scheduler  /   \
   ^    | /     \
   |    V V      V
   |    +---+  +---+
   |    | B |  | C |
   |    +---+  +---+
   |       \    /
   |        \  /
   |         V V
   |       +-----+
   |       |  D  |
   |       +-----+
   |        |
   +--------+

This scheduler's notion of time would be driven by D but it would generate the events for B. With a special limitation that the events must be scheduled by at least L into the future. Any lower delays would be rounded up to L.

Oh, and one more thing about the models that would run predictably every time from scratch: their infinite-precision clock can't use a single common record counter for the high-precision part, because that would cause race conditions. Instead each node would have to keep its own record counter, and the low bits of the timestamp would consist of (node_id, per_node_counter). Which might actually be to the best, since it would remove the contention point of the common counter. The schedulers can be given their own node ids. And to think of it, the schedulers should probably be given the lower ids than the input nodes, because the timed events should probably be processed before the records coming from outside at the same time.

Tuesday, May 5, 2020

consistent time

I've done some more thinking on the issues of consistent time in Triceps models, and came up with a design. It's not a final design, more of design notes, so that I won't forget them until I get to an implementation. But I think it's quite interesting.

Let's start with the models that are the unidirectional graphs, without any loops, they are much easier to reason about.

The basic premise is that the records need to be timestamped when they enter the model, and then processed in the order of these timestamps. The time issues tend to be highlighted in the diamond-shaped graphs, so let's start with one:

.
           input
             |
             V
        timestamper
             |
             V
           +---+
           | A |
           +---+
           /   \
          /     \
          V      V
        +---+  +---+
        | B |  | C |
        +---+  +---+
           \    /
            \  /
             V V
           +-----+
           |  D  |
           +-----+

An input record arrives, gets stamped, goes to the node (i.e. Triead, a triceps thread, but let's talk in more generic terms for now) A, where it can cause the records to be sent to nodes B and/or C. The records produced by A would have the same timestamp as the incoming record. B and C in turn may produce their own records (which again would get the same timestamp) and send them to D.

When two records with the same timestamp arrive from the same node, they have the natural queuing order, and they can be predictably processed in that order. But what if they arrive from different nodes, say B and C? The result of processing might depend on the order, which basically means that we have to define priorities between all the inputs. Here D has the inputs from B and C: one of them (say, B) would get a higher priority, and its records will be processed before the records with the same timestamp from the other input (C).

Well, that's easy to say, but what should D do when it receives a record with timestamp T from C, and nothing yet from B. For how long should it wait for something from B before it decides that it can process the record from C?

One way is to always send the timestamp metadata along each path even if there are no actual records. But this looks like a lot of unnecessary overhead. And it also has another potential issue: suppose we have two external inputs that get timestamped. The input 1 gets a lot of records for a period of time, the input 2 gets none. How often should the input 2 send its timestamp metadata even if it sends no data records? Well, maybe there is another solution.

Another way to resolve it would be for D to ask B, do you ever plan to send me anything with timestamp T or earlier? If not then D can safely go and process the record from C. If yes then D would have to wait for a notification from B. So the request from D to B can be asynchronous. If B is ready, it will send a timestamp metadata as a callback to D right away, if not then it will send either timestamped records or a bare timestamp later.

But B might not be able to answer this directly. It might have to consult A with the same question first. And A would have to consult the timestamper on its input. If A has two inputs, it would consult both timestampers.

Well, these callbacks would be very similar to just sending the timestamp metadata records all the time, and here would be additional delays involved with the requests upstream. But there are positive things too:

* It tells us, how often the inactive timestamper would get queried and send its metadata: more or less, for every record that comes in through any timestamper.

* In reality, not every node in the model would get input for every incoming record. If none of the nodes at some level produce records, the propagation would stop there and won't go farther downstream. And when the requests are sent upstream, they also would reach only to the timestampers that could send input to this node.

* The requests can be of two types: that the time T has been reached and that the time T has been passed. Because the node D prefers B over C, when gets a record from C with timestamp T, it would have to make sure that B is past T before processing that record. But if it gets a record from B with timestamp T, knowing that C is at T (and not yet past it) would be enough to start processing that record.

* The requests can be amortized. I.e. if the node D has records with timestamps T1 and T2 queued from node C, it can ask B once to send the timestamp updates until it reaches past T2. In the same way, if D asks B whether it's past T2 and B knows that it's already past a later timestamp T3, it can send the notification about T3 right away, and D will be able to use this knowledge later.

* Finally, this communication doesn't have to go through the regular queues. The more efficient data structures can be used to propagate the timestamps between the nodes. The timestamp information is cumulative: once B has notified that it had processed the input up to the timestamp T3, there is no point in keeping the information about it processing the timestamps T1 and T2, they're implied in T3.

* But if the communication goes across the network, the round trip time can become prohibitively expensive for the upstream requests. So the downstream timestamps should be sent on their own, although the cumulative approach would still apply. Of course, if there are multiple streams sent over the network between two systems, all these streams should just be sent across using the same sequencer, so there would be no issue with synchronization between them. But there would still be the synchronization issue between multiple external systems, and between them and the local scheduled events. Yet another possible solution for that would be to just restamp them with the local time if the relative ordering doesn't matter.

Well, let's move on to the next problem. When can a node say that it's past a particular timestamp? It depends on the clock granularity. As long as the clock is at T, there might be more records coming in, so it can't say that it's past T yet. So if the clock granularity is 1 millisecond, we'd be stuck waiting for that millisecond to end. What we need is a clock with infinite granularity. It can be simulated by keeping a single global record counter, and building the timestamp as a pair (time, count). The counter would be like the lower bits of the timestamp. This would also allow to amortize the calls to read the clock: if we have 100 records queued up at the timestamper, we can just increase the global counter by 100, and assign each record the same time and the next count in the allocated sequence.

Why not forget the time at all and move on to just the counter? We'd need time to be able to replay the records later at an accelerated pace, and still handle the time-based events.

Basically, the timed scheduler would run as a timestamper. And yes, it would also have to use the same clock, and do the same (time, count) pairs allocated from that clock. If D has some time-based events, the diagram would become:

.
           input
             |
             V
        timestamper <----------------------------- clock
             |                                       |
             V                                       |
           +---+                                     |
           | A |                                     |
           +---+                                     |
           /   \                                     |
          /     \                                    |
          V      V                                   |
        +---+  +---+                                 |
        | B |  | C |                                 |
        +---+  +---+    scheduler/timestamper <------+
           \    /        +
            \  //+-------+
             V VV
           +-----+
           |  D  |
           +-----+

This would mean that when D gets a record from B, it would have to check the time not only in C but also in the timestamper. The timestamper can also do the amortization by telling the time of its next scheduled event. But that's a bit tricky: first, there would have to be some count part of the timestamp associated with that future time. There are two ways to go about it: either first tell the time without the count part (essentially, with it at 0), or allocate the count part in advance, when the event gets scheduled, and then just use that count when the timer fires. Second, it might happen that the processing of an incoming record would schedule a new, earlier event. But that's not a problem because the events can't be scheduled in the past. Since the scheduler would run as a part of D and synchronous with it, it could update its estimation and move the next event forward.

It would also have an interesting interaction with the replay: if we want to replay a timestamped log of incoming events and get the model to behave in exactly the same way as before, the scheduler events would have to happen at exactly the same times too. Which means them getting the exact same count part of the timestamp, and that normally won't be stable due to the races for the clock. But it can be resolved by recording the exact time of all the scheduled events into the same log and reusing it during the replay, the schedulers receiving their timestamps not from the clock but from the log, same as the incoming records.

What if we want to change the model between the reruns? Well, in this case we can discard the log for the schedulers, and just get the count values for them form fresh, and write the values into the new log. The running sequence would be slightly different than the first time, but since the model has changed, it wouldn't matter. Or we could even reuse the part of the log that is still applicable, merge it with the events from the new schedulers, and write into the new log. Either way, once the new log gets written, it can be reused again to produce the exact same result on the next replay.

Another interesting thing about the replay, is how do we transition from a replay to the live performance? If we have a recorded log from yesterday, want to replay it and then continue with today's data, what do we do with all the scheduled events that would have fired overnight? This basically suggests that we need to have some kind of "time reset events" that would be used when the time gets moved abruptly. It would allow the application logic to reset properly all the events that have been scheduled for the meantime- either just cancel them or execute them once, depending on the application semantics.

I think this should work quite well for the graphs without loops. The loops add a good deal of trouble. More on them later.

Tuesday, March 13, 2012

The squiggly time line

The aggregation is a big subject, and I will return to it yet. But since I've touched the time-based processing, now I want to talk more about it.

What the couple of last examples did manually, with the data expiration by time, the more mature CEP systems do internally, using the statements for the time-based work.

Which isn't always better though. The typical issues are with:
  • fast replay of data
  • order of execution
  • synchronization between modules

The problem with the fast replay is that those time based-statements use the real time and not the timestamps from the incoming rows. Sure, in Coral8 you can use the incoming row timestamps but they still are expected to have the time generally synchronized with the local clock (they are an attempt to solve the inter-module synchronization problem, not fast replay). You can't run them fast. And considering the Coral8 fashion of dropping the data when the input buffer overflows, you don't want to feed the data into it too fast to start with. In the Aleri system you can accelerate the time but it's by a fixed factor. You can run the logical time there say 10 times faster and feed the data 10 times faster but there are no timestamps in the input rows, and you simply can't feed the data precisely enough to reproduce the exact timing. And 10 times faster is not the same thing as just as fast as possible. I don't know for sure what the Streambase does, it seems to have the time acceleration by a fixed rate too.

Your typical problem with fast replay in Coral8/CCL is this: you create a time limited window

create window ... keep 3 hours;

and then feed the data for a couple of days in say 20 minutes. Provided that you don't feed it too fast and none of it gets dropped, all of the data ends up in the window and none of it expires, since the window goes by the physical time, and the physical time was only 20 minutes. The first issue is that you may not have enough memory to store the data for two days, and everything would run out of memory and crash. The second issue is that if you want to do some time-based aggregation relying on the window expiration, you're out of luck.

Why would you want to feed the data so fast in the first place? Two reasons:
  • Testing. When you test your time-based logic, you don't want your unit test to take 3 hours, let alone multiple days. You also want your unit tests to be fully repeatable, without any fuzz.
  • State restoration after a planned shutdown or crash. No matter what everyone says, the built-in persistence features work right only for a small subset of the simple models. Getting the persistence work for the more complex models is difficult, and for all I know nobody has bothered to get it working right. The best approach in reality is to preserve a subset of the state, and get the rest of it by replaying the recent input data after restart. The faster you re-feed the data, the faster your model comes back online. (Incidentally, that's what Aleri does with the "persistent source streams", only losing all the timing information of the rows and having the same above-mentioned issue as CCL).
Next issue, the execution order. My last example was relying on $currentHour being updated before flushOldPackets() runs. Otherwise the deletions would propagate through the aggregator where they should not.  In a system like Aleri with each element running in its own thread there is no way to ensure any particular timing between the threads. In a system with single-threaded logic, like Coral8/Sybase or Streambase, there is a way. But getting the order right is tricky. It depends on what the compiler and scheduler decide, and may require a few attempts to get the order right. The procedural execution makes things much more straightforward.

Now, the synchronization between modules. When the data is passed between multiple threads or processes, there is always a jigger in the way the data goes through the inter-process communications and even more so through the network. Relying on the timing of the data after it arrives is usually a bad idea if you want to get any repeatability and precision. Instead the data has to be timestamped by the sender and then these timestamps used by the receiver instead of the real time.

And Coral8 allows you to do so. But what if there is no data coming? What do you do with the time-based processing? The Coral8 approach is to allow some delay and then proceed at the rate of the local clock. Note that the logical time is not exactly the same as the local clock, it generally gets behind the local clock by no more than the delay amount, or might go faster if the sender's clock goes faster. The question is, what delay amount do you choose? If you make it too short, the small hiccups in the data flow throw the timing off, the local clock runs ahead, and then the incoming data gets thrown away because it's too old. If you make it too long, you potentially add a large amount of latency. As it turns out, no reasonable amount of delay works well with Coral8. To get things working at least sort of reliably, you need horrendous delays, on the order of 10 seconds or more. Even then the sender may get hit by a long-running request and the connection would go haywire anyway.

The only reliable solution is to drive the time completely by the sender. Even if there is no data to send, it must still send the periodic time updates, and the receiver must use the incoming timestamps for its time-based processing. Sending one or even ten time-update packets per second is not a whole lot of overhead, and sure works much better than the 10-second delays. And along the way it gives the perfect repeatability and fast replay for the unit testing. So unless your CEP system can be controlled in this way, getting any decent distributed timing control requires doing it manually. The reality is that Aleri can't, Coral8 can't, the Sybase R4/R5 descended from them can't, and I could not find anything related to the time control in the Streambase documentation, so my guess is that it can't either.

And if you have to control the time-based processing manually, doing it in the procedural way is at least easier.

An interesting side subject is the relation of the logical time to the real time. If the input data arrives faster than the CEP model can process it, the logical time will be getting behind the real time. Or if the data is fed at the artificially accelerated rate, the logical time will be getting ahead of the real time. There could even be a combination thereof: making the "real" time also artificial (driven by the sender) and artificially make the data get behind it for the testing purposes. The getting-behind can be detected and used to change the algorithm. For example, if we aggregate the traffic data in multiple stages, to the hour, to the day and to the month, the whole chain does not have to be updated on every packet Just update the first level on every packet, and then propagate further when the traffic burst subsides and gives the model a breather.

So far the major CEP systems don't seem to have a whole lot of direct support for it. There are ways to reduce the load by reducing the update frequency to a fixed period (like the OUTPUT EVERY statement in CCL, or periodic subscription in Aleri), but not much of the load-based kind. If the system provides ways to get both the real time and logical time of the row, the logic can be implemented manually. But the optimizations of the time-reading, like in Coral8, might make it unstable.

The way to do it in Triceps is by handling it in the Perl (or C++) code of the main event loop. When it has no data to read, it can create an "idle" row that would push through the results as a more efficient batch. Well, more about this later.

Monday, March 5, 2012

Time-limited propagation, part 3

In the last example if we keep aggregating the data from hours to days and the days to months, then the arrival of each new packet will update the whole chain. Sometimes that's what we want, sometimes it isn't. The daily stats might be fed into some complicated computation, with nobody looking at the results until the next day. In this situation each packet will trigger these complicated computations, for no good reason, since nobody cares of them until the day is closed.

These unnecessary computations can be prevented by disconnecting the daily data from the hourly data, and performing the manual aggregation only when the day changes. Then these complicated computations would happen only once a day, not many times per second.

Here is how the last example gets amended to produce the once-a-day daily summaries of all the traffic (as before, in multiple snippets, this time showing only the added or changed code):

# an hourly summary, now with the day extracted
our $rtHourly = Triceps::RowType->new(
    time => "int64", # hour's timestamp, microseconds
    day => "string", # in YYYYMMDD
    local_ip => "string", # string to make easier to read
    remote_ip => "string", # string to make easier to read
    bytes => "int64", # bytes sent in an hour
) or die "$!";

# a daily summary: just all traffic for that day
our $rtDaily = Triceps::RowType->new(
    day => "string", # in YYYYMMDD
    bytes => "int64", # bytes sent in an hour
) or die "$!";

The hourly rows get an extra field, for convenient aggregation by day. This notion of the day is calculated as:

# compute the date of a timestamp, a string YYYYMMDD
sub dateStamp # (time)
{
    my @ts = gmtime($_[0]/1000000); # microseconds to seconds
    return sprintf("%04d%02d%02d", $ts[5]+1900, $ts[4]+1, $ts[3]);
}

The calculation is done in GMT, so that the code produces the same result all around the world. If you're doing this kind of project for real, you may want to use the local time zone instead.

The packets-to-hour aggregation function now populates this extra field:

sub computeHourly # (table, context, aggop, opcode, rh, state, args...)
{
...
    my $res = $context->resultType()->makeRowHash(
        time => $hourstamp,
        day => &dateStamp($hourstamp),
 ...
}

And the model keeps a global notion of the current day in addition to the current hour:

# the current day stamp that keeps being updated
our $currentDay;

The hourly table type grows an extra secondary index for the manuall aggregation into the daily data:

# the aggregated hourly stats, kept longer
our $ttHourly = Triceps::TableType->new($rtHourly)
    ->addSubIndex("byAggr",
        Triceps::SimpleOrderedIndex->new(
            time => "ASC", local_ip => "ASC", remote_ip => "ASC")
    )
    ->addSubIndex("byDay",
        Triceps::IndexType->newHashed(key => [ "day" ])
        ->addSubIndex("group",
            Triceps::IndexType->newFifo()
        )
    )
or die "$!";

# remember the daily secondary index type
our $idxHourlyByDay = $ttHourly->findSubIndex("byDay")
    or die "$!";
our $idxHourlyByDayGroup = $idxHourlyByDay->findSubIndex("group")
    or die "$!";

And a table for the daily data is created but not connected to any other tables:

# the aggregated daily stats, kept even longer
our $ttDaily = Triceps::TableType->new($rtDaily)
    ->addSubIndex("byDay",
        Triceps::IndexType->newHashed(key => [ "day" ])
    )
or die "$!";

$ttDaily->initialize() or die "$!";
our $tDaily = $uTraffic->makeTable($ttDaily,
    &Triceps::EM_CALL, "tDaily") or die "$!";

# label to print the changes to the daily stats
makePrintLabel("lbPrintDaily", $tDaily->getOutputLabel());

Instead it gets updated manually with the function that performs the manual aggregation of the hourly data:

sub computeDay # ($dateStamp)
{
    our $uTraffic;
    my $bytes = 0;

    my $rhFirst = $tHourly->findIdxBy($idxHourlyByDay, day => $_[0])
        or die "$!";
    my $rhEnd = $rhFirst->nextGroupIdx($idxHourlyByDayGroup)
        or die "$!";
    for (my $rhi = $rhFirst;
            !$rhi->same($rhEnd); $rhi = $rhi->nextIdx($idxHourlyByDay)) {
        $bytes += $rhi->getRow()->get("bytes");
    }
    $uTraffic->makeHashCall($tDaily->getInputLabel(), "OP_INSERT",
        day => $_[0],
        bytes => $bytes,
    ) or die "$!";
}

This logic doesn't check whether any data for that day existed. If none did, it would just produce a row with traffic of 0 bytes anyway. This is different from the normal aggregation but here may actually be desirable: it shows for sure that yes, the aggregation for that day really did happen.

The main loop then gets extended with the day-keeping logic and with the extra command to dump the daily data:

while(<STDIN>) {
    chomp;
    my @data = split(/,/); # starts with a command, then string opcode
    my $type = shift @data;
    if ($type eq "new") {
        my $rowop = $tPackets->getInputLabel()->makeRowopArray(@data)
            or die "$!";
        # update the current notion of time (simplistic)
        $currentHour = &hourStamp($rowop->getRow()->get("time"));
        my $lastDay = $currentDay;
        $currentDay = &dateStamp($currentHour);
        if (defined($rowop->getRow()->get("local_ip"))) {
            $uTraffic->call($rowop) or die "$!";
        }
        &flushOldPackets(); # flush the packets
        if (defined $lastDay && $lastDay ne $currentDay) {
            &computeDay($lastDay); # manual aggregation
        }
        $uTraffic->drainFrame(); # just in case, for completeness
    } elsif ($type eq "dumpPackets") {
        &dumpTable($tPackets);
    } elsif ($type eq "dumpHourly") {
        &dumpTable($tHourly);
    } elsif ($type eq "dumpDaily") {
        &dumpTable($tDaily);
    }
}

It now maintains the current day, and after the packet computation is done, looks, whether the day has changed. If it did, it calls the manual aggregation of the last day.

And here is an example of its work:

new,OP_INSERT,1330886011000000,1.2.3.4,5.6.7.8,2000,80,100
tPackets.out OP_INSERT time="1330886011000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="100" 
tHourly.out OP_INSERT time="1330884000000000" day="20120304"
 local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="100" 
new,OP_INSERT,1330886012000000,1.2.3.4,5.6.7.8,2000,80,50
tPackets.out OP_INSERT time="1330886012000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="50" 
tHourly.out OP_DELETE time="1330884000000000" day="20120304"
 local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="100" 
tHourly.out OP_INSERT time="1330884000000000" day="20120304"
 local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150" 
new,OP_INSERT,1330889811000000,1.2.3.4,5.6.7.8,2000,80,300
tPackets.out OP_INSERT time="1330889811000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="300" 
tHourly.out OP_INSERT time="1330887600000000" day="20120304"
 local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="300" 

So far all the 3 packets are for the same day, and nothing new has happened.

new,OP_INSERT,1330972411000000,1.2.3.5,5.6.7.9,3000,80,200
tPackets.out OP_INSERT time="1330972411000000" local_ip="1.2.3.5"
 remote_ip="5.6.7.9" local_port="3000" remote_port="80" bytes="200" 
tHourly.out OP_INSERT time="1330970400000000" day="20120305"
 local_ip="1.2.3.5" remote_ip="5.6.7.9" bytes="200" 
tPackets.out OP_DELETE time="1330886011000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="100" 
tPackets.out OP_DELETE time="1330886012000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="50" 
tPackets.out OP_DELETE time="1330889811000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="300" 
tDaily.out OP_INSERT day="20120304" bytes="450"

When a packet for the next day arrives, it has three effects: (1) inserts the packet data as usual, (2) finds that the previous packet data is obsolete and flushes it (without upsetting the hourly summaries) and (3) finds that the day has changed and performs the manual aggregation of last day's hourly data into daily.

new,OP_INSERT,1331058811000000
tPackets.out OP_DELETE time="1330972411000000" local_ip="1.2.3.5"
 remote_ip="5.6.7.9" local_port="3000" remote_port="80" bytes="200" 
tDaily.out OP_INSERT day="20120305" bytes="200" 

A time update for the yet next day flushes out the previous day's detailed packets and again builds the daily summary of that day.

new,OP_INSERT,1331145211000000
tDaily.out OP_INSERT day="20120306" bytes="0" 

Yet another day's time roll now has no old data to delete (since none arrived in the previous day) but still produces the daily summary of 0 bytes.

dumpDaily
day="20120305" bytes="200" 
day="20120304" bytes="450" 
day="20120306" bytes="0" 

This shows the eventual contents of the daily summaries. The order of the rows is fairly random, because of the hashed index. Note that the hourly summaries weren't flushed either, they are all still there too. If you want them eventually deleted after some time, you would need to provide more of the manual logic for that.

Sunday, March 4, 2012

Time-limited propagation, part 2

Now a run of the model. Its printout is also broken up into the separately commented pieces. Of course, it's not like a real run, it just contains one or two packets per hour to show how things work.

new,OP_INSERT,1330886011000000,1.2.3.4,5.6.7.8,2000,80,100
tPackets.out OP_INSERT time="1330886011000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="100" 
tHourly.out OP_INSERT time="1330884000000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" bytes="100" 
new,OP_INSERT,1330886012000000,1.2.3.4,5.6.7.8,2000,80,50
tPackets.out OP_INSERT time="1330886012000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="50" 
tHourly.out OP_DELETE time="1330884000000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" bytes="100" 
tHourly.out OP_INSERT time="1330884000000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" bytes="150" 

The two input rows in the first hour refer to the same connection, so they go into the same group and get aggregated together in the hourly table. The rows for the current hour in the hourly table get updated immediately as more data comes in.

new,OP_INSERT,1330889811000000,1.2.3.4,5.6.7.8,2000,80,300
tPackets.out OP_INSERT time="1330889811000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="300" 
tHourly.out OP_INSERT time="1330887600000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" bytes="300" 

Only one packet arrives in the next hour.

new,OP_INSERT,1330894211000000,1.2.3.5,5.6.7.9,3000,80,200
tPackets.out OP_INSERT time="1330894211000000" local_ip="1.2.3.5"
 remote_ip="5.6.7.9" local_port="3000" remote_port="80" bytes="200" 
tHourly.out OP_INSERT time="1330891200000000" local_ip="1.2.3.5"
 remote_ip="5.6.7.9" bytes="200" 
new,OP_INSERT,1330894211000000,1.2.3.4,5.6.7.8,2000,80,500
tPackets.out OP_INSERT time="1330894211000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="500" 
tHourly.out OP_INSERT time="1330891200000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" bytes="500" 

And two more packets in the next hour.  They are for the different connections, so they do not get summed together in the aggregation. When the hour changes again, the old data will start being deleted, so let's take a snapshot of the tables' contents.

dumpPackets
time="1330886011000000" local_ip="1.2.3.4" remote_ip="5.6.7.8"
 local_port="2000" remote_port="80" bytes="100" 
time="1330886012000000" local_ip="1.2.3.4" remote_ip="5.6.7.8"
 local_port="2000" remote_port="80" bytes="50" 
time="1330889811000000" local_ip="1.2.3.4" remote_ip="5.6.7.8"
 local_port="2000" remote_port="80" bytes="300" 
time="1330894211000000" local_ip="1.2.3.4" remote_ip="5.6.7.8"
 local_port="2000" remote_port="80" bytes="500" 
time="1330894211000000" local_ip="1.2.3.5" remote_ip="5.6.7.9"
 local_port="3000" remote_port="80" bytes="200" 
dumpHourly
time="1330884000000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150" 
time="1330887600000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="300" 
time="1330891200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="500" 
time="1330891200000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" bytes="200" 

The packets table shows all the 5 packets received so far, and the hourly aggregation results for all 3 hours (with two separate aggregation groups in the same last hour, for different ip pairs).

new,OP_INSERT,1330896811000000,1.2.3.5,5.6.7.9,3000,80,10
tPackets.out OP_INSERT time="1330896811000000" local_ip="1.2.3.5"
 remote_ip="5.6.7.9" local_port="3000" remote_port="80" bytes="10" 
tHourly.out OP_INSERT time="1330894800000000" local_ip="1.2.3.5"
 remote_ip="5.6.7.9" bytes="10" 
tPackets.out OP_DELETE time="1330886011000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="100" 
tPackets.out OP_DELETE time="1330886012000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="50" 

When the next hour's packet arrives, it gets processed as usual, but then the removal logic finds the packet rows that have become too old to keep. It kicks in and deletes them. But notice that the deletions affect only the packets table, the aggregator ignores this activity as too old and does not propagate it to the hourly table.

new,OP_INSERT,1330900411000000,1.2.3.4,5.6.7.8,2000,80,40
tPackets.out OP_INSERT time="1330900411000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="40" 
tHourly.out OP_INSERT time="1330898400000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" bytes="40" 
tPackets.out OP_DELETE time="1330889811000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="300" 

One more hour's packet.

new,OP_INSERT,1330904011000000
tPackets.out OP_DELETE time="1330894211000000" local_ip="1.2.3.4"
 remote_ip="5.6.7.8" local_port="2000" remote_port="80" bytes="500" 
tPackets.out OP_DELETE time="1330894211000000" local_ip="1.2.3.5"
 remote_ip="5.6.7.9" local_port="3000" remote_port="80" bytes="200" 

And just a time update for another hour, when no packets have been received. The removal logic still kicks in and works the same way.  After all this activity let's dump the tables again:

dumpPackets
time="1330896811000000" local_ip="1.2.3.5" remote_ip="5.6.7.9"
 local_port="3000" remote_port="80" bytes="10" 
time="1330900411000000" local_ip="1.2.3.4" remote_ip="5.6.7.8"
 local_port="2000" remote_port="80" bytes="40" 
dumpHourly
time="1330884000000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="150" 
time="1330887600000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="300" 
time="1330891200000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="500" 
time="1330891200000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" bytes="200" 
time="1330894800000000" local_ip="1.2.3.5" remote_ip="5.6.7.9" bytes="10" 
time="1330898400000000" local_ip="1.2.3.4" remote_ip="5.6.7.8" bytes="40" 

The packets table only has the data for the last 3 hours (there are no rows for the last hour because none have arrived). But the hourly table contains all the history. The rows weren't getting deleted here.

Time-limited propagation, part 1

When aggregating data, often the results of the aggregation stay relevant longer than the original data.

For example, in the financials the data gets collected and aggregated for the current business day. After the day is closed, the day's detailed data are not interesting any more, and can be deleted in preparation for the next day. However the daily results stay interesting for a long time, and may even be archived for years.

This is not limited to the financials. A long time ago, in the times of slow and expensive Internet connections, I've done a traffic accounting system. It did the same: as the time went by, less and less detail was kept about the traffic usage. The modern accounting of the click-through advertisement also works in a similar way.

An easy way to achieve this result is to put a filter on the way of the aggregation results. It would compare the current idea of time and the time in the rows going by, and throw away the rows that are too old. This can be done as a label that gets the data from the aggregator and then forwards or not the data to the real destination. This solves the propagation problem but as the obsolete original data gets deleted, the aggregator will still be churning and producing the updates, only to have them thrown away at the filter. A more efficient way is to stop the churn by placing the filter right into the aggregator.

The next example demonstrates such an aggregator, in a simplified version of that traffic accounting system that I've once done. The example is actually about more than just stopping the data propagation. That stopping accounts for about 3 lines in it. But I also want to show a simple example of traffic accounting as such. And to show that the lack of the direct time support in Triceps does not stop you from doing any time-based processing. Because of this I'll show the whole example and not just snippets from it. But since the example is biggish, I'll paste it into the text in pieces with commentaries for each piece.

our $uTraffic = Triceps::Unit->new("uTraffic") or die "$!";

# one packet's header
our $rtPacket = Triceps::RowType->new(
    time => "int64", # packet's timestamp, microseconds
    local_ip => "string", # string to make easier to read
    remote_ip => "string", # string to make easier to read
    local_port => "int32",
    remote_port => "int32",
    bytes => "int32", # size of the packet
) or die "$!";

# an hourly summary
our $rtHourly = Triceps::RowType->new(
    time => "int64", # hour's timestamp, microseconds
    local_ip => "string", # string to make easier to read
    remote_ip => "string", # string to make easier to read
    bytes => "int64", # bytes sent in an hour
) or die "$!";

The router to the Internet provider forwards us the packet header information from all the packets that go though the outside link. The local_ip is always the address of a machine on our network, remote_ip outside our network, no matter in which direction the packet went. With a slow and expensive connection, we want to know two things: First, that the provider's billing at the end of the month is correct. Second, to be able to find out the high traffic users, and then maybe look whether that traffic was used for the business purposes or not.  This example goes up to aggregation of the hourly summaries and then stops, since the further aggregation by days and months is straightforward to do.

If there is no traffic for a while, the router is expected to periodically communicate its changing idea of time as the same kind of records but with the non-timestamp fields as NULLs. That by the way is the right way to communicate the time-based information between two machines: do not rely on any local synchronization and timeouts but have the master send the periodic time updates to the slave even if it has no data to send. The logic is then driven by the time reported by the master. A nice side effect is that the logic can also easily be replayed later, using these timestamps and without any concern of the real time. If there are multiple masters, the slave would have to order the data coming from them according to the timestamps, thus synchronizing them together.

The hourly data drops the port information, and sums up the traffic between two addresses in the hour. It still has the timestamp but now this timestamp is rounded to the start of the hour:

# compute an hour-rounded timestamp
sub hourStamp # (time)
{
    return $_[0]  - ($_[0] % (1000*1000*3600));
}

Next, to the aggregation:

# the current hour stamp that keeps being updated
our $currentHour;

# aggregation handler: recalculate the summary for the last hour
sub computeHourly # (table, context, aggop, opcode, rh, state, args...)
{
    my ($table, $context, $aggop, $opcode, $rh, $state, @args) = @_;
    our $currentHour;

    # don't send the NULL record after the group becomes empty
    return if ($context->groupSize()==0
        || $opcode == &Triceps::OP_NOP);

    my $rhFirst = $context->begin();
    my $rFirst = $rhFirst->getRow();
    my $hourstamp = &hourStamp($rFirst->get("time"));

    return if ($hourstamp < $currentHour);

    if ($opcode == &Triceps::OP_DELETE) {
        $context->send($opcode, $$state) or die "$!";
        return;
    }

    my $bytes = 0;
    for (my $rhi = $rhFirst; !$rhi->isNull();
            $rhi = $context->next($rhi)) {
        $bytes += $rhi->getRow()->get("bytes");
    }

    my $res = $context->resultType()->makeRowHash(
        time => $hourstamp,
        local_ip => $rFirst->get("local_ip"),
        remote_ip => $rFirst->get("remote_ip"),
        bytes => $bytes,
    ) or die "$!";
    ${$state} = $res;
    $context->send($opcode, $res) or die "$!";
}

sub initHourly #  (@args)
{
    my $refvar;
    return \$refvar;
}

The aggregation doesn't try to optimize by being additive, to keep the example simpler. The model keeps the notion of the current hour. As soon as the hour stops being current, the aggregation for it stops. The result of that aggregation will then be kept unchanged in the hourly result table, no matter what happens to the original data.

The tables are defined and connected thusly:

# the full stats for the recent time
our $ttPackets = Triceps::TableType->new($rtPacket)
    ->addSubIndex("byHour",
        Triceps::IndexType->newPerlSorted("byHour", undef, sub {
            return &hourStamp($_[0]->get("time")) <=> &hourStamp($_[1]->get("time"));
        })
        ->addSubIndex("byIP",
            Triceps::IndexType->newHashed(key => [ "local_ip", "remote_ip" ])
            ->addSubIndex("group",
                Triceps::IndexType->newFifo()
                ->setAggregator(Triceps::AggregatorType->new(
                    $rtHourly, "aggrHourly", \&initHourly, \&computeHourly)
                )
            )
        )
    )
or die "$!";

$ttPackets->initialize() or die "$!";
our $tPackets = $uTraffic->makeTable($ttPackets,
    &Triceps::EM_CALL, "tPackets") or die "$!";

# the aggregated hourly stats, kept longer
our $ttHourly = Triceps::TableType->new($rtHourly)
    ->addSubIndex("byAggr",
        Triceps::SimpleOrderedIndex->new(
            time => "ASC", local_ip => "ASC", remote_ip => "ASC")
    )
or die "$!";

$ttHourly->initialize() or die "$!";
our $tHourly = $uTraffic->makeTable($ttHourly,
    &Triceps::EM_CALL, "tHourly") or die "$!";

# connect the tables
$tPackets->getAggregatorLabel("aggrHourly")->chain($tHourly->getInputLabel())
    or die "$!";

The table of incoming packets has a 3-level index: it starts with being sorted by the hour part of the timestamp, then goes by the ip addresses to complete the aggregation key, and then a FIFO for each aggregation group. Arguably, maybe it would have been better to include the ip addresses straight into the top-level sorting index, I don't know, and it doesn't seem worth measuring. The top-level ordering by the hour is important, it will be used to delete the rows that have become old.

The table of hourly aggregated stats uses the same kind of index, only now there is no need for a FIFO because there is only one row per this key. And the timestamp is already rounded to the hour right in the rows, so a SimpleOrderedIndex can be used without writing a manual comparison function, and the ip fields have been merged into it too.

The output of the aggregator on the packets table is connected to the input of the hourly table.

The next part has to do with displaying the result of the work:

# a template to make a label that prints the data passing through another label
sub makePrintLabel # ($print_label_name, $parent_label)
{
    my $name = shift;
    my $lbParent = shift;
    my $lb = $lbParent->getUnit()->makeLabel($lbParent->getType(), $name,
        undef, sub { # (label, rowop)
            print($_[1]->printP(), "\n");
        }) or die "$!";
    $lbParent->chain($lb) or die "$!";
    return $lb;
}

# label to print the changes to the detailed stats
makePrintLabel("lbPrintPackets", $tPackets->getOutputLabel());
# label to print the changes to the hourly stats
makePrintLabel("lbPrintHourly", $tHourly->getOutputLabel());

The printing of the result row gets reused pretty often, so I've made a simple template function for it, which would generate the label and the printing on it, and connect it where it belongs. It will probably eventually become a part of the Triceps library. That template is then used to generate the debugging printouts from both tables.

Next go a couple of helper functions:

# dump a table's contents
sub dumpTable # ($table)
{
    my $table = shift;
    for (my $rhit = $table->begin(); !$rhit->isNull(); $rhit = $rhit->next()) {
        print($rhit->getRow()->printP(), "\n");
    }
}

# how long to keep the detailed data, hours
our $keepHours = 2;

# flush the data older than $keepHours from $tPackets
sub flushOldPackets
{
    my $earliest = $currentHour - $keepHours * (1000*1000*3600);
    my $next;
    # the default iteration of $tPackets goes in the hour stamp order
    for (my $rhit = $tPackets->begin(); !$rhit->isNull(); $rhit = $next) {
        last if (&hourStamp($rhit->getRow()->get("time")) >= $earliest);
        $next = $rhit->next(); # advance before removal
        $tPackets->remove($rhit);
    }
}

The dumpTable() is a straightforward iteration through a table and print. It can be used on any table, printP() takes care of any differences.

The flushing goes through the packets table and deletes the rows that belong to an older hour than the current one or $keepHours before it. For this to work right, the rows must go in the order of the hour stamps, which the outer index "byHour" takes care of.

All the time-related logic expects that the time never goes backwards. This is a simplification to make the example shorter, a production code can not assume this.

And the final part is the main loop:

while(<STDIN>) {
    chomp;
    my @data = split(/,/); # starts with a command, then string opcode
    my $type = shift @data;
    if ($type eq "new") {
        my $rowop = $tPackets->getInputLabel()->makeRowopArray(@data)
            or die "$!";
        # update the current notion of time (simplistic)
        $currentHour = &hourStamp($rowop->getRow()->get("time"));
        if (defined($rowop->getRow()->get("local_ip"))) {
            $uTraffic->call($rowop) or die "$!";
        }
        &flushOldPackets(); # flush the packets
        $uTraffic->drainFrame(); # just in case, for completeness
    } elsif ($type eq "dumpPackets") {
        &dumpTable($tPackets);
    } elsif ($type eq "dumpHourly") {
        &dumpTable($tHourly);
    }
}

The input comes in the CSV form as a command followed by more data. If the command is "new" then the data is the opcode and data fields, as it would be sent by the router. The commands "dumpPackets" and "dumpHourly" are used to print the contents of the tables, to see, what is going on in them.

In an honest implementation there would be a separate label that would differentiate between a reported packet and just a time update from the router. Here for simplicity this logic is placed right into the main loop. On each input record it updates the model's idea of the current timestamp, then if there is a packet data, it gets processed, and finally the rows that have become too old for the new timestamp get flushed.