mirror of
https://github.com/dashpay/dash.git
synced 2024-12-28 21:42:47 +01:00
871b8585ca
97d2b09c12
Add helper to wait for validation interface queue to catch up (Matt Corallo)36137497f1
Block ActivateBestChain to empty validationinterface queue (Matt Corallo)5a933cefcc
Add an interface to get the queue depth out of CValidationInterface (Matt Corallo)a99b76f269
Require no cs_main lock for ProcessNewBlock/ActivateBestChain (Matt Corallo)a734896038
Avoid cs_main in net_processing ActivateBestChain calls (Matt Corallo)66aa1d58a1
Refactor ProcessGetData in anticipation of avoiding cs_main for ABC (Matt Corallo)818075adac
Create new mutex for orphans, no cs_main in PLV::BlockConnected (Matt Corallo) Pull request description: This should fix #11822. It ended up bigger than I hoped for, but its not too gnarly. Note that " Require no cs_main lock for ProcessNewBlock/ActivateBestChain" is mostly pure code-movement. Tree-SHA512: 1127688545926f6099449dca6a4e6609eefc3abbd72f1c66e03d32bd8c7b31e82097d8307822cfd1dec0321703579cfdd82069cab6e17b1024e75eac694122cb
214 lines
7.1 KiB
C++
214 lines
7.1 KiB
C++
// Copyright (c) 2015 The Bitcoin Core developers
|
|
// Distributed under the MIT software license, see the accompanying
|
|
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
|
|
|
#include <scheduler.h>
|
|
|
|
#include <random.h>
|
|
#include <reverselock.h>
|
|
|
|
#include <assert.h>
|
|
#include <boost/bind.hpp>
|
|
#include <utility>
|
|
|
|
CScheduler::CScheduler() : nThreadsServicingQueue(0), stopRequested(false), stopWhenEmpty(false)
|
|
{
|
|
}
|
|
|
|
CScheduler::~CScheduler()
|
|
{
|
|
assert(nThreadsServicingQueue == 0);
|
|
}
|
|
|
|
|
|
#if BOOST_VERSION < 105000
|
|
static boost::system_time toPosixTime(const boost::chrono::system_clock::time_point& t)
|
|
{
|
|
// Creating the posix_time using from_time_t loses sub-second precision. So rather than exporting the time_point to time_t,
|
|
// start with a posix_time at the epoch (0) and add the milliseconds that have passed since then.
|
|
return boost::posix_time::from_time_t(0) + boost::posix_time::milliseconds(boost::chrono::duration_cast<boost::chrono::milliseconds>(t.time_since_epoch()).count());
|
|
}
|
|
#endif
|
|
|
|
void CScheduler::serviceQueue()
|
|
{
|
|
boost::unique_lock<boost::mutex> lock(newTaskMutex);
|
|
++nThreadsServicingQueue;
|
|
|
|
// newTaskMutex is locked throughout this loop EXCEPT
|
|
// when the thread is waiting or when the user's function
|
|
// is called.
|
|
while (!shouldStop()) {
|
|
try {
|
|
if (!shouldStop() && taskQueue.empty()) {
|
|
reverse_lock<boost::unique_lock<boost::mutex> > rlock(lock);
|
|
// Use this chance to get a tiny bit more entropy
|
|
RandAddSeedSleep();
|
|
}
|
|
while (!shouldStop() && taskQueue.empty()) {
|
|
// Wait until there is something to do.
|
|
newTaskScheduled.wait(lock);
|
|
}
|
|
|
|
// Wait until either there is a new task, or until
|
|
// the time of the first item on the queue:
|
|
|
|
// wait_until needs boost 1.50 or later; older versions have timed_wait:
|
|
#if BOOST_VERSION < 105000
|
|
while (!shouldStop() && !taskQueue.empty() &&
|
|
newTaskScheduled.timed_wait(lock, toPosixTime(taskQueue.begin()->first))) {
|
|
// Keep waiting until timeout
|
|
}
|
|
#else
|
|
// Some boost versions have a conflicting overload of wait_until that returns void.
|
|
// Explicitly use a template here to avoid hitting that overload.
|
|
while (!shouldStop() && !taskQueue.empty()) {
|
|
boost::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first;
|
|
if (newTaskScheduled.wait_until<>(lock, timeToWaitFor) == boost::cv_status::timeout)
|
|
break; // Exit loop after timeout, it means we reached the time of the event
|
|
}
|
|
#endif
|
|
// If there are multiple threads, the queue can empty while we're waiting (another
|
|
// thread may service the task we were waiting on).
|
|
if (shouldStop() || taskQueue.empty())
|
|
continue;
|
|
|
|
Function f = taskQueue.begin()->second;
|
|
taskQueue.erase(taskQueue.begin());
|
|
|
|
{
|
|
// Unlock before calling f, so it can reschedule itself or another task
|
|
// without deadlocking:
|
|
reverse_lock<boost::unique_lock<boost::mutex> > rlock(lock);
|
|
f();
|
|
}
|
|
} catch (...) {
|
|
--nThreadsServicingQueue;
|
|
throw;
|
|
}
|
|
}
|
|
--nThreadsServicingQueue;
|
|
newTaskScheduled.notify_one();
|
|
}
|
|
|
|
void CScheduler::stop(bool drain)
|
|
{
|
|
{
|
|
boost::unique_lock<boost::mutex> lock(newTaskMutex);
|
|
if (drain)
|
|
stopWhenEmpty = true;
|
|
else
|
|
stopRequested = true;
|
|
}
|
|
newTaskScheduled.notify_all();
|
|
}
|
|
|
|
void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::time_point t)
|
|
{
|
|
{
|
|
boost::unique_lock<boost::mutex> lock(newTaskMutex);
|
|
taskQueue.insert(std::make_pair(t, f));
|
|
}
|
|
newTaskScheduled.notify_one();
|
|
}
|
|
|
|
void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaMilliSeconds)
|
|
{
|
|
schedule(f, boost::chrono::system_clock::now() + boost::chrono::milliseconds(deltaMilliSeconds));
|
|
}
|
|
|
|
static void Repeat(CScheduler* s, CScheduler::Function f, int64_t deltaMilliSeconds)
|
|
{
|
|
f();
|
|
s->scheduleFromNow(boost::bind(&Repeat, s, f, deltaMilliSeconds), deltaMilliSeconds);
|
|
}
|
|
|
|
void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaMilliSeconds)
|
|
{
|
|
scheduleFromNow(boost::bind(&Repeat, this, f, deltaMilliSeconds), deltaMilliSeconds);
|
|
}
|
|
|
|
size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
|
|
boost::chrono::system_clock::time_point &last) const
|
|
{
|
|
boost::unique_lock<boost::mutex> lock(newTaskMutex);
|
|
size_t result = taskQueue.size();
|
|
if (!taskQueue.empty()) {
|
|
first = taskQueue.begin()->first;
|
|
last = taskQueue.rbegin()->first;
|
|
}
|
|
return result;
|
|
}
|
|
|
|
bool CScheduler::AreThreadsServicingQueue() const {
|
|
boost::unique_lock<boost::mutex> lock(newTaskMutex);
|
|
return nThreadsServicingQueue;
|
|
}
|
|
|
|
|
|
void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() {
|
|
{
|
|
LOCK(m_cs_callbacks_pending);
|
|
// Try to avoid scheduling too many copies here, but if we
|
|
// accidentally have two ProcessQueue's scheduled at once its
|
|
// not a big deal.
|
|
if (m_are_callbacks_running) return;
|
|
if (m_callbacks_pending.empty()) return;
|
|
}
|
|
m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this));
|
|
}
|
|
|
|
void SingleThreadedSchedulerClient::ProcessQueue() {
|
|
std::function<void (void)> callback;
|
|
{
|
|
LOCK(m_cs_callbacks_pending);
|
|
if (m_are_callbacks_running) return;
|
|
if (m_callbacks_pending.empty()) return;
|
|
m_are_callbacks_running = true;
|
|
|
|
callback = std::move(m_callbacks_pending.front());
|
|
m_callbacks_pending.pop_front();
|
|
}
|
|
|
|
// RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
|
|
// to ensure both happen safely even if callback() throws.
|
|
struct RAIICallbacksRunning {
|
|
SingleThreadedSchedulerClient* instance;
|
|
explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
|
|
~RAIICallbacksRunning() {
|
|
{
|
|
LOCK(instance->m_cs_callbacks_pending);
|
|
instance->m_are_callbacks_running = false;
|
|
}
|
|
instance->MaybeScheduleProcessQueue();
|
|
}
|
|
} raiicallbacksrunning(this);
|
|
|
|
callback();
|
|
}
|
|
|
|
void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void (void)> func) {
|
|
assert(m_pscheduler);
|
|
|
|
{
|
|
LOCK(m_cs_callbacks_pending);
|
|
m_callbacks_pending.emplace_back(std::move(func));
|
|
}
|
|
MaybeScheduleProcessQueue();
|
|
}
|
|
|
|
void SingleThreadedSchedulerClient::EmptyQueue() {
|
|
assert(!m_pscheduler->AreThreadsServicingQueue());
|
|
bool should_continue = true;
|
|
while (should_continue) {
|
|
ProcessQueue();
|
|
LOCK(m_cs_callbacks_pending);
|
|
should_continue = !m_callbacks_pending.empty();
|
|
}
|
|
}
|
|
|
|
size_t SingleThreadedSchedulerClient::CallbacksPending() {
|
|
LOCK(m_cs_callbacks_pending);
|
|
return m_callbacks_pending.size();
|
|
}
|