refactor: remove dependency of CDKGSession on PeerManager

This commit is contained in:
Konstantin Akimov 2024-09-20 13:53:38 +07:00
parent fb78b0cc94
commit c82672af94
No known key found for this signature in database
GPG Key ID: 2176C4A5D01EA524
4 changed files with 101 additions and 89 deletions

View File

@ -20,7 +20,6 @@
#include <logging.h> #include <logging.h>
#include <masternode/meta.h> #include <masternode/meta.h>
#include <masternode/node.h> #include <masternode/node.h>
#include <net_processing.h>
#include <netmessagemaker.h> #include <netmessagemaker.h>
#include <validation.h> #include <validation.h>
#include <util/irange.h> #include <util/irange.h>
@ -33,14 +32,15 @@
namespace llmq namespace llmq
{ {
class CDKGLogger : public CBatchedLogger CDKGLogger::CDKGLogger(const CDKGSession& _quorumDkg, std::string_view _func, int source_line) :
CDKGLogger(_quorumDkg.params.name, _quorumDkg.quorumIndex, _quorumDkg.m_quorum_base_block_index->GetBlockHash(), _quorumDkg.m_quorum_base_block_index->nHeight, _quorumDkg.AreWeMember(), _func, source_line)
{ {
public: }
CDKGLogger(const CDKGSession& _quorumDkg, std::string_view _func, int source_line) :
CDKGLogger(_quorumDkg.params.name, _quorumDkg.quorumIndex, _quorumDkg.m_quorum_base_block_index->GetBlockHash(), _quorumDkg.m_quorum_base_block_index->nHeight, _quorumDkg.AreWeMember(), _func, source_line){}; CDKGLogger::CDKGLogger(std::string_view _llmqTypeName, int _quorumIndex, const uint256& _quorumHash, int _height, bool _areWeMember, std::string_view _func, int source_line) :
CDKGLogger(std::string_view _llmqTypeName, int _quorumIndex, const uint256& _quorumHash, int _height, bool _areWeMember, std::string_view _func, int source_line) : CBatchedLogger(BCLog::LLMQ_DKG, strprintf("QuorumDKG(type=%s, qIndex=%d, h=%d, member=%d)", _llmqTypeName, _quorumIndex, _height, _areWeMember), __FILE__, source_line)
CBatchedLogger(BCLog::LLMQ_DKG, strprintf("QuorumDKG(type=%s, qIndex=%d, h=%d, member=%d)", _llmqTypeName, _quorumIndex, _height, _areWeMember), __FILE__, source_line){}; {
}; }
static std::array<std::atomic<double>, ToUnderlying(DKGError::type::_COUNT)> simDkgErrorMap{}; static std::array<std::atomic<double>, ToUnderlying(DKGError::type::_COUNT)> simDkgErrorMap{};
@ -259,7 +259,7 @@ bool CDKGSession::PreVerifyMessage(const CDKGContribution& qc, bool& retBan) con
return true; return true;
} }
void CDKGSession::ReceiveMessage(const CDKGContribution& qc) std::optional<CInv> CDKGSession::ReceiveMessage(const CDKGContribution& qc)
{ {
CDKGLogger logger(*this, __func__, __LINE__); CDKGLogger logger(*this, __func__, __LINE__);
@ -273,7 +273,7 @@ void CDKGSession::ReceiveMessage(const CDKGContribution& qc)
if (member->contributions.size() >= 2) { if (member->contributions.size() >= 2) {
// only relay up to 2 contributions, that's enough to let the other members know about his bad behavior // only relay up to 2 contributions, that's enough to let the other members know about his bad behavior
return; return std::nullopt;
} }
const uint256 hash = ::SerializeHash(qc); const uint256 hash = ::SerializeHash(qc);
@ -281,7 +281,6 @@ void CDKGSession::ReceiveMessage(const CDKGContribution& qc)
member->contributions.emplace(hash); member->contributions.emplace(hash);
CInv inv(MSG_QUORUM_CONTRIB, hash); CInv inv(MSG_QUORUM_CONTRIB, hash);
RelayInvToParticipants(inv);
dkgDebugManager.UpdateLocalMemberStatus(params.type, quorumIndex, member->idx, [&](CDKGDebugMemberStatus& status) { dkgDebugManager.UpdateLocalMemberStatus(params.type, quorumIndex, member->idx, [&](CDKGDebugMemberStatus& status) {
status.statusBits.receivedContribution = true; status.statusBits.receivedContribution = true;
@ -293,7 +292,7 @@ void CDKGSession::ReceiveMessage(const CDKGContribution& qc)
// so others know about his bad behavior // so others know about his bad behavior
MarkBadMember(member->idx); MarkBadMember(member->idx);
logger.Batch("%s did send multiple contributions", member->dmn->proTxHash.ToString()); logger.Batch("%s did send multiple contributions", member->dmn->proTxHash.ToString());
return; return inv;
} }
receivedVvecs[member->idx] = qc.vvec; receivedVvecs[member->idx] = qc.vvec;
@ -306,7 +305,7 @@ void CDKGSession::ReceiveMessage(const CDKGContribution& qc)
if (!AreWeMember()) { if (!AreWeMember()) {
// can't further validate // can't further validate
return; return inv;
} }
dkgManager.WriteVerifiedVvecContribution(params.type, m_quorum_base_block_index, qc.proTxHash, qc.vvec); dkgManager.WriteVerifiedVvecContribution(params.type, m_quorum_base_block_index, qc.proTxHash, qc.vvec);
@ -327,7 +326,7 @@ void CDKGSession::ReceiveMessage(const CDKGContribution& qc)
status.statusBits.weComplain = true; status.statusBits.weComplain = true;
return true; return true;
}); });
return; return inv;
} }
logger.Batch("decrypted our contribution share. time=%d", t2.count()); logger.Batch("decrypted our contribution share. time=%d", t2.count());
@ -339,6 +338,7 @@ void CDKGSession::ReceiveMessage(const CDKGContribution& qc)
if (pendingContributionVerifications.size() >= 32) { if (pendingContributionVerifications.size() >= 32) {
VerifyPendingContributions(); VerifyPendingContributions();
} }
return inv;
} }
// Verifies all pending secret key contributions in one batch // Verifies all pending secret key contributions in one batch
@ -567,7 +567,7 @@ bool CDKGSession::PreVerifyMessage(const CDKGComplaint& qc, bool& retBan) const
return true; return true;
} }
void CDKGSession::ReceiveMessage(const CDKGComplaint& qc) std::optional<CInv> CDKGSession::ReceiveMessage(const CDKGComplaint& qc)
{ {
CDKGLogger logger(*this, __func__, __LINE__); CDKGLogger logger(*this, __func__, __LINE__);
@ -577,7 +577,7 @@ void CDKGSession::ReceiveMessage(const CDKGComplaint& qc)
if (member->complaints.size() >= 2) { if (member->complaints.size() >= 2) {
// only relay up to 2 complaints, that's enough to let the other members know about his bad behavior // only relay up to 2 complaints, that's enough to let the other members know about his bad behavior
return; return std::nullopt;
} }
const uint256 hash = ::SerializeHash(qc); const uint256 hash = ::SerializeHash(qc);
@ -585,7 +585,6 @@ void CDKGSession::ReceiveMessage(const CDKGComplaint& qc)
member->complaints.emplace(hash); member->complaints.emplace(hash);
CInv inv(MSG_QUORUM_COMPLAINT, hash); CInv inv(MSG_QUORUM_COMPLAINT, hash);
RelayInvToParticipants(inv);
dkgDebugManager.UpdateLocalMemberStatus(params.type, quorumIndex, member->idx, [&](CDKGDebugMemberStatus& status) { dkgDebugManager.UpdateLocalMemberStatus(params.type, quorumIndex, member->idx, [&](CDKGDebugMemberStatus& status) {
status.statusBits.receivedComplaint = true; status.statusBits.receivedComplaint = true;
@ -597,7 +596,7 @@ void CDKGSession::ReceiveMessage(const CDKGComplaint& qc)
// so others know about his bad behavior // so others know about his bad behavior
MarkBadMember(member->idx); MarkBadMember(member->idx);
logger.Batch("%s did send multiple complaints", member->dmn->proTxHash.ToString()); logger.Batch("%s did send multiple complaints", member->dmn->proTxHash.ToString());
return; return inv;
} }
int receivedCount = 0; int receivedCount = 0;
@ -626,6 +625,7 @@ void CDKGSession::ReceiveMessage(const CDKGComplaint& qc)
} }
logger.Batch("received and relayed complaint. received=%d", receivedCount); logger.Batch("received and relayed complaint. received=%d", receivedCount);
return inv;
} }
void CDKGSession::VerifyAndJustify(CDKGPendingMessages& pendingMessages) void CDKGSession::VerifyAndJustify(CDKGPendingMessages& pendingMessages)
@ -776,7 +776,7 @@ bool CDKGSession::PreVerifyMessage(const CDKGJustification& qj, bool& retBan) co
return true; return true;
} }
void CDKGSession::ReceiveMessage(const CDKGJustification& qj) std::optional<CInv> CDKGSession::ReceiveMessage(const CDKGJustification& qj)
{ {
CDKGLogger logger(*this, __func__, __LINE__); CDKGLogger logger(*this, __func__, __LINE__);
@ -786,7 +786,7 @@ void CDKGSession::ReceiveMessage(const CDKGJustification& qj)
if (member->justifications.size() >= 2) { if (member->justifications.size() >= 2) {
// only relay up to 2 justifications, that's enough to let the other members know about his bad behavior // only relay up to 2 justifications, that's enough to let the other members know about his bad behavior
return; return std::nullopt;
} }
const uint256 hash = ::SerializeHash(qj); const uint256 hash = ::SerializeHash(qj);
@ -795,7 +795,6 @@ void CDKGSession::ReceiveMessage(const CDKGJustification& qj)
// we always relay, even if further verification fails // we always relay, even if further verification fails
CInv inv(MSG_QUORUM_JUSTIFICATION, hash); CInv inv(MSG_QUORUM_JUSTIFICATION, hash);
RelayInvToParticipants(inv);
dkgDebugManager.UpdateLocalMemberStatus(params.type, quorumIndex, member->idx, [&](CDKGDebugMemberStatus& status) { dkgDebugManager.UpdateLocalMemberStatus(params.type, quorumIndex, member->idx, [&](CDKGDebugMemberStatus& status) {
status.statusBits.receivedJustification = true; status.statusBits.receivedJustification = true;
@ -807,13 +806,13 @@ void CDKGSession::ReceiveMessage(const CDKGJustification& qj)
// so others know about his bad behavior // so others know about his bad behavior
logger.Batch("%s did send multiple justifications", member->dmn->proTxHash.ToString()); logger.Batch("%s did send multiple justifications", member->dmn->proTxHash.ToString());
MarkBadMember(member->idx); MarkBadMember(member->idx);
return; return inv;
} }
if (member->bad) { if (member->bad) {
// we locally determined him to be bad (sent none or more then one contributions) // we locally determined him to be bad (sent none or more then one contributions)
// don't give him a second chance (but we relay the justification in case other members disagree) // don't give him a second chance (but we relay the justification in case other members disagree)
return; return inv;
} }
for (const auto& p : qj.contributions) { for (const auto& p : qj.contributions) {
@ -826,7 +825,7 @@ void CDKGSession::ReceiveMessage(const CDKGJustification& qj)
} }
} }
if (member->bad) { if (member->bad) {
return; return inv;
} }
cxxtimer::Timer t1(true); cxxtimer::Timer t1(true);
@ -866,6 +865,7 @@ void CDKGSession::ReceiveMessage(const CDKGJustification& qj)
}); });
logger.Batch("verified justification: received=%d/%d time=%d", receivedCount, expectedCount, t1.count()); logger.Batch("verified justification: received=%d/%d time=%d", receivedCount, expectedCount, t1.count());
return inv;
} }
void CDKGSession::VerifyAndCommit(CDKGPendingMessages& pendingMessages) void CDKGSession::VerifyAndCommit(CDKGPendingMessages& pendingMessages)
@ -1089,7 +1089,7 @@ bool CDKGSession::PreVerifyMessage(const CDKGPrematureCommitment& qc, bool& retB
return true; return true;
} }
void CDKGSession::ReceiveMessage(const CDKGPrematureCommitment& qc) std::optional<CInv> CDKGSession::ReceiveMessage(const CDKGPrematureCommitment& qc)
{ {
CDKGLogger logger(*this, __func__, __LINE__); CDKGLogger logger(*this, __func__, __LINE__);
@ -1129,30 +1129,29 @@ void CDKGSession::ReceiveMessage(const CDKGPrematureCommitment& qc)
if ((*quorumVvec)[0] != qc.quorumPublicKey) { if ((*quorumVvec)[0] != qc.quorumPublicKey) {
logger.Batch("calculated quorum public key does not match"); logger.Batch("calculated quorum public key does not match");
return; return std::nullopt;
} }
uint256 vvecHash = ::SerializeHash(*quorumVvec); uint256 vvecHash = ::SerializeHash(*quorumVvec);
if (qc.quorumVvecHash != vvecHash) { if (qc.quorumVvecHash != vvecHash) {
logger.Batch("calculated quorum vvec hash does not match"); logger.Batch("calculated quorum vvec hash does not match");
return; return std::nullopt;
} }
CBLSPublicKey pubKeyShare = cache.BuildPubKeyShare(::SerializeHash(std::make_pair(memberIndexes, member->id)), quorumVvec, member->id); CBLSPublicKey pubKeyShare = cache.BuildPubKeyShare(::SerializeHash(std::make_pair(memberIndexes, member->id)), quorumVvec, member->id);
if (!pubKeyShare.IsValid()) { if (!pubKeyShare.IsValid()) {
logger.Batch("failed to calculate public key share"); logger.Batch("failed to calculate public key share");
return; return std::nullopt;
} }
if (!qc.quorumSig.VerifyInsecure(pubKeyShare, qc.GetSignHash())) { if (!qc.quorumSig.VerifyInsecure(pubKeyShare, qc.GetSignHash())) {
logger.Batch("failed to verify quorumSig"); logger.Batch("failed to verify quorumSig");
return; return std::nullopt;
} }
} }
WITH_LOCK(invCs, validCommitments.emplace(hash)); WITH_LOCK(invCs, validCommitments.emplace(hash));
CInv inv(MSG_QUORUM_PREMATURE_COMMITMENT, hash); CInv inv(MSG_QUORUM_PREMATURE_COMMITMENT, hash);
RelayInvToParticipants(inv);
dkgDebugManager.UpdateLocalMemberStatus(params.type, quorumIndex, member->idx, [&](CDKGDebugMemberStatus& status) { dkgDebugManager.UpdateLocalMemberStatus(params.type, quorumIndex, member->idx, [&](CDKGDebugMemberStatus& status) {
status.statusBits.receivedPrematureCommitment = true; status.statusBits.receivedPrematureCommitment = true;
@ -1164,6 +1163,7 @@ void CDKGSession::ReceiveMessage(const CDKGPrematureCommitment& qc)
t1.stop(); t1.stop();
logger.Batch("verified premature commitment. received=%d/%d, time=%d", receivedCount, members.size(), t1.count()); logger.Batch("verified premature commitment. received=%d/%d, time=%d", receivedCount, members.size(), t1.count());
return inv;
} }
std::vector<CFinalCommitment> CDKGSession::FinalizeCommitments() std::vector<CFinalCommitment> CDKGSession::FinalizeCommitments()
@ -1296,41 +1296,5 @@ void CDKGSession::MarkBadMember(size_t idx)
member->bad = true; member->bad = true;
} }
void CDKGSession::RelayInvToParticipants(const CInv& inv) const
{
CDKGLogger logger(*this, __func__, __LINE__);
std::stringstream ss;
for (const auto& r : relayMembers) {
ss << r.ToString().substr(0, 4) << " | ";
}
logger.Batch("RelayInvToParticipants inv[%s] relayMembers[%d] GetNodeCount[%d] GetNetworkActive[%d] HasMasternodeQuorumNodes[%d] for quorumHash[%s] forMember[%s] relayMembers[%s]",
inv.ToString(),
relayMembers.size(),
connman.GetNodeCount(ConnectionDirection::Both),
connman.GetNetworkActive(),
connman.HasMasternodeQuorumNodes(params.type, m_quorum_base_block_index->GetBlockHash()),
m_quorum_base_block_index->GetBlockHash().ToString(),
myProTxHash.ToString().substr(0, 4), ss.str());
std::stringstream ss2;
connman.ForEachNode([&](const CNode* pnode) {
if (pnode->qwatch ||
(!pnode->GetVerifiedProRegTxHash().IsNull() && (relayMembers.count(pnode->GetVerifiedProRegTxHash()) != 0))) {
Assert(m_peerman)->PushInventory(pnode->GetId(), inv);
}
if (pnode->GetVerifiedProRegTxHash().IsNull()) {
logger.Batch("node[%d:%s] not mn",
pnode->GetId(),
pnode->m_addr_name);
} else if (relayMembers.count(pnode->GetVerifiedProRegTxHash()) == 0) {
ss2 << pnode->GetVerifiedProRegTxHash().ToString().substr(0, 4) << " | ";
}
});
logger.Batch("forMember[%s] NOTrelayMembers[%s]",
myProTxHash.ToString().substr(0, 4),
ss2.str());
logger.Flush();
}
} // namespace llmq } // namespace llmq

View File

@ -5,13 +5,14 @@
#ifndef BITCOIN_LLMQ_DKGSESSION_H #ifndef BITCOIN_LLMQ_DKGSESSION_H
#define BITCOIN_LLMQ_DKGSESSION_H #define BITCOIN_LLMQ_DKGSESSION_H
#include <llmq/commitment.h>
#include <batchedlogger.h>
#include <bls/bls.h> #include <bls/bls.h>
#include <bls/bls_ies.h> #include <bls/bls_ies.h>
#include <bls/bls_worker.h> #include <bls/bls_worker.h>
#include <llmq/commitment.h>
#include <util/underlying.h>
#include <sync.h> #include <sync.h>
#include <util/underlying.h>
#include <optional> #include <optional>
@ -22,7 +23,6 @@ class CDeterministicMN;
class CMasternodeMetaMan; class CMasternodeMetaMan;
class CSporkManager; class CSporkManager;
class UniValue; class UniValue;
class PeerManager;
using CDeterministicMNCPtr = std::shared_ptr<const CDeterministicMN>; using CDeterministicMNCPtr = std::shared_ptr<const CDeterministicMN>;
@ -249,6 +249,13 @@ public:
} }
}; };
class CDKGLogger : public CBatchedLogger
{
public:
CDKGLogger(const CDKGSession& _quorumDkg, std::string_view _func, int source_line);
CDKGLogger(std::string_view _llmqTypeName, int _quorumIndex, const uint256& _quorumHash, int _height, bool _areWeMember, std::string_view _func, int source_line);
};
/** /**
* The DKG session is a single instance of the DKG process. It is owned and called by CDKGSessionHandler, which passes * The DKG session is a single instance of the DKG process. It is owned and called by CDKGSessionHandler, which passes
* received DKG messages to the session. The session is not persistent and will loose it's state (the whole object is * received DKG messages to the session. The session is not persistent and will loose it's state (the whole object is
@ -281,7 +288,6 @@ private:
CMasternodeMetaMan& m_mn_metaman; CMasternodeMetaMan& m_mn_metaman;
const CActiveMasternodeManager* const m_mn_activeman; const CActiveMasternodeManager* const m_mn_activeman;
const CSporkManager& m_sporkman; const CSporkManager& m_sporkman;
const std::unique_ptr<PeerManager>& m_peerman;
const CBlockIndex* m_quorum_base_block_index{nullptr}; const CBlockIndex* m_quorum_base_block_index{nullptr};
int quorumIndex{0}; int quorumIndex{0};
@ -324,11 +330,12 @@ public:
CDKGSession(const Consensus::LLMQParams& _params, CBLSWorker& _blsWorker, CConnman& _connman, CDKGSession(const Consensus::LLMQParams& _params, CBLSWorker& _blsWorker, CConnman& _connman,
CDeterministicMNManager& dmnman, CDKGSessionManager& _dkgManager, CDKGDebugManager& _dkgDebugManager, CDeterministicMNManager& dmnman, CDKGSessionManager& _dkgManager, CDKGDebugManager& _dkgDebugManager,
CMasternodeMetaMan& mn_metaman, const CActiveMasternodeManager* const mn_activeman, CMasternodeMetaMan& mn_metaman, const CActiveMasternodeManager* const mn_activeman,
const CSporkManager& sporkman, const std::unique_ptr<PeerManager>& peerman) : const CSporkManager& sporkman) :
params(_params), blsWorker(_blsWorker), cache(_blsWorker), connman(_connman), m_dmnman(dmnman), dkgManager(_dkgManager), params(_params), blsWorker(_blsWorker), cache(_blsWorker), connman(_connman), m_dmnman(dmnman), dkgManager(_dkgManager),
dkgDebugManager(_dkgDebugManager), m_mn_metaman(mn_metaman), m_mn_activeman(mn_activeman), m_sporkman(sporkman), dkgDebugManager(_dkgDebugManager), m_mn_metaman(mn_metaman), m_mn_activeman(mn_activeman), m_sporkman(sporkman)
m_peerman(peerman) {} {}
// TODO: remove Init completely
bool Init(gsl::not_null<const CBlockIndex*> pQuorumBaseBlockIndex, Span<CDeterministicMNCPtr> mns, const uint256& _myProTxHash, int _quorumIndex); bool Init(gsl::not_null<const CBlockIndex*> pQuorumBaseBlockIndex, Span<CDeterministicMNCPtr> mns, const uint256& _myProTxHash, int _quorumIndex);
[[nodiscard]] std::optional<size_t> GetMyMemberIndex() const { return myIdx; } [[nodiscard]] std::optional<size_t> GetMyMemberIndex() const { return myIdx; }
@ -350,7 +357,7 @@ public:
void Contribute(CDKGPendingMessages& pendingMessages); void Contribute(CDKGPendingMessages& pendingMessages);
void SendContributions(CDKGPendingMessages& pendingMessages); void SendContributions(CDKGPendingMessages& pendingMessages);
bool PreVerifyMessage(const CDKGContribution& qc, bool& retBan) const; bool PreVerifyMessage(const CDKGContribution& qc, bool& retBan) const;
void ReceiveMessage(const CDKGContribution& qc); std::optional<CInv> ReceiveMessage(const CDKGContribution& qc);
void VerifyPendingContributions() EXCLUSIVE_LOCKS_REQUIRED(cs_pending); void VerifyPendingContributions() EXCLUSIVE_LOCKS_REQUIRED(cs_pending);
// Phase 2: complaint // Phase 2: complaint
@ -358,19 +365,19 @@ public:
void VerifyConnectionAndMinProtoVersions() const; void VerifyConnectionAndMinProtoVersions() const;
void SendComplaint(CDKGPendingMessages& pendingMessages); void SendComplaint(CDKGPendingMessages& pendingMessages);
bool PreVerifyMessage(const CDKGComplaint& qc, bool& retBan) const; bool PreVerifyMessage(const CDKGComplaint& qc, bool& retBan) const;
void ReceiveMessage(const CDKGComplaint& qc); std::optional<CInv> ReceiveMessage(const CDKGComplaint& qc);
// Phase 3: justification // Phase 3: justification
void VerifyAndJustify(CDKGPendingMessages& pendingMessages); void VerifyAndJustify(CDKGPendingMessages& pendingMessages);
void SendJustification(CDKGPendingMessages& pendingMessages, const std::set<uint256>& forMembers); void SendJustification(CDKGPendingMessages& pendingMessages, const std::set<uint256>& forMembers);
bool PreVerifyMessage(const CDKGJustification& qj, bool& retBan) const; bool PreVerifyMessage(const CDKGJustification& qj, bool& retBan) const;
void ReceiveMessage(const CDKGJustification& qj); std::optional<CInv> ReceiveMessage(const CDKGJustification& qj);
// Phase 4: commit // Phase 4: commit
void VerifyAndCommit(CDKGPendingMessages& pendingMessages); void VerifyAndCommit(CDKGPendingMessages& pendingMessages);
void SendCommitment(CDKGPendingMessages& pendingMessages); void SendCommitment(CDKGPendingMessages& pendingMessages);
bool PreVerifyMessage(const CDKGPrematureCommitment& qc, bool& retBan) const; bool PreVerifyMessage(const CDKGPrematureCommitment& qc, bool& retBan) const;
void ReceiveMessage(const CDKGPrematureCommitment& qc); std::optional<CInv> ReceiveMessage(const CDKGPrematureCommitment& qc);
// Phase 5: aggregate/finalize // Phase 5: aggregate/finalize
std::vector<CFinalCommitment> FinalizeCommitments(); std::vector<CFinalCommitment> FinalizeCommitments();
@ -378,10 +385,12 @@ public:
[[nodiscard]] bool AreWeMember() const { return !myProTxHash.IsNull(); } [[nodiscard]] bool AreWeMember() const { return !myProTxHash.IsNull(); }
void MarkBadMember(size_t idx); void MarkBadMember(size_t idx);
void RelayInvToParticipants(const CInv& inv) const;
public: public:
[[nodiscard]] CDKGMember* GetMember(const uint256& proTxHash) const; [[nodiscard]] CDKGMember* GetMember(const uint256& proTxHash) const;
[[nodiscard]] const std::set<uint256>& RelayMembers() const { return relayMembers; }
[[nodiscard]] const CBlockIndex* BlockIndex() const { return m_quorum_base_block_index; }
[[nodiscard]] const uint256& ProTx() const { return myProTxHash; }
[[nodiscard]] const Consensus::LLMQParams GetParams() const { return params; }
private: private:
[[nodiscard]] bool ShouldSimulateError(DKGError::type type) const; [[nodiscard]] bool ShouldSimulateError(DKGError::type type) const;

View File

@ -41,7 +41,7 @@ CDKGSessionHandler::CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chai
m_peerman(peerman), m_peerman(peerman),
params(_params), params(_params),
quorumIndex(_quorumIndex), quorumIndex(_quorumIndex),
curSession(std::make_unique<CDKGSession>(_params, _blsWorker, _connman, dmnman, _dkgManager, _dkgDebugManager, m_mn_metaman, m_mn_activeman, sporkman, peerman)), curSession(std::make_unique<CDKGSession>(_params, _blsWorker, _connman, dmnman, _dkgManager, _dkgDebugManager, m_mn_metaman, m_mn_activeman, sporkman)),
pendingContributions((size_t)_params.size * 2, MSG_QUORUM_CONTRIB), // we allow size*2 messages as we need to make sure we see bad behavior (double messages) pendingContributions((size_t)_params.size * 2, MSG_QUORUM_CONTRIB), // we allow size*2 messages as we need to make sure we see bad behavior (double messages)
pendingComplaints((size_t)_params.size * 2, MSG_QUORUM_COMPLAINT), pendingComplaints((size_t)_params.size * 2, MSG_QUORUM_COMPLAINT),
pendingJustifications((size_t)_params.size * 2, MSG_QUORUM_JUSTIFICATION), pendingJustifications((size_t)_params.size * 2, MSG_QUORUM_JUSTIFICATION),
@ -190,7 +190,7 @@ void CDKGSessionHandler::StopThread()
bool CDKGSessionHandler::InitNewQuorum(const CBlockIndex* pQuorumBaseBlockIndex) bool CDKGSessionHandler::InitNewQuorum(const CBlockIndex* pQuorumBaseBlockIndex)
{ {
curSession = std::make_unique<CDKGSession>(params, blsWorker, connman, m_dmnman, dkgManager, dkgDebugManager, m_mn_metaman, m_mn_activeman, m_sporkman, m_peerman); curSession = std::make_unique<CDKGSession>(params, blsWorker, connman, m_dmnman, dkgManager, dkgDebugManager, m_mn_metaman, m_mn_activeman, m_sporkman);
if (!DeploymentDIP0003Enforced(pQuorumBaseBlockIndex->nHeight, Params().GetConsensus())) { if (!DeploymentDIP0003Enforced(pQuorumBaseBlockIndex->nHeight, Params().GetConsensus())) {
return false; return false;
@ -438,8 +438,45 @@ std::set<NodeId> BatchVerifyMessageSigs(CDKGSession& session, const std::vector<
return ret; return ret;
} }
static void RelayInvToParticipants(const CDKGSession& session, CConnman& connman, PeerManager& peerman, const CInv& inv)
{
CDKGLogger logger(session, __func__, __LINE__);
std::stringstream ss;
const auto& relayMembers = session.RelayMembers();
for (const auto& r : relayMembers) {
ss << r.ToString().substr(0, 4) << " | ";
}
logger.Batch("RelayInvToParticipants inv[%s] relayMembers[%d] GetNodeCount[%d] GetNetworkActive[%d] HasMasternodeQuorumNodes[%d] for quorumHash[%s] forMember[%s] relayMembers[%s]",
inv.ToString(),
relayMembers.size(),
connman.GetNodeCount(ConnectionDirection::Both),
connman.GetNetworkActive(),
connman.HasMasternodeQuorumNodes(session.GetParams().type, session.BlockIndex()->GetBlockHash()),
session.BlockIndex()->GetBlockHash().ToString(),
session.ProTx().ToString().substr(0, 4), ss.str());
std::stringstream ss2;
connman.ForEachNode([&](const CNode* pnode) {
if (pnode->qwatch ||
(!pnode->GetVerifiedProRegTxHash().IsNull() && (relayMembers.count(pnode->GetVerifiedProRegTxHash()) != 0))) {
peerman.PushInventory(pnode->GetId(), inv);
}
if (pnode->GetVerifiedProRegTxHash().IsNull()) {
logger.Batch("node[%d:%s] not mn",
pnode->GetId(),
pnode->m_addr_name);
} else if (relayMembers.count(pnode->GetVerifiedProRegTxHash()) == 0) {
ss2 << pnode->GetVerifiedProRegTxHash().ToString().substr(0, 4) << " | ";
}
});
logger.Batch("forMember[%s] NOTrelayMembers[%s]",
session.ProTx().ToString().substr(0, 4),
ss2.str());
logger.Flush();
}
template<typename Message, int MessageType> template<typename Message, int MessageType>
bool ProcessPendingMessageBatch(CDKGSession& session, CDKGPendingMessages& pendingMessages, size_t maxCount) bool ProcessPendingMessageBatch(CConnman& connman, PeerManager* peerman, CDKGSession& session, CDKGPendingMessages& pendingMessages, size_t maxCount)
{ {
auto msgs = pendingMessages.PopAndDeserializeMessages<Message>(maxCount); auto msgs = pendingMessages.PopAndDeserializeMessages<Message>(maxCount);
if (msgs.empty()) { if (msgs.empty()) {
@ -489,7 +526,10 @@ bool ProcessPendingMessageBatch(CDKGSession& session, CDKGPendingMessages& pendi
if (badNodes.count(nodeId)) { if (badNodes.count(nodeId)) {
continue; continue;
} }
session.ReceiveMessage(*p.second); const std::optional<CInv> inv = session.ReceiveMessage(*p.second);
if (inv && peerman) {
RelayInvToParticipants(session, connman, *peerman, *inv);
}
} }
return true; return true;
@ -532,7 +572,7 @@ void CDKGSessionHandler::HandleDKGRound()
curSession->Contribute(pendingContributions); curSession->Contribute(pendingContributions);
}; };
auto fContributeWait = [this] { auto fContributeWait = [this] {
return ProcessPendingMessageBatch<CDKGContribution, MSG_QUORUM_CONTRIB>(*curSession, pendingContributions, 8); return ProcessPendingMessageBatch<CDKGContribution, MSG_QUORUM_CONTRIB>(connman, m_peerman.get(), *curSession, pendingContributions, 8);
}; };
HandlePhase(QuorumPhase::Contribute, QuorumPhase::Complain, curQuorumHash, 0.05, fContributeStart, fContributeWait); HandlePhase(QuorumPhase::Contribute, QuorumPhase::Complain, curQuorumHash, 0.05, fContributeStart, fContributeWait);
@ -541,7 +581,7 @@ void CDKGSessionHandler::HandleDKGRound()
curSession->VerifyAndComplain(pendingComplaints); curSession->VerifyAndComplain(pendingComplaints);
}; };
auto fComplainWait = [this] { auto fComplainWait = [this] {
return ProcessPendingMessageBatch<CDKGComplaint, MSG_QUORUM_COMPLAINT>(*curSession, pendingComplaints, 8); return ProcessPendingMessageBatch<CDKGComplaint, MSG_QUORUM_COMPLAINT>(connman, m_peerman.get(), *curSession, pendingComplaints, 8);
}; };
HandlePhase(QuorumPhase::Complain, QuorumPhase::Justify, curQuorumHash, 0.05, fComplainStart, fComplainWait); HandlePhase(QuorumPhase::Complain, QuorumPhase::Justify, curQuorumHash, 0.05, fComplainStart, fComplainWait);
@ -550,7 +590,7 @@ void CDKGSessionHandler::HandleDKGRound()
curSession->VerifyAndJustify(pendingJustifications); curSession->VerifyAndJustify(pendingJustifications);
}; };
auto fJustifyWait = [this] { auto fJustifyWait = [this] {
return ProcessPendingMessageBatch<CDKGJustification, MSG_QUORUM_JUSTIFICATION>(*curSession, pendingJustifications, 8); return ProcessPendingMessageBatch<CDKGJustification, MSG_QUORUM_JUSTIFICATION>(connman, m_peerman.get(), *curSession, pendingJustifications, 8);
}; };
HandlePhase(QuorumPhase::Justify, QuorumPhase::Commit, curQuorumHash, 0.05, fJustifyStart, fJustifyWait); HandlePhase(QuorumPhase::Justify, QuorumPhase::Commit, curQuorumHash, 0.05, fJustifyStart, fJustifyWait);
@ -559,7 +599,7 @@ void CDKGSessionHandler::HandleDKGRound()
curSession->VerifyAndCommit(pendingPrematureCommitments); curSession->VerifyAndCommit(pendingPrematureCommitments);
}; };
auto fCommitWait = [this] { auto fCommitWait = [this] {
return ProcessPendingMessageBatch<CDKGPrematureCommitment, MSG_QUORUM_PREMATURE_COMMITMENT>(*curSession, pendingPrematureCommitments, 8); return ProcessPendingMessageBatch<CDKGPrematureCommitment, MSG_QUORUM_PREMATURE_COMMITMENT>(connman, m_peerman.get(), *curSession, pendingPrematureCommitments, 8);
}; };
HandlePhase(QuorumPhase::Commit, QuorumPhase::Finalize, curQuorumHash, 0.1, fCommitStart, fCommitWait); HandlePhase(QuorumPhase::Commit, QuorumPhase::Finalize, curQuorumHash, 0.1, fCommitStart, fCommitWait);

View File

@ -93,7 +93,6 @@ EXPECTED_CIRCULAR_DEPENDENCIES=(
"llmq/context -> llmq/ehf_signals -> net_processing -> llmq/context" "llmq/context -> llmq/ehf_signals -> net_processing -> llmq/context"
"llmq/blockprocessor -> net_processing -> llmq/blockprocessor" "llmq/blockprocessor -> net_processing -> llmq/blockprocessor"
"llmq/chainlocks -> net_processing -> llmq/chainlocks" "llmq/chainlocks -> net_processing -> llmq/chainlocks"
"llmq/dkgsession -> net_processing -> llmq/quorums -> llmq/dkgsession"
"net_processing -> spork -> net_processing" "net_processing -> spork -> net_processing"
"evo/simplifiedmns -> llmq/blockprocessor -> net_processing -> evo/simplifiedmns" "evo/simplifiedmns -> llmq/blockprocessor -> net_processing -> evo/simplifiedmns"
"governance/governance -> net_processing -> governance/governance" "governance/governance -> net_processing -> governance/governance"