dash/src/scheduler.cpp
Konstantin Akimov 4aa197dbdb Merge #18673: scripted-diff: Sort test includes
fa4632c41714dfaa699bacc6a947d72668a4deef test: Move boost/stdlib includes last (MarcoFalke)
fa488f131fd4f5bab0d01376c5a5013306f1abcd scripted-diff: Bump copyright headers (MarcoFalke)
fac5c373006a9e4bcbb56843bb85f1aca4d87599 scripted-diff: Sort test includes (MarcoFalke)

Pull request description:

  When writing tests, often includes need to be added or removed. Currently the list of includes is not sorted, so developers that write tests and have `clang-format` installed will either have an unrelated change (sorting) included in their commit or they will have to manually undo the sort.

  This pull preempts both issues by just sorting all includes in one commit.

  Please be aware that this is **NOT** a change to policy to enforce clang-format or any other developer guideline or process. Developers are free to use whatever tool they want, see also #18651.

  Edit: Also includes a commit to bump the copyright headers, so that the touched files don't need to be touched again for that.

ACKs for top commit:
  practicalswift:
    ACK fa4632c41714dfaa699bacc6a947d72668a4deef
  jonatack:
    ACK fa4632c41714dfaa, light review and sanity checks with gcc build and clang fuzz build

Tree-SHA512: 130a8d073a379ba556b1e64104d37c46b671425c0aef0ed725fd60156a95e8dc83fb6f0b5330b2f8152cf5daaf3983b4aca5e75812598f2626c39fd12b88b180
2023-08-29 22:00:59 -05:00

204 lines
5.8 KiB
C++

// Copyright (c) 2015-2020 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <scheduler.h>
#include <random.h>
#include <util/time.h>
#include <assert.h>
#include <utility>
CScheduler::CScheduler()
{
}
CScheduler::~CScheduler()
{
assert(nThreadsServicingQueue == 0);
if (stopWhenEmpty) assert(taskQueue.empty());
}
void CScheduler::serviceQueue()
{
WAIT_LOCK(newTaskMutex, lock);
++nThreadsServicingQueue;
// newTaskMutex is locked throughout this loop EXCEPT
// when the thread is waiting or when the user's function
// is called.
while (!shouldStop()) {
try {
while (!shouldStop() && taskQueue.empty()) {
// Wait until there is something to do.
newTaskScheduled.wait(lock);
}
// Wait until either there is a new task, or until
// the time of the first item on the queue:
while (!shouldStop() && !taskQueue.empty()) {
std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first;
if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) {
break; // Exit loop after timeout, it means we reached the time of the event
}
}
// If there are multiple threads, the queue can empty while we're waiting (another
// thread may service the task we were waiting on).
if (shouldStop() || taskQueue.empty())
continue;
Function f = taskQueue.begin()->second;
taskQueue.erase(taskQueue.begin());
{
// Unlock before calling f, so it can reschedule itself or another task
// without deadlocking:
REVERSE_LOCK(lock);
f();
}
} catch (...) {
--nThreadsServicingQueue;
throw;
}
}
--nThreadsServicingQueue;
newTaskScheduled.notify_one();
}
void CScheduler::schedule(CScheduler::Function f, std::chrono::system_clock::time_point t)
{
{
LOCK(newTaskMutex);
taskQueue.insert(std::make_pair(t, f));
}
newTaskScheduled.notify_one();
}
void CScheduler::MockForward(std::chrono::seconds delta_seconds)
{
assert(delta_seconds > 0s && delta_seconds <= 1h);
{
LOCK(newTaskMutex);
// use temp_queue to maintain updated schedule
std::multimap<std::chrono::system_clock::time_point, Function> temp_queue;
for (const auto& element : taskQueue) {
temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second);
}
// point taskQueue to temp_queue
taskQueue = std::move(temp_queue);
}
// notify that the taskQueue needs to be processed
newTaskScheduled.notify_one();
}
static void Repeat(CScheduler& s, CScheduler::Function f, std::chrono::milliseconds delta)
{
f();
s.scheduleFromNow([=, &s] { Repeat(s, f, delta); }, delta);
}
void CScheduler::scheduleEvery(CScheduler::Function f, std::chrono::milliseconds delta)
{
scheduleFromNow([=] { Repeat(*this, f, delta); }, delta);
}
size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point& first,
std::chrono::system_clock::time_point& last) const
{
LOCK(newTaskMutex);
size_t result = taskQueue.size();
if (!taskQueue.empty()) {
first = taskQueue.begin()->first;
last = taskQueue.rbegin()->first;
}
return result;
}
bool CScheduler::AreThreadsServicingQueue() const
{
LOCK(newTaskMutex);
return nThreadsServicingQueue;
}
void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue()
{
{
LOCK(m_callbacks_mutex);
// Try to avoid scheduling too many copies here, but if we
// accidentally have two ProcessQueue's scheduled at once its
// not a big deal.
if (m_are_callbacks_running) return;
if (m_callbacks_pending.empty()) return;
}
m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), std::chrono::system_clock::now());
}
void SingleThreadedSchedulerClient::ProcessQueue()
{
std::function<void()> callback;
{
LOCK(m_callbacks_mutex);
if (m_are_callbacks_running) return;
if (m_callbacks_pending.empty()) return;
m_are_callbacks_running = true;
callback = std::move(m_callbacks_pending.front());
m_callbacks_pending.pop_front();
}
// RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
// to ensure both happen safely even if callback() throws.
struct RAIICallbacksRunning {
SingleThreadedSchedulerClient* instance;
explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
~RAIICallbacksRunning()
{
{
LOCK(instance->m_callbacks_mutex);
instance->m_are_callbacks_running = false;
}
instance->MaybeScheduleProcessQueue();
}
} raiicallbacksrunning(this);
callback();
}
void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void()> func)
{
assert(m_pscheduler);
{
LOCK(m_callbacks_mutex);
m_callbacks_pending.emplace_back(std::move(func));
}
MaybeScheduleProcessQueue();
}
void SingleThreadedSchedulerClient::EmptyQueue()
{
assert(!m_pscheduler->AreThreadsServicingQueue());
bool should_continue = true;
while (should_continue) {
ProcessQueue();
LOCK(m_callbacks_mutex);
should_continue = !m_callbacks_pending.empty();
}
}
size_t SingleThreadedSchedulerClient::CallbacksPending()
{
LOCK(m_callbacks_mutex);
return m_callbacks_pending.size();
}