sequential_process
implementation¶namespace bcc {
class sequential_process {
mutex _mutex;
condition_variable _condition;
deque<task> _queue;
bool _done = false;
void run_loop();
thread _thread{[this] { run_loop(); }};
public:
~sequential_process();
void async(task);
};
} // namespace bcc
class sequential_process {
awaitable_queue<task> _queue;
co_task<void> _running;
public:
sequential_process() {
_running = [&]() {
while (true) {
(co_await _queue.pop())();
}
}
}
void async(task f) { _queue.push(move(f)); }
};
awaitable_queue<>
and co_task<>
seqential_process
_thread
_condition
_done
flagrun_loop()
join()
on destuction_running
flagresume()
namespace bcc2 {
class sequential_process {
mutex _mutex;
bool _running = false;
deque<task> _queue;
void resume();
public:
void async(task);
};
} // namespace bcc2
resume()
is the body of our coroutine:while (true) {
(co_await _queue.pop())();
}
namespace bcc2 {
void sequential_process::resume() {
task work;
while (true) {
{
unique_lock<mutex> lock(_mutex);
if (_queue.empty()) {
_running = false;
return;
}
work = move(_queue.front());
_queue.pop_front();
}
work();
}
}
} // namespace bcc2
async()
does a push and resume()
if not runningnamespace bcc2 {
void sequential_process::async(task f) {
bool running = true;
{
lock_guard<mutex> lock(_mutex);
_queue.push_back(move(f));
swap(running, _running);
}
if (!running) resume();
}
} // namespace bcc2
async_packaged()
, shared_pool
, and interned_string
are unmodifiednamespace {
template <class F> // F models R()
auto async_packaged(sequential_process& process, F&& f) {
using result_t = std::result_of_t<std::decay_t<F>()>;
auto task_future = stlab::package<result_t()>(stlab::immediate_executor,
std::forward<F>(f));
process.async(move(task_future.first));
return move(task_future.second);
}
} // namespace
namespace {
struct shared_pool {
unordered_set<string> _pool;
sequential_process _process;
auto insert(string a) -> stlab::future<const string*> {
return async_packaged(
_process, [this, _a = move(a)]() mutable {
return &*_pool.insert(move(_a)).first;
});
}
};
} // namespace
namespace {
class interned_string {
static auto pool() -> shared_pool& {
static shared_pool result;
return result;
}
stlab::future<const string*> _string;
public:
interned_string(string a) : _string(pool().insert(move(a))) {}
auto str() const -> stlab::future<reference_wrapper<const string>> {
return _string.then([](const string* p) { return cref(*p); });
}
};
} // namespace
{
interned_string s("Hello World!"s);
auto done = s.str().then([](const string& s) { cout << s << '\n'; });
blocking_get(done);
}
resume()
need not be executed inlinevoid sequential_process::async(task f) {
bool running = true;
{
lock_guard<mutex> lock(_mutex);
_queue.push_back(move(f));
swap(running, _running);
}
if (!running) async([this]{ resume(); }); // WHAAATTT this???
}