diff --git a/platforms/posix/src/lockstep_scheduler/CMakeLists.txt b/platforms/posix/src/lockstep_scheduler/CMakeLists.txt index 33e8f70666e1de6015cf2e02e96bb839af681f0a..a51e57cb1cec37d057bb14c5fd83bef67252046a 100644 --- a/platforms/posix/src/lockstep_scheduler/CMakeLists.txt +++ b/platforms/posix/src/lockstep_scheduler/CMakeLists.txt @@ -6,6 +6,8 @@ if(NOT PROJECT_NAME STREQUAL "px4") set (CMAKE_CXX_STANDARD 11) + add_definitions(-DUNIT_TESTS) + add_library(lockstep_scheduler src/lockstep_scheduler.cpp ) diff --git a/platforms/posix/src/lockstep_scheduler/include/lockstep_scheduler/lockstep_scheduler.h b/platforms/posix/src/lockstep_scheduler/include/lockstep_scheduler/lockstep_scheduler.h index 8462368529007783c8d57fa0f1a05d00cd923eac..69a0ee34ed189f0b8e46e4155dc4587eb8efb4d2 100644 --- a/platforms/posix/src/lockstep_scheduler/include/lockstep_scheduler/lockstep_scheduler.h +++ b/platforms/posix/src/lockstep_scheduler/include/lockstep_scheduler/lockstep_scheduler.h @@ -10,6 +10,8 @@ class LockstepScheduler { public: + ~LockstepScheduler(); + void set_absolute_time(uint64_t time_us); inline uint64_t get_absolute_time() const { return time_us_; } int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *lock, uint64_t time_us); @@ -19,13 +21,28 @@ private: std::atomic<uint64_t> time_us_{0}; struct TimedWait { + ~TimedWait() + { + // If a thread quickly exits after a cond_timedwait(), the + // thread_local object can still be in the linked list. In that case + // we need to wait until it's removed. + while (!removed) { +#ifndef UNIT_TESTS // unit tests don't define system_usleep and execute faster w/o sleeping here + system_sleep(5000); +#endif + } + } + pthread_cond_t *passed_cond{nullptr}; pthread_mutex_t *passed_lock{nullptr}; uint64_t time_us{0}; bool timeout{false}; std::atomic<bool> done{false}; + std::atomic<bool> removed{true}; + + TimedWait *next{nullptr}; ///< linked list }; - std::vector<std::shared_ptr<TimedWait>> timed_waits_{}; - std::mutex timed_waits_mutex_{}; + TimedWait *timed_waits_{nullptr}; ///< head of linked list + std::mutex timed_waits_mutex_; std::atomic<bool> setting_time_{false}; ///< true if set_absolute_time() is currently being executed }; diff --git a/platforms/posix/src/lockstep_scheduler/src/lockstep_scheduler.cpp b/platforms/posix/src/lockstep_scheduler/src/lockstep_scheduler.cpp index c12f49799cedb66257685a77feb1955beff2abf1..08b76f0272eba41fa1b6c59f2addbbcac4fb78c6 100644 --- a/platforms/posix/src/lockstep_scheduler/src/lockstep_scheduler.cpp +++ b/platforms/posix/src/lockstep_scheduler/src/lockstep_scheduler.cpp @@ -1,5 +1,16 @@ #include "lockstep_scheduler/lockstep_scheduler.h" +LockstepScheduler::~LockstepScheduler() +{ + // cleanup the linked list + std::unique_lock<std::mutex> lock_timed_waits(timed_waits_mutex_); + + while (timed_waits_) { + TimedWait *tmp = timed_waits_; + timed_waits_ = timed_waits_->next; + tmp->removed = true; + } +} void LockstepScheduler::set_absolute_time(uint64_t time_us) { @@ -9,29 +20,38 @@ void LockstepScheduler::set_absolute_time(uint64_t time_us) std::unique_lock<std::mutex> lock_timed_waits(timed_waits_mutex_); setting_time_ = true; - auto it = std::begin(timed_waits_); - - while (it != std::end(timed_waits_)) { - - std::shared_ptr<TimedWait> temp_timed_wait = *it; + TimedWait *timed_wait = timed_waits_; + TimedWait *timed_wait_prev = nullptr; + while (timed_wait) { // Clean up the ones that are already done from last iteration. - if (temp_timed_wait->done) { - it = timed_waits_.erase(it); + if (timed_wait->done) { + // Erase from the linked list + if (timed_wait_prev) { + timed_wait_prev->next = timed_wait->next; + + } else { + timed_waits_ = timed_wait->next; + } + + TimedWait *tmp = timed_wait; + timed_wait = timed_wait->next; + tmp->removed = true; continue; } - if (temp_timed_wait->time_us <= time_us && - !temp_timed_wait->timeout) { + if (timed_wait->time_us <= time_us && + !timed_wait->timeout) { // We are abusing the condition here to signal that the time // has passed. - pthread_mutex_lock(temp_timed_wait->passed_lock); - temp_timed_wait->timeout = true; - pthread_cond_broadcast(temp_timed_wait->passed_cond); - pthread_mutex_unlock(temp_timed_wait->passed_lock); + pthread_mutex_lock(timed_wait->passed_lock); + timed_wait->timeout = true; + pthread_cond_broadcast(timed_wait->passed_cond); + pthread_mutex_unlock(timed_wait->passed_lock); } - ++it; + timed_wait_prev = timed_wait; + timed_wait = timed_wait->next; } setting_time_ = false; @@ -40,7 +60,9 @@ void LockstepScheduler::set_absolute_time(uint64_t time_us) int LockstepScheduler::cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *lock, uint64_t time_us) { - std::shared_ptr<TimedWait> new_timed_wait; + // A TimedWait object might still be in timed_waits_ after we return, so its lifetime needs to be + // longer. And using thread_local is more efficient than malloc. + static thread_local TimedWait timed_wait; { std::lock_guard<std::mutex> lock_timed_waits(timed_waits_mutex_); @@ -49,22 +71,29 @@ int LockstepScheduler::cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *loc return ETIMEDOUT; } - new_timed_wait = std::make_shared<TimedWait>(); - new_timed_wait->time_us = time_us; - new_timed_wait->passed_cond = cond; - new_timed_wait->passed_lock = lock; - timed_waits_.push_back(new_timed_wait); + timed_wait.time_us = time_us; + timed_wait.passed_cond = cond; + timed_wait.passed_lock = lock; + timed_wait.timeout = false; + timed_wait.done = false; + + // Add to linked list if not removed yet (otherwise just re-use the object) + if (timed_wait.removed) { + timed_wait.removed = false; + timed_wait.next = timed_waits_; + timed_waits_ = &timed_wait; + } } int result = pthread_cond_wait(cond, lock); - const bool timeout = new_timed_wait->timeout; + const bool timeout = timed_wait.timeout; if (result == 0 && timeout) { result = ETIMEDOUT; } - new_timed_wait->done = true; + timed_wait.done = true; if (!timeout && setting_time_) { // This is where it gets tricky: the timeout has not been triggered yet, diff --git a/platforms/posix/src/lockstep_scheduler/test/src/lockstep_scheduler_test.cpp b/platforms/posix/src/lockstep_scheduler/test/src/lockstep_scheduler_test.cpp index f06edcbb159353147adaf26ff8f8297fb911fbfb..e486710bf24bd8e035e16670daa03c2bad1311a6 100644 --- a/platforms/posix/src/lockstep_scheduler/test/src/lockstep_scheduler_test.cpp +++ b/platforms/posix/src/lockstep_scheduler/test/src/lockstep_scheduler_test.cpp @@ -4,7 +4,43 @@ #include <atomic> #include <random> #include <iostream> +#include <functional> +#include <chrono> +class TestThread +{ +public: + TestThread(const std::function<void()> &f) + : _f(f) + { + _thread = std::thread(std::bind(&TestThread::execute, this)); + } + + void join(LockstepScheduler &ls) + { + // The unit-tests do not reflect the real usage, where + // set_absolute_time() is called regularly and can do some + // cleanup tasks. We simulate that here by waiting until + // the given task returns (which is expected to happen quickly) + // and then call set_absolute_time(), which can do the cleanup, + // and _thread can then exit as well. + while (!_done) { + std::this_thread::yield(); // usleep is too slow here + } + + ls.set_absolute_time(ls.get_absolute_time()); + _thread.join(); + } +private: + void execute() + { + _f(); + _done = true; + } + std::function<void()> _f; + std::atomic<bool> _done{false}; + std::thread _thread; +}; constexpr uint64_t some_time_us = 12345678; @@ -33,7 +69,7 @@ void test_condition_timing_out() // Use a thread to wait for condition while we already have the lock. // This ensures the synchronization happens in the right order. - std::thread thread([&ls, &cond, &lock, &should_have_timed_out]() { + TestThread thread([&ls, &cond, &lock, &should_have_timed_out]() { assert(ls.cond_timedwait(&cond, &lock, some_time_us + 1000) == ETIMEDOUT); assert(should_have_timed_out); // It should be re-locked afterwards, so we should be able to unlock it. @@ -44,7 +80,7 @@ void test_condition_timing_out() should_have_timed_out = true; ls.set_absolute_time(some_time_us + 1500); - thread.join(); + thread.join(ls); pthread_mutex_destroy(&lock); pthread_cond_destroy(&cond); @@ -67,7 +103,7 @@ void test_locked_semaphore_getting_unlocked() pthread_mutex_lock(&lock); // Use a thread to wait for condition while we already have the lock. // This ensures the synchronization happens in the right order. - std::thread thread([&ls, &cond, &lock]() { + TestThread thread([&ls, &cond, &lock]() { ls.set_absolute_time(some_time_us + 500); assert(ls.cond_timedwait(&cond, &lock, some_time_us + 1000) == 0); @@ -79,7 +115,7 @@ void test_locked_semaphore_getting_unlocked() pthread_cond_broadcast(&cond); pthread_mutex_unlock(&lock); - thread.join(); + thread.join(ls); pthread_mutex_destroy(&lock); pthread_cond_destroy(&cond); @@ -107,7 +143,7 @@ public: void run() { pthread_mutex_lock(&lock_); - thread_ = std::make_shared<std::thread>([this]() { + thread_ = std::make_shared<TestThread>([this]() { result_ = ls_.cond_timedwait(&cond_, &lock_, timeout_); pthread_mutex_unlock(&lock_); }); @@ -131,13 +167,13 @@ public: pthread_mutex_unlock(&lock_); is_done_ = true; // We can be sure that this triggers. - thread_->join(); + thread_->join(ls_); assert(result_ == 0); } else if (timeout_reached) { is_done_ = true; - thread_->join(); + thread_->join(ls_); assert(result_ == ETIMEDOUT); } } @@ -151,7 +187,7 @@ private: LockstepScheduler &ls_; std::atomic<bool> is_done_{false}; std::atomic<int> result_ {INITIAL_RESULT}; - std::shared_ptr<std::thread> thread_{}; + std::shared_ptr<TestThread> thread_{}; }; int random_number(int min, int max) @@ -250,7 +286,7 @@ void test_usleep() std::atomic<Step> step{Step::Init}; - std::thread thread([&step, &ls]() { + TestThread thread([&step, &ls]() { step = Step::ThreadStarted; WAIT_FOR(step == Step::BeforeUsleep); @@ -268,7 +304,7 @@ void test_usleep() assert(ls.usleep_until(some_time_us + 1000) == 0); assert(step == Step::UsleepTriggered); - thread.join(); + thread.join(ls); } int main(int /*argc*/, char ** /*argv*/)