Merge #6443: refactor: move CConnman and PeerManager out of CDeterministicMNManager, LLMQContext member ctors, reduce use in CJContext

7d26061170 refactor: move `CConnman`, `PeerManager` out of `CCoinJoinClientQueueManager` ctor (Kittywhiskers Van Gogh)
953ba96ac9 refactor: move `CConnman` out of `CoinJoinWalletManager` ctor (Kittywhiskers Van Gogh)
ac930a84d8 refactor: remove unused `CConnman` from `CDeterministicMNManager` ctor (Kittywhiskers Van Gogh)
a14e604064 refactor: remove `CConnman`, `PeerManager` from `LLMQContext` ctor (Kittywhiskers Van Gogh)
d9e5cc7c9a refactor: move `PeerManager` out of `CInstantSendManager` ctor (Kittywhiskers Van Gogh)
82d1aed1d6 refactor: move `CConnman`, `PeerManager` out of `CSigSharesManager` ctor (Kittywhiskers Van Gogh)
7498a38076 refactor: move `PeerManager` out of `CSigningManager` ctor (Kittywhiskers Van Gogh)
7ebc61e375 refactor: move `CConnman` out of `CQuorumManager` ctor (Kittywhiskers Van Gogh)
c07b522baa refactor: move `CConnman` out of `CDKGSession{,Handler,Manager}` ctor (Kittywhiskers Van Gogh)
01876c7e56 refactor: move `PeerManager` out of `CDKGSession{,Handler,Manager}` ctor (Kittywhiskers Van Gogh)
cc0e771c29 refactor: start BLS thread in `LLMQContext` ctor, move `Start` downwards (Kittywhiskers Van Gogh)

Pull request description:

  ## Additional Information

  * Depends on https://github.com/dashpay/dash/pull/6425
  * Dependency for https://github.com/dashpay/dash/pull/6304
  * In order to reduce the logic used in chainstate initialization (that will be split out of `init.cpp` in [bitcoin#23280](https://github.com/bitcoin/bitcoin/pull/23280)), the spinning up of threads (as done in `LLMQContext::Start()`) needs to be moved down.
    * They were moved up in [dash#5752](https://github.com/dashpay/dash/pull/5752) as `CBLSWorker` is a part of `LLMQContext` and `CBLSWorker` is needed during chainstate verification. As suggested in dash#5752, an alternate fix to the one already merged in was to move `CBLSWorker` `Start()`/`Stop()` to the constructor, which is done here.
      * Another alternate fix is that we move it out of `LLMQContext` entirely and let it remain in `NodeContext` though this approach has not been taken.
    * The reason we cannot retain the status quo is because `bitcoin-chainstate` (the binary introduced in [bitcoin#24304](https://github.com/bitcoin/bitcoin/pull/24304) that's built on the code split off in [bitcoin#23280](https://github.com/bitcoin/bitcoin/pull/23280)) aims to be devoid of P2P logic and this is reflected in the source files used to build it ([source](https://github.com/bitcoin/bitcoin/pull/24304/files#diff-4cb884d03ebb901069e4ee5de5d02538c40dd9b39919c615d8eaa9d364bbbd77R794)). (Also, there's no `NodeContext`, [source](https://github.com/bitcoin/bitcoin/pull/24304/files#diff-4cb884d03ebb901069e4ee5de5d02538c40dd9b39919c615d8eaa9d364bbbd77R795-R798))
      * This means need to separate P2P and validation components from Dash-specific logic in order for the split to work as expected. This PR is a step in that direction by moving P2P elements (`CConnman` and `PeerManager`) out of constructors.
  * As it stands, there are two sources for Dash-specific components to have access to P2P components, initialization (e.g. through `LLMQContext::Start()` or `PeerManagerImpl::ProcessMessage()`).

  ## Breaking Changes

  None expected. While changes are present in initialization order, consensus behaviour should remain unchanged.

  ## Checklist:

  - [x] I have performed a self-review of my own code
  - [x] I have commented my code, particularly in hard-to-understand areas **(note: N/A)**
  - [x] I have added or updated relevant unit/integration/functional/e2e tests **(note: N/A)**
  - [x] I have made corresponding changes to the documentation **(note: N/A)**
  - [x] I have assigned this pull request to a milestone _(for repository code-owners and collaborators only)_

ACKs for top commit:
  knst:
    utACK 7d26061170
  UdjinM6:
    utACK 7d26061170

Tree-SHA512: 4f12cbda935cad3a186acb31fed513cea489b0d3a55aa80be8e1336d10cbe1579d6d3db862a78a167134650c9e97816732acaf0c85ab759f6555b1b6be99ec02
This commit is contained in:
pasta 2024-12-09 14:46:45 -06:00
commit ff1ddc952a
No known key found for this signature in database
GPG Key ID: E2F3D7916E722D38
27 changed files with 386 additions and 388 deletions

View File

@ -30,18 +30,20 @@
#include <memory>
#include <univalue.h>
PeerMsgRet CCoinJoinClientQueueManager::ProcessMessage(const CNode& peer, std::string_view msg_type, CDataStream& vRecv)
PeerMsgRet CCoinJoinClientQueueManager::ProcessMessage(const CNode& peer, CConnman& connman, PeerManager& peerman,
std::string_view msg_type, CDataStream& vRecv)
{
if (m_is_masternode) return {};
if (!m_mn_sync.IsBlockchainSynced()) return {};
if (msg_type == NetMsgType::DSQUEUE) {
return CCoinJoinClientQueueManager::ProcessDSQueue(peer, vRecv);
return CCoinJoinClientQueueManager::ProcessDSQueue(peer, connman, peerman, vRecv);
}
return {};
}
PeerMsgRet CCoinJoinClientQueueManager::ProcessDSQueue(const CNode& peer, CDataStream& vRecv)
PeerMsgRet CCoinJoinClientQueueManager::ProcessDSQueue(const CNode& peer, CConnman& connman, PeerManager& peerman,
CDataStream& vRecv)
{
assert(m_mn_metaman.IsValid());
@ -50,7 +52,7 @@ PeerMsgRet CCoinJoinClientQueueManager::ProcessDSQueue(const CNode& peer, CDataS
{
LOCK(cs_main);
Assert(peerman)->EraseObjectRequest(peer.GetId(), CInv(MSG_DSQ, dsq.GetHash()));
peerman.EraseObjectRequest(peer.GetId(), CInv(MSG_DSQ, dsq.GetHash()));
}
if (dsq.masternodeOutpoint.IsNull() && dsq.m_protxHash.IsNull()) {
@ -102,8 +104,9 @@ PeerMsgRet CCoinJoinClientQueueManager::ProcessDSQueue(const CNode& peer, CDataS
}
// if the queue is ready, submit if we can
if (dsq.fReady && m_walletman.ForAnyCJClientMan([this, &dmn](std::unique_ptr<CCoinJoinClientManager>& clientman) {
return clientman->TrySubmitDenominate(dmn->pdmnState->addr, this->connman);
if (dsq.fReady &&
m_walletman.ForAnyCJClientMan([this, &connman, &dmn](std::unique_ptr<CCoinJoinClientManager>& clientman) {
return clientman->TrySubmitDenominate(dmn->pdmnState->addr, connman);
})) {
LogPrint(BCLog::COINJOIN, "DSQUEUE -- CoinJoin queue (%s) is ready on masternode %s\n", dsq.ToString(),
dmn->pdmnState->addr.ToStringAddrPort());
@ -132,7 +135,7 @@ PeerMsgRet CCoinJoinClientQueueManager::ProcessDSQueue(const CNode& peer, CDataS
WITH_LOCK(cs_vecqueue, vecCoinJoinQueue.push_back(dsq));
}
} // cs_ProcessDSQueue
peerman->RelayDSQ(dsq);
peerman.RelayDSQ(dsq);
return {};
}
@ -1917,11 +1920,11 @@ void CoinJoinWalletManager::Add(const std::shared_ptr<CWallet>& wallet)
g_wallet_init_interface.InitCoinJoinSettings(*this);
}
void CoinJoinWalletManager::DoMaintenance()
void CoinJoinWalletManager::DoMaintenance(CConnman& connman)
{
LOCK(cs_wallet_manager_map);
for (auto& [_, clientman] : m_wallet_manager_map) {
clientman->DoMaintenance(m_chainman, m_connman, m_mempool);
clientman->DoMaintenance(m_chainman, connman, m_mempool);
}
}

View File

@ -76,11 +76,10 @@ public:
using wallet_name_cjman_map = std::map<const std::string, std::unique_ptr<CCoinJoinClientManager>>;
public:
CoinJoinWalletManager(ChainstateManager& chainman, CConnman& connman, CDeterministicMNManager& dmnman,
CMasternodeMetaMan& mn_metaman, const CTxMemPool& mempool, const CMasternodeSync& mn_sync,
CoinJoinWalletManager(ChainstateManager& chainman, CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_metaman,
const CTxMemPool& mempool, const CMasternodeSync& mn_sync,
const std::unique_ptr<CCoinJoinClientQueueManager>& queueman, bool is_masternode) :
m_chainman(chainman),
m_connman(connman),
m_dmnman(dmnman),
m_mn_metaman(mn_metaman),
m_mempool(mempool),
@ -97,7 +96,7 @@ public:
}
void Add(const std::shared_ptr<CWallet>& wallet);
void DoMaintenance();
void DoMaintenance(CConnman& connman);
void Remove(const std::string& name);
void Flush(const std::string& name);
@ -122,7 +121,6 @@ public:
private:
ChainstateManager& m_chainman;
CConnman& m_connman;
CDeterministicMNManager& m_dmnman;
CMasternodeMetaMan& m_mn_metaman;
const CTxMemPool& m_mempool;
@ -234,8 +232,6 @@ public:
class CCoinJoinClientQueueManager : public CCoinJoinBaseManager
{
private:
CConnman& connman;
std::unique_ptr<PeerManager>& peerman;
CoinJoinWalletManager& m_walletman;
CDeterministicMNManager& m_dmnman;
CMasternodeMetaMan& m_mn_metaman;
@ -245,20 +241,18 @@ private:
const bool m_is_masternode;
public:
explicit CCoinJoinClientQueueManager(CConnman& _connman, std::unique_ptr<PeerManager>& _peerman,
CoinJoinWalletManager& walletman, CDeterministicMNManager& dmnman,
explicit CCoinJoinClientQueueManager(CoinJoinWalletManager& walletman, CDeterministicMNManager& dmnman,
CMasternodeMetaMan& mn_metaman, const CMasternodeSync& mn_sync,
bool is_masternode) :
connman(_connman),
peerman(_peerman),
m_walletman(walletman),
m_dmnman(dmnman),
m_mn_metaman(mn_metaman),
m_mn_sync(mn_sync),
m_is_masternode{is_masternode} {};
PeerMsgRet ProcessMessage(const CNode& peer, std::string_view msg_type, CDataStream& vRecv) EXCLUSIVE_LOCKS_REQUIRED(!cs_vecqueue);
PeerMsgRet ProcessDSQueue(const CNode& peer, CDataStream& vRecv);
PeerMsgRet ProcessMessage(const CNode& peer, CConnman& connman, PeerManager& peerman, std::string_view msg_type,
CDataStream& vRecv) EXCLUSIVE_LOCKS_REQUIRED(!cs_vecqueue);
PeerMsgRet ProcessDSQueue(const CNode& peer, CConnman& connman, PeerManager& peerman, CDataStream& vRecv);
void DoMaintenance();
};

View File

@ -15,11 +15,10 @@ CJContext::CJContext(ChainstateManager& chainman, CConnman& connman, CDeterminis
std::unique_ptr<PeerManager>& peerman, bool relay_txes) :
dstxman{std::make_unique<CDSTXManager>()},
#ifdef ENABLE_WALLET
walletman{std::make_unique<CoinJoinWalletManager>(chainman, connman, dmnman, mn_metaman, mempool, mn_sync, queueman,
/* is_masternode = */ mn_activeman != nullptr)},
queueman{relay_txes
? std::make_unique<CCoinJoinClientQueueManager>(connman, peerman, *walletman, dmnman, mn_metaman,
mn_sync, /* is_masternode = */ mn_activeman != nullptr)
walletman{std::make_unique<CoinJoinWalletManager>(chainman, dmnman, mn_metaman, mempool, mn_sync, queueman,
/*is_masternode=*/mn_activeman != nullptr)},
queueman{relay_txes ? std::make_unique<CCoinJoinClientQueueManager>(*walletman, dmnman, mn_metaman, mn_sync,
/*is_masternode=*/mn_activeman != nullptr)
: nullptr},
#endif // ENABLE_WALLET
server{std::make_unique<CCoinJoinServer>(chainman, connman, dmnman, *dstxman, mn_metaman, mempool, mn_activeman,

View File

@ -93,7 +93,7 @@ void CDSNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, con
m_llmq_ctx->isman->UpdatedBlockTip(pindexNew);
m_llmq_ctx->clhandler->UpdatedBlockTip();
m_llmq_ctx->qman->UpdatedBlockTip(pindexNew, fInitialDownload);
m_llmq_ctx->qman->UpdatedBlockTip(pindexNew, m_connman, fInitialDownload);
m_llmq_ctx->qdkgsman->UpdatedBlockTip(pindexNew, fInitialDownload);
m_llmq_ctx->ehfSignalsHandler->UpdatedBlockTip(pindexNew, /* is_masternode = */ m_mn_activeman != nullptr);
@ -107,7 +107,7 @@ void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef&
{
assert(m_cj_ctx && m_llmq_ctx);
m_llmq_ctx->isman->TransactionAddedToMempool(ptx);
m_llmq_ctx->isman->TransactionAddedToMempool(m_peerman, ptx);
m_llmq_ctx->clhandler->TransactionAddedToMempool(ptx, nAcceptTime);
m_cj_ctx->dstxman->TransactionAddedToMempool(ptx);
}

View File

@ -30,7 +30,6 @@ class CBlock;
class CBlockIndex;
class CChainState;
class CCoinsViewCache;
class CConnman;
class CEvoDB;
class TxValidationState;
@ -583,7 +582,6 @@ private:
std::atomic<int> to_cleanup {0};
CChainState& m_chainstate;
CConnman& connman;
CEvoDB& m_evoDb;
std::unordered_map<uint256, CDeterministicMNList, StaticSaltedHasher> mnListsCache GUARDED_BY(cs);
@ -592,8 +590,11 @@ private:
const CBlockIndex* m_initial_snapshot_index GUARDED_BY(cs) {nullptr};
public:
explicit CDeterministicMNManager(CChainState& chainstate, CConnman& _connman, CEvoDB& evoDb) :
m_chainstate(chainstate), connman(_connman), m_evoDb(evoDb) {}
explicit CDeterministicMNManager(CChainState& chainstate, CEvoDB& evoDb) :
m_chainstate(chainstate),
m_evoDb(evoDb)
{
}
~CDeterministicMNManager() = default;
bool ProcessBlock(const CBlock& block, gsl::not_null<const CBlockIndex*> pindex, BlockValidationState& state,

View File

@ -1893,7 +1893,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
// Same logic as above with pblocktree
node.dmnman.reset();
node.dmnman = std::make_unique<CDeterministicMNManager>(chainman.ActiveChainstate(), *node.connman, *node.evodb);
node.dmnman = std::make_unique<CDeterministicMNManager>(chainman.ActiveChainstate(), *node.evodb);
node.mempool->ConnectManagers(node.dmnman.get());
node.cpoolman.reset();
@ -1907,12 +1907,10 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
node.llmq_ctx->Stop();
}
node.llmq_ctx.reset();
node.llmq_ctx = std::make_unique<LLMQContext>(chainman, *node.connman, *node.dmnman, *node.evodb, *node.mn_metaman, *node.mnhf_manager, *node.sporkman,
*node.mempool, node.mn_activeman.get(), *node.mn_sync, node.peerman, /* unit_tests = */ false, /* wipe = */ fReset || fReindexChainState);
node.llmq_ctx = std::make_unique<LLMQContext>(chainman, *node.dmnman, *node.evodb, *node.mn_metaman, *node.mnhf_manager, *node.sporkman,
*node.mempool, node.mn_activeman.get(), *node.mn_sync, /*unit_tests=*/false, /*wipe=*/fReset || fReindexChainState);
// Enable CMNHFManager::{Process, Undo}Block
node.mnhf_manager->ConnectManagers(node.chainman.get(), node.llmq_ctx->qman.get());
// Have to start it early to let VerifyDB check ChainLock signatures in coinbase
node.llmq_ctx->Start();
node.chain_helper.reset();
node.chain_helper = std::make_unique<CChainstateHelper>(*node.cpoolman, *node.dmnman, *node.mnhf_manager, *node.govman, *(node.llmq_ctx->quorum_block_processor), *node.chainman,
@ -2287,6 +2285,8 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
// ********************************************************* Step 10a: schedule Dash-specific tasks
node.llmq_ctx->Start(*node.connman, *node.peerman);
node.scheduler->scheduleEvery(std::bind(&CNetFulfilledRequestManager::DoMaintenance, std::ref(*node.netfulfilledman)), std::chrono::minutes{1});
node.scheduler->scheduleEvery(std::bind(&CMasternodeSync::DoMaintenance, std::ref(*node.mn_sync), std::cref(*node.peerman), std::cref(*node.govman)), std::chrono::seconds{1});
node.scheduler->scheduleEvery(std::bind(&CMasternodeUtils::DoMaintenance, std::ref(*node.connman), std::ref(*node.dmnman), std::ref(*node.mn_sync), std::ref(*node.cj_ctx)), std::chrono::minutes{1});
@ -2302,7 +2302,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
#ifdef ENABLE_WALLET
} else if (!ignores_incoming_txs) {
node.scheduler->scheduleEvery(std::bind(&CCoinJoinClientQueueManager::DoMaintenance, std::ref(*node.cj_ctx->queueman)), std::chrono::seconds{1});
node.scheduler->scheduleEvery(std::bind(&CoinJoinWalletManager::DoMaintenance, std::ref(*node.cj_ctx->walletman)), std::chrono::seconds{1});
node.scheduler->scheduleEvery(std::bind(&CoinJoinWalletManager::DoMaintenance, std::ref(*node.cj_ctx->walletman), std::ref(*node.connman)), std::chrono::seconds{1});
#endif // ENABLE_WALLET
}

View File

@ -18,24 +18,22 @@
#include <llmq/signing.h>
#include <llmq/signing_shares.h>
LLMQContext::LLMQContext(ChainstateManager& chainman, CConnman& connman, CDeterministicMNManager& dmnman,
CEvoDB& evo_db, CMasternodeMetaMan& mn_metaman, CMNHFManager& mnhfman, CSporkManager& sporkman,
LLMQContext::LLMQContext(ChainstateManager& chainman, CDeterministicMNManager& dmnman, CEvoDB& evo_db,
CMasternodeMetaMan& mn_metaman, CMNHFManager& mnhfman, CSporkManager& sporkman,
CTxMemPool& mempool, const CActiveMasternodeManager* const mn_activeman,
const CMasternodeSync& mn_sync, const std::unique_ptr<PeerManager>& peerman, bool unit_tests,
bool wipe) :
const CMasternodeSync& mn_sync, bool unit_tests, bool wipe) :
is_masternode{mn_activeman != nullptr},
bls_worker{std::make_shared<CBLSWorker>()},
dkg_debugman{std::make_unique<llmq::CDKGDebugManager>()},
quorum_block_processor{std::make_unique<llmq::CQuorumBlockProcessor>(chainman.ActiveChainstate(), dmnman, evo_db)},
qdkgsman{std::make_unique<llmq::CDKGSessionManager>(*bls_worker, chainman.ActiveChainstate(), connman, dmnman,
*dkg_debugman, mn_metaman, *quorum_block_processor,
mn_activeman, sporkman, peerman, unit_tests, wipe)},
qman{std::make_unique<llmq::CQuorumManager>(*bls_worker, chainman.ActiveChainstate(), connman, dmnman, *qdkgsman,
evo_db, *quorum_block_processor, mn_activeman, mn_sync, sporkman,
qdkgsman{std::make_unique<llmq::CDKGSessionManager>(*bls_worker, chainman.ActiveChainstate(), dmnman, *dkg_debugman,
mn_metaman, *quorum_block_processor, mn_activeman, sporkman,
unit_tests, wipe)},
sigman{std::make_unique<llmq::CSigningManager>(mn_activeman, chainman.ActiveChainstate(), *qman, peerman,
unit_tests, wipe)},
shareman{std::make_unique<llmq::CSigSharesManager>(connman, *sigman, mn_activeman, *qman, sporkman, peerman)},
qman{std::make_unique<llmq::CQuorumManager>(*bls_worker, chainman.ActiveChainstate(), dmnman, *qdkgsman, evo_db,
*quorum_block_processor, mn_activeman, mn_sync, sporkman, unit_tests,
wipe)},
sigman{std::make_unique<llmq::CSigningManager>(mn_activeman, chainman.ActiveChainstate(), *qman, unit_tests, wipe)},
shareman{std::make_unique<llmq::CSigSharesManager>(*sigman, mn_activeman, *qman, sporkman)},
clhandler{[&]() -> llmq::CChainLocksHandler* const {
assert(llmq::chainLocksHandler == nullptr);
llmq::chainLocksHandler = std::make_unique<llmq::CChainLocksHandler>(chainman.ActiveChainstate(), *qman,
@ -48,15 +46,19 @@ LLMQContext::LLMQContext(ChainstateManager& chainman, CConnman& connman, CDeterm
llmq::quorumInstantSendManager = std::make_unique<llmq::CInstantSendManager>(*llmq::chainLocksHandler,
chainman.ActiveChainstate(), *qman,
*sigman, *shareman, sporkman,
mempool, mn_sync, peerman,
is_masternode, unit_tests, wipe);
mempool, mn_sync, is_masternode,
unit_tests, wipe);
return llmq::quorumInstantSendManager.get();
}()},
ehfSignalsHandler{std::make_unique<llmq::CEHFSignalsHandler>(chainman, mnhfman, *sigman, *shareman, *qman)}
{
// Have to start it early to let VerifyDB check ChainLock signatures in coinbase
bls_worker->Start();
}
LLMQContext::~LLMQContext() {
bls_worker->Stop();
// LLMQContext doesn't own these objects, but still need to care of them for consistency:
llmq::quorumInstantSendManager.reset();
llmq::chainLocksHandler.reset();
@ -70,21 +72,21 @@ void LLMQContext::Interrupt() {
llmq::quorumInstantSendManager->InterruptWorkerThread();
}
void LLMQContext::Start() {
void LLMQContext::Start(CConnman& connman, PeerManager& peerman)
{
assert(clhandler == llmq::chainLocksHandler.get());
assert(isman == llmq::quorumInstantSendManager.get());
bls_worker->Start();
if (is_masternode) {
qdkgsman->StartThreads();
qdkgsman->StartThreads(connman, peerman);
}
qman->Start();
shareman->RegisterAsRecoveredSigsListener();
shareman->StartWorkerThread();
sigman->StartWorkerThread();
shareman->StartWorkerThread(connman, peerman);
sigman->StartWorkerThread(peerman);
llmq::chainLocksHandler->Start();
llmq::quorumInstantSendManager->Start();
llmq::quorumInstantSendManager->Start(peerman);
}
void LLMQContext::Stop() {
@ -101,5 +103,4 @@ void LLMQContext::Stop() {
if (is_masternode) {
qdkgsman->StopThreads();
}
bls_worker->Stop();
}

View File

@ -39,14 +39,14 @@ private:
public:
LLMQContext() = delete;
LLMQContext(const LLMQContext&) = delete;
LLMQContext(ChainstateManager& chainman, CConnman& connman, CDeterministicMNManager& dmnman, CEvoDB& evo_db,
LLMQContext(ChainstateManager& chainman, CDeterministicMNManager& dmnman, CEvoDB& evo_db,
CMasternodeMetaMan& mn_metaman, CMNHFManager& mnhfman, CSporkManager& sporkman, CTxMemPool& mempool,
const CActiveMasternodeManager* const mn_activeman, const CMasternodeSync& mn_sync,
const std::unique_ptr<PeerManager>& peerman, bool unit_tests, bool wipe);
const CActiveMasternodeManager* const mn_activeman, const CMasternodeSync& mn_sync, bool unit_tests,
bool wipe);
~LLMQContext();
void Interrupt();
void Start();
void Start(CConnman& connman, PeerManager& peerman);
void Stop();
/** Guaranteed if LLMQContext is initialized then all members are valid too

View File

@ -72,14 +72,12 @@ CDKGMember::CDKGMember(const CDeterministicMNCPtr& _dmn, size_t _idx) :
}
CDKGSession::CDKGSession(const CBlockIndex* pQuorumBaseBlockIndex, const Consensus::LLMQParams& _params,
CBLSWorker& _blsWorker, CConnman& _connman, CDeterministicMNManager& dmnman,
CDKGSessionManager& _dkgManager, CDKGDebugManager& _dkgDebugManager,
CMasternodeMetaMan& mn_metaman, const CActiveMasternodeManager* const mn_activeman,
const CSporkManager& sporkman) :
CBLSWorker& _blsWorker, CDeterministicMNManager& dmnman, CDKGSessionManager& _dkgManager,
CDKGDebugManager& _dkgDebugManager, CMasternodeMetaMan& mn_metaman,
const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman) :
params(_params),
blsWorker(_blsWorker),
cache(_blsWorker),
connman(_connman),
m_dmnman(dmnman),
dkgManager(_dkgManager),
dkgDebugManager(_dkgDebugManager),
@ -156,7 +154,7 @@ bool CDKGSession::Init(const uint256& _myProTxHash, int _quorumIndex)
return true;
}
void CDKGSession::Contribute(CDKGPendingMessages& pendingMessages)
void CDKGSession::Contribute(CDKGPendingMessages& pendingMessages, PeerManager& peerman)
{
CDKGLogger logger(*this, __func__, __LINE__);
@ -174,10 +172,10 @@ void CDKGSession::Contribute(CDKGPendingMessages& pendingMessages)
logger.Batch("generated contributions. time=%d", t1.count());
logger.Flush();
SendContributions(pendingMessages);
SendContributions(pendingMessages, peerman);
}
void CDKGSession::SendContributions(CDKGPendingMessages& pendingMessages)
void CDKGSession::SendContributions(CDKGPendingMessages& pendingMessages, PeerManager& peerman)
{
CDKGLogger logger(*this, __func__, __LINE__);
@ -226,7 +224,7 @@ void CDKGSession::SendContributions(CDKGPendingMessages& pendingMessages)
return true;
});
pendingMessages.PushPendingMessage(-1, nullptr, qc);
pendingMessages.PushPendingMessage(-1, qc, peerman);
}
// only performs cheap verifications, but not the signature of the message. this is checked with batched verification
@ -417,7 +415,7 @@ void CDKGSession::VerifyPendingContributions()
pendingContributionVerifications.clear();
}
void CDKGSession::VerifyAndComplain(CDKGPendingMessages& pendingMessages)
void CDKGSession::VerifyAndComplain(CConnman& connman, CDKGPendingMessages& pendingMessages, PeerManager& peerman)
{
if (!AreWeMember()) {
return;
@ -453,12 +451,12 @@ void CDKGSession::VerifyAndComplain(CDKGPendingMessages& pendingMessages)
logger.Batch("verified contributions. time=%d", t1.count());
logger.Flush();
VerifyConnectionAndMinProtoVersions();
VerifyConnectionAndMinProtoVersions(connman);
SendComplaint(pendingMessages);
SendComplaint(pendingMessages, peerman);
}
void CDKGSession::VerifyConnectionAndMinProtoVersions() const
void CDKGSession::VerifyConnectionAndMinProtoVersions(CConnman& connman) const
{
assert(m_mn_metaman.IsValid());
@ -499,7 +497,7 @@ void CDKGSession::VerifyConnectionAndMinProtoVersions() const
}
}
void CDKGSession::SendComplaint(CDKGPendingMessages& pendingMessages)
void CDKGSession::SendComplaint(CDKGPendingMessages& pendingMessages, PeerManager& peerman)
{
CDKGLogger logger(*this, __func__, __LINE__);
@ -538,7 +536,7 @@ void CDKGSession::SendComplaint(CDKGPendingMessages& pendingMessages)
return true;
});
pendingMessages.PushPendingMessage(-1, nullptr, qc);
pendingMessages.PushPendingMessage(-1, qc, peerman);
}
// only performs cheap verifications, but not the signature of the message. this is checked with batched verification
@ -645,7 +643,7 @@ std::optional<CInv> CDKGSession::ReceiveMessage(const CDKGComplaint& qc)
return inv;
}
void CDKGSession::VerifyAndJustify(CDKGPendingMessages& pendingMessages)
void CDKGSession::VerifyAndJustify(CDKGPendingMessages& pendingMessages, PeerManager& peerman)
{
if (!AreWeMember()) {
return;
@ -682,11 +680,12 @@ void CDKGSession::VerifyAndJustify(CDKGPendingMessages& pendingMessages)
logger.Flush();
if (!justifyFor.empty()) {
SendJustification(pendingMessages, justifyFor);
SendJustification(pendingMessages, peerman, justifyFor);
}
}
void CDKGSession::SendJustification(CDKGPendingMessages& pendingMessages, const std::set<uint256>& forMembers)
void CDKGSession::SendJustification(CDKGPendingMessages& pendingMessages, PeerManager& peerman,
const std::set<uint256>& forMembers)
{
CDKGLogger logger(*this, __func__, __LINE__);
@ -731,7 +730,7 @@ void CDKGSession::SendJustification(CDKGPendingMessages& pendingMessages, const
return true;
});
pendingMessages.PushPendingMessage(-1, nullptr, qj);
pendingMessages.PushPendingMessage(-1, qj, peerman);
}
// only performs cheap verifications, but not the signature of the message. this is checked with batched verification
@ -885,7 +884,7 @@ std::optional<CInv> CDKGSession::ReceiveMessage(const CDKGJustification& qj)
return inv;
}
void CDKGSession::VerifyAndCommit(CDKGPendingMessages& pendingMessages)
void CDKGSession::VerifyAndCommit(CDKGPendingMessages& pendingMessages, PeerManager& peerman)
{
if (!AreWeMember()) {
return;
@ -927,10 +926,10 @@ void CDKGSession::VerifyAndCommit(CDKGPendingMessages& pendingMessages)
logger.Flush();
SendCommitment(pendingMessages);
SendCommitment(pendingMessages, peerman);
}
void CDKGSession::SendCommitment(CDKGPendingMessages& pendingMessages)
void CDKGSession::SendCommitment(CDKGPendingMessages& pendingMessages, PeerManager& peerman)
{
CDKGLogger logger(*this, __func__, __LINE__);
@ -1041,7 +1040,7 @@ void CDKGSession::SendCommitment(CDKGPendingMessages& pendingMessages)
return true;
});
pendingMessages.PushPendingMessage(-1, nullptr, qc);
pendingMessages.PushPendingMessage(-1, qc, peerman);
}
// only performs cheap verifications, but not the signature of the message. this is checked with batched verification

View File

@ -22,6 +22,7 @@ class CConnman;
class CDeterministicMN;
class CMasternodeMetaMan;
class CSporkManager;
class PeerManager;
using CDeterministicMNCPtr = std::shared_ptr<const CDeterministicMN>;
@ -279,7 +280,6 @@ private:
CBLSWorker& blsWorker;
CBLSWorkerCache cache;
CConnman& connman;
CDeterministicMNManager& m_dmnman;
CDKGSessionManager& dkgManager;
CDKGDebugManager& dkgDebugManager;
@ -326,9 +326,9 @@ private:
public:
CDKGSession(const CBlockIndex* pQuorumBaseBlockIndex, const Consensus::LLMQParams& _params, CBLSWorker& _blsWorker,
CConnman& _connman, CDeterministicMNManager& dmnman, CDKGSessionManager& _dkgManager,
CDKGDebugManager& _dkgDebugManager, CMasternodeMetaMan& mn_metaman,
const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman);
CDeterministicMNManager& dmnman, CDKGSessionManager& _dkgManager, CDKGDebugManager& _dkgDebugManager,
CMasternodeMetaMan& mn_metaman, const CActiveMasternodeManager* const mn_activeman,
const CSporkManager& sporkman);
// TODO: remove Init completely
bool Init(const uint256& _myProTxHash, int _quorumIndex);
@ -349,28 +349,28 @@ public:
*/
// Phase 1: contribution
void Contribute(CDKGPendingMessages& pendingMessages);
void SendContributions(CDKGPendingMessages& pendingMessages);
void Contribute(CDKGPendingMessages& pendingMessages, PeerManager& peerman);
void SendContributions(CDKGPendingMessages& pendingMessages, PeerManager& peerman);
bool PreVerifyMessage(const CDKGContribution& qc, bool& retBan) const;
std::optional<CInv> ReceiveMessage(const CDKGContribution& qc);
void VerifyPendingContributions() EXCLUSIVE_LOCKS_REQUIRED(cs_pending);
// Phase 2: complaint
void VerifyAndComplain(CDKGPendingMessages& pendingMessages);
void VerifyConnectionAndMinProtoVersions() const;
void SendComplaint(CDKGPendingMessages& pendingMessages);
void VerifyAndComplain(CConnman& connman, CDKGPendingMessages& pendingMessages, PeerManager& peerman);
void VerifyConnectionAndMinProtoVersions(CConnman& connman) const;
void SendComplaint(CDKGPendingMessages& pendingMessages, PeerManager& peerman);
bool PreVerifyMessage(const CDKGComplaint& qc, bool& retBan) const;
std::optional<CInv> ReceiveMessage(const CDKGComplaint& qc);
// Phase 3: justification
void VerifyAndJustify(CDKGPendingMessages& pendingMessages);
void SendJustification(CDKGPendingMessages& pendingMessages, const std::set<uint256>& forMembers);
void VerifyAndJustify(CDKGPendingMessages& pendingMessages, PeerManager& peerman);
void SendJustification(CDKGPendingMessages& pendingMessages, PeerManager& peerman, const std::set<uint256>& forMembers);
bool PreVerifyMessage(const CDKGJustification& qj, bool& retBan) const;
std::optional<CInv> ReceiveMessage(const CDKGJustification& qj);
// Phase 4: commit
void VerifyAndCommit(CDKGPendingMessages& pendingMessages);
void SendCommitment(CDKGPendingMessages& pendingMessages);
void VerifyAndCommit(CDKGPendingMessages& pendingMessages, PeerManager& peerman);
void SendCommitment(CDKGPendingMessages& pendingMessages, PeerManager& peerman);
bool PreVerifyMessage(const CDKGPrematureCommitment& qc, bool& retBan) const;
std::optional<CInv> ReceiveMessage(const CDKGPrematureCommitment& qc);

View File

@ -24,16 +24,13 @@
namespace llmq
{
CDKGSessionHandler::CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman,
CDeterministicMNManager& dmnman, CDKGDebugManager& _dkgDebugManager,
CDKGSessionManager& _dkgManager, CMasternodeMetaMan& mn_metaman,
CQuorumBlockProcessor& _quorumBlockProcessor,
const CActiveMasternodeManager* const mn_activeman,
const CSporkManager& sporkman, const std::unique_ptr<PeerManager>& peerman,
CDKGSessionHandler::CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chainstate, CDeterministicMNManager& dmnman,
CDKGDebugManager& _dkgDebugManager, CDKGSessionManager& _dkgManager,
CMasternodeMetaMan& mn_metaman, CQuorumBlockProcessor& _quorumBlockProcessor,
const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman,
const Consensus::LLMQParams& _params, int _quorumIndex) :
blsWorker(_blsWorker),
m_chainstate(chainstate),
connman(_connman),
m_dmnman(dmnman),
dkgDebugManager(_dkgDebugManager),
dkgManager(_dkgManager),
@ -41,11 +38,10 @@ CDKGSessionHandler::CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chai
quorumBlockProcessor(_quorumBlockProcessor),
m_mn_activeman(mn_activeman),
m_sporkman(sporkman),
m_peerman(peerman),
params(_params),
quorumIndex(_quorumIndex),
curSession(std::make_unique<CDKGSession>(nullptr, _params, _blsWorker, _connman, dmnman, _dkgManager,
_dkgDebugManager, m_mn_metaman, m_mn_activeman, sporkman)),
curSession(std::make_unique<CDKGSession>(nullptr, _params, _blsWorker, dmnman, _dkgManager, _dkgDebugManager,
m_mn_metaman, m_mn_activeman, sporkman)),
pendingContributions(
(size_t)_params.size * 2,
MSG_QUORUM_CONTRIB), // we allow size*2 messages as we need to make sure we see bad behavior (double messages)
@ -60,18 +56,8 @@ CDKGSessionHandler::CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chai
CDKGSessionHandler::~CDKGSessionHandler() = default;
void CDKGPendingMessages::PushPendingMessage(NodeId from, PeerManager* peerman, CDataStream& vRecv)
void CDKGPendingMessages::PushPendingMessage(NodeId from, CDataStream& vRecv, PeerManager& peerman)
{
// if peer is not -1 we should always pass valid peerman
assert(from == -1 || peerman != nullptr);
if (peerman != nullptr) {
if (m_peerman == nullptr) {
m_peerman = peerman;
}
// we should never use one different PeerManagers for same queue
assert(m_peerman == peerman);
}
// this will also consume the data, even if we bail out early
auto pm = std::make_shared<CDataStream>(std::move(vRecv));
@ -80,7 +66,7 @@ void CDKGPendingMessages::PushPendingMessage(NodeId from, PeerManager* peerman,
uint256 hash = hw.GetHash();
if (from != -1) {
WITH_LOCK(::cs_main, Assert(m_peerman.load())->EraseObjectRequest(from, CInv(invType, hash)));
WITH_LOCK(::cs_main, peerman.EraseObjectRequest(from, CInv(invType, hash)));
}
LOCK(cs_messages);
@ -119,10 +105,10 @@ bool CDKGPendingMessages::HasSeen(const uint256& hash) const
return seenMessages.count(hash) != 0;
}
void CDKGPendingMessages::Misbehaving(const NodeId from, const int score)
void CDKGPendingMessages::Misbehaving(const NodeId from, const int score, PeerManager& peerman)
{
if (from == -1) return;
m_peerman.load()->Misbehaving(from, score);
peerman.Misbehaving(from, score);
}
void CDKGPendingMessages::Clear()
@ -162,28 +148,30 @@ void CDKGSessionHandler::UpdatedBlockTip(const CBlockIndex* pindexNew)
params.name, quorumIndex, currentHeight, pQuorumBaseBlockIndex->nHeight, ToUnderlying(oldPhase), ToUnderlying(phase));
}
void CDKGSessionHandler::ProcessMessage(const CNode& pfrom, gsl::not_null<PeerManager*> peerman, const std::string& msg_type, CDataStream& vRecv)
void CDKGSessionHandler::ProcessMessage(const CNode& pfrom, PeerManager& peerman, const std::string& msg_type,
CDataStream& vRecv)
{
// We don't handle messages in the calling thread as deserialization/processing of these would block everything
if (msg_type == NetMsgType::QCONTRIB) {
pendingContributions.PushPendingMessage(pfrom.GetId(), peerman, vRecv);
pendingContributions.PushPendingMessage(pfrom.GetId(), vRecv, peerman);
} else if (msg_type == NetMsgType::QCOMPLAINT) {
pendingComplaints.PushPendingMessage(pfrom.GetId(), peerman, vRecv);
pendingComplaints.PushPendingMessage(pfrom.GetId(), vRecv, peerman);
} else if (msg_type == NetMsgType::QJUSTIFICATION) {
pendingJustifications.PushPendingMessage(pfrom.GetId(), peerman, vRecv);
pendingJustifications.PushPendingMessage(pfrom.GetId(), vRecv, peerman);
} else if (msg_type == NetMsgType::QPCOMMITMENT) {
pendingPrematureCommitments.PushPendingMessage(pfrom.GetId(), peerman, vRecv);
pendingPrematureCommitments.PushPendingMessage(pfrom.GetId(), vRecv, peerman);
}
}
void CDKGSessionHandler::StartThread()
void CDKGSessionHandler::StartThread(CConnman& connman, PeerManager& peerman)
{
if (phaseHandlerThread.joinable()) {
throw std::runtime_error("Tried to start an already started CDKGSessionHandler thread.");
}
m_thread_name = strprintf("llmq-%d-%d", ToUnderlying(params.type), quorumIndex);
phaseHandlerThread = std::thread(util::TraceThread, m_thread_name.c_str(), [this] { PhaseHandlerThread(); });
phaseHandlerThread = std::thread(&util::TraceThread, m_thread_name.c_str(),
[this, &connman, &peerman] { PhaseHandlerThread(connman, peerman); });
}
void CDKGSessionHandler::StopThread()
@ -200,7 +188,7 @@ bool CDKGSessionHandler::InitNewQuorum(const CBlockIndex* pQuorumBaseBlockIndex)
return false;
}
curSession = std::make_unique<CDKGSession>(pQuorumBaseBlockIndex, params, blsWorker, connman, m_dmnman, dkgManager,
curSession = std::make_unique<CDKGSession>(pQuorumBaseBlockIndex, params, blsWorker, m_dmnman, dkgManager,
dkgDebugManager, m_mn_metaman, m_mn_activeman, m_sporkman);
if (!curSession->Init(m_mn_activeman->GetProTxHash(), quorumIndex)) {
@ -445,7 +433,8 @@ std::set<NodeId> BatchVerifyMessageSigs(CDKGSession& session, const std::vector<
return ret;
}
static void RelayInvToParticipants(const CDKGSession& session, CConnman& connman, PeerManager& peerman, const CInv& inv)
static void RelayInvToParticipants(const CDKGSession& session, const CConnman& connman, PeerManager& peerman,
const CInv& inv)
{
CDKGLogger logger(session, __func__, __LINE__);
std::stringstream ss;
@ -476,9 +465,10 @@ static void RelayInvToParticipants(const CDKGSession& session, CConnman& connman
logger.Batch("forMember[%s] NOTrelayMembers[%s]", session.ProTx().ToString().substr(0, 4), ss2.str());
logger.Flush();
}
template <typename Message, int MessageType>
bool ProcessPendingMessageBatch(CConnman& connman, PeerManager* peerman, CDKGSession& session,
CDKGPendingMessages& pendingMessages, size_t maxCount)
bool ProcessPendingMessageBatch(const CConnman& connman, CDKGSession& session, CDKGPendingMessages& pendingMessages,
PeerManager& peerman, size_t maxCount)
{
auto msgs = pendingMessages.PopAndDeserializeMessages<Message>(maxCount);
if (msgs.empty()) {
@ -493,7 +483,7 @@ bool ProcessPendingMessageBatch(CConnman& connman, PeerManager* peerman, CDKGSes
if (!p.second) {
LogPrint(BCLog::LLMQ_DKG, "%s -- failed to deserialize message, peer=%d\n", __func__, nodeId);
{
pendingMessages.Misbehaving(nodeId, 100);
pendingMessages.Misbehaving(nodeId, 100, peerman);
}
continue;
}
@ -502,7 +492,7 @@ bool ProcessPendingMessageBatch(CConnman& connman, PeerManager* peerman, CDKGSes
if (ban) {
LogPrint(BCLog::LLMQ_DKG, "%s -- banning node due to failed preverification, peer=%d\n", __func__, nodeId);
{
pendingMessages.Misbehaving(nodeId, 100);
pendingMessages.Misbehaving(nodeId, 100, peerman);
}
}
LogPrint(BCLog::LLMQ_DKG, "%s -- skipping message due to failed preverification, peer=%d\n", __func__, nodeId);
@ -519,7 +509,7 @@ bool ProcessPendingMessageBatch(CConnman& connman, PeerManager* peerman, CDKGSes
LOCK(cs_main);
for (auto nodeId : badNodes) {
LogPrint(BCLog::LLMQ_DKG, "%s -- failed to verify signature, peer=%d\n", __func__, nodeId);
pendingMessages.Misbehaving(nodeId, 100);
pendingMessages.Misbehaving(nodeId, 100, peerman);
}
}
@ -529,15 +519,15 @@ bool ProcessPendingMessageBatch(CConnman& connman, PeerManager* peerman, CDKGSes
continue;
}
const std::optional<CInv> inv = session.ReceiveMessage(*p.second);
if (inv && peerman) {
RelayInvToParticipants(session, connman, *peerman, *inv);
if (inv) {
RelayInvToParticipants(session, connman, peerman, *inv);
}
}
return true;
}
void CDKGSessionHandler::HandleDKGRound()
void CDKGSessionHandler::HandleDKGRound(CConnman& connman, PeerManager& peerman)
{
WaitForNextPhase(std::nullopt, QuorumPhase::Initialized);
@ -570,60 +560,53 @@ void CDKGSessionHandler::HandleDKGRound()
WaitForNextPhase(QuorumPhase::Initialized, QuorumPhase::Contribute, curQuorumHash);
// Contribute
auto fContributeStart = [this]() {
curSession->Contribute(pendingContributions);
};
auto fContributeWait = [this] {
return ProcessPendingMessageBatch<CDKGContribution, MSG_QUORUM_CONTRIB>(connman, m_peerman.get(), *curSession,
pendingContributions, 8);
auto fContributeStart = [this, &peerman]() { curSession->Contribute(pendingContributions, peerman); };
auto fContributeWait = [this, &connman, &peerman] {
return ProcessPendingMessageBatch<CDKGContribution, MSG_QUORUM_CONTRIB>(connman, *curSession,
pendingContributions, peerman, 8);
};
HandlePhase(QuorumPhase::Contribute, QuorumPhase::Complain, curQuorumHash, 0.05, fContributeStart, fContributeWait);
// Complain
auto fComplainStart = [this]() {
curSession->VerifyAndComplain(pendingComplaints);
auto fComplainStart = [this, &connman, &peerman]() {
curSession->VerifyAndComplain(connman, pendingComplaints, peerman);
};
auto fComplainWait = [this] {
return ProcessPendingMessageBatch<CDKGComplaint, MSG_QUORUM_COMPLAINT>(connman, m_peerman.get(), *curSession,
pendingComplaints, 8);
auto fComplainWait = [this, &connman, &peerman] {
return ProcessPendingMessageBatch<CDKGComplaint, MSG_QUORUM_COMPLAINT>(connman, *curSession, pendingComplaints,
peerman, 8);
};
HandlePhase(QuorumPhase::Complain, QuorumPhase::Justify, curQuorumHash, 0.05, fComplainStart, fComplainWait);
// Justify
auto fJustifyStart = [this]() {
curSession->VerifyAndJustify(pendingJustifications);
};
auto fJustifyWait = [this] {
return ProcessPendingMessageBatch<CDKGJustification, MSG_QUORUM_JUSTIFICATION>(connman, m_peerman.get(),
*curSession,
pendingJustifications, 8);
auto fJustifyStart = [this, &peerman]() { curSession->VerifyAndJustify(pendingJustifications, peerman); };
auto fJustifyWait = [this, &connman, &peerman] {
return ProcessPendingMessageBatch<CDKGJustification, MSG_QUORUM_JUSTIFICATION>(connman, *curSession,
pendingJustifications, peerman, 8);
};
HandlePhase(QuorumPhase::Justify, QuorumPhase::Commit, curQuorumHash, 0.05, fJustifyStart, fJustifyWait);
// Commit
auto fCommitStart = [this]() {
curSession->VerifyAndCommit(pendingPrematureCommitments);
};
auto fCommitWait = [this] {
auto fCommitStart = [this, &peerman]() { curSession->VerifyAndCommit(pendingPrematureCommitments, peerman); };
auto fCommitWait = [this, &connman, &peerman] {
return ProcessPendingMessageBatch<CDKGPrematureCommitment, MSG_QUORUM_PREMATURE_COMMITMENT>(
connman, m_peerman.get(), *curSession, pendingPrematureCommitments, 8);
connman, *curSession, pendingPrematureCommitments, peerman, 8);
};
HandlePhase(QuorumPhase::Commit, QuorumPhase::Finalize, curQuorumHash, 0.1, fCommitStart, fCommitWait);
auto finalCommitments = curSession->FinalizeCommitments();
for (const auto& fqc : finalCommitments) {
if (auto inv_opt = quorumBlockProcessor.AddMineableCommitment(fqc); inv_opt.has_value()) {
Assert(m_peerman.get())->RelayInv(inv_opt.value());
peerman.RelayInv(inv_opt.value());
}
}
}
void CDKGSessionHandler::PhaseHandlerThread()
void CDKGSessionHandler::PhaseHandlerThread(CConnman& connman, PeerManager& peerman)
{
while (!stopRequested) {
try {
LogPrint(BCLog::LLMQ_DKG, "CDKGSessionHandler::%s -- %s qi[%d] - starting HandleDKGRound\n", __func__, params.name, quorumIndex);
HandleDKGRound();
HandleDKGRound(connman, peerman);
} catch (AbortPhaseException& e) {
dkgDebugManager.UpdateLocalSessionStatus(params.type, quorumIndex, [&](CDKGDebugSessionStatus& status) {
status.statusBits.aborted = true;

View File

@ -7,8 +7,6 @@
#include <net.h> // for NodeId
#include <gsl/pointers.h>
#include <atomic>
#include <list>
#include <map>
@ -65,7 +63,6 @@ public:
using BinaryMessage = std::pair<NodeId, std::shared_ptr<CDataStream>>;
private:
std::atomic<PeerManager*> m_peerman{nullptr};
const int invType;
const size_t maxMessagesPerNode;
mutable Mutex cs_messages;
@ -77,18 +74,18 @@ public:
explicit CDKGPendingMessages(size_t _maxMessagesPerNode, int _invType) :
invType(_invType), maxMessagesPerNode(_maxMessagesPerNode) {};
void PushPendingMessage(NodeId from, PeerManager* peerman, CDataStream& vRecv);
void PushPendingMessage(NodeId from, CDataStream& vRecv, PeerManager& peerman);
std::list<BinaryMessage> PopPendingMessages(size_t maxCount);
bool HasSeen(const uint256& hash) const;
void Misbehaving(NodeId from, int score);
void Misbehaving(NodeId from, int score, PeerManager& peerman);
void Clear();
template<typename Message>
void PushPendingMessage(NodeId from, PeerManager* peerman, Message& msg)
template <typename Message>
void PushPendingMessage(NodeId from, Message& msg, PeerManager& peerman)
{
CDataStream ds(SER_NETWORK, PROTOCOL_VERSION);
ds << msg;
PushPendingMessage(from, peerman, ds);
PushPendingMessage(from, ds, peerman);
}
// Might return nullptr messages, which indicates that deserialization failed for some reason
@ -132,7 +129,6 @@ private:
CBLSWorker& blsWorker;
CChainState& m_chainstate;
CConnman& connman;
CDeterministicMNManager& m_dmnman;
CDKGDebugManager& dkgDebugManager;
CDKGSessionManager& dkgManager;
@ -140,7 +136,6 @@ private:
CQuorumBlockProcessor& quorumBlockProcessor;
const CActiveMasternodeManager* const m_mn_activeman;
const CSporkManager& m_sporkman;
const std::unique_ptr<PeerManager>& m_peerman;
const Consensus::LLMQParams params;
const int quorumIndex;
@ -160,16 +155,16 @@ private:
CDKGPendingMessages pendingPrematureCommitments;
public:
CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDeterministicMNManager& dmnman,
CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chainstate, CDeterministicMNManager& dmnman,
CDKGDebugManager& _dkgDebugManager, CDKGSessionManager& _dkgManager, CMasternodeMetaMan& mn_metaman,
CQuorumBlockProcessor& _quorumBlockProcessor, const CActiveMasternodeManager* const mn_activeman,
const CSporkManager& sporkman, const std::unique_ptr<PeerManager>& peerman, const Consensus::LLMQParams& _params, int _quorumIndex);
const CSporkManager& sporkman, const Consensus::LLMQParams& _params, int _quorumIndex);
~CDKGSessionHandler();
void UpdatedBlockTip(const CBlockIndex *pindexNew);
void ProcessMessage(const CNode& pfrom, gsl::not_null<PeerManager*> peerman, const std::string& msg_type, CDataStream& vRecv);
void ProcessMessage(const CNode& pfrom, PeerManager& peerman, const std::string& msg_type, CDataStream& vRecv);
void StartThread();
void StartThread(CConnman& connman, PeerManager& peerman);
void StopThread();
bool GetContribution(const uint256& hash, CDKGContribution& ret) const;
@ -194,8 +189,8 @@ private:
void WaitForNewQuorum(const uint256& oldQuorumHash) const;
void SleepBeforePhase(QuorumPhase curPhase, const uint256& expectedQuorumHash, double randomSleepFactor, const WhileWaitFunc& runWhileWaiting) const;
void HandlePhase(QuorumPhase curPhase, QuorumPhase nextPhase, const uint256& expectedQuorumHash, double randomSleepFactor, const StartPhaseFunc& startPhaseFunc, const WhileWaitFunc& runWhileWaiting);
void HandleDKGRound();
void PhaseHandlerThread();
void HandleDKGRound(CConnman& connman, PeerManager& peerman);
void PhaseHandlerThread(CConnman& connman, PeerManager& peerman);
};
} // namespace llmq

View File

@ -29,14 +29,14 @@ static const std::string DB_VVEC = "qdkg_V";
static const std::string DB_SKCONTRIB = "qdkg_S";
static const std::string DB_ENC_CONTRIB = "qdkg_E";
CDKGSessionManager::CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDeterministicMNManager& dmnman,
CDKGDebugManager& _dkgDebugManager, CMasternodeMetaMan& mn_metaman, CQuorumBlockProcessor& _quorumBlockProcessor,
const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman,
const std::unique_ptr<PeerManager>& peerman, bool unitTests, bool fWipe) :
CDKGSessionManager::CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chainstate, CDeterministicMNManager& dmnman,
CDKGDebugManager& _dkgDebugManager, CMasternodeMetaMan& mn_metaman,
CQuorumBlockProcessor& _quorumBlockProcessor,
const CActiveMasternodeManager* const mn_activeman,
const CSporkManager& sporkman, bool unitTests, bool fWipe) :
db(std::make_unique<CDBWrapper>(unitTests ? "" : (gArgs.GetDataDirNet() / "llmq/dkgdb"), 1 << 20, unitTests, fWipe)),
blsWorker(_blsWorker),
m_chainstate(chainstate),
connman(_connman),
m_dmnman(dmnman),
dkgDebugManager(_dkgDebugManager),
quorumBlockProcessor(_quorumBlockProcessor),
@ -51,19 +51,20 @@ CDKGSessionManager::CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chai
for (const auto& params : consensus_params.llmqs) {
auto session_count = (params.useRotation) ? params.signingActiveQuorumCount : 1;
for (const auto i : irange::range(session_count)) {
dkgSessionHandlers.emplace(std::piecewise_construct,
std::forward_as_tuple(params.type, i),
std::forward_as_tuple(blsWorker, m_chainstate, connman, dmnman, dkgDebugManager, *this, mn_metaman,
quorumBlockProcessor, mn_activeman, spork_manager, peerman, params, i));
dkgSessionHandlers.emplace(std::piecewise_construct, std::forward_as_tuple(params.type, i),
std::forward_as_tuple(blsWorker, m_chainstate, dmnman, dkgDebugManager, *this,
mn_metaman, quorumBlockProcessor, mn_activeman,
spork_manager, params, i));
}
}
}
CDKGSessionManager::~CDKGSessionManager() = default;
void CDKGSessionManager::StartThreads()
void CDKGSessionManager::StartThreads(CConnman& connman, PeerManager& peerman)
{
for (auto& it : dkgSessionHandlers) {
it.second.StartThread();
it.second.StartThread(connman, peerman);
}
}
@ -90,7 +91,8 @@ void CDKGSessionManager::UpdatedBlockTip(const CBlockIndex* pindexNew, bool fIni
}
}
PeerMsgRet CDKGSessionManager::ProcessMessage(CNode& pfrom, PeerManager* peerman, bool is_masternode, const std::string& msg_type, CDataStream& vRecv)
PeerMsgRet CDKGSessionManager::ProcessMessage(CNode& pfrom, PeerManager& peerman, bool is_masternode,
const std::string& msg_type, CDataStream& vRecv)
{
static Mutex cs_indexedQuorumsCache;
static std::map<Consensus::LLMQType, unordered_lru_cache<uint256, int, StaticSaltedHasher>> indexedQuorumsCache GUARDED_BY(cs_indexedQuorumsCache);

View File

@ -42,7 +42,6 @@ private:
CBLSWorker& blsWorker;
CChainState& m_chainstate;
CConnman& connman;
CDeterministicMNManager& m_dmnman;
CDKGDebugManager& dkgDebugManager;
CQuorumBlockProcessor& quorumBlockProcessor;
@ -71,18 +70,19 @@ private:
mutable std::map<ContributionsCacheKey, ContributionsCacheEntry> contributionsCache GUARDED_BY(contributionsCacheCs);
public:
CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDeterministicMNManager& dmnman,
CDKGDebugManager& _dkgDebugManager, CMasternodeMetaMan& mn_metaman, CQuorumBlockProcessor& _quorumBlockProcessor,
const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman,
const std::unique_ptr<PeerManager>& peerman, bool unitTests, bool fWipe);
CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chainstate, CDeterministicMNManager& dmnman,
CDKGDebugManager& _dkgDebugManager, CMasternodeMetaMan& mn_metaman,
CQuorumBlockProcessor& _quorumBlockProcessor, const CActiveMasternodeManager* const mn_activeman,
const CSporkManager& sporkman, bool unitTests, bool fWipe);
~CDKGSessionManager();
void StartThreads();
void StartThreads(CConnman& connman, PeerManager& peerman);
void StopThreads();
void UpdatedBlockTip(const CBlockIndex *pindexNew, bool fInitialDownload);
PeerMsgRet ProcessMessage(CNode& pfrom, PeerManager* peerman, bool is_masternode, const std::string& msg_type, CDataStream& vRecv);
PeerMsgRet ProcessMessage(CNode& pfrom, PeerManager& peerman, bool is_masternode, const std::string& msg_type,
CDataStream& vRecv);
bool AlreadyHave(const CInv& inv) const;
bool GetContribution(const uint256& hash, CDKGContribution& ret) const;
bool GetComplaint(const uint256& hash, CDKGComplaint& ret) const;

View File

@ -430,14 +430,14 @@ std::vector<uint256> CInstantSendDb::RemoveChainedInstantSendLocks(const uint256
////////////////
void CInstantSendManager::Start()
void CInstantSendManager::Start(PeerManager& peerman)
{
// can't start new thread if we have one running already
if (workThread.joinable()) {
assert(false);
}
workThread = std::thread(&util::TraceThread, "isman", [this] { WorkThreadMain(); });
workThread = std::thread(&util::TraceThread, "isman", [this, &peerman] { WorkThreadMain(peerman); });
sigman.RegisterRecoveredSigsListener(this);
}
@ -732,21 +732,23 @@ void CInstantSendManager::HandleNewInstantSendLockRecoveredSig(const llmq::CReco
pendingInstantSendLocks.emplace(hash, std::make_pair(-1, islock));
}
PeerMsgRet CInstantSendManager::ProcessMessage(const CNode& pfrom, std::string_view msg_type, CDataStream& vRecv)
PeerMsgRet CInstantSendManager::ProcessMessage(const CNode& pfrom, PeerManager& peerman, std::string_view msg_type,
CDataStream& vRecv)
{
if (IsInstantSendEnabled() && msg_type == NetMsgType::ISDLOCK) {
const auto islock = std::make_shared<CInstantSendLock>();
vRecv >> *islock;
return ProcessMessageInstantSendLock(pfrom, islock);
return ProcessMessageInstantSendLock(pfrom, peerman, islock);
}
return {};
}
PeerMsgRet CInstantSendManager::ProcessMessageInstantSendLock(const CNode& pfrom, const llmq::CInstantSendLockPtr& islock)
PeerMsgRet CInstantSendManager::ProcessMessageInstantSendLock(const CNode& pfrom, PeerManager& peerman,
const llmq::CInstantSendLockPtr& islock)
{
auto hash = ::SerializeHash(*islock);
WITH_LOCK(::cs_main, Assert(m_peerman)->EraseObjectRequest(pfrom.GetId(), CInv(MSG_ISDLOCK, hash)));
WITH_LOCK(::cs_main, peerman.EraseObjectRequest(pfrom.GetId(), CInv(MSG_ISDLOCK, hash)));
if (!islock->TriviallyValid()) {
return tl::unexpected{100};
@ -800,7 +802,7 @@ bool CInstantSendLock::TriviallyValid() const
return true;
}
bool CInstantSendManager::ProcessPendingInstantSendLocks()
bool CInstantSendManager::ProcessPendingInstantSendLocks(PeerManager& peerman)
{
decltype(pendingInstantSendLocks) pend;
bool fMoreWork{false};
@ -845,7 +847,7 @@ bool CInstantSendManager::ProcessPendingInstantSendLocks()
auto dkgInterval = llmq_params.dkgInterval;
// First check against the current active set and don't ban
auto badISLocks = ProcessPendingInstantSendLocks(llmq_params, 0, pend, false);
auto badISLocks = ProcessPendingInstantSendLocks(llmq_params, peerman, /*signOffset=*/0, pend, false);
if (!badISLocks.empty()) {
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- doing verification on old active set\n", __func__);
@ -858,13 +860,15 @@ bool CInstantSendManager::ProcessPendingInstantSendLocks()
}
}
// Now check against the previous active set and perform banning if this fails
ProcessPendingInstantSendLocks(llmq_params, dkgInterval, pend, true);
ProcessPendingInstantSendLocks(llmq_params, peerman, dkgInterval, pend, true);
}
return fMoreWork;
}
std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPendingInstantSendLocks(const Consensus::LLMQParams& llmq_params, int signOffset, const std::unordered_map<uint256, std::pair<NodeId, CInstantSendLockPtr>, StaticSaltedHasher>& pend, bool ban)
std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPendingInstantSendLocks(
const Consensus::LLMQParams& llmq_params, PeerManager& peerman, int signOffset,
const std::unordered_map<uint256, std::pair<NodeId, CInstantSendLockPtr>, StaticSaltedHasher>& pend, bool ban)
{
CBLSBatchVerifier<NodeId, uint256> batchVerifier(false, true, 8);
std::unordered_map<uint256, CRecoveredSig, StaticSaltedHasher> recSigs;
@ -936,7 +940,7 @@ std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPend
for (const auto& nodeId : batchVerifier.badSources) {
// Let's not be too harsh, as the peer might simply be unlucky and might have sent us an old lock which
// does not validate anymore due to changed quorums
Assert(m_peerman)->Misbehaving(nodeId, 20);
peerman.Misbehaving(nodeId, 20);
}
}
for (const auto& p : pend) {
@ -951,7 +955,7 @@ std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPend
continue;
}
ProcessInstantSendLock(nodeId, hash, islock);
ProcessInstantSendLock(nodeId, peerman, hash, islock);
// See comment further on top. We pass a reconstructed recovered sig to the signing manager to avoid
// double-verification of the sig.
@ -969,7 +973,8 @@ std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPend
return badISLocks;
}
void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& hash, const CInstantSendLockPtr& islock)
void CInstantSendManager::ProcessInstantSendLock(NodeId from, PeerManager& peerman, const uint256& hash,
const CInstantSendLockPtr& islock)
{
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s: processing islock, peer=%d\n", __func__,
islock->txid.ToString(), hash.ToString(), from);
@ -1030,28 +1035,28 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has
CInv inv(MSG_ISDLOCK, hash);
if (tx != nullptr) {
Assert(m_peerman)->RelayInvFiltered(inv, *tx, ISDLOCK_PROTO_VERSION);
peerman.RelayInvFiltered(inv, *tx, ISDLOCK_PROTO_VERSION);
} else {
// we don't have the TX yet, so we only filter based on txid. Later when that TX arrives, we will re-announce
// with the TX taken into account.
Assert(m_peerman)->RelayInvFiltered(inv, islock->txid, ISDLOCK_PROTO_VERSION);
peerman.RelayInvFiltered(inv, islock->txid, ISDLOCK_PROTO_VERSION);
}
ResolveBlockConflicts(hash, *islock);
if (tx != nullptr) {
RemoveMempoolConflictsForLock(hash, *islock);
RemoveMempoolConflictsForLock(peerman, hash, *islock);
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- notify about lock %s for tx %s\n", __func__,
hash.ToString(), tx->GetHash().ToString());
GetMainSignals().NotifyTransactionLock(tx, islock);
// bump mempool counter to make sure newly locked txes are picked up by getblocktemplate
mempool.AddTransactionsUpdated(1);
} else {
m_peerman->AskPeersForTransaction(islock->txid, m_is_masternode);
peerman.AskPeersForTransaction(islock->txid, m_is_masternode);
}
}
void CInstantSendManager::TransactionAddedToMempool(const CTransactionRef& tx)
void CInstantSendManager::TransactionAddedToMempool(PeerManager& peerman, const CTransactionRef& tx)
{
if (!IsInstantSendEnabled() || !m_mn_sync.IsBlockchainSynced() || tx->vin.empty()) {
return;
@ -1080,7 +1085,7 @@ void CInstantSendManager::TransactionAddedToMempool(const CTransactionRef& tx)
// TX is not locked, so make sure it is tracked
AddNonLockedTx(tx, nullptr);
} else {
RemoveMempoolConflictsForLock(::SerializeHash(*islock), *islock);
RemoveMempoolConflictsForLock(peerman, ::SerializeHash(*islock), *islock);
}
}
@ -1292,7 +1297,8 @@ void CInstantSendManager::HandleFullyConfirmedBlock(const CBlockIndex* pindex)
}
}
void CInstantSendManager::RemoveMempoolConflictsForLock(const uint256& hash, const CInstantSendLock& islock)
void CInstantSendManager::RemoveMempoolConflictsForLock(PeerManager& peerman, const uint256& hash,
const CInstantSendLock& islock)
{
std::unordered_map<uint256, CTransactionRef, StaticSaltedHasher> toDelete;
@ -1321,7 +1327,7 @@ void CInstantSendManager::RemoveMempoolConflictsForLock(const uint256& hash, con
for (const auto& p : toDelete) {
RemoveConflictedTx(*p.second);
}
m_peerman->AskPeersForTransaction(islock.txid, m_is_masternode);
peerman.AskPeersForTransaction(islock.txid, m_is_masternode);
}
}
@ -1583,10 +1589,10 @@ size_t CInstantSendManager::GetInstantSendLockCount() const
return db.GetInstantSendLockCount();
}
void CInstantSendManager::WorkThreadMain()
void CInstantSendManager::WorkThreadMain(PeerManager& peerman)
{
while (!workInterrupt) {
bool fMoreWork = ProcessPendingInstantSendLocks();
bool fMoreWork = ProcessPendingInstantSendLocks(peerman);
ProcessPendingRetryLockTxs();
if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) {

View File

@ -206,7 +206,6 @@ private:
CSporkManager& spork_manager;
CTxMemPool& mempool;
const CMasternodeSync& m_mn_sync;
const std::unique_ptr<PeerManager>& m_peerman;
const bool m_is_masternode;
@ -256,9 +255,8 @@ private:
public:
explicit CInstantSendManager(CChainLocksHandler& _clhandler, CChainState& chainstate, CQuorumManager& _qman,
CSigningManager& _sigman, CSigSharesManager& _shareman, CSporkManager& sporkman,
CTxMemPool& _mempool, const CMasternodeSync& mn_sync,
const std::unique_ptr<PeerManager>& peerman, bool is_masternode, bool unitTests,
bool fWipe) :
CTxMemPool& _mempool, const CMasternodeSync& mn_sync, bool is_masternode,
bool unitTests, bool fWipe) :
db(unitTests, fWipe),
clhandler(_clhandler),
m_chainstate(chainstate),
@ -268,14 +266,13 @@ public:
spork_manager(sporkman),
mempool(_mempool),
m_mn_sync(mn_sync),
m_peerman(peerman),
m_is_masternode{is_masternode}
{
workInterrupt.reset();
}
~CInstantSendManager() = default;
void Start();
void Start(PeerManager& peerman);
void Stop();
void InterruptWorkerThread() { workInterrupt(); };
@ -295,18 +292,15 @@ private:
const Consensus::Params& params) EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests);
void TrySignInstantSendLock(const CTransaction& tx) EXCLUSIVE_LOCKS_REQUIRED(!cs_creating);
PeerMsgRet ProcessMessageInstantSendLock(const CNode& pfrom, const CInstantSendLockPtr& islock);
bool ProcessPendingInstantSendLocks()
PeerMsgRet ProcessMessageInstantSendLock(const CNode& pfrom, PeerManager& peerman, const CInstantSendLockPtr& islock);
bool ProcessPendingInstantSendLocks(PeerManager& peerman)
EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);
std::unordered_set<uint256, StaticSaltedHasher> ProcessPendingInstantSendLocks(const Consensus::LLMQParams& llmq_params,
int signOffset,
const std::unordered_map<uint256,
std::pair<NodeId, CInstantSendLockPtr>,
StaticSaltedHasher>& pend,
bool ban)
std::unordered_set<uint256, StaticSaltedHasher> ProcessPendingInstantSendLocks(
const Consensus::LLMQParams& llmq_params, PeerManager& peerman, int signOffset,
const std::unordered_map<uint256, std::pair<NodeId, CInstantSendLockPtr>, StaticSaltedHasher>& pend, bool ban)
EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);
void ProcessInstantSendLock(NodeId from, const uint256& hash, const CInstantSendLockPtr& islock)
void ProcessInstantSendLock(NodeId from, PeerManager& peerman, const uint256& hash, const CInstantSendLockPtr& islock)
EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);
void AddNonLockedTx(const CTransactionRef& tx, const CBlockIndex* pindexMined)
@ -318,14 +312,14 @@ private:
void TruncateRecoveredSigsForInputs(const CInstantSendLock& islock)
EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests);
void RemoveMempoolConflictsForLock(const uint256& hash, const CInstantSendLock& islock)
void RemoveMempoolConflictsForLock(PeerManager& peerman, const uint256& hash, const CInstantSendLock& islock)
EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests, !cs_nonLocked, !cs_pendingRetry);
void ResolveBlockConflicts(const uint256& islockHash, const CInstantSendLock& islock)
EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);
void ProcessPendingRetryLockTxs()
EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingRetry);
void WorkThreadMain()
void WorkThreadMain(PeerManager& peerman)
EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);
void HandleFullyConfirmedBlock(const CBlockIndex* pindex)
@ -339,9 +333,9 @@ public:
[[nodiscard]] MessageProcessingResult HandleNewRecoveredSig(const CRecoveredSig& recoveredSig) override
EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_pendingLocks);
PeerMsgRet ProcessMessage(const CNode& pfrom, std::string_view msg_type, CDataStream& vRecv);
PeerMsgRet ProcessMessage(const CNode& pfrom, PeerManager& peerman, std::string_view msg_type, CDataStream& vRecv);
void TransactionAddedToMempool(const CTransactionRef& tx)
void TransactionAddedToMempool(PeerManager& peerman, const CTransactionRef& tx)
EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);
void TransactionRemovedFromMempool(const CTransactionRef& tx);
void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex)

View File

@ -210,8 +210,8 @@ bool CQuorum::ReadContributions(const CDBWrapper& db)
return true;
}
CQuorumManager::CQuorumManager(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman,
CDeterministicMNManager& dmnman, CDKGSessionManager& _dkgManager, CEvoDB& _evoDb,
CQuorumManager::CQuorumManager(CBLSWorker& _blsWorker, CChainState& chainstate, CDeterministicMNManager& dmnman,
CDKGSessionManager& _dkgManager, CEvoDB& _evoDb,
CQuorumBlockProcessor& _quorumBlockProcessor,
const CActiveMasternodeManager* const mn_activeman, const CMasternodeSync& mn_sync,
const CSporkManager& sporkman, bool unit_tests, bool wipe) :
@ -219,7 +219,6 @@ CQuorumManager::CQuorumManager(CBLSWorker& _blsWorker, CChainState& chainstate,
unit_tests, wipe)),
blsWorker(_blsWorker),
m_chainstate(chainstate),
connman(_connman),
m_dmnman(dmnman),
dkgManager(_dkgManager),
quorumBlockProcessor(_quorumBlockProcessor),
@ -249,7 +248,7 @@ void CQuorumManager::Stop()
workerPool.stop(true);
}
void CQuorumManager::TriggerQuorumDataRecoveryThreads(const CBlockIndex* pIndex) const
void CQuorumManager::TriggerQuorumDataRecoveryThreads(CConnman& connman, const CBlockIndex* pIndex) const
{
if ((m_mn_activeman == nullptr && !IsWatchQuorumsEnabled()) || !QuorumDataRecoveryEnabled() || pIndex == nullptr) {
return;
@ -296,17 +295,17 @@ void CQuorumManager::TriggerQuorumDataRecoveryThreads(const CBlockIndex* pIndex)
}
// Finally start the thread which triggers the requests for this quorum
StartQuorumDataRecoveryThread(pQuorum, pIndex, nDataMask);
StartQuorumDataRecoveryThread(connman, pQuorum, pIndex, nDataMask);
}
}
}
void CQuorumManager::UpdatedBlockTip(const CBlockIndex* pindexNew, bool fInitialDownload) const
void CQuorumManager::UpdatedBlockTip(const CBlockIndex* pindexNew, CConnman& connman, bool fInitialDownload) const
{
if (!m_mn_sync.IsBlockchainSynced()) return;
for (const auto& params : Params().GetConsensus().llmqs) {
CheckQuorumConnections(params, pindexNew);
CheckQuorumConnections(connman, params, pindexNew);
}
if (m_mn_activeman != nullptr || IsWatchQuorumsEnabled()) {
@ -322,11 +321,12 @@ void CQuorumManager::UpdatedBlockTip(const CBlockIndex* pindexNew, bool fInitial
}
}
TriggerQuorumDataRecoveryThreads(pindexNew);
TriggerQuorumDataRecoveryThreads(connman, pindexNew);
StartCleanupOldQuorumDataThread(pindexNew);
}
void CQuorumManager::CheckQuorumConnections(const Consensus::LLMQParams& llmqParams, const CBlockIndex* pindexNew) const
void CQuorumManager::CheckQuorumConnections(CConnman& connman, const Consensus::LLMQParams& llmqParams,
const CBlockIndex* pindexNew) const
{
if (m_mn_activeman == nullptr && !IsWatchQuorumsEnabled()) return;
@ -469,7 +469,9 @@ bool CQuorumManager::HasQuorum(Consensus::LLMQType llmqType, const CQuorumBlockP
return quorum_block_processor.HasMinedCommitment(llmqType, quorumHash);
}
bool CQuorumManager::RequestQuorumData(CNode* pfrom, Consensus::LLMQType llmqType, const CBlockIndex* pQuorumBaseBlockIndex, uint16_t nDataMask, const uint256& proTxHash) const
bool CQuorumManager::RequestQuorumData(CNode* pfrom, CConnman& connman, Consensus::LLMQType llmqType,
const CBlockIndex* pQuorumBaseBlockIndex, uint16_t nDataMask,
const uint256& proTxHash) const
{
if (pfrom == nullptr) {
LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- Invalid pfrom: nullptr\n", __func__);
@ -697,7 +699,7 @@ size_t CQuorumManager::GetQuorumRecoveryStartOffset(const CQuorumCPtr pQuorum, c
return nIndex % pQuorum->qc->validMembers.size();
}
PeerMsgRet CQuorumManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv)
PeerMsgRet CQuorumManager::ProcessMessage(CNode& pfrom, CConnman& connman, const std::string& msg_type, CDataStream& vRecv)
{
auto strFunc = __func__;
auto errorHandler = [&](const std::string& strError, int nScore = 10) -> PeerMsgRet {
@ -709,7 +711,6 @@ PeerMsgRet CQuorumManager::ProcessMessage(CNode& pfrom, const std::string& msg_t
};
if (msg_type == NetMsgType::QGETDATA) {
if (m_mn_activeman == nullptr || (pfrom.GetVerifiedProRegTxHash().IsNull() && !pfrom.qwatch)) {
return errorHandler("Not a verified masternode or a qwatch connection");
}
@ -912,7 +913,8 @@ void CQuorumManager::StartCachePopulatorThread(const CQuorumCPtr pQuorum) const
});
}
void CQuorumManager::StartQuorumDataRecoveryThread(const CQuorumCPtr pQuorum, const CBlockIndex* pIndex, uint16_t nDataMaskIn) const
void CQuorumManager::StartQuorumDataRecoveryThread(CConnman& connman, const CQuorumCPtr pQuorum,
const CBlockIndex* pIndex, uint16_t nDataMaskIn) const
{
assert(m_mn_activeman);
@ -922,7 +924,7 @@ void CQuorumManager::StartQuorumDataRecoveryThread(const CQuorumCPtr pQuorum, co
}
pQuorum->fQuorumDataRecoveryThreadRunning = true;
workerPool.push([pQuorum, pIndex, nDataMaskIn, this](int threadId) {
workerPool.push([&connman, pQuorum, pIndex, nDataMaskIn, this](int threadId) {
size_t nTries{0};
uint16_t nDataMask{nDataMaskIn};
int64_t nTimeLastSuccess{0};
@ -958,7 +960,6 @@ void CQuorumManager::StartQuorumDataRecoveryThread(const CQuorumCPtr pQuorum, co
printLog("Try to request");
while (nDataMask > 0 && !quorumThreadInterrupt) {
if (nDataMask & llmq::CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR &&
pQuorum->HasVerificationVector()) {
nDataMask &= ~llmq::CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR;
@ -1005,7 +1006,8 @@ void CQuorumManager::StartQuorumDataRecoveryThread(const CQuorumCPtr pQuorum, co
return;
}
if (RequestQuorumData(pNode, pQuorum->qc->llmqType, pQuorum->m_quorum_base_block_index, nDataMask, proTxHash)) {
if (RequestQuorumData(pNode, connman, pQuorum->qc->llmqType, pQuorum->m_quorum_base_block_index,
nDataMask, proTxHash)) {
nTimeLastSuccess = GetTime<std::chrono::seconds>().count();
printLog("Requested");
} else {

View File

@ -238,7 +238,6 @@ private:
CBLSWorker& blsWorker;
CChainState& m_chainstate;
CConnman& connman;
CDeterministicMNManager& m_dmnman;
CDKGSessionManager& dkgManager;
CQuorumBlockProcessor& quorumBlockProcessor;
@ -261,7 +260,7 @@ private:
mutable CThreadInterrupt quorumThreadInterrupt;
public:
CQuorumManager(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDeterministicMNManager& dmnman,
CQuorumManager(CBLSWorker& _blsWorker, CChainState& chainstate, CDeterministicMNManager& dmnman,
CDKGSessionManager& _dkgManager, CEvoDB& _evoDb, CQuorumBlockProcessor& _quorumBlockProcessor,
const CActiveMasternodeManager* const mn_activeman, const CMasternodeSync& mn_sync,
const CSporkManager& sporkman, bool unit_tests, bool wipe);
@ -270,15 +269,17 @@ public:
void Start();
void Stop();
void TriggerQuorumDataRecoveryThreads(const CBlockIndex* pIndex) const;
void TriggerQuorumDataRecoveryThreads(CConnman& connman, const CBlockIndex* pIndex) const;
void UpdatedBlockTip(const CBlockIndex *pindexNew, bool fInitialDownload) const;
void UpdatedBlockTip(const CBlockIndex* pindexNew, CConnman& connman, bool fInitialDownload) const;
PeerMsgRet ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv);
PeerMsgRet ProcessMessage(CNode& pfrom, CConnman& connman, const std::string& msg_type, CDataStream& vRecv);
static bool HasQuorum(Consensus::LLMQType llmqType, const CQuorumBlockProcessor& quorum_block_processor, const uint256& quorumHash);
bool RequestQuorumData(CNode* pfrom, Consensus::LLMQType llmqType, const CBlockIndex* pQuorumBaseBlockIndex, uint16_t nDataMask, const uint256& proTxHash = uint256()) const;
bool RequestQuorumData(CNode* pfrom, CConnman& connman, Consensus::LLMQType llmqType,
const CBlockIndex* pQuorumBaseBlockIndex, uint16_t nDataMask,
const uint256& proTxHash = uint256()) const;
// all these methods will lock cs_main for a short period of time
CQuorumCPtr GetQuorum(Consensus::LLMQType llmqType, const uint256& quorumHash) const;
@ -289,7 +290,8 @@ public:
private:
// all private methods here are cs_main-free
void CheckQuorumConnections(const Consensus::LLMQParams& llmqParams, const CBlockIndex *pindexNew) const;
void CheckQuorumConnections(CConnman& connman, const Consensus::LLMQParams& llmqParams,
const CBlockIndex* pindexNew) const;
CQuorumPtr BuildQuorumFromCommitment(Consensus::LLMQType llmqType, gsl::not_null<const CBlockIndex*> pQuorumBaseBlockIndex, bool populate_cache) const;
bool BuildQuorumContributions(const CFinalCommitmentPtr& fqc, const std::shared_ptr<CQuorum>& quorum) const;
@ -301,7 +303,8 @@ private:
size_t GetQuorumRecoveryStartOffset(const CQuorumCPtr pQuorum, const CBlockIndex* pIndex) const;
void StartCachePopulatorThread(const CQuorumCPtr pQuorum) const;
void StartQuorumDataRecoveryThread(const CQuorumCPtr pQuorum, const CBlockIndex* pIndex, uint16_t nDataMask) const;
void StartQuorumDataRecoveryThread(CConnman& connman, const CQuorumCPtr pQuorum, const CBlockIndex* pIndex,
uint16_t nDataMask) const;
void StartCleanupOldQuorumDataThread(const CBlockIndex* pIndex) const;
void MigrateOldQuorumDB(CEvoDB& evoDb) const;

View File

@ -349,8 +349,11 @@ void CRecoveredSigsDb::CleanupOldVotes(int64_t maxAge)
//////////////////
CSigningManager::CSigningManager(const CActiveMasternodeManager* const mn_activeman, const CChainState& chainstate,
const CQuorumManager& _qman, const std::unique_ptr<PeerManager>& peerman, bool fMemory, bool fWipe) :
db(fMemory, fWipe), m_mn_activeman(mn_activeman), m_chainstate(chainstate), qman(_qman), m_peerman(peerman)
const CQuorumManager& _qman, bool fMemory, bool fWipe) :
db(fMemory, fWipe),
m_mn_activeman(mn_activeman),
m_chainstate(chainstate),
qman(_qman)
{
}
@ -381,13 +384,14 @@ bool CSigningManager::GetRecoveredSigForGetData(const uint256& hash, CRecoveredS
return true;
}
PeerMsgRet CSigningManager::ProcessMessage(const CNode& pfrom, const std::string& msg_type, CDataStream& vRecv)
PeerMsgRet CSigningManager::ProcessMessage(const CNode& pfrom, PeerManager& peerman, const std::string& msg_type,
CDataStream& vRecv)
{
if (msg_type == NetMsgType::QSIGREC) {
auto recoveredSig = std::make_shared<CRecoveredSig>();
vRecv >> *recoveredSig;
return ProcessMessageRecoveredSig(pfrom, recoveredSig);
return ProcessMessageRecoveredSig(pfrom, peerman, recoveredSig);
}
return {};
}
@ -416,10 +420,11 @@ static bool PreVerifyRecoveredSig(const CQuorumManager& quorum_manager, const CR
return true;
}
PeerMsgRet CSigningManager::ProcessMessageRecoveredSig(const CNode& pfrom, const std::shared_ptr<const CRecoveredSig>& recoveredSig)
PeerMsgRet CSigningManager::ProcessMessageRecoveredSig(const CNode& pfrom, PeerManager& peerman,
const std::shared_ptr<const CRecoveredSig>& recoveredSig)
{
WITH_LOCK(::cs_main, Assert(m_peerman)->EraseObjectRequest(pfrom.GetId(),
CInv(MSG_QUORUM_RECOVERED_SIG, recoveredSig->GetHash())));
WITH_LOCK(::cs_main,
peerman.EraseObjectRequest(pfrom.GetId(), CInv(MSG_QUORUM_RECOVERED_SIG, recoveredSig->GetHash())));
bool ban = false;
if (!PreVerifyRecoveredSig(qman, *recoveredSig, ban)) {
@ -517,22 +522,22 @@ void CSigningManager::CollectPendingRecoveredSigsToVerify(
}
}
void CSigningManager::ProcessPendingReconstructedRecoveredSigs()
void CSigningManager::ProcessPendingReconstructedRecoveredSigs(PeerManager& peerman)
{
decltype(pendingReconstructedRecoveredSigs) m;
WITH_LOCK(cs_pending, swap(m, pendingReconstructedRecoveredSigs));
for (const auto& p : m) {
ProcessRecoveredSig(p.second);
ProcessRecoveredSig(p.second, peerman);
}
}
bool CSigningManager::ProcessPendingRecoveredSigs()
bool CSigningManager::ProcessPendingRecoveredSigs(PeerManager& peerman)
{
std::unordered_map<NodeId, std::list<std::shared_ptr<const CRecoveredSig>>> recSigsByNode;
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher> quorums;
ProcessPendingReconstructedRecoveredSigs();
ProcessPendingReconstructedRecoveredSigs(peerman);
const size_t nMaxBatchSize{32};
CollectPendingRecoveredSigsToVerify(nMaxBatchSize, recSigsByNode, quorums);
@ -575,7 +580,7 @@ bool CSigningManager::ProcessPendingRecoveredSigs()
if (batchVerifier.badSources.count(nodeId)) {
LogPrint(BCLog::LLMQ, "CSigningManager::%s -- invalid recSig from other node, banning peer=%d\n", __func__, nodeId);
Assert(m_peerman)->Misbehaving(nodeId, 100);
peerman.Misbehaving(nodeId, 100);
continue;
}
@ -584,7 +589,7 @@ bool CSigningManager::ProcessPendingRecoveredSigs()
continue;
}
ProcessRecoveredSig(recSig);
ProcessRecoveredSig(recSig, peerman);
}
}
@ -592,7 +597,7 @@ bool CSigningManager::ProcessPendingRecoveredSigs()
}
// signature must be verified already
void CSigningManager::ProcessRecoveredSig(const std::shared_ptr<const CRecoveredSig>& recoveredSig)
void CSigningManager::ProcessRecoveredSig(const std::shared_ptr<const CRecoveredSig>& recoveredSig, PeerManager& peerman)
{
auto llmqType = recoveredSig->getLlmqType();
@ -631,12 +636,12 @@ void CSigningManager::ProcessRecoveredSig(const std::shared_ptr<const CRecovered
WITH_LOCK(cs_pending, pendingReconstructedRecoveredSigs.erase(recoveredSig->GetHash()));
if (m_mn_activeman != nullptr) {
Assert(m_peerman)->RelayRecoveredSig(recoveredSig->GetHash());
peerman.RelayRecoveredSig(recoveredSig->GetHash());
}
auto listeners = WITH_LOCK(cs_listeners, return recoveredSigsListeners);
for (auto& l : listeners) {
Assert(m_peerman)->PostProcessMessage(l->HandleNewRecoveredSig(*recoveredSig));
peerman.PostProcessMessage(l->HandleNewRecoveredSig(*recoveredSig));
}
GetMainSignals().NotifyRecoveredSig(recoveredSig);
@ -799,14 +804,14 @@ bool CSigningManager::GetVoteForId(Consensus::LLMQType llmqType, const uint256&
return db.GetVoteForId(llmqType, id, msgHashRet);
}
void CSigningManager::StartWorkerThread()
void CSigningManager::StartWorkerThread(PeerManager& peerman)
{
// can't start new thread if we have one running already
if (workThread.joinable()) {
assert(false);
}
workThread = std::thread(&util::TraceThread, "sigshares", [this] { WorkThreadMain(); });
workThread = std::thread(&util::TraceThread, "sigshares", [this, &peerman] { WorkThreadMain(peerman); });
}
void CSigningManager::StopWorkerThread()
@ -826,10 +831,10 @@ void CSigningManager::InterruptWorkerThread()
workInterrupt();
}
void CSigningManager::WorkThreadMain()
void CSigningManager::WorkThreadMain(PeerManager& peerman)
{
while (!workInterrupt) {
bool fMoreWork = ProcessPendingRecoveredSigs();
bool fMoreWork = ProcessPendingRecoveredSigs(peerman);
Cleanup();

View File

@ -162,7 +162,6 @@ private:
const CActiveMasternodeManager* const m_mn_activeman;
const CChainState& m_chainstate;
const CQuorumManager& qman;
const std::unique_ptr<PeerManager>& m_peerman;
mutable Mutex cs_pending;
// Incoming and not verified yet
@ -178,12 +177,12 @@ private:
public:
CSigningManager(const CActiveMasternodeManager* const mn_activeman, const CChainState& chainstate,
const CQuorumManager& _qman, const std::unique_ptr<PeerManager>& peerman, bool fMemory, bool fWipe);
const CQuorumManager& _qman, bool fMemory, bool fWipe);
bool AlreadyHave(const CInv& inv) const;
bool GetRecoveredSigForGetData(const uint256& hash, CRecoveredSig& ret) const;
PeerMsgRet ProcessMessage(const CNode& pnode, const std::string& msg_type, CDataStream& vRecv);
PeerMsgRet ProcessMessage(const CNode& pnode, PeerManager& peerman, const std::string& msg_type, CDataStream& vRecv);
// This is called when a recovered signature was was reconstructed from another P2P message and is known to be valid
// This is the case for example when a signature appears as part of InstantSend or ChainLocks
@ -196,16 +195,18 @@ public:
void TruncateRecoveredSig(Consensus::LLMQType llmqType, const uint256& id);
private:
PeerMsgRet ProcessMessageRecoveredSig(const CNode& pfrom, const std::shared_ptr<const CRecoveredSig>& recoveredSig);
PeerMsgRet ProcessMessageRecoveredSig(const CNode& pfrom, PeerManager& peerman,
const std::shared_ptr<const CRecoveredSig>& recoveredSig);
void CollectPendingRecoveredSigsToVerify(size_t maxUniqueSessions,
std::unordered_map<NodeId, std::list<std::shared_ptr<const CRecoveredSig>>>& retSigShares,
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& retQuorums);
void ProcessPendingReconstructedRecoveredSigs();
bool ProcessPendingRecoveredSigs(); // called from the worker thread of CSigSharesManager
void ProcessPendingReconstructedRecoveredSigs(PeerManager& peerman);
bool ProcessPendingRecoveredSigs(PeerManager& peerman); // called from the worker thread of CSigSharesManager
public:
// TODO - should not be public!
void ProcessRecoveredSig(const std::shared_ptr<const CRecoveredSig>& recoveredSig);
void ProcessRecoveredSig(const std::shared_ptr<const CRecoveredSig>& recoveredSig, PeerManager& peerman);
private:
void Cleanup(); // called from the worker thread of CSigSharesManager
@ -228,10 +229,10 @@ public:
private:
std::thread workThread;
CThreadInterrupt workInterrupt;
void WorkThreadMain();
void WorkThreadMain(PeerManager& peerman);
public:
void StartWorkerThread();
void StartWorkerThread(PeerManager& peerman);
void StopWorkerThread();
void InterruptWorkerThread();
};

View File

@ -178,14 +178,15 @@ void CSigSharesNodeState::RemoveSession(const uint256& signHash)
//////////////////////
void CSigSharesManager::StartWorkerThread()
void CSigSharesManager::StartWorkerThread(CConnman& connman, PeerManager& peerman)
{
// can't start new thread if we have one running already
if (workThread.joinable()) {
assert(false);
}
workThread = std::thread(&util::TraceThread, "sigshares", [this] { WorkThreadMain(); });
workThread = std::thread(&util::TraceThread, "sigshares",
[this, &connman, &peerman] { WorkThreadMain(connman, peerman); });
}
void CSigSharesManager::StopWorkerThread()
@ -215,7 +216,8 @@ void CSigSharesManager::InterruptWorkerThread()
workInterrupt();
}
void CSigSharesManager::ProcessMessage(const CNode& pfrom, const CSporkManager& sporkman, const std::string& msg_type, CDataStream& vRecv)
void CSigSharesManager::ProcessMessage(const CNode& pfrom, PeerManager& peerman, const CSporkManager& sporkman,
const std::string& msg_type, CDataStream& vRecv)
{
// non-masternodes are not interested in sigshares
if (m_mn_activeman == nullptr) return;
@ -227,12 +229,12 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const CSporkManager&
if (receivedSigShares.size() > MAX_MSGS_SIG_SHARES) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many sigs in QSIGSHARE message. cnt=%d, max=%d, node=%d\n", __func__, receivedSigShares.size(), MAX_MSGS_SIG_SHARES, pfrom.GetId());
BanNode(pfrom.GetId());
BanNode(pfrom.GetId(), peerman);
return;
}
for (const auto& sigShare : receivedSigShares) {
ProcessMessageSigShare(pfrom.GetId(), sigShare);
ProcessMessageSigShare(pfrom.GetId(), peerman, sigShare);
}
}
@ -241,12 +243,12 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const CSporkManager&
vRecv >> msgs;
if (msgs.size() > MAX_MSGS_CNT_QSIGSESANN) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many announcements in QSIGSESANN message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSESANN, pfrom.GetId());
BanNode(pfrom.GetId());
BanNode(pfrom.GetId(), peerman);
return;
}
if (!ranges::all_of(msgs,
[this, &pfrom](const auto& ann){ return ProcessMessageSigSesAnn(pfrom, ann); })) {
BanNode(pfrom.GetId());
BanNode(pfrom.GetId(), peerman);
return;
}
} else if (msg_type == NetMsgType::QSIGSHARESINV) {
@ -254,12 +256,12 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const CSporkManager&
vRecv >> msgs;
if (msgs.size() > MAX_MSGS_CNT_QSIGSHARESINV) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many invs in QSIGSHARESINV message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSHARESINV, pfrom.GetId());
BanNode(pfrom.GetId());
BanNode(pfrom.GetId(), peerman);
return;
}
if (!ranges::all_of(msgs,
[this, &pfrom](const auto& inv){ return ProcessMessageSigSharesInv(pfrom, inv); })) {
BanNode(pfrom.GetId());
BanNode(pfrom.GetId(), peerman);
return;
}
} else if (msg_type == NetMsgType::QGETSIGSHARES) {
@ -267,12 +269,12 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const CSporkManager&
vRecv >> msgs;
if (msgs.size() > MAX_MSGS_CNT_QGETSIGSHARES) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many invs in QGETSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QGETSIGSHARES, pfrom.GetId());
BanNode(pfrom.GetId());
BanNode(pfrom.GetId(), peerman);
return;
}
if (!ranges::all_of(msgs,
[this, &pfrom](const auto& inv){ return ProcessMessageGetSigShares(pfrom, inv); })) {
BanNode(pfrom.GetId());
BanNode(pfrom.GetId(), peerman);
return;
}
} else if (msg_type == NetMsgType::QBSIGSHARES) {
@ -284,12 +286,12 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const CSporkManager&
}
if (totalSigsCount > MAX_MSGS_TOTAL_BATCHED_SIGS) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many sigs in QBSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_TOTAL_BATCHED_SIGS, pfrom.GetId());
BanNode(pfrom.GetId());
BanNode(pfrom.GetId(), peerman);
return;
}
if (!ranges::all_of(msgs,
[this, &pfrom](const auto& bs){ return ProcessMessageBatchedSigShares(pfrom, bs); })) {
BanNode(pfrom.GetId());
BanNode(pfrom.GetId(), peerman);
return;
}
}
@ -454,7 +456,7 @@ bool CSigSharesManager::ProcessMessageBatchedSigShares(const CNode& pfrom, const
return true;
}
void CSigSharesManager::ProcessMessageSigShare(NodeId fromId, const CSigShare& sigShare)
void CSigSharesManager::ProcessMessageSigShare(NodeId fromId, PeerManager& peerman, const CSigShare& sigShare)
{
assert(m_mn_activeman);
@ -479,12 +481,12 @@ void CSigSharesManager::ProcessMessageSigShare(NodeId fromId, const CSigShare& s
if (sigShare.getQuorumMember() >= quorum->members.size()) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- quorumMember out of bounds\n", __func__);
BanNode(fromId);
BanNode(fromId, peerman);
return;
}
if (!quorum->qc->validMembers[sigShare.getQuorumMember()]) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- quorumMember not valid\n", __func__);
BanNode(fromId);
BanNode(fromId, peerman);
return;
}
@ -620,7 +622,7 @@ bool CSigSharesManager::CollectPendingSigSharesToVerify(
return true;
}
bool CSigSharesManager::ProcessPendingSigShares(const CConnman& connman)
bool CSigSharesManager::ProcessPendingSigShares(PeerManager& peerman, const CConnman& connman)
{
std::unordered_map<NodeId, std::vector<CSigShare>> sigSharesByNodes;
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher> quorums;
@ -646,7 +648,7 @@ bool CSigSharesManager::ProcessPendingSigShares(const CConnman& connman)
// we didn't check this earlier because we use a lazy BLS signature and tried to avoid doing the expensive
// deserialization in the message thread
if (!sigShare.sigShare.Get().IsValid()) {
BanNode(nodeId);
BanNode(nodeId, peerman);
// don't process any additional shares from this node
break;
}
@ -678,25 +680,26 @@ bool CSigSharesManager::ProcessPendingSigShares(const CConnman& connman)
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- invalid sig shares from other node, banning peer=%d\n",
__func__, nodeId);
// this will also cause re-requesting of the shares that were sent by this node
BanNode(nodeId);
BanNode(nodeId, peerman);
continue;
}
ProcessPendingSigShares(v, quorums, connman);
ProcessPendingSigShares(v, quorums, peerman, connman);
}
return sigSharesByNodes.size() >= nMaxBatchSize;
}
// It's ensured that no duplicates are passed to this method
void CSigSharesManager::ProcessPendingSigShares(const std::vector<CSigShare>& sigSharesToProcess,
void CSigSharesManager::ProcessPendingSigShares(
const std::vector<CSigShare>& sigSharesToProcess,
const std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& quorums,
const CConnman& connman)
PeerManager& peerman, const CConnman& connman)
{
cxxtimer::Timer t(true);
for (const auto& sigShare : sigSharesToProcess) {
auto quorumKey = std::make_pair(sigShare.getLlmqType(), sigShare.getQuorumHash());
ProcessSigShare(sigShare, connman, quorums.at(quorumKey));
ProcessSigShare(peerman, sigShare, connman, quorums.at(quorumKey));
}
t.stop();
@ -705,7 +708,8 @@ void CSigSharesManager::ProcessPendingSigShares(const std::vector<CSigShare>& si
}
// sig shares are already verified when entering this method
void CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CConnman& connman, const CQuorumCPtr& quorum)
void CSigSharesManager::ProcessSigShare(PeerManager& peerman, const CSigShare& sigShare, const CConnman& connman,
const CQuorumCPtr& quorum)
{
assert(m_mn_activeman);
@ -754,11 +758,12 @@ void CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CConnma
}
if (canTryRecovery) {
TryRecoverSig(quorum, sigShare.getId(), sigShare.getMsgHash());
TryRecoverSig(peerman, quorum, sigShare.getId(), sigShare.getMsgHash());
}
}
void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash)
void CSigSharesManager::TryRecoverSig(PeerManager& peerman, const CQuorumCPtr& quorum, const uint256& id,
const uint256& msgHash)
{
if (sigman.HasRecoveredSigForId(quorum->params.type, id)) {
return;
@ -817,7 +822,7 @@ void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256&
}
}
sigman.ProcessRecoveredSig(rs);
sigman.ProcessRecoveredSig(rs, peerman);
}
CDeterministicMNCPtr CSigSharesManager::SelectMemberForRecovery(const CQuorumCPtr& quorum, const uint256 &id, int attempt)
@ -1027,7 +1032,9 @@ void CSigSharesManager::CollectSigSharesToSendConcentrated(std::unordered_map<No
}
}
void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToAnnounce)
void CSigSharesManager::CollectSigSharesToAnnounce(
const CConnman& connman,
std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToAnnounce)
{
AssertLockHeld(cs);
@ -1035,8 +1042,8 @@ void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_map<NodeId, st
// TODO: remove NO_THREAD_SAFETY_ANALYSIS
// using here template ForEach makes impossible to use lock annotation
sigSharesQueuedToAnnounce.ForEach([this, &quorumNodesMap, &sigSharesToAnnounce](const SigShareKey& sigShareKey,
bool) NO_THREAD_SAFETY_ANALYSIS {
sigSharesQueuedToAnnounce.ForEach([this, &connman, &quorumNodesMap,
&sigSharesToAnnounce](const SigShareKey& sigShareKey, bool) NO_THREAD_SAFETY_ANALYSIS {
AssertLockHeld(cs);
const auto& signHash = sigShareKey.first;
auto quorumMember = sigShareKey.second;
@ -1084,7 +1091,7 @@ void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_map<NodeId, st
sigSharesQueuedToAnnounce.Clear();
}
bool CSigSharesManager::SendMessages()
bool CSigSharesManager::SendMessages(CConnman& connman)
{
std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>> sigSharesToRequest;
std::unordered_map<NodeId, std::unordered_map<uint256, CBatchedSigShares, StaticSaltedHasher>> sigShareBatchesToSend;
@ -1113,7 +1120,7 @@ bool CSigSharesManager::SendMessages()
LOCK(cs);
CollectSigSharesToRequest(sigSharesToRequest);
CollectSigSharesToSend(sigShareBatchesToSend);
CollectSigSharesToAnnounce(sigSharesToAnnounce);
CollectSigSharesToAnnounce(connman, sigSharesToAnnounce);
CollectSigSharesToSendConcentrated(sigSharesToSend, snap.Nodes());
for (auto& [nodeId, sigShareMap] : sigSharesToRequest) {
@ -1254,7 +1261,7 @@ CSigShare CSigSharesManager::RebuildSigShare(const CSigSharesNodeState::SessionI
return sigShare;
}
void CSigSharesManager::Cleanup()
void CSigSharesManager::Cleanup(const CConnman& connman)
{
int64_t now = GetTime<std::chrono::seconds>().count();
if (now - lastCleanupTime < 5) {
@ -1407,13 +1414,13 @@ void CSigSharesManager::RemoveSigSharesForSession(const uint256& signHash)
timeSeenForSessions.erase(signHash);
}
void CSigSharesManager::RemoveBannedNodeStates()
void CSigSharesManager::RemoveBannedNodeStates(PeerManager& peerman)
{
// Called regularly to cleanup local node states for banned nodes
LOCK(cs);
for (auto it = nodeStates.begin(); it != nodeStates.end();) {
if (Assert(m_peerman)->IsBanned(it->first)) {
if (peerman.IsBanned(it->first)) {
// re-request sigshares from other nodes
// TODO: remove NO_THREAD_SAFETY_ANALYSIS
// using here template ForEach makes impossible to use lock annotation
@ -1428,23 +1435,21 @@ void CSigSharesManager::RemoveBannedNodeStates()
}
}
void CSigSharesManager::BanNode(NodeId nodeId)
void CSigSharesManager::BanNode(NodeId nodeId, PeerManager& peerman)
{
if (nodeId == -1) {
return;
}
{
Assert(m_peerman)->Misbehaving(nodeId, 100);
}
peerman.Misbehaving(nodeId, 100);
LOCK(cs);
auto it = nodeStates.find(nodeId);
if (it == nodeStates.end()) {
return;
}
auto& nodeState = it->second;
auto& nodeState = it->second;
// Whatever we requested from him, let's request it from someone else now
// TODO: remove NO_THREAD_SAFETY_ANALYSIS
// using here template ForEach makes impossible to use lock annotation
@ -1453,26 +1458,25 @@ void CSigSharesManager::BanNode(NodeId nodeId)
sigSharesRequested.Erase(k);
});
nodeState.requestedSigShares.Clear();
nodeState.banned = true;
}
void CSigSharesManager::WorkThreadMain()
void CSigSharesManager::WorkThreadMain(CConnman& connman, PeerManager& peerman)
{
int64_t lastSendTime = 0;
while (!workInterrupt) {
RemoveBannedNodeStates();
RemoveBannedNodeStates(peerman);
bool fMoreWork = ProcessPendingSigShares(connman);
SignPendingSigShares();
bool fMoreWork = ProcessPendingSigShares(peerman, connman);
SignPendingSigShares(connman, peerman);
if (GetTimeMillis() - lastSendTime > 100) {
SendMessages();
SendMessages(connman);
lastSendTime = GetTimeMillis();
}
Cleanup();
Cleanup(connman);
// TODO Wakeup when pending signing is needed?
if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
@ -1487,7 +1491,7 @@ void CSigSharesManager::AsyncSign(const CQuorumCPtr& quorum, const uint256& id,
pendingSigns.emplace_back(quorum, id, msgHash);
}
void CSigSharesManager::SignPendingSigShares()
void CSigSharesManager::SignPendingSigShares(const CConnman& connman, PeerManager& peerman)
{
std::vector<PendingSignatureData> v;
WITH_LOCK(cs_pendingSigns, v.swap(pendingSigns));
@ -1497,7 +1501,7 @@ void CSigSharesManager::SignPendingSigShares()
if (opt_sigShare.has_value() && opt_sigShare->sigShare.Get().IsValid()) {
auto sigShare = *opt_sigShare;
ProcessSigShare(sigShare, connman, pQuorum);
ProcessSigShare(peerman, sigShare, connman, pQuorum);
if (IsAllMembersConnectedEnabled(pQuorum->params.type, m_sporkman)) {
LOCK(cs);

View File

@ -404,34 +404,35 @@ private:
FastRandomContext rnd GUARDED_BY(cs);
CConnman& connman;
CSigningManager& sigman;
const CActiveMasternodeManager* const m_mn_activeman;
const CQuorumManager& qman;
const CSporkManager& m_sporkman;
const std::unique_ptr<PeerManager>& m_peerman;
int64_t lastCleanupTime{0};
std::atomic<uint32_t> recoveredSigsCounter{0};
public:
explicit CSigSharesManager(CConnman& _connman, CSigningManager& _sigman, const CActiveMasternodeManager* const mn_activeman,
const CQuorumManager& _qman, const CSporkManager& sporkman, const std::unique_ptr<PeerManager>& peerman) :
connman(_connman), sigman(_sigman), m_mn_activeman(mn_activeman), qman(_qman), m_sporkman(sporkman), m_peerman(peerman)
explicit CSigSharesManager(CSigningManager& _sigman, const CActiveMasternodeManager* const mn_activeman,
const CQuorumManager& _qman, const CSporkManager& sporkman) :
sigman(_sigman),
m_mn_activeman(mn_activeman),
qman(_qman),
m_sporkman(sporkman)
{
workInterrupt.reset();
};
CSigSharesManager() = delete;
~CSigSharesManager() override = default;
void StartWorkerThread();
void StartWorkerThread(CConnman& connman, PeerManager& peerman);
void StopWorkerThread();
void RegisterAsRecoveredSigsListener();
void UnregisterAsRecoveredSigsListener();
void InterruptWorkerThread();
void ProcessMessage(const CNode& pnode, const CSporkManager& sporkman, const std::string& msg_type, CDataStream& vRecv);
void ProcessMessage(const CNode& pnode, PeerManager& peerman, const CSporkManager& sporkman,
const std::string& msg_type, CDataStream& vRecv);
void AsyncSign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash);
std::optional<CSigShare> CreateSigShare(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash) const;
@ -447,7 +448,7 @@ private:
bool ProcessMessageSigSharesInv(const CNode& pfrom, const CSigSharesInv& inv);
bool ProcessMessageGetSigShares(const CNode& pfrom, const CSigSharesInv& inv);
bool ProcessMessageBatchedSigShares(const CNode& pfrom, const CBatchedSigShares& batchedSigShares);
void ProcessMessageSigShare(NodeId fromId, const CSigShare& sigShare);
void ProcessMessageSigShare(NodeId fromId, PeerManager& peerman, const CSigShare& sigShare);
static bool VerifySigSharesInv(Consensus::LLMQType llmqType, const CSigSharesInv& inv);
static bool PreVerifyBatchedSigShares(const CActiveMasternodeManager& mn_activeman, const CQuorumManager& quorum_manager,
@ -456,31 +457,36 @@ private:
bool CollectPendingSigSharesToVerify(
size_t maxUniqueSessions, std::unordered_map<NodeId, std::vector<CSigShare>>& retSigShares,
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& retQuorums);
bool ProcessPendingSigShares(const CConnman& connman);
bool ProcessPendingSigShares(PeerManager& peerman, const CConnman& connman);
void ProcessPendingSigShares(const std::vector<CSigShare>& sigSharesToProcess,
void ProcessPendingSigShares(
const std::vector<CSigShare>& sigSharesToProcess,
const std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& quorums,
const CConnman& connman);
PeerManager& peerman, const CConnman& connman);
void ProcessSigShare(const CSigShare& sigShare, const CConnman& connman, const CQuorumCPtr& quorum);
void TryRecoverSig(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash);
void ProcessSigShare(PeerManager& peerman, const CSigShare& sigShare, const CConnman& connman,
const CQuorumCPtr& quorum);
void TryRecoverSig(PeerManager& peerman, const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash);
bool GetSessionInfoByRecvId(NodeId nodeId, uint32_t sessionId, CSigSharesNodeState::SessionInfo& retInfo);
static CSigShare RebuildSigShare(const CSigSharesNodeState::SessionInfo& session, const std::pair<uint16_t, CBLSLazySignature>& in);
void Cleanup();
void Cleanup(const CConnman& connman);
void RemoveSigSharesForSession(const uint256& signHash) EXCLUSIVE_LOCKS_REQUIRED(cs);
void RemoveBannedNodeStates();
void RemoveBannedNodeStates(PeerManager& peerman);
void BanNode(NodeId nodeId);
void BanNode(NodeId nodeId, PeerManager& peerman);
bool SendMessages();
bool SendMessages(CConnman& connman);
void CollectSigSharesToRequest(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToRequest) EXCLUSIVE_LOCKS_REQUIRED(cs);
void CollectSigSharesToSend(std::unordered_map<NodeId, std::unordered_map<uint256, CBatchedSigShares, StaticSaltedHasher>>& sigSharesToSend) EXCLUSIVE_LOCKS_REQUIRED(cs);
void CollectSigSharesToSendConcentrated(std::unordered_map<NodeId, std::vector<CSigShare>>& sigSharesToSend, const std::vector<CNode*>& vNodes) EXCLUSIVE_LOCKS_REQUIRED(cs);
void CollectSigSharesToAnnounce(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToAnnounce) EXCLUSIVE_LOCKS_REQUIRED(cs);
void SignPendingSigShares();
void WorkThreadMain();
void CollectSigSharesToAnnounce(
const CConnman& connman,
std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToAnnounce)
EXCLUSIVE_LOCKS_REQUIRED(cs);
void SignPendingSigShares(const CConnman& connman, PeerManager& peerman);
void WorkThreadMain(CConnman& connman, PeerManager& peerman);
};
} // namespace llmq

View File

@ -4651,13 +4651,13 @@ void CConnman::SetMasternodeQuorumRelayMembers(Consensus::LLMQType llmqType, con
});
}
bool CConnman::HasMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash)
bool CConnman::HasMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash) const
{
LOCK(cs_vPendingMasternodes);
return masternodeQuorumNodes.count(std::make_pair(llmqType, quorumHash));
}
std::set<uint256> CConnman::GetMasternodeQuorums(Consensus::LLMQType llmqType)
std::set<uint256> CConnman::GetMasternodeQuorums(Consensus::LLMQType llmqType) const
{
LOCK(cs_vPendingMasternodes);
std::set<uint256> result;
@ -4700,7 +4700,7 @@ void CConnman::RemoveMasternodeQuorumNodes(Consensus::LLMQType llmqType, const u
masternodeQuorumRelayMembers.erase(std::make_pair(llmqType, quorumHash));
}
bool CConnman::IsMasternodeQuorumNode(const CNode* pnode, const CDeterministicMNList& tip_mn_list)
bool CConnman::IsMasternodeQuorumNode(const CNode* pnode, const CDeterministicMNList& tip_mn_list) const
{
// Let's see if this is an outgoing connection to an address that is known to be a masternode
// We however only need to know this if the node did not authenticate itself as a MN yet

View File

@ -1503,12 +1503,12 @@ public:
bool AddPendingMasternode(const uint256& proTxHash);
void SetMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash, const std::set<uint256>& proTxHashes);
void SetMasternodeQuorumRelayMembers(Consensus::LLMQType llmqType, const uint256& quorumHash, const std::set<uint256>& proTxHashes);
bool HasMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash);
std::set<uint256> GetMasternodeQuorums(Consensus::LLMQType llmqType);
bool HasMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash) const;
std::set<uint256> GetMasternodeQuorums(Consensus::LLMQType llmqType) const;
// also returns QWATCH nodes
std::set<NodeId> GetMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash) const;
void RemoveMasternodeQuorumNodes(Consensus::LLMQType llmqType, const uint256& quorumHash);
bool IsMasternodeQuorumNode(const CNode* pnode, const CDeterministicMNList& tip_mn_list);
bool IsMasternodeQuorumNode(const CNode* pnode, const CDeterministicMNList& tip_mn_list) const;
bool IsMasternodeQuorumRelayMember(const uint256& protxHash);
void AddPendingProbeConnections(const std::set<uint256>& proTxHashes);

View File

@ -5251,7 +5251,7 @@ void PeerManagerImpl::ProcessMessage(
{
//probably one the extensions
#ifdef ENABLE_WALLET
ProcessPeerMsgRet(m_cj_ctx->queueman->ProcessMessage(pfrom, msg_type, vRecv), pfrom);
ProcessPeerMsgRet(m_cj_ctx->queueman->ProcessMessage(pfrom, m_connman, *this, msg_type, vRecv), pfrom);
m_cj_ctx->walletman->ForEachCJClientMan([this, &pfrom, &msg_type, &vRecv](std::unique_ptr<CCoinJoinClientManager>& clientman) {
clientman->ProcessMessage(pfrom, m_chainman.ActiveChainstate(), m_connman, m_mempool, msg_type, vRecv);
});
@ -5262,10 +5262,10 @@ void PeerManagerImpl::ProcessMessage(
ProcessPeerMsgRet(m_govman.ProcessMessage(pfrom, m_connman, *this, msg_type, vRecv), pfrom);
ProcessPeerMsgRet(CMNAuth::ProcessMessage(pfrom, peer->m_their_services, m_connman, m_mn_metaman, m_mn_activeman, m_chainman.ActiveChain(), m_mn_sync, m_dmnman->GetListAtChainTip(), msg_type, vRecv), pfrom);
PostProcessMessage(m_llmq_ctx->quorum_block_processor->ProcessMessage(pfrom, msg_type, vRecv), pfrom.GetId());
ProcessPeerMsgRet(m_llmq_ctx->qdkgsman->ProcessMessage(pfrom, this, is_masternode, msg_type, vRecv), pfrom);
ProcessPeerMsgRet(m_llmq_ctx->qman->ProcessMessage(pfrom, msg_type, vRecv), pfrom);
m_llmq_ctx->shareman->ProcessMessage(pfrom, m_sporkman, msg_type, vRecv);
ProcessPeerMsgRet(m_llmq_ctx->sigman->ProcessMessage(pfrom, msg_type, vRecv), pfrom);
ProcessPeerMsgRet(m_llmq_ctx->qdkgsman->ProcessMessage(pfrom, *this, is_masternode, msg_type, vRecv), pfrom);
ProcessPeerMsgRet(m_llmq_ctx->qman->ProcessMessage(pfrom, m_connman, msg_type, vRecv), pfrom);
m_llmq_ctx->shareman->ProcessMessage(pfrom, *this, m_sporkman, msg_type, vRecv);
ProcessPeerMsgRet(m_llmq_ctx->sigman->ProcessMessage(pfrom, *this, msg_type, vRecv), pfrom);
if (msg_type == NetMsgType::CLSIG) {
if (llmq::AreChainLocksEnabled(m_sporkman)) {
@ -5278,7 +5278,7 @@ void PeerManagerImpl::ProcessMessage(
return; // CLSIG
}
ProcessPeerMsgRet(m_llmq_ctx->isman->ProcessMessage(pfrom, msg_type, vRecv), pfrom);
ProcessPeerMsgRet(m_llmq_ctx->isman->ProcessMessage(pfrom, *this, msg_type, vRecv), pfrom);
return;
}

View File

@ -818,7 +818,7 @@ static RPCHelpMan quorum_getdata()
const CBlockIndex* pQuorumBaseBlockIndex = WITH_LOCK(cs_main, return chainman.m_blockman.LookupBlockIndex(quorumHash));
return connman.ForNode(nodeId, [&](CNode* pNode) {
return llmq_ctx.qman->RequestQuorumData(pNode, llmqType, pQuorumBaseBlockIndex, nDataMask, proTxHash);
return llmq_ctx.qman->RequestQuorumData(pNode, connman, llmqType, pQuorumBaseBlockIndex, nDataMask, proTxHash);
});
},
};

View File

@ -107,16 +107,16 @@ void DashTestSetup(NodeContext& node, const CChainParams& chainparams)
{
CChainState& chainstate = Assert(node.chainman)->ActiveChainstate();
node.dmnman = std::make_unique<CDeterministicMNManager>(chainstate, *node.connman, *node.evodb);
node.dmnman = std::make_unique<CDeterministicMNManager>(chainstate, *node.evodb);
node.mempool->ConnectManagers(node.dmnman.get());
node.cj_ctx = std::make_unique<CJContext>(*node.chainman, *node.connman, *node.dmnman, *node.mn_metaman, *node.mempool,
/* mn_activeman = */ nullptr, *node.mn_sync, node.peerman, /* relay_txes = */ true);
/*mn_activeman=*/nullptr, *node.mn_sync, node.peerman, /*relay_txes=*/true);
#ifdef ENABLE_WALLET
node.coinjoin_loader = interfaces::MakeCoinJoinLoader(*node.cj_ctx->walletman);
#endif // ENABLE_WALLET
node.llmq_ctx = std::make_unique<LLMQContext>(*node.chainman, *node.connman, *node.dmnman, *node.evodb, *node.mn_metaman, *node.mnhf_manager, *node.sporkman, *node.mempool,
/* mn_activeman = */ nullptr, *node.mn_sync, node.peerman, /* unit_tests = */ true, /* wipe = */ false);
node.llmq_ctx = std::make_unique<LLMQContext>(*node.chainman, *node.dmnman, *node.evodb, *node.mn_metaman, *node.mnhf_manager, *node.sporkman, *node.mempool,
/*mn_activeman=*/nullptr, *node.mn_sync, /*unit_tests=*/true, /*wipe=*/false);
Assert(node.mnhf_manager)->ConnectManagers(node.chainman.get(), node.llmq_ctx->qman.get());
node.chain_helper = std::make_unique<CChainstateHelper>(*node.cpoolman, *node.dmnman, *node.mnhf_manager, *node.govman, *(node.llmq_ctx->quorum_block_processor), *node.chainman,
chainparams.GetConsensus(), *node.mn_sync, *node.sporkman, *(node.llmq_ctx->clhandler), *(node.llmq_ctx->qman));