Saturday, February 15, 2025

Asynchronous programming 11 - writing libraries

Writing the libraries in the asynchronous way is a special challenge. There is so much context attached to the futures, and implicitly inherited between them (especially through inlining) that it tends to spill over in all the wrong places. The libraries have to take explicit steps to prevent this spill-over from their callers and to their callers. And of course the libraries at the deeper levels have to do this too.

The most frequent and absolutely worst issue is with inlining. Getting rid of the whole inlining debacle by using a delayed scheduling slot solves this problem. Otherwise you have to explicitly chain every future entering your library to your own trampolined future, and also mark every future exiting your library as trampolined.

The executors represent another issue, just as bad. You don't want your code to run on the caller's executor, and don't want caller's code to run on your executor. So not only trampoline the inputs but trampoline on your executor. But this doesn't solve the exit side, not letting the user code processing your result to run on your executor. You need not only have the caller provide the result promises but also make sure that they're trampolined on a specific executor. However, like for inlining, there is an easy solution: get rid of the explicit executors in the asynchronous subsystem altogether. There is no good reason to use the serial executors, they're all pain and no gain, and all the parallel executors are effectively equivalent, so having one global executor per process is sufficient. If you want to limit the degree of multithreaing, use some form of semaphores.

Next, do the error handling and cancellations right, this usually requires some thinking through of your code. 

Another item that can make sense is priorities. There would be priority inversions as usual, but not any worse than with synchronous programming. In fact, it could be resolved with priority inheritance in a fairly straightforward way: the cancellations propagate (and stop propagation) in the same way as the priority inheritance would, so all we need to do is to stick the priority propagation on top of the pre-existing cancellation mechanism (of course if it does pre-exist).

And as the last point, try to avoid using raw pointers as arguments, this will save great many issues with the memory getting freed under them. Use the reference-counted shared pointers instead whenever possible.

Asynchronous programming 10 - error handling

What if a computation fails? Then the future still gets completed but has to return an error. At the very least, if the future can return only one value, the returned object should have a place for error indication. But a better asynchronous library would have a place for error indication right in the futures. BTW, to do it right, it shouldn't be just an error code, it should be a proper error object that allows nested errors, and ideally also error lists, like Error in Triceps. 

So suppose that our futures do have an error indication. How can these errors be handled? 

Chaining between futures is easy: just the error chains through in the same way as the value. A nice consequence is that the asynchronous lock pattern and other patterns like that just work transparently, releasing the lock in any case, error or no error. However an error object may have references to the objects that we might not want to be stuck in the state of an asynchronous lock. And we don't want the unrelated code that locks the mutex next to start with an error. An error is applicable even to a void future, so it would get stuck even in one of those. So there should be a separated case of chaining to a void future that just passes through the completion but not the error. If your library doesn't have one, you can make one with a function chained to the first future and freshly completes the second future, ignoring the error.

Chaining functions is more complicated. In the simplest case we can just let the function check for error and handle it (and that's a good reason to have the whole input future as an argument and not just the value from it). But that means doing a lot of the same boilerplate error propagation code in a lot of functions.

The other option is to have the chaining code propagate the error directly form the input future to the result promise, ignoring the function in this case, and basically cancelling the chain. This is very much like how the exceptions work in normal programming, just skipping over the rest of function and returning an error, so this behavior should be the "normal" chaining, while a function that handles the error in its input future is more like a "catch" or "finally" statement. Note that if the function get skipped in case of an error, it doesn't really need to see the whole input future, it could as well get just the value from it. With this option, if you've prepared a million-long chain for a loop (not a great idea, better generate each iteration on by one), it will all get cancelled on the first error.

The third option (and these options are not mutually exclusive, they all can be available to use as needed) is to chain a function specifically as an error handler, to be run on error only. Which is even more like a "catch" statement than the previous case. But there is a catch: this makes the chain branch, which means that eventually the normal and error paths have to join back together with an AllOf. Which is not only a pain to always add explicitly but it also implies that the error path somehow has to complete even if there is no error, so again there has to be a chain cancellation logic for the error handlers but working in the opposite way, ignoring the functions on success. That's probably not worth the trouble, so the handling of errors by a separate function makes sense mostly as an one-off where depending on the  success of the input future either the normal function or the error handler function get called, sending their result to the same promise, so the fork gets immediately joined back. This is just like doing an if-else in one function but has the benefit of allowing the composition, reusing the same error handling function with many normal processing functions, being truly like a "catch" statement. This pattern is particularly convenient for adding some higher-level details to the error object (as in building a stack trace in normal programming).

The next item is the error handling in AllOf, or in its generalization to map-reduce. How much do we care that some of the branches have failed? A typical case would probably be where we want all of them to succeed, so for that AllOf should collect all the errors from all the branches into one error object.

What about a semaphore? There in a natural way any returned error would cause the whole semaphore to be cancelled. If the semaphore represents a limited-parallelized loop, that's what we'd probably want anyway. Well, due to the parallelism, there might be hiccups where there might be some more iterations scheduled by the other futures completing normally at the same time as the one with the error. One possible race comes from the semaphore logic picking one future from the run queue, executing it, and then coming back when its result promise completes. The queue itself is unlocked after the head future got picked from it, so another completed future would pick the next head future from the queue before the first one completes the loop of error propagation. Another possible race comes from the part where it's OK to add more work to the semaphore while running on one of its own chains. So if instead of pre-generating all the iteration head futures in advance we just put into the semaphore one future with a function than on running generates the head of one iteration (but doesn't start it yet!), then reattaches itself to the semaphore, and then lets that iteration run by chaining it to the input future. And of course if this iteration generation function doesn't check for errors,  the reattached copy can grab a successfully completed future and run an iteration. So it would help to also add explicit logic to the semaphore that just cancels all the outstanding incoming futures once one of them gets an error. And also could pay attention to errors in the iteration generation function to stop generating once an error is seen.

Of course, not every semaphore needs to be self-collapsing on error, some of them are used for general synchronization, and should ignore the errors.

The most complicated question of error handling is: can we stop the ongoing parallel iterations of a parallel loop when one iteration gets an error? This can be done by setting a flag in a common context, checking it in every function, and bailing out immediately with a special error if this flag is set. This is kind of a pain to do all over the place. So maybe this can be folded into the futures/promises themselves: create a cancellation object and attach it to the futures, so when completing a future with a cancellation object attached, it would check if the cancellation is true and replace the result with a cancellation error instead. Note that this would not happen everywhere but only on the futures where the cancellation object is attached. So when you call into a library, you can't attach the cancellation objects to the futures created inside the library. And you can't always  quickly cancel the future that waits for the library call to return because the library might still be using some memory in your state (although this of course depends a lot on the library API, if all the memory is controlled by reference counters then we don't care, we can just let it run and ignore the result). 

Can we propagate the cancellation object between futures, so that they would even go through the insides of a library? Generally, yes, we can do it on chaining, But that takes some care. 

First, the propagation must stop once we reach the end of the whole logical operation, and also must stop when we go to the void futures for the patterns like the asynchronous mutex. And stop even for non-void futures in the patterns like the cache, where one caller asking to cancel the read shouldn't cancel the read for everyone. 

Second, the functions that create intermediate promises form scratch must have a way to propagate the cancellations from their inputs to these newly created promises. 

Third, the libraries need to be able to do their own cancellations too, so it's not a single cancellation object but a set of cancellation objects per future, with the overhead of checking them all on every step (and yes, also with overhead of attaching them all to every step). Although if the sets are not modified often, maybe an optimized version can be devised where the overhead is taken at the set creation time and then the set consolidates the state of all the cancellations in it, making necessary to attach only one set and check the state of only one set. 

Fourth, what about the system calls to the OS, which on a microkernel OS would likely translate to calls in the other processes? The cancellation state cannot be read from other address spaces. Which basically means that as we cross the address space boundary, we need to create a matching cancellation object (and here treating the whole set of cancellation objects as one object helps too) on the other side, remember this match on our side, and then have a system call that would propagate the cancellation to the other side. Fairly complicated but I think doable. Of course, at some point this whole path will get down to the hardware, and there we won't be able to actually interrupt an ongoing operation, but we can arrange to ignore its result and return. And there are things that can't be ignored, for example an app might suddenly stop caring whether its buffer write has succeeded or not, but a filesystem can't ignore whether a metadata block write succeeded or not. However this filesystem shouldn't keep the app waiting, if the app has lost interest, the filesystem can sort out its metadata writes in the background.

Fifth, between this filesystem write example and the cache example, a cancellation flag also needs to have a future connected to it, that would get completed with a cancellation error when the cancellation is triggered. We can then chain from this future directly to the result future of the cache read or block write, "overtaking" the normal result to essentially do an "anyOf", with the first completion setting the result (including error) into the future and any following completion attempts to set the result getting ignored. A catch is that when one path completes, the other will still hold a reference on the result future, potentially causing the unpleasant memory leaks. And also the cancellation future would keep accumulating these chainings to it after each operation under it gets normally completed. Maybe the cancellation objects would be short-lived and this wouldn't be a real problem. Or maybe this will require to think of a way for un-chaining once it gets overtaken by completion of another path.

The final thing to say is that the C++ coroutines don't seem smart enough to translate the error handling in promises to look like exception handling at high level.  And this is a very important subject, so maybe the coroutines are not the answer yet.

Wednesday, February 12, 2025

Asynchronous programming 9 - composition

 Consider this traditional function:

void writeHeader()
{
  char buf[512];
  // ... populate the bufer ...
  write(buf, 512);
}

and its asynchronous version:

void writeHeader(
  shared_ptr<FutureBase> input,
  shared_ptr<HeaderCtx> ctx,
  shared_ptr<WritePromise> result)
{
  char buf[512];
  // ... populate the bufer ...
  write(buf, 512)->chain(result);
}

What is wrong with it? The buffer on the stack gets freed before the write completes and the next scheduled function fills it with garbage. The next version:

HeaderCtx {
  ...
  char buf[512];
};

void writeHeader(
  shared_ptr<FutureBase> input,
  shared_ptr<HeaderCtx> ctx,
  shared_ptr<WritePromise> result)
{
  // ... populate the bufer ...
  write(ctx->buf, 512)->chain(result);
}

Potentially better, with buffer in the context (remember, the context is an analog of a stack frame in the normal functions) but now the context gets freed immediately after writeHeader() returns too! So no, not really better. What we need is to keep the context alive until the write completes. It can be done like this:

void empty(
  shared_ptr<FutureBase> input,
  shared_ptr<void> ctx,
  shared_ptr<Promise<void>> result)
{}

void writeHeader(
  shared_ptr<FutureBase> input,
  shared_ptr<HeaderCtx> ctx,
  shared_ptr<WritePromise> result)
{
  // ... populate the bufer ...
  auto wres = write(ctx->buf, 512)
  wres->chain(result);
  wres->chain(empty, ctx);
}

or in a slightly different version:

void empty(
  shared_ptr<FutureBase> input,
  shared_ptr<void> ctx,
  shared_ptr<PromiseBase> result)
{
  input->chain(result);
}

void writeHeader(
  shared_ptr<FutureBase> input,
  shared_ptr<HeaderCtx> ctx,
  shared_ptr<WritePromise> result)
{
  // ... populate the bufer ...
  auto wres = write(ctx->buf, 512)
  wres->chain(empty, ctx)->chain(result);
}

The empty function does nothing, it's just a placeholder  for the context to be kept alive in a chained promise until the write completes. Note that the same empty function can be used in all the places where this functionality is needed.

Which brings us to the point that instead of writing custom snippets for everything, we might be able to compose a good deal of computation out of pre-defined functions.

One repeating example has been storing the result of a computation in a variable. It can be done as a reusable function that gets an address to store as its context (and that's one of the examples where the context would be better as just a pointer instead of a shared_ptr) and stores the value of a given type from its input future. Considering that a future has two separate meanings, returning the value and signaling the completion, we could even define a separate specialized kind of future that would store the value at a given address instead of keeping it internally.

Another obvious possible composition is in collecting the arguments of the asynchronous functions. It would make sense to be able to compute the arguments in parallel, then call the function. And it's not that hard to do. An asynchronous function in any case consists of multiple plain functions: "header function" and "continuation functions", with the context passed to the continuation functions being the stack frame of the asynchronous function, with the context allocated and  needed arguments copied into the context by the header part. How about we make the function arguments into a structure and pass it as context to the header part of the asynchronous function? Which would now become not called directly but chained to the completion of the context. Which in turn would be driven by AllOf for completion of the computation of all the arguments (stored into the structure on completion as discussed above), and sometimes perhaps one more function, telling that the previous computation in the sequence has completed. Not every argument has to be computed asynchronously, they could be assigned synchronously, and then there just won't be a future for this argument to include into AllOf. To reduce the overhead, potentially the arguments structure can be passed not as a shared_ptr but as a plain pointer, owned by the calling function (as the arguments are on the stack for the plain functions) - then of course the calling function needs to make sure that the argument structure lives throughout the call, as been shown above with the buffer.

Well, if you're using coroutines, the compiler would probably do all that for you, generating just enough of the small functions on the fly. If the coroutines don't work for you, the missing custom fragments can probably be filled in the modern C++ with lambdas. Lambdas can be combined with the macros too if you really want to.

One thing that can be said about large collections of small functions calling each other through a scheduler, is that they'll never be very efficient. Although they can be a little more efficient if instead of returning back they could be made to jump straight to the next function in the chain, such as if the entry address of the next function is pushed onto the stack instead of the return address. In fact, I've started writing this series because of a something that I've read recently, about a virtual machine in an ancient DBMS that worked exactly like this, instead of returning from a function having an instruction (PC = (RCHAIN)+), so that a sequence of function addresses to call would be prepared in memory, RCHAIN initialized pointing to the start of it, and then calling this instruction to jump to the first function in the sequence.

Sunday, February 9, 2025

Asynchronous programming 8 - loops and a semahore

The two basic varieties of loops are where you either execute everything sequentially or schedule all the iterations in parallel. Suppose a loop consists of three parts A, B, C, connected by futures, and suppose the loop has 100 iterations. In the sequential form it unrolls into:

-> A1 -> B1 -> C1 -> A2 -> B2 -> C2 -> ... -> A100 -> B100 -> C100 ->

 In the parallel form it becomes

   /->A1 -> B1 -> C1 ---\
->          ...          ->
   \->A100->B100->C100 -/

An important thing to keep in mind for both forms is to prevent inlining at least at the start of each iteration. In the parallel form the inlining would kill the parallelism, and in either form it will likely overflow the stack by the recursive calls (300 of them in this example). This is a very good reason to ever avoid the inlining, and the tail scheduling slot from the previous installment is a good solution there.

In the parallel form all the futures at the end of each iteration are joined into one (usually ignoring the return value and becoming a void future) with an AllOf primitive, where the final future gets completed only after all the inputs get completed. It can be easily implemented by counting the completed futures, with pseudo-code like:

class AllOf {
public:
  AllOf(count):
    ctx_(make_shared<Context>{count})
  {}

  // inputs - a container of input future references
  template<typename Container>
  AllOf(const Container &inputs):
     ctx_(make_shared<Context>{inputs.size})
  {
    for (cost auto &inf in inputs){
      addInput(inf);
   }
  }

  void addInput(
shared_ptr<FutureBase> input)
  {
    // Note that the result promie is ignored
    input
->chain(advance, ctx_);
  }

 
shared_ptr<Future<void>> getResult() const
  {
    ctx_->result_->to_future();
  }

  static void advance(
    shared_ptr<FutureBase> input,
    shared_ptr<Context> ctx,
    shared_ptr<Promise<void>> result /*ignored*/)
  {
    if (atomic_decrease(ctx->count_) == 0) {
      ctx->result_->complete();
    }
  }  
protected:
  struct Context : public ContextBase {
    atomic_int count_;
    shared_ptr<Promise<void>> result_ = make_promise<void>();
  };
  shared_ptr<Context> ctx_;
};

// used like this:
  AllOf res(100);
  for (int i = 0; i < 100; i++) {
    res.addInput(shedule_iteration(ctx));
  }
  res.getResult->chain(...)

Although fundamentally this doesn't have to be just a counter, it can as well run a map-reduce aggregation pattern, where the "map" part is done in parallel as loop iterations, and "reduce" combines the results under a mutex in a class similar to AllOf here, and after all the inputs are processed, completes the output promise.

Note that there is no need to even collect all the inputs in an array as some libraries do, the constructor from a container is just a convenience. Once they're properly chained together, only the count matters. And it's easy to make a variation of the implementation where the count would be accumulated dynamically as the inputs are connected, and then finalized (it's important to not send out the result before finalization).

Now, what if you want to do something in between, a parallel loop with a limited parallelism? An executor with a limited number of threads won't work here because it limits the computational parallelism but here we want to limit the resource parallelism. Think for example of each iteration opening, proecessing, and closing a file descriptor, with a limited number of descriptors to be open at a time. Here we need a semaphore implemented with futures. A semaphore is kind of a mix of the AllOf pattern shown here and the asynchronous mutex pattern shown before. Of course, the easiest way is to just serialize the loop by partitioning it into K parts, each part fully serial. But that's not the same thing as the semaphore where the next iteration gets started when any of the running iterations complete, a semaphore is better at balancing the load.

A path to implementing a semaphore would be to keep two lists, one of the completed futures, another one of waiting promises, one of the lists being non-empty at each time. If a new promise gets chained when there is a spare completed future on the list, they can be "annihilated" by using the future to complete the promise and forgetting both (but pre-rigging the chain of execution off that promise that will return a completed future to the semaphore at the end). When a completed future gets added and there is a promise waiting, they can also be "annihilated" in the same way. Otherwise the future or promise gets added to the appropriate list. If we start by adding all the promises first, and telling the semaphore the degree of parallelism K, then the semaphore can also signal the completion of work when it collects all K futures on their list, completing another special promise.

Finally, there is a pattern of retry loop for manual programming that allows to reduce the number of small functions. Sometimes you need to prepare multiple resources before starting an operation:

if (resource_a == nullptr) {
  resource_a = acquire_a(); // may sleep
}
if (resource_b == nullptr) {
  resource_b = acquire_b(); // may sleep
}
operation(resource_a, resource_b);

An interesting property of this preparation is that it can be restarted from scratch with little cost. So instead of writing little fragment functions to do one check and one assignment per function, we can write one function and schedule it recursively after each acquisition, making a state machine to remember where we left off last time:

void startOperation(
  shared_ptr<FutureBase> input,
  shared_ptr<Context> ctx,
  shared_ptr<Promise<Sometype>> result)
{
  switch(ctx->state) {
  case AFTER_A:
    ctx->resource_a =
      static_cast<Future<ResourceA> *>(input.get())->value();
    break;
  case AFTER_B:
    ctx->resource_b =
      static_cast<Future<ResourceB> *>(input.get())->value();
    break;
  }
  ctx->state = INIT;

  if (ctx->resource_a == nullptr) {
    state = AFTER_A;
    acquire_a()->chain(startOperation, ctx);
    return;
  }
  if (ctx->resource_b == nullptr) {
    state = AFTER_B;
    acquire_b()->chain(startOperation, ctx);
    return;
  }
  operation(ctx->resource_a, ctx->resource_b)->chain(result);
}

It's doesn't look like the most efficient way of execution but surprisingly, if the resources are typically already acquired, it is, doing the whole check and initiating the operation in a single function. It's also a very efficient way to make the program more readable by humans, without a myriad of tiny functions.

P.S. See more about the loops in the installment on error handling.

Saturday, February 8, 2025

Asynchronous programming 7 - caching

Caching of values becomes easy with the futures because the futures take care of almost all the needed synchronization. The combination futures and shared_ptr also transparently solves the issues with what to do with the ownership of an object discarded from cache if it's still used by a reader (that reader will have the last shared_ptr) and what to do if the cache is full of incomplete reads (add the entry to the cache anyway, so that if anyone else tries to read it in the meantime, they will wait for it and won't initiate another read, and then chain the reading from the completion of the previous read). The pseudo-code looks like this:

class Cache {
public:
  shared_ptr<Future<Value>> read(Key k) {
    scopelock(mutex_);
    auto it = data_.find(k);
    if (it != data.end()) {
      // bump the key in LRU order
      lru_.remove(times_[k]);
      times_[k] = ++generation_;
      lru_[generation_] = k;
      // note that the future might not be completed yet!
      // but that's OK, the caller will wait for it
      return it->second;
    }
    tryRemoveOldL();
    // the new key gets always inserted, even if the cache is overflowing
    times_[k] = ++generation_;
    lru_[generation_] = k;
    if (data_.size < SIZE_LIMIT) {
      data_[k] = reading_ = readValue(k);
    } else {
      // wait for the last read to complete before reading this key
      auto ctx = make_shared<WaitCtx> {this, key};
      shared_ptr<Promise<Value>> result = make_promise<Value>();
      auto prev_reading = reading_;
      data_[k] = reading_ = result->to_future();
      prev_reading->chain(readDelayed, ctx)->chain(result);
    }
    return data[k];
  }

protected:
  // context for waiting to read a key
  struct WaitCtx {
    Cache *cache;
    Key key;
  };

  // If the cache is overflowing, tries to remove the oldest completed
  // element, expects the object to be already locked.
  // Returns true if either the cache wasn't overflowing or an element
  // got successfully discarded from it.
  bool tryRemoveOldL() {
    if (data_.size >= SIZE_LIMIT) {
      for (it in lru_) {
        if (data_[it->second]->is_completed()) {
          Key kold = it->second;
          data_.remove(kold);
          lru_.remove(times_[kold]);
          times_.remove(kold);
          return true;
        }
      }
      return false;
    }
    return true;
  }

  shared_ptr<Future<Value>> readValue(Key k) {
    ...
  }

  static void readDelayed(
    shared_ptr<Future<Value>> input,
    shared_ptr<WaitCtx> ctx,
    shared_ptr<Promise<Value>> result)
  {
    scopelock(ctx->cache->mutex_);
    if (
ctx->cache->tryRemoveOldL()) {
     
ctx->cache->readValue(ctx->key)->chain(result);
    } else {
      // oops, at least our input element should have completed,
      // this means it got stolen from us in a race between its
      // completion and this function running, need to delay again
      auto prev_reading =
ctx->cache->reading_;
     
ctx->cache->reading_ = result->to_future();
      prev_reading->chain(readDelayed, ctx)->chain(result);
    }
  }

  Mutex mutex_;
  map<Key, shared_ptr<Future<Value>>> data_;
  map<Key, int64_t> times_;
  map<int64_t, Key> lru_;
  int64_t generation_ = 0;
  shared_ptr<Future<Value>> reading_;
};

This code is much simpler than the example I've shown in "The Practice of Parallel Programming", so finally the asynchronous programming is good for something!

However the scheduling of the asynchronous functions doesn't mesh well with the object methods, those have to be made into the static methods (perhaps there is some better solution?).

There is also a special case of caching, the lazy reading of exactly one object. That requires only a mutex and a future. Check under mutex if the future reference is null, and if so then initiate the read and remember the result future from it. Otherwise just return the future, and it doesn't matter at this point if the future is completed or not, it will be the caller's responsibility to wait for it.

Asynchronous programming 6 - inlining done right

To recap, "inlining" is when we complete a future that has some function chained to it (directly or through other futures), and that function gets immediately executed from the completion library call. The opposite is "trampolining" where this function gets put into a scheduler queue and executed later.

Inlining allows to save on the cost of scheduling, and also keeps the cache hot: completing a future means that we've just put some value into it, and so reading that value (and other values it's connected to) immediately means that it will still be in the cache.

However inlining can also be self-defeating: suppose we want to complete ten futures, each with a function chained to it. If trampolined, ten CPUs can pick them from the scheduler queue and execute in parallel. But inlining would inadvertently cause them to be executed sequentially.

The reality is that inlining is only efficient when it's done at the tail of the current function. On the other hand, the issues with inlining (stack overflows and bad interactions with mutexes and serializing the parallel execution) can be avoided if the inlined function was called only after the current function returns.

Put this way, the straightforward solution is to replace inlining with a special case of trampolining via a "delayed scheduling slot": have a special thread-local variable in the scheduler, sufficient to hold a reference to a single scheduled function. Whenever a future is completed, put one chained function there and schedule the rest as usual. If the delayed slot is already used, then it can be either left as-is and all the new functions scheduled as usual, or in the hope that the later completions have a hotter cache, move the old contents of the delayed slot into the normal scheduling queue and put the new function there. Then when the current asynchronous function is completed, have the scheduler code check the delay slot, and if not empty, call the function from there.

This can be expressed in pseudocode:

thread_local<Function> delaySlot;

complete_future(Future fut)
{
  FunctionList funcs;
  FutureList recfut;

  recfut.insert(fut);

  for (f in recfut) {
    scopedlock(f);

    if (f.completed)
      continue;  // a double completion, nothing to do 

    funcs.merge(f.chained_functions);
    f.chained_functions.clear();

    recfut.merge(f.chained_futures);
    f.chained_futures.clear();
  }
  if (delaySlot.empty() && funcs.size() == 1) {
    delaySlot = funcs.front();
  } else if (!funcs.empty()) {
    scopelock(schedulerQueue);
    if (!delaySlot.empty()) {
      schedulerQueue.insert(delaySlot);
    }
    delaySlot = funcs.front();
    funcs.pop_front();
    for (fun in funcs) {
      schedulerQueue.insert(fun);
  }
}

scheduler_thread()
{
  while (true) {
    Function f;
    if (!delaySlot.empty()) {
      f = delaySlot;
      delaySlot.clear();
    } else {
      f = getNextFromQueue();
    }
    execute(f);
  }
}

Another interesting point is that cache locality gets improved by unfair scheduling, inserting the new functions at the front of the scheduling queue, with the premise that the more recent inserts will have a hotter cache. It's not exactly unfair either: Remember that in asynchronous programming the sequential execution gets broken up into a sequence of separate small functions. And so the most recently scheduled function is likely the continuation of the previous function, and running it first is completely fair, with the scheduling queue becoming a representation of the call stack, the innermost "return addresses" being at the front of the queue.

This is very similar to Triceps's scheduling logic, following from the same reasoning. Or to put it differently, this is the reason why Triceps's scheduling logic should also be used for the asynchronous programming in general.

Asynchronous programming 5 - critical sections the right way

Here is how to do the critical sections (i.e. sleeplock mutex analog) properly in the asynchronous way. Consider that the chains of futures are always computed sequentially, one before another. So that's what we need to do to have the critical sections computed sequentially: arrange them into a dynamically built chain. Note that there can be more than one action chained to the same future, and that's what get used for synchronization: the same future at the end of critical section gets two actions chained to it. One is used to return the result and continue computation in the usual way, another one only looks for completion and starts the next thread's asynchronous section.

Suppose we have pseudo-code like this:

funcA() {
  ...
  // this future completes when the code can enter the
  // critical section
  shared_ptr<Future<void>> futB = waitSyncB();
  auto futX = futB->chain(funcCritC, ctx);
  return futX;
}

// this function represents the critical section for serialization
// that may require sleep
funcCritC(
  shared_ptr<Future<void>> input,
  shared_ptr<SomeCtx> ctx,
 
shared_ptr<Promise<Sometype>> result)
{
  ...
  auto futD = funcSleepD();
  futD->chain(result);
}

The execution of this code can be described as a chain:

funcA -> waitSyncB -> futB -> funcCritC -> funcSleepD ->futD -> futX -> ...

With the critical section in it being the part

futB -> funcCritC -> funcSleepD -> futD -> futX

that may have a sleep in the middle (and funcSleepD also being a part of the critical section) . This chain can execute only when futB allows it, telling that the critical section became free. FutB has no value, it's used purely for synchronization. So if we have three threads executing this code (let's mark them as suffixes 1, 2, 3), they should be arranged in a chain

funcCritC1 -> funcSleepD1 ->futD1 ->futX1 -> futB2 -> funcCritC2 -> funcSleepD2 ->futD2 ->futX2 -> futB3 -> funcCritC3 -> funcSleepD3 ->futD3 -> futX3 ->

Note that the instances of futD here grow an extra connection. In addition to being connected to futX in the original chain, they become connected to funCritC of the original thread.

... futB1 -> funcCritC1 -> funcSleepD1 ->futD1 -> futX1 -> ...
                                                  |
                                                  V
                                                  futB2 -> funcCritC2 -> funcSleepD2 ->futD2 -> futX2

This basically means that the function waitSyncB() should create the connection FutX(N) -> FutB(N+1). And no serial executors are needed, this chain will execute sequentially on any executor!

Here is the pseudcode of an implementation:

class Serial {
public:
  Serial()
    : tail_(make_shared<Future<void>>())
  {
    // the critical section starts free
    tail_.complete();
  }
 
  void serialize(

    shared_ptr<Future<void>> &head,
    shared_ptr<Promise<void>> &newtail)
  {
    newtail =
make_shared<Promise<void>>();
    head = newtail->to_future();
    atomic_swap(tail_, head);
  }

protected:
 shared_ptr<Future<void>> tail_;
}

Then the code of funcA() becomes:

Serial asyncMutex;

funcA() {
  ...
  // this future completes when the code can enter the
  // critical section
  shared_ptr<Future<void>> futB;
  // we need to complete this future when exiting the
  // critical section
  shared_ptr<Promise<void>> futB_end;
 
asyncMutex.serialize(futB, futB_end);
  auto futX = futB->chain_trampolined(funcCritC, ctx);
  futX->chain(futB_end);
  return futX;
}

There is a bit of general ugliness of the asynchronous programming where we need to know the result future before creating the chain that produces it (very similar to the issue with Triceps Labels) but otherwise it's straightforward and simple to use.

The chaining with chain_trampolined() is used here to request the next function to be run through a scheduler queue to avoid the situation when a long chain of waiting threads gets built up while one thread is waiting inside the critical section, and then everything in that tries to execute inlined (by direct calls rather than scheduling in the executor) and runs out of stack. In fact, this is such a typical and thorny issue that probably everything should be trampolined by default, and inlining (direct calls) done only when explicitly requested. But the creators of the asynchronous libraries tend to have an opposite opinion.

Another thing to note is that there is no atomic swap defined on the standard library shared pointers. You can make one by either using a traditional mutex as a spinlock, with  regular swap under it, or define your own version of shared_ptr just for the futures that does have an atomic swap. It's such a convenient operation that making your own shared_ptr is worth it.

Wednesday, February 5, 2025

Asynchronous programming 4 - a look under the carpet

 

In this part I want to go over some simplifications I've made in the first part, mostly because some things are wrong and should never be used. Here I want to talk about them, and also about the solutions for the same problems that should be used instead.

Back then I've said that there is no await() in asynchronous programming, but usually there is. Just that it should never be used because it leads to very bad deadlocks. In particular, people tend to use it in combination with the serial executor as a way of locking, to run some blocking code without releasing the executor thread. If some of the called code wants to run on the same executor (essentially, doing a recursive lock), the code will wait on its queue forever and thus deadlock. It's not really await()'s problem, the same would happen with any recursive lock including the patterns that I'll show soon, but people are less aware of the issue with await() and proudly feel like they've "cheated the system".

And I've already mentioned that there is no good reason to use the serial executor at all, there are better patterns. These better patterns rely on something that I haven't mentioned yet: the mutexes. Which are available with asynchronous programming but need to be treated somewhat differently than in common programs. In asynchronous programming they need to be treated as spinlocks, to protect a short and quick chunk of code. Sometimes they can even be replaced by the lock-free code (finally a good use for it!). 

Instead the futures should be used as the mechanism for the long waits. A future has a nice property that avoids the race conditions: it might get completed after a function got chained to it, or before a function got chained to it, the function will run in either case when the future gets completed. So as I showed before, sometimes we don't even care about the value returned in a future but care about the fact of its completion to let some other code run. But a thing to keep in mind is that if a future is already completed before we chain a function to it, the function will usually run immediately on chaining, although there is no guarantee of that. This leads to the difficult to diagnose errors when some function assumes that it has an exclusive access to some data while it assembles a chain of futures and functions but in reality the first future in the chain sometimes completes on another CPU before the whole chain is assembled. Then the functions in the chain start running on the other CPUs, and our function in question at some point ends up with chaining another function that accesses the same data to an already completed future, and that another function gets called immediately, accessing the same data. 

This mostly happens with the serial executors, when both the current function and the chained one rely on the same serial executor for serialization (another reason to never use the serial executors). The executor gets specified in the chaining arguments, but since it's the same executor as the currently running one, the chaining thinks that it's fine to call directly. But it can also happen on any executor, while using mutexes in a slightly easier to diagnose pattern, where one function assembles a chain under mutex, and one of the functions in the chain tries to lock the same mutex, which becomes a recursive lock, and everything deadlocks. 

Hence the rule: either do no chaining under a locked mutex, or if you really need to, make sure that the first future in the chain won't get completed until after you unlock the mutex. In the last case you'd usually start with creating a promise, then build a chain on its future side, and finally after unlocking the mutex you'd chain that first promise to some future that might have been completed.

Another thing that I didn't mention is that usually the executors have a direct way to schedule a function to be executed on them. The trouble is that the signature of that function is usually different than a function that gets chained to a future, because with direct scheduling there are no future and no result promise arguments to the function. So if you need a function used both ways, you can't, because the signatures are different. In this situation, instead of the direct scheduling, you can use chaining on a future that is either pre-completed or gets completed right after chaining. However with plain chaining it will cause the function to be called right there (and this is known as "inlined" as opposed to scheduling on an executor which is known as "trampolining"). So you'd have to use the kind of chaining that allows to explicitly disable the inlining. Or if this option is not available in your asynchronous library, then there is no other choice than to do an explicit scheduling.

Disabling the immediate inlined execution on chaining also resolves the other potential issues mentioned above (at the cost of additional overhead of scheduling). Or if it's not available, a chain can be made run through an explicit scheduling with pseudo-code like this (pseudo, since it plays a bit loose with the types):

// it didn't occur to me before but the contexts do have to have
// a virtual base class for their destruction to work
// correctly in shared_ptr
struct SchedBarrierCtx : public ContextBase {
  AsyncFunctionBase func;
  shared_ptr<FutureBase> input;
  shared_ptr<ContextBase> ctx;
  shared_ptr<Scheduler> sched;
  shared_ptr<PromiseBase> output;
};

template <typename Tin, typename Tout, typename Tctx>
shared_ptr<Future<Tout>> sched_barrier(
  shared_ptr<Future<Tin>> input,
  AsyncFunction<Tin, Tout, Tctx> func,
  shared_ptr<Tctx> ctx,
  shared_ptr<Scheduler> sched)
{
  auto barrier_ctx = make_shared<SchedBarrierCtx> {
    func, input, ctx, sched, /*output*/ nullptr};
  // no need to specify the executor for chain(),
  // because barrier_trampoline1() will do that anyway,
  // and it's cheaper to inline it on any current executor
  return input->chain(barrier_trampoline1, barrier_ctx);
}

void barrier_trampoline1(
  shared_ptr<FutureBase> input,
  shared_ptr<SchedBarrierCtx> ctx,
  shared_ptr<PromiseBase> result)
{
  ctx->output = result;
  ctx->sched->schedule(barrier_trampoline2, ctx);
}

void barrier_trampoline2(shared_ptr<SchedBarrierCtx> ctx)
{
  ctx->func(ctx->input, ctx->ctx, ctx->output);
}

The arguments for the chained function get passed through scheduling by saving them in the barrier context.

Note that barrier or not, but the scheduled function can still complete before chain() returns! It's not very probable, because it requires another CPU to pick the scheduled work and complete it while the current CPU gets delayed by something else (perhaps an interrupt handler in the kernel), or for the kernel scheduler to do something unusual, but it's possible. The only thing guaranteed here is that the chained function will run in another kernel thread, and so if that kernel thread blocks, the one that called the chaining can still continue.

Sunday, February 2, 2025

Asynchronous programming 3 - some assistance

I've been saying it 20 years ago, and 15 years ago in the TPOPP book, and I'm still saying it now: the asynchronous programming has to be assisted by a compiler, otherwise it's just a huge pain of doing manually things that a compiler normally does. Fortunately, I think now we have an out-of-the-box solution: the C++ coroutines in C++20, as described for example here: https://en.cppreference.com/w/cpp/language/coroutines . I haven't quite tried to do an actual implementation with them but it looks like the right thing. You define your Promise class (note that coroutines don't differentiate between the Future and Promise sides and call everything a Promise), and then the coroutine statements take that Promise class as a template argument and arrange the splitting of the sequential code into fragments. And you do the explicit parallelism on your own.

Another solution that I played with, doing a partial implementation, would work with plain C too: a preprocessor. It can be done in some smart way, as a whole pre-parser like cfront of yore, or a lot can be achieved even with the standard C preprocessor. The only trick is to generate the unique function names, and these can be done by using the macro __LINE__. Since the line number stays the same within a macro invocation, each invocation gets a unique number that can be used repeatedly within the macro body. In modern C++, of course, we could also use the lambdas, making the naming issue moot, it's more of a plain C issue.

The most difficult part is that  we'll need to use the same call and return macros in both the "header" part of the function and the "continuation" part. Which means that all the functions have to have the same result type, and return the value in the same way. So let's take the example from the last post and reformat it to fit into this approach. The original example from the previous installment was:

struct FuncContext {
  int a;
};

Future<int> func()
{
  auto ctx = make_shared<FuncContext>();
  shared_ptr<Future<int>> futa = get_a();
  return futa->chain(func2, ctx) // use default executor
    ->chain(func3, ctx);
}

void func2(shared_ptr<FuncContext> ctx, shared_ptr<Future<int>> arg, shared_ptr<Promise<int>> result) {
  ctx->a = arg->value();
  get_b(ctx->a)->chain(result);
}

void func3(shared_ptr<FuncContext> ctx, shared_ptr<Future<int>> arg, shared_ptr<Promise<int>> result) {
  int b = arg->value;
  result->return_value(ctx->a + b);
}

To get the same return type throughout we change the "header" part to return void and pass the returned future back via an argument. 

The other problem is the type of that return promise's value: carrying it through all the "continuation" parts is difficult, so we'd have to revert to the base promise type that doesn't care about the return value and cast it only when setting the value. This base type has to exist for the scheduler to juggle all these promises in its queues. Also, remember, the premise here is that coroutines are not available, which would often mean plain C, and there the promises can't be templatized in the first place.

The code becomes:

struct FuncContext {
  int a;
};

void func(shared_ptr<Promise<int>>* result_future)
{
  auto ctx = make_shared<FuncContext>();
  auto result = make_shared<Promise<int>>();
  *result_future = result.to_future();
  shared_ptr<Future<int>> fut_cont;
  get_a(&fut_cont);
  fut_cont->chain(func2, ctx)->chain(result);
}

void func2(shared_ptr<FuncContext> ctx, shared_ptr<Future<int>> arg, shared_ptr<PromiseBase> result) {
  ctx->a = arg->value();
  shared_ptr<Future<int>> fut_cont;
  get_b(ctx->a, &fut_cont);
  fut_cont->chain(func3, ctx)->chain(result);
}

void func3(shared_ptr<FuncContext> ctx, shared_ptr<Future<int>> arg, shared_ptr<PromiseBase> result) {
  int b = arg->value;
  static_cast<Promise<int>*>(result.get())->return_value(ctx->a + b);
}

Then we want to make it look like this:

ASYNC_FUNC_0ARG(func, int, {
  int a; // this is the context
}) {
  ASYNC_CALL_0ARG(func, ctx->a, int, get_a);
  ASYNC_CALL_1ARG(func, int b, int, get_b, ctx->a);
  ASYNC_FUNC_RETURN(int, ctx->a + b);
} ASYNC_FUNC_END

Here for simplicity I've just used separate macros for definitions and calls of functions with different number of arguments. It's definitely possible to use the macros with variable number of arguments, just it's not something that I use often and I'm too lazy to look it up now. The invocation of ASNC_FUNC_END is needed to balance out the curly braces. The name of the calling function is needed in the CALL macros to refer to the context type name, this unfortunately can't be avoided, and then incidentally it can be used to generate the names of continuation functions. Alternatively, we could define the function name as a macro before the function definition and undef it afterwards, then everything in between could just use that macro for function name.

There is a bit of ugliness but still, looks much shorter and simpler than before, doesn't it? Now all we do is to define the macros that will translate one into another by copy-pasting from the long example (I haven't actually tried these macros right now, so they might contain small bugs but it shows the idea, and I did get a similar system working in the past):

#define ASYNC_FUNC_0ARG(fname, func_return_type, context_body) \
struct fname##Context context_body; \
void fname(shared_ptr<Promise<return_type>>* result_future) \
{ \
  using return_type = func_return_type; \
  auto ctx = make_shared<fname##Context>(); \
  auto result = make_shared<Promise<return_type>>(); \
  *result_future = result.to_future();

#define ASYNC_FUNC_END }

#define ASYNC_CALL_0ARG(fname, assign_to, call_return_type, call) \
    shared_ptr<Future<call_return_type>> fut_cont; \
    call(&fut_cont); \
   
static void fname##__LINE__(shared_ptr<fname##Context> ctx, shared_ptr<Future<call_return_type>> arg, shared_ptr<PromiseBase> result); \
    fut_cont->chain(cont##__LINE__, ctx)->chain(result); \
  } \
} \
static void fname##__LINE__(shared_ptr<fname##Context> ctx, shared_ptr<Future<call_return_type>> arg, shared_ptr<PromiseBase> result) { \
  assign_to = arg->value(); \
  {

#define ASYNC_CALL_1ARG(fname, assign_to, call_return_type, call, call_arg1) \
    shared_ptr<Future<call_return_type>> fut_cont; \
    call(call_arg1, &fut_cont); \
   
static void fname##__LINE__(shared_ptr<fname##Context> ctx, shared_ptr<Future<call_return_type>> arg, shared_ptr<PromiseBase> result); \
    fut_cont->chain(cont##__LINE__, ctx)->chain(result); \
  } \
} \
static void fname##__LINE__(shared_ptr<fname##Context> ctx, shared_ptr<Future<call_return_type>> arg, shared_ptr<PromiseBase> result) { \
  assign_to = arg->value(); \
  {

#define ASYNC_FUNC_RETURN(return_type, expr) \
  static_cast<Promise<return_type>*>(result.get())->return_value(expr)

There are a couple more of things to explain in  ASYNC_CALL macros. One is that they have to declare the continuation function before using it, this is something that I've glanced over before, because if you write these continuation functions manually, you'd collect all the declarations up front. But if they're generated on the fly, the declarations also have to come on the fly. These functions can be static because they're not called from outside the file. The second thing is that the current function gets closed with two curly braces, and the next one gets opened with two curly braces. This is because ASYNC_FUNC opens the function with a curly brace for the generated definitions, and then another brace comes after the macro, and then we need to maintain the same brace depth throughout.

Note that the execution of the asynchronous functions here is strictly sequential, no ifs nor loops. However similar macros can be made for ifs and loops, and if I ever get around to transform this text to a chapter for a newer version of my book on parallel programming, I'll do them too. They'd be ugly but still better than writing things manually. And a specialized preprocessor like cfront can reduce the ugliness of having to repeat the names that can't be remembered between the C preprocessor macros and to explicitly specify the level of nesting for the ifs and loops.

 

Saturday, January 25, 2025

Asynchronous programming 2 - filling in the types

For the sake of a quick introduction, I've glanced over some things in part 1. Here I want to come back and show them.

Let's start with a small code snippet similar to what was shown in part 1:

func(context, arg)
{
  ...
}

Future fut1;
fut1.chain(func, context, executor);

Note that there are no types in this snippet, I've dropped them to avoid getting mired in them. Let's fill them in, and the answer might vary depending on the specific asynchronous library.

Let's start with the function argument arg. Note that it's not explicitly mentioned anywhere in chain(). That's because the argument comes from the future fut1, it's the value that becomes stored in it. So if, suppose, the type of fut1 is actually Future<int>, the argument might actually be

int arg

but the more typical solution is to pass the whole input future as an argument:

Future<int> arg

Except that normally the futures wouldn't be copied but passed by reference. And considering in how many places they get referred to, the only reasonable way is to use either reference counting or garbage collection. Reference counting is more natural for C++ and C, so the type would become:

shared_ptr<Future<int>> arg

Next, what is the function's return value? Being an asynchronous function, its return value must be returned through a Promise. Moreover, that Promise's Future side needs to be returned at the time of the chaining, so the chaining becomes (assuming that the returned value is of type double):

shared_ptr<Future<int>> fut1;
shared_ptr<Future<double>> fut2 = fut1.chain(func, context, executor);

But how will  the function know where to return that value? It has to receive that result Promise as an argument too:

void func(context, shared_ptr<Future<int>> arg, shared_ptr<Promise<double>> result);

Since the result is returned through an argument, the normal function's return type becomes void. It's the responsibility of the function to make sure that the result promise will be completed, however this doesn't have to happen by function's return time. Instead it can schedule some other code that will complete this promise later (for example, by chaining it from some other future that it creates). Which, yes, is another potential source of errors when the promise completion gets forgotten in one of the branches of execution and the rest of logic gets dealocked waiting for it. The way to debug this is to have the library keep track of the futures that have some dependency chained to them but haven't been completed and haven't been scheduled to run and haven't been chained to something else. However this can also be a normal intermediate state of a future being still prepared, or of a future stored in some data structure to be found and completed later, so the program can't just abort every time on seeing such a future. Instead it has to be a tool that can run and list all the suspicious futures whenever a deadlock is suspected. Or there can be a special flag that would let the future be temporarily excepted, that gets cleared on exiting the constructing scope unless explicitly preserved. Then any con-compliant future without this flag can be an immediate reason for a program abort, but if the flag gets mismanaged, the deadlocks could still happen. As I've said many times before, the asynchronous programming is fragile and hard to debug.

The executor would generally also be a shared_ptr. The final part is the context. Which is normally also a shared_ptr to some object. What object, depends on the function. Consider a classic function:

int func()
{
  int a = get_a();
  int b = get_b(a);
  return a+b;
}

If the functions get_a() and get_b() can block (and I've made get_b() dependent on a to make the execution sequential), in the asynchronous form this function gets split:

struct FuncContext {
  int a;
};

Future<int> func()
{
  auto ctx = make_shared<FuncContext>();
  shared_ptr<Future<int>> futa = get_a();
  return futa->chain(func2, ctx) // use default executor
    ->chain(func3, ctx);
}

void func2(shared_ptr<FuncContext> ctx, shared_ptr<Future<int>> arg, shared_ptr<Promise<int>> result) {
  ctx->a = arg->value();
  get_b(ctx->a)->chain(result);
}

void func3(shared_ptr<FuncContext> ctx, shared_ptr<Future<int>> arg, shared_ptr<Promise<int>> result) {
  int b = arg->value;
  result->return_value(ctx->a + b);
}

This highlights how the asynchronous code is typically written:

  • There are two kinds of asynchronous functions: the "head parts" of the actual meaningful high-level functions, like func(), and the split-out internal fragments of the meaningful functions, like func2() and func3(). They're usually written differently, the heads taking the arguments just like the common functions and returning a future with the result, where the fragments are tied to some future representing the result of another asynchronous function call, do the next step of computation until calling another asynchronous function, and then return the result of that function as their result (at least in this pattern where the fragments are pre-chained in advance).
  • The context carries the values between the fragments, and is an analog of a stack frame in a normal function. It's possible to fine-tune each step's context but that's usually more trouble than worth, so other than for some obvious optimizations (such as b here not getting stored in the context because it's used in only one fragment), it's much easier and better to just carry the same context throughout all the fragments.

Note that all the dynamic objects are nicely auto-destroyed by reference counting after the function completes, and in the meantime are held alive in the scheduling queues and future chains. However the implication there is that a value stays alive as long as the future containing it stays alive, and if that future is kept for a long time, the value would also be kept.

Why would a future be kept for a long time? Because a future represents both a value and the fact of completion, and the fact of completion might be interesting for much longer than the value, as will be shown in the future installments. In this case it might be useful to chain a future with a value to a future without a value:

Future<SomeType> fut_a;
Promise<void> prom_b;
...
fut_a->chain(prom_b);

However normally the chaining expects that the types of values on both sides are the same. So this is a special case of converting to void that should be included in the library. If it isn't in the library, it can be implemented as:

template<typename T>
void convert_to_void_impl(shared_ptr<void> ctx, shared_ptr<Future<T>> input, shared_ptr<Promise<void>> result)
{
  result->return_value();
}

template<typename T>
shared_ptr<Future<void>> chain_to_void(shared_ptr<Future<T>> input) {
  return input->chain(convert_to_void_impl, nullptr, input->getExecutor());
}

using an intermediate function to change the type. And if some particular library supports no void future, you can always use an int future instead and never look at its value, just be satisfied that it has some value.

Tuesday, January 21, 2025

Asynchronous programming 1 - Futures and Promises and Executors

A little while ago I've worked on an OS project that was written in an entirely asynchronous way, without explicit synchronizaton. Which is pretty much what I've described in my book in Section 6.4 "Queues as the sole synchronization mechanism" but in a new-fashioned way. So I want to write up the lessons from that experience before I completely forgot them.

First, how is the basic programming goes in this paradigm. The basic synchronization unit there is a "future" that is used to signal the completion of some operation that may involve a wait, and return a value resulting form that operation. In pseudocode it looks like this:

Future f = start_some_operation(args);

In the plain programming with futures, you have to wait for the future to be completed before reading the value from it (for now we'll skip over the part of how the values of different types are passed through the futures - basically in C++ it means that the future object would be a template with the value type as its parameter, and much more painful in plain C):

MyValue v = await(f);

However the fully asynchronous programming allows no waits. Instead you chain the future to call the next function:

f.chain(another_operation, context);

When the future becomes completed, it will use its result value to call:

another_operation(context, value)

How does a future get completed? There are two varieties of doing this. First, there is something in the runtime environment that completes the futures when the wait is finished. In a fully asynchronous OS this can be the kernel itself, or in a POSIX environment this would be some glue code that translates the end of a system call in some background thread to a completion of a future. Second, this could be done from another chunk of code right in the same environment. Fundamentally, both ways are the same, it's always some other code completing the future, just it can be a part of the same process and address space and logical unit or be somewhere outside of it.

The futures really have two separate APIs, one for the receiving side where the returned value gets tied to some other code, one for the sending side that puts the returned value into the future and marks it completed. For this reason, the sending API is sometimes given a separate name, a "promise". So it's a promise to return a value, which eventually gets fulfilled, and the value comes out on the other side as the completion of a future.

How does the function chained to the future execute? There are two ways: one is to call it immediately, another one is to schedule it for execution later with some kind of a scheduler. They have different trade-offs: the immediate execution can be faster but as the function gets called, it grows the current stack, and a long enough chain can overflow the available stack size. The scheduling is slower but limits the stack growth and can be used across the process boundaries, such as when the kernel code completes a userspace future. This has a very close relation to how things work in Triceps: a future is similar to a Triceps Label, except for the part that the Labels are connected once into a pipeline that is then used to send a continuous stream of data, while the futures are built into ephemeral pipelines that are used only once and then discarded, and new pipelines need to be built for more data. Unlike futures, Triceps always schedules the data that comes through a Label to avoid the stack depth troubles, and also provides certain guarantees of the scheduling order for the labels within a Unit.

However the futures model also has an analogy of a Triceps Unit, called an "executor". It's a scheduler with its queue, and also some CPU threads to perform the execution, potentially in a multithreaded way. Looking back, the idea of running short thread snippets on multiple CPUs in https://babkin-cep.blogspot.com/2020/02/scheduling-policy-for-burst.html is exactly such a combination of asynchronous logic with a multithreaded executor. 

There can be multiple executors in a process, and not all of them are multithreaded. The single-threaded executors have a special use: they are used as mutexes. Since the functions in the scheduling queue of a single-threaded executor run sequentially, this has the same effect as serializing them on a mutex. So you'd have a separate single-threaded executor for every place where you'd lock a mutex in a common program. Well, sort of, as long as you don't try to do any other asynchronous operation when holding the lock - then as soon as you leave the current function, your lock gets released, so the serial executors are more like spinlocks without performance advantages, and you need to build special patterns to make sleeplocks. All the caveats of locking still apply, and you can cause deadlocks just as in the regular programs (they manifest a little differently, as all futures being incomplete, but still lock up the execution). If you want, you can also create executors with specific scheduling order logic, such as a single-threaded executor with the same logic as a Triceps Unit.

The multithreaded executors are generally all equivalent (since they provide no guarantees about the order of execution), so you'd normally have only one, with as many threads as there are physical CPUs. With some exceptions. You may have snippets executing at different priorities by using different executors (again, one executor per priority is enough). Or you may want to limit the load imposed by some code by limiting its parallelism, then making a separate executor with a smaller number of threads makes sense. 

When creating more executors, don't forget that the underlying resources are not unlimited, and their threads will be fighting it out for the real CPUs at the OS level. And yes, the threads may get preempted at the OS level at the most inconvenient times. So this is the case where having both some OS support and coordination between the executors within the process to synchronize the preemption at the points where the threads switch over to handling the next future can really help.

With the executors, the chaining of the code to a future gets an extra argument, the executor to schedule this code on:

 f.chain(another_operation, context, executor);

And then the chained operation can be executed as a direct function call only if it should run on the same executor as the code that completes the future, and only if the executor's scheduling logic allows it.

To give a small example, this pseudo-code with locking:

foo()
{
  before_lock();
  lock(mutexA);
  inside_lock();
  unlock(mutexA);
  after_lock();
}

becomes:

foo()
{
  before_lock();
  p = new_promise<int>();
  p.to_future().chain(inside_lock, NULL, singleExecutorA);
  p.return_value(0);
}

inside_lock(context, value)
{
  ...
  p = new_promise<int>();
  p.to_future().chain(after_lock, NULL, multiExecutor);
  p.return_value(0);
}

after_lock(context, value)
{
  ...
}

A lot more code. With the chaining now built into the functions themselves. And this example also glossed over the fact that if foo() returned a value, it must now return a promise of this value instead (so yes, the non-blocking and potentially blocking functions become different). This code also assumes that mutexA was held as a spinlock only for a very short period of time and no asynchronous operations were called from inside_lock(). Things get very confusing very fast, and organizing them in certain patterns does help. More on that in the next installments.