Merge #6423: refactor: create helper function RelayRecoveredSig inside peerman

86e92c376a refactor: drop unused CConnman from CSigningManager (Konstantin Akimov)
4668db60a2 refactor: create helper function RelayRecoveredSig inside peerman (pasta)

Pull request description:

  ## Issue being fixed or feature implemented
  High m_nodes_mutex lock contention during high load

  ## What was done?
  this commit should have a few benefits:
  1. previous logic using ForEachNode results in locking m_nodes_mutex, a highly contended RecursiveMutex, AND m_peer_mutex(in GetPeerRef)
  2. prior also resulted in calling .find over the m_peer_map for each node. Basically old logic was (probably) O(n(nlogn) the new logic results in acquiring m_peer_mutex once and looping over the list of peers, (probably) O(n)
  3. Moves networking logic out of llmq/ and into actual net_processing.cpp

  ## How Has This Been Tested?
  Hasn't really yet; it builds, but I need to run tests / maybe deploy to testnet mn

  ## Breaking Changes

  ## Checklist:
    _Go over all the following points, and put an `x` in all the boxes that apply._
  - [ ] I have performed a self-review of my own code
  - [ ] I have commented my code, particularly in hard-to-understand areas
  - [ ] I have added or updated relevant unit/integration/functional/e2e tests
  - [ ] I have made corresponding changes to the documentation
  - [x] I have assigned this pull request to a milestone _(for repository code-owners and collaborators only)_

ACKs for top commit:
  knst:
    utACK 86e92c376a
  UdjinM6:
    utACK 86e92c376a

Tree-SHA512: ca9d6ac22f8b72b117188147044c499ae62722283c6291633067b99726e6a6abc52e5c8cf3bdcd0d8fed0ad8d9086b000f628c9a932dfe89153e912b563eda5a
This commit is contained in:
pasta 2024-11-22 16:26:30 -06:00
commit f9d044d5ec
No known key found for this signature in database
GPG Key ID: E2F3D7916E722D38
7 changed files with 24 additions and 15 deletions

View File

@ -33,7 +33,7 @@ LLMQContext::LLMQContext(ChainstateManager& chainman, CConnman& connman, CDeterm
qman{std::make_unique<llmq::CQuorumManager>(*bls_worker, chainman.ActiveChainstate(), connman, dmnman, *qdkgsman, qman{std::make_unique<llmq::CQuorumManager>(*bls_worker, chainman.ActiveChainstate(), connman, dmnman, *qdkgsman,
evo_db, *quorum_block_processor, mn_activeman, mn_sync, sporkman, evo_db, *quorum_block_processor, mn_activeman, mn_sync, sporkman,
unit_tests, wipe)}, unit_tests, wipe)},
sigman{std::make_unique<llmq::CSigningManager>(connman, mn_activeman, chainman.ActiveChainstate(), *qman, peerman, sigman{std::make_unique<llmq::CSigningManager>(mn_activeman, chainman.ActiveChainstate(), *qman, peerman,
unit_tests, wipe)}, unit_tests, wipe)},
shareman{std::make_unique<llmq::CSigSharesManager>(connman, *sigman, mn_activeman, *qman, sporkman, peerman)}, shareman{std::make_unique<llmq::CSigSharesManager>(connman, *sigman, mn_activeman, *qman, sporkman, peerman)},
clhandler{[&]() -> llmq::CChainLocksHandler* const { clhandler{[&]() -> llmq::CChainLocksHandler* const {

View File

@ -22,6 +22,7 @@
class CBlockIndex; class CBlockIndex;
class CChainState; class CChainState;
class CConnman;
class CDBWrapper; class CDBWrapper;
class CMasternodeSync; class CMasternodeSync;
class CSporkManager; class CSporkManager;

View File

@ -348,9 +348,9 @@ void CRecoveredSigsDb::CleanupOldVotes(int64_t maxAge)
////////////////// //////////////////
CSigningManager::CSigningManager(CConnman& _connman, const CActiveMasternodeManager* const mn_activeman, const CChainState& chainstate, CSigningManager::CSigningManager(const CActiveMasternodeManager* const mn_activeman, const CChainState& chainstate,
const CQuorumManager& _qman, const std::unique_ptr<PeerManager>& peerman, bool fMemory, bool fWipe) : const CQuorumManager& _qman, const std::unique_ptr<PeerManager>& peerman, bool fMemory, bool fWipe) :
db(fMemory, fWipe), connman(_connman), m_mn_activeman(mn_activeman), m_chainstate(chainstate), qman(_qman), m_peerman(peerman) db(fMemory, fWipe), m_mn_activeman(mn_activeman), m_chainstate(chainstate), qman(_qman), m_peerman(peerman)
{ {
} }
@ -633,12 +633,7 @@ void CSigningManager::ProcessRecoveredSig(const std::shared_ptr<const CRecovered
WITH_LOCK(cs_pending, pendingReconstructedRecoveredSigs.erase(recoveredSig->GetHash())); WITH_LOCK(cs_pending, pendingReconstructedRecoveredSigs.erase(recoveredSig->GetHash()));
if (m_mn_activeman != nullptr) { if (m_mn_activeman != nullptr) {
CInv inv(MSG_QUORUM_RECOVERED_SIG, recoveredSig->GetHash()); Assert(m_peerman)->RelayRecoveredSig(recoveredSig->GetHash());
connman.ForEachNode([&](const CNode* pnode) {
if (pnode->fSendRecSigs) {
Assert(m_peerman)->PushInventory(pnode->GetId(), inv);
}
});
} }
auto listeners = WITH_LOCK(cs_listeners, return recoveredSigsListeners); auto listeners = WITH_LOCK(cs_listeners, return recoveredSigsListeners);

View File

@ -19,7 +19,6 @@
class CActiveMasternodeManager; class CActiveMasternodeManager;
class CChainState; class CChainState;
class CConnman;
class CDataStream; class CDataStream;
class CDBBatch; class CDBBatch;
class CDBWrapper; class CDBWrapper;
@ -160,7 +159,6 @@ class CSigningManager
private: private:
CRecoveredSigsDb db; CRecoveredSigsDb db;
CConnman& connman;
const CActiveMasternodeManager* const m_mn_activeman; const CActiveMasternodeManager* const m_mn_activeman;
const CChainState& m_chainstate; const CChainState& m_chainstate;
const CQuorumManager& qman; const CQuorumManager& qman;
@ -179,7 +177,7 @@ private:
std::vector<CRecoveredSigsListener*> recoveredSigsListeners GUARDED_BY(cs_listeners); std::vector<CRecoveredSigsListener*> recoveredSigsListeners GUARDED_BY(cs_listeners);
public: public:
CSigningManager(CConnman& _connman, const CActiveMasternodeManager* const mn_activeman, const CChainState& chainstate, CSigningManager(const CActiveMasternodeManager* const mn_activeman, const CChainState& chainstate,
const CQuorumManager& _qman, const std::unique_ptr<PeerManager>& peerman, bool fMemory, bool fWipe); const CQuorumManager& _qman, const std::unique_ptr<PeerManager>& peerman, bool fMemory, bool fWipe);
bool AlreadyHave(const CInv& inv) const; bool AlreadyHave(const CInv& inv) const;

View File

@ -976,8 +976,6 @@ public:
// If true, we will send him CoinJoin queue messages // If true, we will send him CoinJoin queue messages
std::atomic<bool> fSendDSQueue{false}; std::atomic<bool> fSendDSQueue{false};
// If true, we will announce/send him plain recovered sigs (usually true for full nodes)
std::atomic<bool> fSendRecSigs{false};
// If true, we will send him all quorum related messages, even if he is not a member of our quorums // If true, we will send him all quorum related messages, even if he is not a member of our quorums
std::atomic<bool> qwatch{false}; std::atomic<bool> qwatch{false};

View File

@ -271,6 +271,8 @@ struct Peer {
std::atomic<std::chrono::microseconds> m_ping_start{0us}; std::atomic<std::chrono::microseconds> m_ping_start{0us};
/** Whether a ping has been requested by the user */ /** Whether a ping has been requested by the user */
std::atomic<bool> m_ping_queued{false}; std::atomic<bool> m_ping_queued{false};
/** Whether the peer has requested to receive llmq recovered signatures */
std::atomic<bool> m_wants_recsigs{false};
struct TxRelay { struct TxRelay {
mutable RecursiveMutex m_bloom_filter_mutex; mutable RecursiveMutex m_bloom_filter_mutex;
@ -607,6 +609,7 @@ public:
void RelayInvFiltered(CInv &inv, const CTransaction &relatedTx, const int minProtoVersion) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void RelayInvFiltered(CInv &inv, const CTransaction &relatedTx, const int minProtoVersion) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void RelayInvFiltered(CInv &inv, const uint256 &relatedTxHash, const int minProtoVersion) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void RelayInvFiltered(CInv &inv, const uint256 &relatedTxHash, const int minProtoVersion) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void RelayTransaction(const uint256& txid) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void RelayTransaction(const uint256& txid) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void RelayRecoveredSig(const uint256& sigHash) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void SetBestHeight(int height) override { m_best_height = height; }; void SetBestHeight(int height) override { m_best_height = height; };
void Misbehaving(const NodeId pnode, const int howmuch, const std::string& message = "") override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void Misbehaving(const NodeId pnode, const int howmuch, const std::string& message = "") override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv, void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv,
@ -2339,6 +2342,17 @@ void PeerManagerImpl::RelayTransaction(const uint256& txid)
}; };
} }
void PeerManagerImpl::RelayRecoveredSig(const uint256& sigHash)
{
const CInv inv{MSG_QUORUM_RECOVERED_SIG, sigHash};
LOCK(m_peer_mutex);
for (const auto& [_, peer] : m_peer_map) {
if (peer->m_wants_recsigs) {
PushInv(*peer, inv);
}
}
}
void PeerManagerImpl::RelayAddress(NodeId originator, void PeerManagerImpl::RelayAddress(NodeId originator,
const CAddress& addr, const CAddress& addr,
bool fReachable) bool fReachable)
@ -3948,7 +3962,7 @@ void PeerManagerImpl::ProcessMessage(
if (msg_type == NetMsgType::QSENDRECSIGS) { if (msg_type == NetMsgType::QSENDRECSIGS) {
bool b; bool b;
vRecv >> b; vRecv >> b;
pfrom.fSendRecSigs = b; peer->m_wants_recsigs = b;
return; return;
} }

View File

@ -109,6 +109,9 @@ public:
virtual void RelayTransaction(const uint256& txid) virtual void RelayTransaction(const uint256& txid)
EXCLUSIVE_LOCKS_REQUIRED(cs_main) = 0; EXCLUSIVE_LOCKS_REQUIRED(cs_main) = 0;
/** Relay recovered sigs to all interested peers */
virtual void RelayRecoveredSig(const uint256& sigHash) = 0;
/** Set the best height */ /** Set the best height */
virtual void SetBestHeight(int height) = 0; virtual void SetBestHeight(int height) = 0;