resume()
on a thread pool causes on object life time problem.sequential_process
may destruct before resume()
is invoked.namespace bcc {
void sequential_process::async(task f) {
bool running = true;
{
lock_guard<mutex> lock(_mutex);
_queue.push_back(move(f));
swap(running, _running);
}
if (!running) pool_async([this] { resume(); }); // <--- FIX ME !!!
}
} // namespace
process
may destruct before the lambda executes, causing a data race{
sequential_process process;
process.async([] {
this_thread::sleep_for(1s);
cout << "Made it!" << endl;
});
}
==================
WARNING: ThreadSanitizer: data race (pid=7699)
Read of size 8 at 0x7ffeefbff498 by main thread:
#0 std::__1::__deque_base<(anonymous namespace)::task, std::__1::allocator<(anonymous namespace)::task> >::begin() deque:1061 (scratch:x86_64+0x1000154e5)
#1 std::__1::__deque_base<(anonymous namespace)::task, std::__1::allocator<(anonymous namespace)::task> >::clear() deque:1167 (scratch:x86_64+0x100014ce0)
#2 std::__1::__deque_base<(anonymous namespace)::task, std::__1::allocator<(anonymous namespace)::task> >::~__deque_base() deque:1105 (scratch:x86_64+0x100014a3a)
#3 std::__1::deque<(anonymous namespace)::task, std::__1::allocator<(anonymous namespace)::task> >::~deque() deque:1187 (scratch:x86_64+0x1000149d8)
#4 std::__1::deque<(anonymous namespace)::task, std::__1::allocator<(anonymous namespace)::task> >::~deque() deque:1187 (scratch:x86_64+0x100014998)
#5 (anonymous namespace)::sequential_process::~sequential_process() main.cpp:182 (scratch:x86_64+0x100024956)
#6 (anonymous namespace)::sequential_process::~sequential_process() main.cpp:182 (scratch:x86_64+0x100005878)
#7 main main.cpp:290 (scratch:x86_64+0x1000045d6)
Previous write of size 8 at 0x7ffeefbff498 by thread T4 (mutexes: write M272):
#0 std::__1::deque<(anonymous namespace)::task, std::__1::allocator<(anonymous namespace)::task> >::pop_front() deque:2568 (scratch:x86_64+0x10001ac75)
#1 (anonymous namespace)::sequential_process::resume() main.cpp:214 (scratch:x86_64+0x1000244e1)
#2 (anonymous namespace)::sequential_process::async((anonymous namespace)::task)::'lambda'()::operator()() const main.cpp:247 (scratch:x86_64+0x100024149)
#3 (anonymous namespace)::task::model<(anonymous namespace)::sequential_process::async((anonymous namespace)::task)::'lambda'()>::invoke() main.cpp:36 (scratch:x86_64+0x100023fed)
#4 (anonymous namespace)::task::operator()() main.cpp:46 (scratch:x86_64+0x10001a3ce)
#5 (anonymous namespace)::task_system::run(unsigned int) main.cpp:123 (scratch:x86_64+0x1000196b0)
#6 (anonymous namespace)::task_system::task_system()::'lambda'()::operator()() const main.cpp:130 (scratch:x86_64+0x10001940a)
#7 std::__1::__thread_proxy<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, (anonymous namespace)::task_system::task_system()::'lambda'()> >(void*, void*) type_traits:4323 (scratch:x86_64+0x100017d54)
As if synchronized via sleep:
#0 nanosleep <null> (libclang_rt.tsan_osx_dynamic.dylib:x86_64h+0x270e3)
#1 std::__1::this_thread::sleep_for(std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > const&) <null> (libc++.1.dylib:x86_64+0x47933)
#2 main main.cpp:288 (scratch:x86_64+0x1000045c5)
Location is stack of main thread.
Mutex M272 (0x7ffeefbff430) created at:
#0 pthread_mutex_lock <null> (libclang_rt.tsan_osx_dynamic.dylib:x86_64h+0x37aae)
#1 std::__1::mutex::lock() <null> (libc++.1.dylib:x86_64+0x39c7e)
#2 main main.cpp:283 (scratch:x86_64+0x100004582)
Thread T4 (tid=440231, running) created by main thread at:
#0 pthread_create <null> (libclang_rt.tsan_osx_dynamic.dylib:x86_64h+0x283ed)
#1 std::__1::thread::thread<(anonymous namespace)::task_system::task_system()::'lambda'(), void>((anonymous namespace)::task_system::task_system()::'lambda'()&&) __threading_support:327 (scratch:x86_64+0x100016e18)
#2 std::__1::thread::thread<(anonymous namespace)::task_system::task_system()::'lambda'(), void>((anonymous namespace)::task_system::task_system()::'lambda'()&&) thread:360 (scratch:x86_64+0x100016318)
#3 _ZNSt3__16vectorINS_6threadENS_9allocatorIS1_EEE24__emplace_back_slow_pathIJZN12_GLOBAL__N_111task_systemC1EvEUlvE_EEEvDpOT_ memory:1759 (scratch:x86_64+0x100016087)
#4 (anonymous namespace)::task_system::task_system() vector:1644 (scratch:x86_64+0x100012499)
#5 (anonymous namespace)::task_system::task_system() main.cpp:128 (scratch:x86_64+0x100011938)
#6 void (anonymous namespace)::pool_async<(anonymous namespace)::sequential_process::async((anonymous namespace)::task)::'lambda'()>((anonymous namespace)::sequential_process::async((anonymous namespace)::task)::'lambda'()&&) main.cpp:157 (scratch:x86_64+0x100006b93)
#7 (anonymous namespace)::sequential_process::async((anonymous namespace)::task) main.cpp:247 (scratch:x86_64+0x100004986)
#8 main main.cpp:283 (scratch:x86_64+0x100004582)
SUMMARY: ThreadSanitizer: data race deque:1061 in std::__1::__deque_base<(anonymous namespace)::task, std::__1::allocator<(anonymous namespace)::task> >::begin()
==================
ThreadSanitizer report breakpoint hit. Use 'thread info -s' to get extended information about the report.
process
to fix the invocation{
auto process = make_shared<sequential_process>();
process->async([process] {
this_thread::sleep_for(1s);
cout << "Made it!" << endl;
});
this_thread::sleep_for(2s); // This line is here for my slides
}
sequential_process
are donesequential_process
_done
flag_condition
variablenamespace bcc2 {
class sequential_process {
function<void(task)> _executor;
mutex _mutex;
bool _running = false;
deque<task> _queue;
condition_variable _condition; // <---
bool _done = false; // <---
void resume();
public:
~sequential_process(); // <---
void async(task f);
};
} // namespace bcc2
namespace bcc2 {
sequential_process::~sequential_process() {
unique_lock<mutex> lock(_mutex);
if (!_running) return;
_done = true;
while (_running)
_condition.wait(lock);
}
} // namespace bcc
resume()
if we are done then notify when we stop runningnamespace bcc2 {
void sequential_process::resume() {
task work;
while (true) {
{
unique_lock<mutex> lock(_mutex);
if (_queue.empty()) {
_running = false;
if (_done) _condition.notify_one(); // <---
return;
}
work = move(_queue.front());
_queue.pop_front();
}
move(work)();
}
}
} // namespace bcc
process
N
processes we need N+1
threads available in the pool to guarantee no deadlocksshared_ptr
into the implementationnamespace bcc3 {
class sequential_process {
struct implementation;
shared_ptr<implementation> _self;
public:
sequential_process();
void async(task);
};
} // namespace bcc3
sequential_process
class becomes the implementationnamespace bcc3 {
struct sequential_process::implementation
: enable_shared_from_this<implementation> { // <---
mutex _mutex;
deque<task> _queue;
bool _running = false;
void resume();
void async(task);
};
} // namespace bcc3
implementation::async()
can then attach a shared pointer from this to resume()
namespace bcc3 {
void sequential_process::implementation::async(task f) {
bool running = true;
{
lock_guard<mutex> lock(_mutex);
_queue.push_back(move(f));
swap(running, _running);
}
if (!running)
pool_async([_self = shared_from_this()] { _self->resume(); }); // <---
}
} // namespace bcc3
weak_from_this()
in the lambda has the effect of canceling any operations that haven't started before destructionnamespace bcc4 {
class sequential_process {
struct implementation;
shared_ptr<implementation> _self;
public:
explicit sequential_process(task); // <---
void async(task);
};
} // namespace bcc4
namespace bcc4 {
struct sequential_process::implementation
: enable_shared_from_this<implementation> {
mutex _mutex;
deque<task> _queue;
bool _running = false;
task _completion; // <---
void resume();
implementation(task completion) : _completion(move(completion)) {} // <---
~implementation() { move(_completion)(); } // <---
void async(task);
};
} // namespace bcc4
{
bcc4::sequential_process process([] { cout << "End" << endl; });
process.async([] {
this_thread::sleep_for(1s);
cout << "Made it!" << endl;
});
}
this_thread::sleep_for(2s);
namespace bcc5 {
class sequential_process {
struct implementation;
shared_ptr<implementation> _self;
public:
sequential_process();
sequential_process(const sequential_process&) = delete;
sequential_process(sequential_process&&) noexcept = default;
sequential_process& operator=(const sequential_process&) = delete;
sequential_process& operator=(sequential_process&&) noexcept = default;
~sequential_process();
void async(task f);
};
} // namespace bcc5
_running
and _done
flagsenum
for the statenamespace bcc5 {
struct sequential_process::implementation : enable_shared_from_this<implementation> {
function<void(task)> _executor;
mutex _mutex;
deque<task> _queue;
condition_variable _condition;
enum { idle, queued, running, done } _state = idle; // <---
void resume();
void wait(); // <---
void async(task f);
};
} // namespace bcc5
queued
if we were idle
namespace bcc5 {
void sequential_process::implementation::async(task f) {
bool was_idle = false;
{
lock_guard<mutex> lock(_mutex);
_queue.push_back(move(f));
was_idle = _state == idle; // <---
if (was_idle) _state = queued; // <---
}
if (was_idle) pool_async([_self = shared_from_this()] { _self->resume(); });
}
} // namespace bcc5
resume()
handles the various statesidle
-> idle
(canceled)queued
-> running
_queue.empty()
-> idle (notify if was done
)resume()
we can safely call notify_one()
outside the lockvoid bcc5::sequential_process::implementation::resume() {
task work;
while (true) {
{
unique_lock<mutex> lock(_mutex);
if (_state == idle) return; // <---
if (_state == queued) _state = running; // <---
if (_queue.empty()) {
auto last_state = _state;
_state = idle;
if (last_state == done) break; // <---
return;
}
work = move(_queue.front());
_queue.pop_front();
}
move(work)();
}
_condition.notify_one(); // <---
}
wait()
operation, called from ~sequential_process()
idle
we are done and destructqueued
we take ownership of executionrunning
we signal done
void bcc5::sequential_process::implementation::wait() {
bool was_queued = false;
{
unique_lock<mutex> lock(_mutex);
if (_state == idle) return;
if (_state == queued) {
_state = idle;
was_queued = true;
} else {
_state = done;
while (_state == done)
_condition.wait(lock);
}
}
if (!was_queued) return;
while (!_queue.empty()) {
move(_queue.front())();
_queue.pop_front();
}
}
{
bcc5::sequential_process process;
process.async([] {
this_thread::sleep_for(1s);
cout << "Made it!" << endl;
});
}
cout << "process destructed" << endl;