The two basic varieties of loops are where you either execute everything sequentially or schedule all the iterations in parallel. Suppose a loop consists of three parts A, B, C, connected by futures, and suppose the loop has 100 iterations. In the sequential form it unrolls into:
-> A1 -> B1 -> C1 -> A2 -> B2 -> C2 -> ... -> A100 -> B100 -> C100 ->
In the parallel form it becomes
/->A1 -> B1 -> C1 ---\
-> ... ->
\->A100->B100->C100 -/
An important thing to keep in mind for both forms is to prevent inlining at least at the start of each iteration. In the parallel form the inlining would kill the parallelism, and in either form it will likely overflow the stack by the recursive calls (300 of them in this example). This is a very good reason to ever avoid the inlining, and the tail scheduling slot from the previous installment is a good solution there.
In the parallel form all the futures at the end of each iteration are joined into one (usually ignoring the return value and becoming a void future) with an AllOf primitive, where the final future gets completed only after all the inputs get completed. It can be easily implemented by counting the completed futures, with pseudo-code like:
class AllOf {
public:
AllOf(count):
ctx_(make_shared<Context>{count})
{}
// inputs - a container of input future references
template<typename Container>
AllOf(const Container &inputs):
ctx_(make_shared<Context>{inputs.size})
{
for (cost auto &inf in inputs){
addInput(inf);
}
}
void addInput(shared_ptr<FutureBase> input)
{
// Note that the result promie is ignored
input->chain(advance, ctx_);
}
shared_ptr<Future<void>> getResult() const
{
ctx_->result_->to_future();
}
static void advance(
shared_ptr<FutureBase> input,
shared_ptr<Context> ctx,
shared_ptr<Promise<void>> result /*ignored*/)
{
if (atomic_decrease(ctx->count_) == 0) {
ctx->result_->complete();
}
}
protected:
struct Context : public ContextBase {
atomic_int count_;
shared_ptr<Promise<void>> result_ = make_promise<void>();
};
shared_ptr<Context> ctx_;
};
// used like this:
AllOf res(100);
for (int i = 0; i < 100; i++) {
res.addInput(shedule_iteration(ctx));
}
res.getResult->chain(...)
Although fundamentally this doesn't have to be just a counter, it can as well run a map-reduce aggregation pattern, where the "map" part is done in parallel as loop iterations, and "reduce" combines the results under a mutex in a class similar to AllOf here, and after all the inputs are processed, completes the output promise.
Note that there is no need to even collect all the inputs in an array as some libraries do, the constructor from a container is just a convenience. Once they're properly chained together, only the count matters. And it's easy to make a variation of the implementation where the count would be accumulated dynamically as the inputs are connected, and then finalized (it's important to not send out the result before finalization).
Now, what if you want to do something in between, a parallel loop with a limited parallelism? An executor with a limited number of threads won't work here because it limits the computational parallelism but here we want to limit the resource parallelism. Think for example of each iteration opening, proecessing, and closing a file descriptor, with a limited number of descriptors to be open at a time. Here we need a semaphore implemented with futures. A semaphore is kind of a mix of the AllOf pattern shown here and the asynchronous mutex pattern shown before. Of course, the easiest way is to just serialize the loop by partitioning it into K parts, each part fully serial. But that's not the same thing as the semaphore where the next iteration gets started when any of the running iterations complete, a semaphore is better at balancing the load.
A path to implementing a semaphore would be to keep two lists, one of the completed futures, another one of waiting promises, one of the lists being non-empty at each time. If a new promise gets chained when there is a spare completed future on the list, they can be "annihilated" by using the future to complete the promise and forgetting both (but pre-rigging the chain of execution off that promise that will return a completed future to the semaphore at the end). When a completed future gets added and there is a promise waiting, they can also be "annihilated" in the same way. Otherwise the future or promise gets added to the appropriate list. If we start by adding all the promises first, and telling the semaphore the degree of parallelism K, then the semaphore can also signal the completion of work when it collects all K futures on their list, completing another special promise.
Finally, there is a pattern of retry loop for manual programming that allows to reduce the number of small functions. Sometimes you need to prepare multiple resources before starting an operation:
if (resource_a == nullptr) {
resource_a = acquire_a(); // may sleep
}
if (resource_b == nullptr) {
resource_b = acquire_b(); // may sleep
}
operation(resource_a, resource_b);
An interesting property of this preparation is that it can be restarted from scratch with little cost. So instead of writing little fragment functions to do one check and one assignment per function, we can write one function and schedule it recursively after each acquisition, making a state machine to remember where we left off last time:
void startOperation(
shared_ptr<FutureBase> input,
shared_ptr<Context> ctx,
shared_ptr<Promise<Sometype>> result)
{
switch(ctx->state) {
case AFTER_A:
ctx->resource_a =
static_cast<Future<ResourceA> *>(input.get())->value();
break;
case AFTER_B:
ctx->resource_b =
static_cast<Future<ResourceB> *>(input.get())->value();
break;
}
ctx->state = INIT;
if (ctx->resource_a == nullptr) {
state = AFTER_A;
acquire_a()->chain(startOperation, ctx);
return;
}
if (ctx->resource_b == nullptr) {
state = AFTER_B;
acquire_b()->chain(startOperation, ctx);
return;
}
operation(ctx->resource_a, ctx->resource_b)->chain(result);
}
It's doesn't look like the most efficient way of execution but surprisingly, if the resources are typically already acquired, it is, doing the whole check and initiating the operation in a single function. It's also a very efficient way to make the program more readable by humans, without a myriad of tiny functions.
P.S. See more about the loops in the installment on error handling.
No comments:
Post a Comment