Let's start with the models that are the unidirectional graphs, without any loops, they are much easier to reason about.
The basic premise is that the records need to be timestamped when they enter the model, and then processed in the order of these timestamps. The time issues tend to be highlighted in the diamond-shaped graphs, so let's start with one:
. input | V timestamper | V +---+ | A | +---+ / \ / \ V V +---+ +---+ | B | | C | +---+ +---+ \ / \ / V V +-----+ | D | +-----+
An input record arrives, gets stamped, goes to the node (i.e. Triead, a triceps thread, but let's talk in more generic terms for now) A, where it can cause the records to be sent to nodes B and/or C. The records produced by A would have the same timestamp as the incoming record. B and C in turn may produce their own records (which again would get the same timestamp) and send them to D.
When two records with the same timestamp arrive from the same node, they have the natural queuing order, and they can be predictably processed in that order. But what if they arrive from different nodes, say B and C? The result of processing might depend on the order, which basically means that we have to define priorities between all the inputs. Here D has the inputs from B and C: one of them (say, B) would get a higher priority, and its records will be processed before the records with the same timestamp from the other input (C).
Well, that's easy to say, but what should D do when it receives a record with timestamp T from C, and nothing yet from B. For how long should it wait for something from B before it decides that it can process the record from C?
One way is to always send the timestamp metadata along each path even if there are no actual records. But this looks like a lot of unnecessary overhead. And it also has another potential issue: suppose we have two external inputs that get timestamped. The input 1 gets a lot of records for a period of time, the input 2 gets none. How often should the input 2 send its timestamp metadata even if it sends no data records? Well, maybe there is another solution.
Another way to resolve it would be for D to ask B, do you ever plan to send me anything with timestamp T or earlier? If not then D can safely go and process the record from C. If yes then D would have to wait for a notification from B. So the request from D to B can be asynchronous. If B is ready, it will send a timestamp metadata as a callback to D right away, if not then it will send either timestamped records or a bare timestamp later.
But B might not be able to answer this directly. It might have to consult A with the same question first. And A would have to consult the timestamper on its input. If A has two inputs, it would consult both timestampers.
Well, these callbacks would be very similar to just sending the timestamp metadata records all the time, and here would be additional delays involved with the requests upstream. But there are positive things too:
* It tells us, how often the inactive timestamper would get queried and send its metadata: more or less, for every record that comes in through any timestamper.
* In reality, not every node in the model would get input for every incoming record. If none of the nodes at some level produce records, the propagation would stop there and won't go farther downstream. And when the requests are sent upstream, they also would reach only to the timestampers that could send input to this node.
* The requests can be of two types: that the time T has been reached and that the time T has been passed. Because the node D prefers B over C, when gets a record from C with timestamp T, it would have to make sure that B is past T before processing that record. But if it gets a record from B with timestamp T, knowing that C is at T (and not yet past it) would be enough to start processing that record.
* The requests can be amortized. I.e. if the node D has records with timestamps T1 and T2 queued from node C, it can ask B once to send the timestamp updates until it reaches past T2. In the same way, if D asks B whether it's past T2 and B knows that it's already past a later timestamp T3, it can send the notification about T3 right away, and D will be able to use this knowledge later.
* Finally, this communication doesn't have to go through the regular queues. The more efficient data structures can be used to propagate the timestamps between the nodes. The timestamp information is cumulative: once B has notified that it had processed the input up to the timestamp T3, there is no point in keeping the information about it processing the timestamps T1 and T2, they're implied in T3.
* But if the communication goes across the network, the round trip time can become prohibitively expensive for the upstream requests. So the downstream timestamps should be sent on their own, although the cumulative approach would still apply. Of course, if there are multiple streams sent over the network between two systems, all these streams should just be sent across using the same sequencer, so there would be no issue with synchronization between them. But there would still be the synchronization issue between multiple external systems, and between them and the local scheduled events. Yet another possible solution for that would be to just restamp them with the local time if the relative ordering doesn't matter.
Well, let's move on to the next problem. When can a node say that it's past a particular timestamp? It depends on the clock granularity. As long as the clock is at T, there might be more records coming in, so it can't say that it's past T yet. So if the clock granularity is 1 millisecond, we'd be stuck waiting for that millisecond to end. What we need is a clock with infinite granularity. It can be simulated by keeping a single global record counter, and building the timestamp as a pair (time, count). The counter would be like the lower bits of the timestamp. This would also allow to amortize the calls to read the clock: if we have 100 records queued up at the timestamper, we can just increase the global counter by 100, and assign each record the same time and the next count in the allocated sequence.
Why not forget the time at all and move on to just the counter? We'd need time to be able to replay the records later at an accelerated pace, and still handle the time-based events.
Basically, the timed scheduler would run as a timestamper. And yes, it would also have to use the same clock, and do the same (time, count) pairs allocated from that clock. If D has some time-based events, the diagram would become:
. input | V timestamper <----------------------------- clock | | V | +---+ | | A | | +---+ | / \ | / \ | V V | +---+ +---+ | | B | | C | | +---+ +---+ scheduler/timestamper <------+ \ / + \ //+-------+ V VV +-----+ | D | +-----+
This would mean that when D gets a record from B, it would have to check the time not only in C but also in the timestamper. The timestamper can also do the amortization by telling the time of its next scheduled event. But that's a bit tricky: first, there would have to be some count part of the timestamp associated with that future time. There are two ways to go about it: either first tell the time without the count part (essentially, with it at 0), or allocate the count part in advance, when the event gets scheduled, and then just use that count when the timer fires. Second, it might happen that the processing of an incoming record would schedule a new, earlier event. But that's not a problem because the events can't be scheduled in the past. Since the scheduler would run as a part of D and synchronous with it, it could update its estimation and move the next event forward.
It would also have an interesting interaction with the replay: if we want to replay a timestamped log of incoming events and get the model to behave in exactly the same way as before, the scheduler events would have to happen at exactly the same times too. Which means them getting the exact same count part of the timestamp, and that normally won't be stable due to the races for the clock. But it can be resolved by recording the exact time of all the scheduled events into the same log and reusing it during the replay, the schedulers receiving their timestamps not from the clock but from the log, same as the incoming records.
What if we want to change the model between the reruns? Well, in this case we can discard the log for the schedulers, and just get the count values for them form fresh, and write the values into the new log. The running sequence would be slightly different than the first time, but since the model has changed, it wouldn't matter. Or we could even reuse the part of the log that is still applicable, merge it with the events from the new schedulers, and write into the new log. Either way, once the new log gets written, it can be reused again to produce the exact same result on the next replay.
Another interesting thing about the replay, is how do we transition from a replay to the live performance? If we have a recorded log from yesterday, want to replay it and then continue with today's data, what do we do with all the scheduled events that would have fired overnight? This basically suggests that we need to have some kind of "time reset events" that would be used when the time gets moved abruptly. It would allow the application logic to reset properly all the events that have been scheduled for the meantime- either just cancel them or execute them once, depending on the application semantics.
I think this should work quite well for the graphs without loops. The loops add a good deal of trouble. More on them later.
No comments:
Post a Comment