diff --git a/src/llmq/quorums_init.cpp b/src/llmq/quorums_init.cpp index a2c0659c9c..8c14a1f608 100644 --- a/src/llmq/quorums_init.cpp +++ b/src/llmq/quorums_init.cpp @@ -36,7 +36,7 @@ void InitLLMQSystem(CEvoDB& evoDb, CScheduler* scheduler, bool unitTests, bool f quorumSigSharesManager = new CSigSharesManager(); quorumSigningManager = new CSigningManager(*llmqDb, unitTests); chainLocksHandler = new CChainLocksHandler(scheduler); - quorumInstantSendManager = new CInstantSendManager(scheduler, *llmqDb); + quorumInstantSendManager = new CInstantSendManager(*llmqDb); } void DestroyLLMQSystem() @@ -84,14 +84,14 @@ void StartLLMQSystem() chainLocksHandler->Start(); } if (quorumInstantSendManager) { - quorumInstantSendManager->RegisterAsRecoveredSigsListener(); + quorumInstantSendManager->Start(); } } void StopLLMQSystem() { if (quorumInstantSendManager) { - quorumInstantSendManager->UnregisterAsRecoveredSigsListener(); + quorumInstantSendManager->Stop(); } if (chainLocksHandler) { chainLocksHandler->Stop(); @@ -113,6 +113,9 @@ void InterruptLLMQSystem() if (quorumSigSharesManager) { quorumSigSharesManager->InterruptWorkerThread(); } + if (quorumInstantSendManager) { + quorumInstantSendManager->InterruptWorkerThread(); + } } } diff --git a/src/llmq/quorums_instantsend.cpp b/src/llmq/quorums_instantsend.cpp index fee9f57ff2..b8115e4f52 100644 --- a/src/llmq/quorums_instantsend.cpp +++ b/src/llmq/quorums_instantsend.cpp @@ -12,7 +12,6 @@ #include "txmempool.h" #include "masternode-sync.h" #include "net_processing.h" -#include "scheduler.h" #include "spork.h" #include "validation.h" @@ -24,6 +23,7 @@ #include "instantx.h" #include +#include namespace llmq { @@ -208,24 +208,45 @@ CInstantSendLockPtr CInstantSendDb::GetInstantSendLockByInput(const COutPoint& o //////////////// -CInstantSendManager::CInstantSendManager(CScheduler* _scheduler, CDBWrapper& _llmqDb) : - scheduler(_scheduler), +CInstantSendManager::CInstantSendManager(CDBWrapper& _llmqDb) : db(_llmqDb) { + workInterrupt.reset(); } CInstantSendManager::~CInstantSendManager() { } -void CInstantSendManager::RegisterAsRecoveredSigsListener() +void CInstantSendManager::Start() { + // can't start new thread if we have one running already + if (workThread.joinable()) { + assert(false); + } + + workThread = std::thread(&TraceThread >, "instantsend", std::function(std::bind(&CInstantSendManager::WorkThreadMain, this))); + quorumSigningManager->RegisterRecoveredSigsListener(this); } -void CInstantSendManager::UnregisterAsRecoveredSigsListener() +void CInstantSendManager::Stop() { quorumSigningManager->UnregisterRecoveredSigsListener(this); + + // make sure to call InterruptWorkerThread() first + if (!workInterrupt) { + assert(false); + } + + if (workThread.joinable()) { + workThread.join(); + } +} + +void CInstantSendManager::InterruptWorkerThread() +{ + workInterrupt(); } bool CInstantSendManager::ProcessTx(const CTransaction& tx, const Consensus::Params& params) @@ -552,13 +573,6 @@ void CInstantSendManager::ProcessMessageInstantSendLock(CNode* pfrom, const llmq islock.txid.ToString(), hash.ToString(), pfrom->id); pendingInstantSendLocks.emplace(hash, std::make_pair(pfrom->id, std::move(islock))); - - if (!hasScheduledProcessPending) { - hasScheduledProcessPending = true; - scheduler->scheduleFromNow([&] { - ProcessPendingInstantSendLocks(); - }, 100); - } } bool CInstantSendManager::PreVerifyInstantSendLock(NodeId nodeId, const llmq::CInstantSendLock& islock, bool& retBan) @@ -581,7 +595,7 @@ bool CInstantSendManager::PreVerifyInstantSendLock(NodeId nodeId, const llmq::CI return true; } -void CInstantSendManager::ProcessPendingInstantSendLocks() +bool CInstantSendManager::ProcessPendingInstantSendLocks() { auto llmqType = Params().GetConsensus().llmqForInstantSend; @@ -589,12 +603,15 @@ void CInstantSendManager::ProcessPendingInstantSendLocks() { LOCK(cs); - hasScheduledProcessPending = false; pend = std::move(pendingInstantSendLocks); } + if (pend.empty()) { + return false; + } + if (!IsNewInstantSendEnabled()) { - return; + return false; } int tipHeight; @@ -621,7 +638,7 @@ void CInstantSendManager::ProcessPendingInstantSendLocks() auto quorum = quorumSigningManager->SelectQuorumForSigning(llmqType, tipHeight, id); if (!quorum) { // should not happen, but if one fails to select, all others will also fail to select - return; + return false; } uint256 signHash = CLLMQUtils::BuildSignHash(llmqType, quorum->qc.quorumHash, id, islock.txid); batchVerifier.PushMessage(nodeId, hash, signHash, islock.sig, quorum->qc.quorumPublicKey); @@ -679,6 +696,8 @@ void CInstantSendManager::ProcessPendingInstantSendLocks() } } } + + return true; } void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& hash, const CInstantSendLock& islock) @@ -1052,6 +1071,21 @@ bool CInstantSendManager::GetConflictingTx(const CTransaction& tx, uint256& retC return false; } +void CInstantSendManager::WorkThreadMain() +{ + while (!workInterrupt) { + bool didWork = false; + + didWork |= ProcessPendingInstantSendLocks(); + + if (!didWork) { + if (!workInterrupt.sleep_for(std::chrono::milliseconds(100))) { + return; + } + } + } +} + bool IsOldInstantSendEnabled() { return sporkManager.IsSporkActive(SPORK_2_INSTANTSEND_ENABLED) && !sporkManager.IsSporkActive(SPORK_20_INSTANTSEND_LLMQ_BASED); diff --git a/src/llmq/quorums_instantsend.h b/src/llmq/quorums_instantsend.h index 2355b984a9..b3b847408e 100644 --- a/src/llmq/quorums_instantsend.h +++ b/src/llmq/quorums_instantsend.h @@ -14,8 +14,6 @@ #include #include -class CScheduler; - namespace llmq { @@ -72,9 +70,11 @@ class CInstantSendManager : public CRecoveredSigsListener { private: CCriticalSection cs; - CScheduler* scheduler; CInstantSendDb db; + std::thread workThread; + CThreadInterrupt workInterrupt; + /** * Request ids of inputs that we signed. Used to determine if a recovered signature belongs to an * in-progress input lock. @@ -92,14 +92,14 @@ private: // Incoming and not verified yet std::unordered_map> pendingInstantSendLocks; - bool hasScheduledProcessPending{false}; public: - CInstantSendManager(CScheduler* _scheduler, CDBWrapper& _llmqDb); + CInstantSendManager(CDBWrapper& _llmqDb); ~CInstantSendManager(); - void RegisterAsRecoveredSigsListener(); - void UnregisterAsRecoveredSigsListener(); + void Start(); + void Stop(); + void InterruptWorkerThread(); public: bool ProcessTx(const CTransaction& tx, const Consensus::Params& params); @@ -118,7 +118,7 @@ public: void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman); void ProcessMessageInstantSendLock(CNode* pfrom, const CInstantSendLock& islock, CConnman& connman); bool PreVerifyInstantSendLock(NodeId nodeId, const CInstantSendLock& islock, bool& retBan); - void ProcessPendingInstantSendLocks(); + bool ProcessPendingInstantSendLocks(); void ProcessInstantSendLock(NodeId from, const uint256& hash, const CInstantSendLock& islock); void UpdateWalletTransaction(const uint256& txid, const CTransactionRef& tx); @@ -133,6 +133,8 @@ public: bool AlreadyHave(const CInv& inv); bool GetInstantSendLockByHash(const uint256& hash, CInstantSendLock& ret); + + void WorkThreadMain(); }; extern CInstantSendManager* quorumInstantSendManager;