Here is how to do the critical sections (i.e. sleeplock mutex analog) properly in the asynchronous way. Consider that the chains of futures are always computed sequentially, one before another. So that's what we need to do to have the critical sections computed sequentially: arrange them into a dynamically built chain. Note that there can be more than one action chained to the same future, and that's what get used for synchronization: the same future at the end of critical section gets two actions chained to it. One is used to return the result and continue computation in the usual way, another one only looks for completion and starts the next thread's asynchronous section.
Suppose we have pseudo-code like this:
funcA() {
...
// this future completes when the code can enter the
// critical section
shared_ptr<Future<void>> futB = waitSyncB();
auto futX = futB->chain(funcCritC, ctx);
return futX;
}
// this function represents the critical section for serialization
// that may require sleep
funcCritC(
shared_ptr<Future<void>> input,
shared_ptr<SomeCtx> ctx,
shared_ptr<Promise<Sometype>> result)
{
...
auto futD = funcSleepD();
futD->chain(result);
}
The execution of this code can be described as a chain:
funcA -> waitSyncB -> futB -> funcCritC -> funcSleepD ->futD -> futX -> ...
With the critical section in it being the part
futB -> funcCritC -> funcSleepD -> futD -> futX
that may have a sleep in the middle (and funcSleepD also being a part of the critical section) . This chain can execute only when futB allows it, telling that the critical section became free. FutB has no value, it's used purely for synchronization. So if we have three threads executing this code (let's mark them as suffixes 1, 2, 3), they should be arranged in a chain
funcCritC1 -> funcSleepD1 ->futD1 ->futX1 -> futB2 -> funcCritC2 -> funcSleepD2 ->futD2 ->futX2 -> futB3 -> funcCritC3 -> funcSleepD3 ->futD3 -> futX3 ->
Note that the instances of futD here grow an extra connection. In addition to being connected to futX in the original chain, they become connected to funCritC of the original thread.
... futB1 -> funcCritC1 -> funcSleepD1 ->futD1 -> futX1 -> ...
|
V
futB2 -> funcCritC2 -> funcSleepD2 ->futD2 -> futX2
This basically means that the function waitSyncB() should create the connection FutX(N) -> FutB(N+1). And no serial executors are needed, this chain will execute sequentially on any executor!
Here is the pseudcode of an implementation:
class Serial {
public:
Serial()
: tail_(make_shared<Future<void>>())
{
// the critical section starts free
tail_.complete();
}
void serialize(
shared_ptr<Future<void>> &head,
shared_ptr<Promise<void>> &newtail)
{
newtail = make_shared<Promise<void>>();
head = newtail->to_future();
atomic_swap(tail_, head);
}
protected:
shared_ptr<Future<void>> tail_;
}
Then the code of funcA() becomes:
Serial asyncMutex;
funcA() {
...
// this future completes when the code can enter the
// critical section
shared_ptr<Future<void>> futB;
// we need to complete this future when exiting the
// critical section
shared_ptr<Promise<void>> futB_end;
asyncMutex.serialize(futB, futB_end);
auto futX = futB->chain_trampolined(funcCritC, ctx);
futX->chain(futB_end);
return futX;
}
There is a bit of general ugliness of the asynchronous programming where we need to know the result future before creating the chain that produces it (very similar to the issue with Triceps Labels) but otherwise it's straightforward and simple to use.
The chaining with chain_trampolined() is used here to request the next function to be run through a scheduler queue to avoid the situation when a long chain of waiting threads gets built up while one thread is waiting inside the critical section, and then everything in that tries to execute inlined (by direct calls rather than scheduling in the executor) and runs out of stack. In fact, this is such a typical and thorny issue that probably everything should be trampolined by default, and inlining (direct calls) done only when explicitly requested. But the creators of the asynchronous libraries tend to have an opposite opinion.
Another thing to note is that there is no atomic swap defined on the standard library shared pointers. You can make one by either using a traditional mutex as a spinlock, with regular swap under it, or define your own version of shared_ptr just for the futures that does have an atomic swap. It's such a convenient operation that making your own shared_ptr is worth it.
No comments:
Post a Comment