Caching of values becomes easy with the futures because the futures take care of almost all the needed synchronization. The combination futures and shared_ptr also transparently solves the issues with what to do with the ownership of an object discarded from cache if it's still used by a reader (that reader will have the last shared_ptr) and what to do if the cache is full of incomplete reads (add the entry to the cache anyway, so that if anyone else tries to read it in the meantime, they will wait for it and won't initiate another read, and then chain the reading from the completion of the previous read). The pseudo-code looks like this:
class Cache {
public:
shared_ptr<Future<Value>> read(Key k) {
scopelock(mutex_);
auto it = data_.find(k);
if (it != data.end()) {
// bump the key in LRU order
lru_.remove(times_[k]);
times_[k] = ++generation_;
lru_[generation_] = k;
// note that the future might not be completed yet!
// but that's OK, the caller will wait for it
return it->second;
}
tryRemoveOldL();
// the new key gets always inserted, even if the cache is overflowing
times_[k] = ++generation_;
lru_[generation_] = k;
if (data_.size < SIZE_LIMIT) {
data_[k] = reading_ = readValue(k);
} else {
// wait for the last read to complete before reading this key
auto ctx = make_shared<WaitCtx> {this, key};
shared_ptr<Promise<Value>> result = make_promise<Value>();
auto prev_reading = reading_;
data_[k] = reading_ = result->to_future();
prev_reading->chain(readDelayed, ctx)->chain(result);
}
return data[k];
}
protected:
// context for waiting to read a key
struct WaitCtx {
Cache *cache;
Key key;
};
// If the cache is overflowing, tries to remove the oldest completed
// element, expects the object to be already locked.
// Returns true if either the cache wasn't overflowing or an element
// got successfully discarded from it.
bool tryRemoveOldL() {
if (data_.size >= SIZE_LIMIT) {
for (it in lru_) {
if (data_[it->second]->is_completed()) {
Key kold = it->second;
data_.remove(kold);
lru_.remove(times_[kold]);
times_.remove(kold);
return true;
}
}
return false;
}
return true;
}
shared_ptr<Future<Value>> readValue(Key k) {
...
}
static void readDelayed(
shared_ptr<Future<Value>> input,
shared_ptr<WaitCtx> ctx,
shared_ptr<Promise<Value>> result)
{
scopelock(ctx->cache->mutex_);
if (ctx->cache->tryRemoveOldL()) {
ctx->cache->readValue(ctx->key)->chain(result);
} else {
// oops, at least our input element should have completed,
// this means it got stolen from us in a race between its
// completion and this function running, need to delay again
auto prev_reading = ctx->cache->reading_;
ctx->cache->reading_ = result->to_future();
prev_reading->chain(readDelayed, ctx)->chain(result);
}
}
Mutex mutex_;
map<Key, shared_ptr<Future<Value>>> data_;
map<Key, int64_t> times_;
map<int64_t, Key> lru_;
int64_t generation_ = 0;
shared_ptr<Future<Value>> reading_;
};
This code is much simpler than the example I've shown in "The Practice of Parallel Programming", so finally the asynchronous programming is good for something!
However the scheduling of the asynchronous functions doesn't mesh well with the object methods, those have to be made into the static methods (perhaps there is some better solution?).
There is also a special case of caching, the lazy reading of exactly one object. That requires only a mutex and a future. Check under mutex if the future reference is null, and if so then initiate the read and remember the result future from it. Otherwise just return the future, and it doesn't matter at this point if the future is completed or not, it will be the caller's responsibility to wait for it.
No comments:
Post a Comment