merge bitcoin#25100: Switch scheduler to steady_clock

This commit is contained in:
Kittywhiskers Van Gogh 2022-05-10 10:59:18 +02:00
parent cc7d2b8d0a
commit 484447cc86
No known key found for this signature in database
GPG Key ID: 30CD0C065E5C4AAD
3 changed files with 27 additions and 27 deletions

View File

@ -4,10 +4,10 @@
#include <scheduler.h> #include <scheduler.h>
#include <random.h> #include <sync.h>
#include <util/time.h> #include <util/time.h>
#include <assert.h> #include <cassert>
#include <functional> #include <functional>
#include <utility> #include <utility>
@ -41,7 +41,7 @@ void CScheduler::serviceQueue()
// the time of the first item on the queue: // the time of the first item on the queue:
while (!shouldStop() && !taskQueue.empty()) { while (!shouldStop() && !taskQueue.empty()) {
std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first; std::chrono::steady_clock::time_point timeToWaitFor = taskQueue.begin()->first;
if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) { if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) {
break; // Exit loop after timeout, it means we reached the time of the event break; // Exit loop after timeout, it means we reached the time of the event
} }
@ -70,7 +70,7 @@ void CScheduler::serviceQueue()
newTaskScheduled.notify_one(); newTaskScheduled.notify_one();
} }
void CScheduler::schedule(CScheduler::Function f, std::chrono::system_clock::time_point t) void CScheduler::schedule(CScheduler::Function f, std::chrono::steady_clock::time_point t)
{ {
{ {
LOCK(newTaskMutex); LOCK(newTaskMutex);
@ -87,7 +87,7 @@ void CScheduler::MockForward(std::chrono::seconds delta_seconds)
LOCK(newTaskMutex); LOCK(newTaskMutex);
// use temp_queue to maintain updated schedule // use temp_queue to maintain updated schedule
std::multimap<std::chrono::system_clock::time_point, Function> temp_queue; std::multimap<std::chrono::steady_clock::time_point, Function> temp_queue;
for (const auto& element : taskQueue) { for (const auto& element : taskQueue) {
temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second); temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second);
@ -112,8 +112,8 @@ void CScheduler::scheduleEvery(CScheduler::Function f, std::chrono::milliseconds
scheduleFromNow([this, f, delta] { Repeat(*this, f, delta); }, delta); scheduleFromNow([this, f, delta] { Repeat(*this, f, delta); }, delta);
} }
size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point& first, size_t CScheduler::getQueueInfo(std::chrono::steady_clock::time_point& first,
std::chrono::system_clock::time_point& last) const std::chrono::steady_clock::time_point& last) const
{ {
LOCK(newTaskMutex); LOCK(newTaskMutex);
size_t result = taskQueue.size(); size_t result = taskQueue.size();
@ -141,7 +141,7 @@ void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue()
if (m_are_callbacks_running) return; if (m_are_callbacks_running) return;
if (m_callbacks_pending.empty()) return; if (m_callbacks_pending.empty()) return;
} }
m_scheduler.schedule([this] { this->ProcessQueue(); }, std::chrono::system_clock::now()); m_scheduler.schedule([this] { this->ProcessQueue(); }, std::chrono::steady_clock::now());
} }
void SingleThreadedSchedulerClient::ProcessQueue() void SingleThreadedSchedulerClient::ProcessQueue()

View File

@ -46,12 +46,12 @@ public:
typedef std::function<void()> Function; typedef std::function<void()> Function;
/** Call func at/after time t */ /** Call func at/after time t */
void schedule(Function f, std::chrono::system_clock::time_point t) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); void schedule(Function f, std::chrono::steady_clock::time_point t) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex);
/** Call f once after the delta has passed */ /** Call f once after the delta has passed */
void scheduleFromNow(Function f, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex) void scheduleFromNow(Function f, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
{ {
schedule(std::move(f), std::chrono::system_clock::now() + delta); schedule(std::move(f), std::chrono::steady_clock::now() + delta);
} }
/** /**
@ -93,8 +93,8 @@ public:
* Returns number of tasks waiting to be serviced, * Returns number of tasks waiting to be serviced,
* and first and last task times * and first and last task times
*/ */
size_t getQueueInfo(std::chrono::system_clock::time_point& first, size_t getQueueInfo(std::chrono::steady_clock::time_point& first,
std::chrono::system_clock::time_point& last) const std::chrono::steady_clock::time_point& last) const
EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex);
/** Returns true if there are threads actively running in serviceQueue() */ /** Returns true if there are threads actively running in serviceQueue() */
@ -103,7 +103,7 @@ public:
private: private:
mutable Mutex newTaskMutex; mutable Mutex newTaskMutex;
std::condition_variable newTaskScheduled; std::condition_variable newTaskScheduled;
std::multimap<std::chrono::system_clock::time_point, Function> taskQueue GUARDED_BY(newTaskMutex); std::multimap<std::chrono::steady_clock::time_point, Function> taskQueue GUARDED_BY(newTaskMutex);
int nThreadsServicingQueue GUARDED_BY(newTaskMutex){0}; int nThreadsServicingQueue GUARDED_BY(newTaskMutex){0};
bool stopRequested GUARDED_BY(newTaskMutex){false}; bool stopRequested GUARDED_BY(newTaskMutex){false};
bool stopWhenEmpty GUARDED_BY(newTaskMutex){false}; bool stopWhenEmpty GUARDED_BY(newTaskMutex){false};

View File

@ -15,13 +15,13 @@
BOOST_AUTO_TEST_SUITE(scheduler_tests) BOOST_AUTO_TEST_SUITE(scheduler_tests)
static void microTask(CScheduler& s, std::mutex& mutex, int& counter, int delta, std::chrono::system_clock::time_point rescheduleTime) static void microTask(CScheduler& s, std::mutex& mutex, int& counter, int delta, std::chrono::steady_clock::time_point rescheduleTime)
{ {
{ {
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
counter += delta; counter += delta;
} }
std::chrono::system_clock::time_point noTime = std::chrono::system_clock::time_point::min(); auto noTime = std::chrono::steady_clock::time_point::min();
if (rescheduleTime != noTime) { if (rescheduleTime != noTime) {
CScheduler::Function f = std::bind(&microTask, std::ref(s), std::ref(mutex), std::ref(counter), -delta + 1, noTime); CScheduler::Function f = std::bind(&microTask, std::ref(s), std::ref(mutex), std::ref(counter), -delta + 1, noTime);
s.schedule(f, rescheduleTime); s.schedule(f, rescheduleTime);
@ -49,15 +49,15 @@ BOOST_AUTO_TEST_CASE(manythreads)
auto randomMsec = [](FastRandomContext& rc) -> int { return -11 + (int)rc.randrange(1012); }; // [-11, 1000] auto randomMsec = [](FastRandomContext& rc) -> int { return -11 + (int)rc.randrange(1012); }; // [-11, 1000]
auto randomDelta = [](FastRandomContext& rc) -> int { return -1000 + (int)rc.randrange(2001); }; // [-1000, 1000] auto randomDelta = [](FastRandomContext& rc) -> int { return -1000 + (int)rc.randrange(2001); }; // [-1000, 1000]
std::chrono::system_clock::time_point start = std::chrono::system_clock::now(); auto start = std::chrono::steady_clock::now();
std::chrono::system_clock::time_point now = start; auto now = start;
std::chrono::system_clock::time_point first, last; std::chrono::steady_clock::time_point first, last;
size_t nTasks = microTasks.getQueueInfo(first, last); size_t nTasks = microTasks.getQueueInfo(first, last);
BOOST_CHECK(nTasks == 0); BOOST_CHECK(nTasks == 0);
for (int i = 0; i < 100; ++i) { for (int i = 0; i < 100; ++i) {
std::chrono::system_clock::time_point t = now + std::chrono::microseconds(randomMsec(rng)); auto t = now + std::chrono::microseconds(randomMsec(rng));
std::chrono::system_clock::time_point tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng)); auto tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng));
int whichCounter = zeroToNine(rng); int whichCounter = zeroToNine(rng);
CScheduler::Function f = std::bind(&microTask, std::ref(microTasks), CScheduler::Function f = std::bind(&microTask, std::ref(microTasks),
std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]), std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]),
@ -75,14 +75,14 @@ BOOST_AUTO_TEST_CASE(manythreads)
microThreads.emplace_back(std::bind(&CScheduler::serviceQueue, &microTasks)); microThreads.emplace_back(std::bind(&CScheduler::serviceQueue, &microTasks));
UninterruptibleSleep(std::chrono::microseconds{600}); UninterruptibleSleep(std::chrono::microseconds{600});
now = std::chrono::system_clock::now(); now = std::chrono::steady_clock::now();
// More threads and more tasks: // More threads and more tasks:
for (int i = 0; i < 5; i++) for (int i = 0; i < 5; i++)
microThreads.emplace_back(std::bind(&CScheduler::serviceQueue, &microTasks)); microThreads.emplace_back(std::bind(&CScheduler::serviceQueue, &microTasks));
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
std::chrono::system_clock::time_point t = now + std::chrono::microseconds(randomMsec(rng)); auto t = now + std::chrono::microseconds(randomMsec(rng));
std::chrono::system_clock::time_point tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng)); auto tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng));
int whichCounter = zeroToNine(rng); int whichCounter = zeroToNine(rng);
CScheduler::Function f = std::bind(&microTask, std::ref(microTasks), CScheduler::Function f = std::bind(&microTask, std::ref(microTasks),
std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]), std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]),
@ -112,7 +112,7 @@ BOOST_AUTO_TEST_CASE(wait_until_past)
WAIT_LOCK(mtx, lock); WAIT_LOCK(mtx, lock);
const auto no_wait = [&](const std::chrono::seconds& d) { const auto no_wait = [&](const std::chrono::seconds& d) {
return condvar.wait_until(lock, std::chrono::system_clock::now() - d); return condvar.wait_until(lock, std::chrono::steady_clock::now() - d);
}; };
BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::seconds{1})); BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::seconds{1}));
@ -184,7 +184,7 @@ BOOST_AUTO_TEST_CASE(mockforward)
scheduler.scheduleFromNow(dummy, std::chrono::minutes{8}); scheduler.scheduleFromNow(dummy, std::chrono::minutes{8});
// check taskQueue // check taskQueue
std::chrono::system_clock::time_point first, last; std::chrono::steady_clock::time_point first, last;
size_t num_tasks = scheduler.getQueueInfo(first, last); size_t num_tasks = scheduler.getQueueInfo(first, last);
BOOST_CHECK_EQUAL(num_tasks, 3ul); BOOST_CHECK_EQUAL(num_tasks, 3ul);
@ -205,7 +205,7 @@ BOOST_AUTO_TEST_CASE(mockforward)
BOOST_CHECK_EQUAL(counter, 2); BOOST_CHECK_EQUAL(counter, 2);
// check that the time of the remaining job has been updated // check that the time of the remaining job has been updated
std::chrono::system_clock::time_point now = std::chrono::system_clock::now(); auto now = std::chrono::steady_clock::now();
int delta = std::chrono::duration_cast<std::chrono::seconds>(first - now).count(); int delta = std::chrono::duration_cast<std::chrono::seconds>(first - now).count();
// should be between 2 & 3 minutes from now // should be between 2 & 3 minutes from now
BOOST_CHECK(delta > 2*60 && delta < 3*60); BOOST_CHECK(delta > 2*60 && delta < 3*60);