Tuesday, March 13, 2012

The squiggly time line

The aggregation is a big subject, and I will return to it yet. But since I've touched the time-based processing, now I want to talk more about it.

What the couple of last examples did manually, with the data expiration by time, the more mature CEP systems do internally, using the statements for the time-based work.

Which isn't always better though. The typical issues are with:
  • fast replay of data
  • order of execution
  • synchronization between modules

The problem with the fast replay is that those time based-statements use the real time and not the timestamps from the incoming rows. Sure, in Coral8 you can use the incoming row timestamps but they still are expected to have the time generally synchronized with the local clock (they are an attempt to solve the inter-module synchronization problem, not fast replay). You can't run them fast. And considering the Coral8 fashion of dropping the data when the input buffer overflows, you don't want to feed the data into it too fast to start with. In the Aleri system you can accelerate the time but it's by a fixed factor. You can run the logical time there say 10 times faster and feed the data 10 times faster but there are no timestamps in the input rows, and you simply can't feed the data precisely enough to reproduce the exact timing. And 10 times faster is not the same thing as just as fast as possible. I don't know for sure what the Streambase does, it seems to have the time acceleration by a fixed rate too.

Your typical problem with fast replay in Coral8/CCL is this: you create a time limited window

create window ... keep 3 hours;

and then feed the data for a couple of days in say 20 minutes. Provided that you don't feed it too fast and none of it gets dropped, all of the data ends up in the window and none of it expires, since the window goes by the physical time, and the physical time was only 20 minutes. The first issue is that you may not have enough memory to store the data for two days, and everything would run out of memory and crash. The second issue is that if you want to do some time-based aggregation relying on the window expiration, you're out of luck.

Why would you want to feed the data so fast in the first place? Two reasons:
  • Testing. When you test your time-based logic, you don't want your unit test to take 3 hours, let alone multiple days. You also want your unit tests to be fully repeatable, without any fuzz.
  • State restoration after a planned shutdown or crash. No matter what everyone says, the built-in persistence features work right only for a small subset of the simple models. Getting the persistence work for the more complex models is difficult, and for all I know nobody has bothered to get it working right. The best approach in reality is to preserve a subset of the state, and get the rest of it by replaying the recent input data after restart. The faster you re-feed the data, the faster your model comes back online. (Incidentally, that's what Aleri does with the "persistent source streams", only losing all the timing information of the rows and having the same above-mentioned issue as CCL).
Next issue, the execution order. My last example was relying on $currentHour being updated before flushOldPackets() runs. Otherwise the deletions would propagate through the aggregator where they should not.  In a system like Aleri with each element running in its own thread there is no way to ensure any particular timing between the threads. In a system with single-threaded logic, like Coral8/Sybase or Streambase, there is a way. But getting the order right is tricky. It depends on what the compiler and scheduler decide, and may require a few attempts to get the order right. The procedural execution makes things much more straightforward.

Now, the synchronization between modules. When the data is passed between multiple threads or processes, there is always a jigger in the way the data goes through the inter-process communications and even more so through the network. Relying on the timing of the data after it arrives is usually a bad idea if you want to get any repeatability and precision. Instead the data has to be timestamped by the sender and then these timestamps used by the receiver instead of the real time.

And Coral8 allows you to do so. But what if there is no data coming? What do you do with the time-based processing? The Coral8 approach is to allow some delay and then proceed at the rate of the local clock. Note that the logical time is not exactly the same as the local clock, it generally gets behind the local clock by no more than the delay amount, or might go faster if the sender's clock goes faster. The question is, what delay amount do you choose? If you make it too short, the small hiccups in the data flow throw the timing off, the local clock runs ahead, and then the incoming data gets thrown away because it's too old. If you make it too long, you potentially add a large amount of latency. As it turns out, no reasonable amount of delay works well with Coral8. To get things working at least sort of reliably, you need horrendous delays, on the order of 10 seconds or more. Even then the sender may get hit by a long-running request and the connection would go haywire anyway.

The only reliable solution is to drive the time completely by the sender. Even if there is no data to send, it must still send the periodic time updates, and the receiver must use the incoming timestamps for its time-based processing. Sending one or even ten time-update packets per second is not a whole lot of overhead, and sure works much better than the 10-second delays. And along the way it gives the perfect repeatability and fast replay for the unit testing. So unless your CEP system can be controlled in this way, getting any decent distributed timing control requires doing it manually. The reality is that Aleri can't, Coral8 can't, the Sybase R4/R5 descended from them can't, and I could not find anything related to the time control in the Streambase documentation, so my guess is that it can't either.

And if you have to control the time-based processing manually, doing it in the procedural way is at least easier.

An interesting side subject is the relation of the logical time to the real time. If the input data arrives faster than the CEP model can process it, the logical time will be getting behind the real time. Or if the data is fed at the artificially accelerated rate, the logical time will be getting ahead of the real time. There could even be a combination thereof: making the "real" time also artificial (driven by the sender) and artificially make the data get behind it for the testing purposes. The getting-behind can be detected and used to change the algorithm. For example, if we aggregate the traffic data in multiple stages, to the hour, to the day and to the month, the whole chain does not have to be updated on every packet Just update the first level on every packet, and then propagate further when the traffic burst subsides and gives the model a breather.

So far the major CEP systems don't seem to have a whole lot of direct support for it. There are ways to reduce the load by reducing the update frequency to a fixed period (like the OUTPUT EVERY statement in CCL, or periodic subscription in Aleri), but not much of the load-based kind. If the system provides ways to get both the real time and logical time of the row, the logic can be implemented manually. But the optimizations of the time-reading, like in Coral8, might make it unstable.

The way to do it in Triceps is by handling it in the Perl (or C++) code of the main event loop. When it has no data to read, it can create an "idle" row that would push through the results as a more efficient batch. Well, more about this later.


  1. Esper can give complete control over time to an application and there is no limit in which events can be processed when time is controlled externally

  2. Thanks, interesting to know. Well, technically, Aleri can do it too: you can run in artificial time, setting and stopping it. So you an stop the time, set to record timestamp, feed the record, wait for processing to complete, advance time, wait for any time-based processing to complete, and so on. I'm not sure if it made to Sybase R5, but it definitely worked on Aleri. However there was no tool that did it for you easily, and also all these synchronous calls present a pretty high overhead.