Thursday, June 6, 2013

reordering the data from multiple threads, part 1

The pipelined examples shown before had very conveniently preserved the order of the data while spreading the computational load among multiple threads. But it forced the computation to pass sequentially through every thread, increasing the latency and adding overhead. The other frequently used option is to farm out the work to a number of parallel threads and then collect the results back from them, as shown in the Fig. 1. This topology is colloquially known as "diamond" or "fork-join" (having nothing to do with the SQL joins, just that the arrows first fork from one box to multiple and then join back to one box).

Fig. 1. Diamond topology.

There are multiple way to decide, which thread gets which unit of data to process. One possibility that provides the natural load balancing is to keep a common queue of work items, and have the worker threads (B and C in Fig. 1) read the next work item when they become free after processing the last item. But this way has an important limitation: there is no way to tell in advance, which work item will go to which thread, so there is no way for the worker threads to keep any state. All the state would have to be encapsulated into the work item (that would be a tray in the Triceps terms). And at the moment Triceps provides no way to maintain such a shared queue.

The other way is to partition the data between the worker threads based on the primary key. Usually it's done by using a hash of the key, which would distribute the data randomly and hopefully evenly. Then a worker thread can keep the state for its subset of keys, forming a partition (also known as "shard") and process the sequential updates for this partition. The way to send a rowop only to the thread B or only to the thread C would be by having a separate designated nexus for each worker thread, and the thread A making the decision based on the hash of the primary key in the data. The processed data from all the worker threads can then be sent to a single nexus feeding the thread D.

But either way the data will arrive at D in an unpredictable order. The balance of load between the worker threads is not perfect, and there is no way to predict, how long will each tray take to process. A tray following one path may overtake a tray following another path.

If the order of the data is important, D must collate the data back into the original order before processing it further. Obviously, for that it must know what the original order was, so A must tag the data with the sequential identifiers. Since the data rows themselves may get multiplied, the tagging is best done at the tray level (what happens if the trays split in the worker threads is a separate story for the future, for now the solution is simply "keep the tray together").

When the data goes through a nexus, Triceps always keeps a tray as in indivisible bundle. It always gets read from the reading facet together, without being interspersed with the data from the other trays. As the data is sent into a writer facet, it's collected in a tray, and sent on only when the facet gets flushed, with the TrieadOwner::flushWriters() or Facet::flushWriter().

There also are two special labels defined on every nexus, that tell the tray boundaries to the reading thread:

  • _BEGIN_ is called at the start of the tray.
  • _END_ is called at the end of the tray.

These labels can be defined explicitly, or otherwise they become defined implicitly anyway. If they get defined implicitly, they get the empty row type (one with no fields). If you define them explicitly, you can use any row type you please, and this is a good place for the tray sequence id.

And in a writer facet you can send data to these labels. When you want to put a sequence id on a tray, you define the _BEGIN_ label with that sequence id field, and then call the _BEGIN_ label with the appropriate id values. Even if you don't call the _BEGIN_ and _END_ labels, they do get called (not quite called but placed on the tray) automatically anyway, with the opcode of OP_INSERT and all the fields NULL. The direct calling of these labels will also cause the facet to be flushed: _BEGIN_ flushes any data previously collected on the tray and then adds itself; _END_ adds itself and then flushes the tray.

The exact rules of how the _BEGIN_ and _END_ get called are actually somewhat complicated, having to deal with the optimizations for the case when nobody is interested in them, but they do what makes sense intuitively.

The case when these labels get a call with OP_INSERT and no data gets optimized out by not placing it into the actual Xtray, even if it was called explicitly. Then in the reader facet these implicit rowops are re-created but only if there is a chaining for their labels from the facet's FnReturn or if they are defined in the currently pushed FnBinding. So if you do a trace on the reading thread's unit, you will see these implicit rowops to be called only if they are actually used.

No comments:

Post a Comment