Coroutines By Hand¶

  • Implement sequential process as a coroutine

Recall sequential_process implementation¶

namespace bcc {

class sequential_process {
    mutex _mutex;
    condition_variable _condition;
    deque<task> _queue;
    bool _done = false;

    void run_loop();

    thread _thread{[this] { run_loop(); }};

public:
    ~sequential_process();
    void async(task);
};

} // namespace bcc
  • The logical structure of our coroutine will be:
class sequential_process {
    awaitable_queue<task> _queue;
    co_task<void> _running;

public:
    sequential_process() {
        _running = [&]() {
            while (true) {
                (co_await _queue.pop())();
            }
        }
    }
    void async(task f) { _queue.push(move(f)); }
};
  • Tip: When desinging code, sketch the code in an ideal form
  • Without building the infrustructure for awaitable_queue<> and co_task<>
    • We can build the same logical structure directly in seqential_process
  • Build concrete solutions before complex abstractions
  • As a coroutine we no longer need:
    • _thread
    • _condition
    • _done flag
    • run_loop()
    • join() on destuction
  • We will need
    • _running flag
    • resume()
namespace bcc2 {

class sequential_process {
    mutex _mutex;
    bool _running = false;
    deque<task> _queue;

    void resume();

public:
    void async(task);
};

} // namespace bcc2
  • resume() is the body of our coroutine:
while (true) {
    (co_await _queue.pop())();
}
namespace bcc2 {

void sequential_process::resume() {
    task work;
    while (true) {
        {
            unique_lock<mutex> lock(_mutex);

            if (_queue.empty()) {
                _running = false;
                return;
            }
            work = move(_queue.front());
            _queue.pop_front();
        }
        work();
    }
}

} // namespace bcc2
  • async() does a push and resume() if not running
namespace bcc2 {

void sequential_process::async(task f) {
    bool running = true;
    {
        lock_guard<mutex> lock(_mutex);
        _queue.push_back(move(f));
        swap(running, _running);
    }
    if (!running) resume();
}

} // namespace bcc2
  • async_packaged(), shared_pool, and interned_string are unmodified
namespace {

template <class F> // F models R()
auto async_packaged(sequential_process& process, F&& f) {
    using result_t = std::result_of_t<std::decay_t<F>()>;

    auto task_future = stlab::package<result_t()>(stlab::immediate_executor,
                                                  std::forward<F>(f));

    process.async(move(task_future.first));

    return move(task_future.second);
}

} // namespace
namespace {

struct shared_pool {
    unordered_set<string> _pool;
    sequential_process _process;

    auto insert(string a) -> stlab::future<const string*> {
        return async_packaged(
            _process, [this, _a = move(a)]() mutable {
                return &*_pool.insert(move(_a)).first;
            });
    }
};

} // namespace
namespace {

class interned_string {
    static auto pool() -> shared_pool& {
        static shared_pool result;
        return result;
    }

    stlab::future<const string*> _string;

public:
    interned_string(string a) : _string(pool().insert(move(a))) {}

    auto str() const -> stlab::future<reference_wrapper<const string>> {
        return _string.then([](const string* p) { return cref(*p); });
    }
};

} // namespace
  • And is used exactly the same way
{
    interned_string s("Hello World!"s);

    auto done = s.str().then([](const string& s) { cout << s << '\n'; });

    blocking_get(done);
}
  • Advantages to the coroutine implementation
    • No seperate thread overhead
    • No overhead for waiting on condition variable
    • No blocking
    • Possible to implement with lock-free queue
    • resume() need not be executed inline
      • It could be queued to a thread pool
      • Requires some managment of object liftimes
  • Disadvantages
    • inline execution may live lock
void sequential_process::async(task f) {
    bool running = true;
    {
        lock_guard<mutex> lock(_mutex);
        _queue.push_back(move(f));
        swap(running, _running);
    }
    if (!running) async([this]{ resume(); }); // WHAAATTT this???
}