mirror of
https://github.com/dashpay/dash.git
synced 2024-12-25 12:02:48 +01:00
4aa197dbdb
fa4632c41714dfaa699bacc6a947d72668a4deef test: Move boost/stdlib includes last (MarcoFalke) fa488f131fd4f5bab0d01376c5a5013306f1abcd scripted-diff: Bump copyright headers (MarcoFalke) fac5c373006a9e4bcbb56843bb85f1aca4d87599 scripted-diff: Sort test includes (MarcoFalke) Pull request description: When writing tests, often includes need to be added or removed. Currently the list of includes is not sorted, so developers that write tests and have `clang-format` installed will either have an unrelated change (sorting) included in their commit or they will have to manually undo the sort. This pull preempts both issues by just sorting all includes in one commit. Please be aware that this is **NOT** a change to policy to enforce clang-format or any other developer guideline or process. Developers are free to use whatever tool they want, see also #18651. Edit: Also includes a commit to bump the copyright headers, so that the touched files don't need to be touched again for that. ACKs for top commit: practicalswift: ACK fa4632c41714dfaa699bacc6a947d72668a4deef jonatack: ACK fa4632c41714dfaa, light review and sanity checks with gcc build and clang fuzz build Tree-SHA512: 130a8d073a379ba556b1e64104d37c46b671425c0aef0ed725fd60156a95e8dc83fb6f0b5330b2f8152cf5daaf3983b4aca5e75812598f2626c39fd12b88b180
256 lines
7.7 KiB
C++
256 lines
7.7 KiB
C++
// Copyright (c) 2012-2020 The Bitcoin Core developers
|
|
// Distributed under the MIT software license, see the accompanying
|
|
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
|
|
|
#ifndef BITCOIN_CHECKQUEUE_H
|
|
#define BITCOIN_CHECKQUEUE_H
|
|
|
|
#include <sync.h>
|
|
#include <tinyformat.h>
|
|
#include <util/threadnames.h>
|
|
|
|
#include <algorithm>
|
|
#include <vector>
|
|
|
|
template <typename T>
|
|
class CCheckQueueControl;
|
|
|
|
/**
|
|
* Queue for verifications that have to be performed.
|
|
* The verifications are represented by a type T, which must provide an
|
|
* operator(), returning a bool.
|
|
*
|
|
* One thread (the master) is assumed to push batches of verifications
|
|
* onto the queue, where they are processed by N-1 worker threads. When
|
|
* the master is done adding work, it temporarily joins the worker pool
|
|
* as an N'th worker, until all jobs are done.
|
|
*/
|
|
template <typename T>
|
|
class CCheckQueue
|
|
{
|
|
private:
|
|
//! Mutex to protect the inner state
|
|
Mutex m_mutex;
|
|
|
|
//! Worker threads block on this when out of work
|
|
std::condition_variable m_worker_cv;
|
|
|
|
//! Master thread blocks on this when out of work
|
|
std::condition_variable m_master_cv;
|
|
|
|
//! The queue of elements to be processed.
|
|
//! As the order of booleans doesn't matter, it is used as a LIFO (stack)
|
|
std::vector<T> queue GUARDED_BY(m_mutex);
|
|
|
|
//! The number of workers (including the master) that are idle.
|
|
int nIdle GUARDED_BY(m_mutex){0};
|
|
|
|
//! The total number of workers (including the master).
|
|
int nTotal GUARDED_BY(m_mutex){0};
|
|
|
|
//! The temporary evaluation result.
|
|
bool fAllOk GUARDED_BY(m_mutex){true};
|
|
|
|
/**
|
|
* Number of verifications that haven't completed yet.
|
|
* This includes elements that are no longer queued, but still in the
|
|
* worker's own batches.
|
|
*/
|
|
unsigned int nTodo GUARDED_BY(m_mutex){0};
|
|
|
|
//! The maximum number of elements to be processed in one batch
|
|
const unsigned int nBatchSize;
|
|
|
|
std::vector<std::thread> m_worker_threads;
|
|
bool m_request_stop GUARDED_BY(m_mutex){false};
|
|
|
|
/** Internal function that does bulk of the verification work. */
|
|
bool Loop(bool fMaster)
|
|
{
|
|
std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv;
|
|
std::vector<T> vChecks;
|
|
vChecks.reserve(nBatchSize);
|
|
unsigned int nNow = 0;
|
|
bool fOk = true;
|
|
do {
|
|
{
|
|
WAIT_LOCK(m_mutex, lock);
|
|
// first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
|
|
if (nNow) {
|
|
fAllOk &= fOk;
|
|
nTodo -= nNow;
|
|
if (nTodo == 0 && !fMaster)
|
|
// We processed the last element; inform the master it can exit and return the result
|
|
m_master_cv.notify_one();
|
|
} else {
|
|
// first iteration
|
|
nTotal++;
|
|
}
|
|
// logically, the do loop starts here
|
|
while (queue.empty() && !m_request_stop) {
|
|
if (fMaster && nTodo == 0) {
|
|
nTotal--;
|
|
bool fRet = fAllOk;
|
|
// reset the status for new work later
|
|
fAllOk = true;
|
|
// return the current status
|
|
return fRet;
|
|
}
|
|
nIdle++;
|
|
cond.wait(lock); // wait
|
|
nIdle--;
|
|
}
|
|
if (m_request_stop) {
|
|
return false;
|
|
}
|
|
|
|
// Decide how many work units to process now.
|
|
// * Do not try to do everything at once, but aim for increasingly smaller batches so
|
|
// all workers finish approximately simultaneously.
|
|
// * Try to account for idle jobs which will instantly start helping.
|
|
// * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
|
|
nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
|
|
vChecks.resize(nNow);
|
|
for (unsigned int i = 0; i < nNow; i++) {
|
|
// We want the lock on the m_mutex to be as short as possible, so swap jobs from the global
|
|
// queue to the local batch vector instead of copying.
|
|
vChecks[i].swap(queue.back());
|
|
queue.pop_back();
|
|
}
|
|
// Check whether we need to do work at all
|
|
fOk = fAllOk;
|
|
}
|
|
// execute work
|
|
for (T& check : vChecks)
|
|
if (fOk)
|
|
fOk = check();
|
|
vChecks.clear();
|
|
} while (true);
|
|
}
|
|
|
|
public:
|
|
//! Mutex to ensure only one concurrent CCheckQueueControl
|
|
Mutex m_control_mutex;
|
|
|
|
//! Create a new check queue
|
|
explicit CCheckQueue(unsigned int nBatchSizeIn)
|
|
: nBatchSize(nBatchSizeIn)
|
|
{
|
|
}
|
|
|
|
//! Create a pool of new worker threads.
|
|
void StartWorkerThreads(const int threads_num)
|
|
{
|
|
{
|
|
LOCK(m_mutex);
|
|
nIdle = 0;
|
|
nTotal = 0;
|
|
fAllOk = true;
|
|
}
|
|
assert(m_worker_threads.empty());
|
|
for (int n = 0; n < threads_num; ++n) {
|
|
m_worker_threads.emplace_back([this, n]() {
|
|
util::ThreadRename(strprintf("scriptch.%i", n));
|
|
Loop(false /* worker thread */);
|
|
});
|
|
}
|
|
}
|
|
|
|
//! Wait until execution finishes, and return whether all evaluations were successful.
|
|
bool Wait()
|
|
{
|
|
return Loop(true /* master thread */);
|
|
}
|
|
|
|
//! Add a batch of checks to the queue
|
|
void Add(std::vector<T>& vChecks)
|
|
{
|
|
if (vChecks.empty()) {
|
|
return;
|
|
}
|
|
|
|
{
|
|
LOCK(m_mutex);
|
|
for (T& check : vChecks) {
|
|
queue.emplace_back();
|
|
check.swap(queue.back());
|
|
}
|
|
nTodo += vChecks.size();
|
|
}
|
|
|
|
if (vChecks.size() == 1) {
|
|
m_worker_cv.notify_one();
|
|
} else {
|
|
m_worker_cv.notify_all();
|
|
}
|
|
}
|
|
|
|
//! Stop all of the worker threads.
|
|
void StopWorkerThreads()
|
|
{
|
|
WITH_LOCK(m_mutex, m_request_stop = true);
|
|
m_worker_cv.notify_all();
|
|
for (std::thread& t : m_worker_threads) {
|
|
t.join();
|
|
}
|
|
m_worker_threads.clear();
|
|
WITH_LOCK(m_mutex, m_request_stop = false);
|
|
}
|
|
|
|
~CCheckQueue()
|
|
{
|
|
assert(m_worker_threads.empty());
|
|
}
|
|
|
|
};
|
|
|
|
/**
|
|
* RAII-style controller object for a CCheckQueue that guarantees the passed
|
|
* queue is finished before continuing.
|
|
*/
|
|
template <typename T>
|
|
class CCheckQueueControl
|
|
{
|
|
private:
|
|
CCheckQueue<T> * const pqueue;
|
|
bool fDone;
|
|
|
|
public:
|
|
CCheckQueueControl() = delete;
|
|
CCheckQueueControl(const CCheckQueueControl&) = delete;
|
|
CCheckQueueControl& operator=(const CCheckQueueControl&) = delete;
|
|
explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false)
|
|
{
|
|
// passed queue is supposed to be unused, or nullptr
|
|
if (pqueue != nullptr) {
|
|
ENTER_CRITICAL_SECTION(pqueue->m_control_mutex);
|
|
}
|
|
}
|
|
|
|
bool Wait()
|
|
{
|
|
if (pqueue == nullptr)
|
|
return true;
|
|
bool fRet = pqueue->Wait();
|
|
fDone = true;
|
|
return fRet;
|
|
}
|
|
|
|
void Add(std::vector<T>& vChecks)
|
|
{
|
|
if (pqueue != nullptr)
|
|
pqueue->Add(vChecks);
|
|
}
|
|
|
|
~CCheckQueueControl()
|
|
{
|
|
if (!fDone)
|
|
Wait();
|
|
if (pqueue != nullptr) {
|
|
LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex);
|
|
}
|
|
}
|
|
};
|
|
|
|
#endif // BITCOIN_CHECKQUEUE_H
|