Move processing of InstantSend locks into its own worker thread (#2857)

* Let ProcessPendingInstantSendLocks return true when it did some work

* Introduce own worker thread for CInstantSendManager

Instead of using the scheduler.

* Remove scheduler from CInstantSendManager

* Add missing reset() call for workInterrupt
This commit is contained in:
Alexander Block 2019-04-11 14:43:00 +02:00 committed by UdjinM6
parent ae78360e5d
commit 90b1b71967
3 changed files with 66 additions and 27 deletions

View File

@ -36,7 +36,7 @@ void InitLLMQSystem(CEvoDB& evoDb, CScheduler* scheduler, bool unitTests, bool f
quorumSigSharesManager = new CSigSharesManager();
quorumSigningManager = new CSigningManager(*llmqDb, unitTests);
chainLocksHandler = new CChainLocksHandler(scheduler);
quorumInstantSendManager = new CInstantSendManager(scheduler, *llmqDb);
quorumInstantSendManager = new CInstantSendManager(*llmqDb);
}
void DestroyLLMQSystem()
@ -84,14 +84,14 @@ void StartLLMQSystem()
chainLocksHandler->Start();
}
if (quorumInstantSendManager) {
quorumInstantSendManager->RegisterAsRecoveredSigsListener();
quorumInstantSendManager->Start();
}
}
void StopLLMQSystem()
{
if (quorumInstantSendManager) {
quorumInstantSendManager->UnregisterAsRecoveredSigsListener();
quorumInstantSendManager->Stop();
}
if (chainLocksHandler) {
chainLocksHandler->Stop();
@ -113,6 +113,9 @@ void InterruptLLMQSystem()
if (quorumSigSharesManager) {
quorumSigSharesManager->InterruptWorkerThread();
}
if (quorumInstantSendManager) {
quorumInstantSendManager->InterruptWorkerThread();
}
}
}

View File

@ -12,7 +12,6 @@
#include "txmempool.h"
#include "masternode-sync.h"
#include "net_processing.h"
#include "scheduler.h"
#include "spork.h"
#include "validation.h"
@ -24,6 +23,7 @@
#include "instantx.h"
#include <boost/algorithm/string/replace.hpp>
#include <boost/thread.hpp>
namespace llmq
{
@ -208,24 +208,45 @@ CInstantSendLockPtr CInstantSendDb::GetInstantSendLockByInput(const COutPoint& o
////////////////
CInstantSendManager::CInstantSendManager(CScheduler* _scheduler, CDBWrapper& _llmqDb) :
scheduler(_scheduler),
CInstantSendManager::CInstantSendManager(CDBWrapper& _llmqDb) :
db(_llmqDb)
{
workInterrupt.reset();
}
CInstantSendManager::~CInstantSendManager()
{
}
void CInstantSendManager::RegisterAsRecoveredSigsListener()
void CInstantSendManager::Start()
{
// can't start new thread if we have one running already
if (workThread.joinable()) {
assert(false);
}
workThread = std::thread(&TraceThread<std::function<void()> >, "instantsend", std::function<void()>(std::bind(&CInstantSendManager::WorkThreadMain, this)));
quorumSigningManager->RegisterRecoveredSigsListener(this);
}
void CInstantSendManager::UnregisterAsRecoveredSigsListener()
void CInstantSendManager::Stop()
{
quorumSigningManager->UnregisterRecoveredSigsListener(this);
// make sure to call InterruptWorkerThread() first
if (!workInterrupt) {
assert(false);
}
if (workThread.joinable()) {
workThread.join();
}
}
void CInstantSendManager::InterruptWorkerThread()
{
workInterrupt();
}
bool CInstantSendManager::ProcessTx(const CTransaction& tx, const Consensus::Params& params)
@ -552,13 +573,6 @@ void CInstantSendManager::ProcessMessageInstantSendLock(CNode* pfrom, const llmq
islock.txid.ToString(), hash.ToString(), pfrom->id);
pendingInstantSendLocks.emplace(hash, std::make_pair(pfrom->id, std::move(islock)));
if (!hasScheduledProcessPending) {
hasScheduledProcessPending = true;
scheduler->scheduleFromNow([&] {
ProcessPendingInstantSendLocks();
}, 100);
}
}
bool CInstantSendManager::PreVerifyInstantSendLock(NodeId nodeId, const llmq::CInstantSendLock& islock, bool& retBan)
@ -581,7 +595,7 @@ bool CInstantSendManager::PreVerifyInstantSendLock(NodeId nodeId, const llmq::CI
return true;
}
void CInstantSendManager::ProcessPendingInstantSendLocks()
bool CInstantSendManager::ProcessPendingInstantSendLocks()
{
auto llmqType = Params().GetConsensus().llmqForInstantSend;
@ -589,12 +603,15 @@ void CInstantSendManager::ProcessPendingInstantSendLocks()
{
LOCK(cs);
hasScheduledProcessPending = false;
pend = std::move(pendingInstantSendLocks);
}
if (pend.empty()) {
return false;
}
if (!IsNewInstantSendEnabled()) {
return;
return false;
}
int tipHeight;
@ -621,7 +638,7 @@ void CInstantSendManager::ProcessPendingInstantSendLocks()
auto quorum = quorumSigningManager->SelectQuorumForSigning(llmqType, tipHeight, id);
if (!quorum) {
// should not happen, but if one fails to select, all others will also fail to select
return;
return false;
}
uint256 signHash = CLLMQUtils::BuildSignHash(llmqType, quorum->qc.quorumHash, id, islock.txid);
batchVerifier.PushMessage(nodeId, hash, signHash, islock.sig, quorum->qc.quorumPublicKey);
@ -679,6 +696,8 @@ void CInstantSendManager::ProcessPendingInstantSendLocks()
}
}
}
return true;
}
void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& hash, const CInstantSendLock& islock)
@ -1052,6 +1071,21 @@ bool CInstantSendManager::GetConflictingTx(const CTransaction& tx, uint256& retC
return false;
}
void CInstantSendManager::WorkThreadMain()
{
while (!workInterrupt) {
bool didWork = false;
didWork |= ProcessPendingInstantSendLocks();
if (!didWork) {
if (!workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
return;
}
}
}
}
bool IsOldInstantSendEnabled()
{
return sporkManager.IsSporkActive(SPORK_2_INSTANTSEND_ENABLED) && !sporkManager.IsSporkActive(SPORK_20_INSTANTSEND_LLMQ_BASED);

View File

@ -14,8 +14,6 @@
#include <unordered_map>
#include <unordered_set>
class CScheduler;
namespace llmq
{
@ -72,9 +70,11 @@ class CInstantSendManager : public CRecoveredSigsListener
{
private:
CCriticalSection cs;
CScheduler* scheduler;
CInstantSendDb db;
std::thread workThread;
CThreadInterrupt workInterrupt;
/**
* Request ids of inputs that we signed. Used to determine if a recovered signature belongs to an
* in-progress input lock.
@ -92,14 +92,14 @@ private:
// Incoming and not verified yet
std::unordered_map<uint256, std::pair<NodeId, CInstantSendLock>> pendingInstantSendLocks;
bool hasScheduledProcessPending{false};
public:
CInstantSendManager(CScheduler* _scheduler, CDBWrapper& _llmqDb);
CInstantSendManager(CDBWrapper& _llmqDb);
~CInstantSendManager();
void RegisterAsRecoveredSigsListener();
void UnregisterAsRecoveredSigsListener();
void Start();
void Stop();
void InterruptWorkerThread();
public:
bool ProcessTx(const CTransaction& tx, const Consensus::Params& params);
@ -118,7 +118,7 @@ public:
void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman);
void ProcessMessageInstantSendLock(CNode* pfrom, const CInstantSendLock& islock, CConnman& connman);
bool PreVerifyInstantSendLock(NodeId nodeId, const CInstantSendLock& islock, bool& retBan);
void ProcessPendingInstantSendLocks();
bool ProcessPendingInstantSendLocks();
void ProcessInstantSendLock(NodeId from, const uint256& hash, const CInstantSendLock& islock);
void UpdateWalletTransaction(const uint256& txid, const CTransactionRef& tx);
@ -133,6 +133,8 @@ public:
bool AlreadyHave(const CInv& inv);
bool GetInstantSendLockByHash(const uint256& hash, CInstantSendLock& ret);
void WorkThreadMain();
};
extern CInstantSendManager* quorumInstantSendManager;