refactor: remove dependency of QuorumBlockProcessor on PeerManager

This commit is contained in:
Konstantin Akimov 2024-09-26 23:13:21 +07:00
parent f1c6d17879
commit 1d13f010d0
No known key found for this signature in database
GPG Key ID: 2176C4A5D01EA524
7 changed files with 44 additions and 51 deletions

View File

@ -16,7 +16,6 @@
#include <consensus/validation.h>
#include <deploymentstatus.h>
#include <net.h>
#include <net_processing.h>
#include <primitives/block.h>
#include <primitives/transaction.h>
#include <saltedhasher.h>
@ -44,14 +43,16 @@ static const std::string DB_MINED_COMMITMENT_BY_INVERSED_HEIGHT_Q_INDEXED = "q_m
static const std::string DB_BEST_BLOCK_UPGRADE = "q_bbu2";
CQuorumBlockProcessor::CQuorumBlockProcessor(CChainState& chainstate, CDeterministicMNManager& dmnman, CEvoDB& evoDb,
const std::unique_ptr<PeerManager>& peerman) :
m_chainstate(chainstate), m_dmnman(dmnman), m_evoDb(evoDb), m_peerman(peerman)
CQuorumBlockProcessor::CQuorumBlockProcessor(CChainState& chainstate, CDeterministicMNManager& dmnman, CEvoDB& evoDb) :
m_chainstate(chainstate),
m_dmnman(dmnman),
m_evoDb(evoDb)
{
utils::InitQuorumsCache(mapHasMinedCommitmentCache);
}
PeerMsgRet CQuorumBlockProcessor::ProcessMessage(const CNode& peer, std::string_view msg_type, CDataStream& vRecv)
MessageProcessingResult CQuorumBlockProcessor::ProcessMessage(const CNode& peer, std::string_view msg_type,
CDataStream& vRecv)
{
if (msg_type != NetMsgType::QFCOMMITMENT) {
return {};
@ -60,19 +61,21 @@ PeerMsgRet CQuorumBlockProcessor::ProcessMessage(const CNode& peer, std::string_
CFinalCommitment qc;
vRecv >> qc;
WITH_LOCK(::cs_main, Assert(m_peerman)->EraseObjectRequest(peer.GetId(),
CInv(MSG_QUORUM_FINAL_COMMITMENT, ::SerializeHash(qc))));
MessageProcessingResult ret;
ret.m_to_erase = CInv{MSG_QUORUM_FINAL_COMMITMENT, ::SerializeHash(qc)};
if (qc.IsNull()) {
LogPrint(BCLog::LLMQ, "CQuorumBlockProcessor::%s -- null commitment from peer=%d\n", __func__, peer.GetId());
return tl::unexpected{100};
ret.m_error = MisbehavingError{100};
return ret;
}
const auto& llmq_params_opt = Params().GetLLMQ(qc.llmqType);
if (!llmq_params_opt.has_value()) {
LogPrint(BCLog::LLMQ, "CQuorumBlockProcessor::%s -- invalid commitment type %d from peer=%d\n", __func__,
ToUnderlying(qc.llmqType), peer.GetId());
return tl::unexpected{100};
ret.m_error = MisbehavingError{100};
return ret;
}
auto type = qc.llmqType;
@ -86,32 +89,33 @@ PeerMsgRet CQuorumBlockProcessor::ProcessMessage(const CNode& peer, std::string_
qc.quorumHash.ToString(), peer.GetId());
// can't really punish the node here, as we might simply be the one that is on the wrong chain or not
// fully synced
return {};
return ret;
}
if (m_chainstate.m_chain.Tip()->GetAncestor(pQuorumBaseBlockIndex->nHeight) != pQuorumBaseBlockIndex) {
LogPrint(BCLog::LLMQ, "CQuorumBlockProcessor::%s -- block %s not in active chain, peer=%d\n", __func__,
qc.quorumHash.ToString(), peer.GetId());
// same, can't punish
return {};
return ret;
}
if (int quorumHeight = pQuorumBaseBlockIndex->nHeight - (pQuorumBaseBlockIndex->nHeight % llmq_params_opt->dkgInterval) + int(qc.quorumIndex);
quorumHeight != pQuorumBaseBlockIndex->nHeight) {
LogPrint(BCLog::LLMQ, "CQuorumBlockProcessor::%s -- block %s is not the first block in the DKG interval, peer=%d\n", __func__,
qc.quorumHash.ToString(), peer.GetId());
return tl::unexpected{100};
ret.m_error = MisbehavingError{100};
return ret;
}
if (pQuorumBaseBlockIndex->nHeight < (m_chainstate.m_chain.Height() - llmq_params_opt->dkgInterval)) {
LogPrint(BCLog::LLMQ, "CQuorumBlockProcessor::%s -- block %s is too old, peer=%d\n", __func__,
qc.quorumHash.ToString(), peer.GetId());
// TODO: enable punishment in some future version when all/most nodes are running with this fix
// return tl::unexpected{100};
return {};
// ret.m_error = MisbehavingError{100};
return ret;
}
if (HasMinedCommitment(type, qc.quorumHash)) {
LogPrint(BCLog::LLMQ, "CQuorumBlockProcessor::%s -- commitment for quorum hash[%s], type[%d], quorumIndex[%d] is already mined, peer=%d\n",
__func__, qc.quorumHash.ToString(), ToUnderlying(type), qc.quorumIndex, peer.GetId());
// NOTE: do not punish here
return {};
return ret;
}
}
@ -124,7 +128,7 @@ PeerMsgRet CQuorumBlockProcessor::ProcessMessage(const CNode& peer, std::string_
if (it != minableCommitmentsByQuorum.end()) {
auto jt = minableCommitments.find(it->second);
if (jt != minableCommitments.end() && jt->second.CountSigners() <= qc.CountSigners()) {
return {};
return ret;
}
}
}
@ -133,14 +137,15 @@ PeerMsgRet CQuorumBlockProcessor::ProcessMessage(const CNode& peer, std::string_
LogPrint(BCLog::LLMQ, "CQuorumBlockProcessor::%s -- commitment for quorum %s:%d is not valid quorumIndex[%d] nversion[%d], peer=%d\n",
__func__, qc.quorumHash.ToString(),
ToUnderlying(qc.llmqType), qc.quorumIndex, qc.nVersion, peer.GetId());
return tl::unexpected{100};
ret.m_error = MisbehavingError{100};
return ret;
}
LogPrint(BCLog::LLMQ, "CQuorumBlockProcessor::%s -- received commitment for quorum %s:%d, validMembers=%d, signers=%d, peer=%d\n", __func__,
qc.quorumHash.ToString(), ToUnderlying(qc.llmqType), qc.CountValidMembers(), qc.CountSigners(), peer.GetId());
AddMineableCommitmentAndRelay(qc);
return {};
ret.m_inventory = AddMineableCommitment(qc);
return ret;
}
bool CQuorumBlockProcessor::ProcessBlock(const CBlock& block, gsl::not_null<const CBlockIndex*> pindex, BlockValidationState& state, bool fJustCheck, bool fBLSChecks)
@ -637,7 +642,7 @@ bool CQuorumBlockProcessor::HasMineableCommitment(const uint256& hash) const
return minableCommitments.count(hash) != 0;
}
std::optional<uint256> CQuorumBlockProcessor::AddMineableCommitment(const CFinalCommitment& fqc)
std::optional<CInv> CQuorumBlockProcessor::AddMineableCommitment(const CFinalCommitment& fqc)
{
const uint256 commitmentHash = ::SerializeHash(fqc);
@ -663,17 +668,7 @@ std::optional<uint256> CQuorumBlockProcessor::AddMineableCommitment(const CFinal
return false;
}();
return relay ? std::make_optional(commitmentHash) : std::nullopt;
}
void CQuorumBlockProcessor::AddMineableCommitmentAndRelay(const CFinalCommitment& fqc)
{
const auto commitmentHashOpt = AddMineableCommitment(fqc);
// We only relay the new commitment if it's new or better then the old one
if (commitmentHashOpt) {
CInv inv(MSG_QUORUM_FINAL_COMMITMENT, *commitmentHashOpt);
Assert(m_peerman)->RelayInv(inv);
}
return relay ? std::make_optional(CInv{MSG_QUORUM_FINAL_COMMITMENT, commitmentHash}) : std::nullopt;
}
bool CQuorumBlockProcessor::GetMineableCommitmentByHash(const uint256& commitmentHash, llmq::CFinalCommitment& ret) const

View File

@ -26,7 +26,6 @@ class CDataStream;
class CDeterministicMNManager;
class CEvoDB;
class CNode;
class PeerManager;
extern RecursiveMutex cs_main;
@ -42,7 +41,6 @@ private:
CChainState& m_chainstate;
CDeterministicMNManager& m_dmnman;
CEvoDB& m_evoDb;
const std::unique_ptr<PeerManager>& m_peerman;
mutable Mutex minableCommitmentsCs;
std::map<std::pair<Consensus::LLMQType, uint256>, uint256> minableCommitmentsByQuorum GUARDED_BY(minableCommitmentsCs);
@ -51,15 +49,15 @@ private:
mutable std::map<Consensus::LLMQType, unordered_lru_cache<uint256, bool, StaticSaltedHasher>> mapHasMinedCommitmentCache GUARDED_BY(minableCommitmentsCs);
public:
explicit CQuorumBlockProcessor(CChainState& chainstate, CDeterministicMNManager& dmnman, CEvoDB& evoDb,
const std::unique_ptr<PeerManager>& peerman);
explicit CQuorumBlockProcessor(CChainState& chainstate, CDeterministicMNManager& dmnman, CEvoDB& evoDb);
PeerMsgRet ProcessMessage(const CNode& peer, std::string_view msg_type, CDataStream& vRecv);
MessageProcessingResult ProcessMessage(const CNode& peer, std::string_view msg_type, CDataStream& vRecv);
bool ProcessBlock(const CBlock& block, gsl::not_null<const CBlockIndex*> pindex, BlockValidationState& state, bool fJustCheck, bool fBLSChecks) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
bool UndoBlock(const CBlock& block, gsl::not_null<const CBlockIndex*> pindex) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
void AddMineableCommitmentAndRelay(const CFinalCommitment& fqc);
//! it returns hash of commitment if it should be relay, otherwise nullopt
std::optional<CInv> AddMineableCommitment(const CFinalCommitment& fqc);
bool HasMineableCommitment(const uint256& hash) const;
bool GetMineableCommitmentByHash(const uint256& commitmentHash, CFinalCommitment& ret) const;
std::optional<std::vector<CFinalCommitment>> GetMineableCommitments(const Consensus::LLMQParams& llmqParams, int nHeight) const EXCLUSIVE_LOCKS_REQUIRED(cs_main);
@ -74,8 +72,6 @@ public:
std::vector<std::pair<int, const CBlockIndex*>> GetLastMinedCommitmentsPerQuorumIndexUntilBlock(Consensus::LLMQType llmqType, const CBlockIndex* pindex, size_t cycle) const;
std::optional<const CBlockIndex*> GetLastMinedCommitmentsByQuorumIndexUntilBlock(Consensus::LLMQType llmqType, const CBlockIndex* pindex, int quorumIndex, size_t cycle) const;
private:
//! it returns hash of commitment if it should be relay, otherwise nullopt
std::optional<uint256> AddMineableCommitment(const CFinalCommitment& fqc);
static bool GetCommitmentsFromBlock(const CBlock& block, gsl::not_null<const CBlockIndex*> pindex, std::multimap<Consensus::LLMQType, CFinalCommitment>& ret, BlockValidationState& state) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
bool ProcessCommitment(int nHeight, const uint256& blockHash, const CFinalCommitment& qc, BlockValidationState& state, bool fJustCheck, bool fBLSChecks) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
static bool IsMiningPhase(const Consensus::LLMQParams& llmqParams, const CChain& active_chain, int nHeight) EXCLUSIVE_LOCKS_REQUIRED(cs_main);

View File

@ -25,7 +25,7 @@ LLMQContext::LLMQContext(CChainState& chainstate, CConnman& connman, CDeterminis
is_masternode{mn_activeman != nullptr},
bls_worker{std::make_shared<CBLSWorker>()},
dkg_debugman{std::make_unique<llmq::CDKGDebugManager>()},
quorum_block_processor{std::make_unique<llmq::CQuorumBlockProcessor>(chainstate, dmnman, evo_db, peerman)},
quorum_block_processor{std::make_unique<llmq::CQuorumBlockProcessor>(chainstate, dmnman, evo_db)},
qdkgsman{std::make_unique<llmq::CDKGSessionManager>(*bls_worker, chainstate, connman, dmnman, *dkg_debugman,
mn_metaman, *quorum_block_processor, mn_activeman, sporkman,
peerman, unit_tests, wipe)},

View File

@ -612,7 +612,9 @@ void CDKGSessionHandler::HandleDKGRound()
auto finalCommitments = curSession->FinalizeCommitments();
for (const auto& fqc : finalCommitments) {
quorumBlockProcessor.AddMineableCommitmentAndRelay(fqc);
if (auto inv_opt = quorumBlockProcessor.AddMineableCommitment(fqc); inv_opt.has_value()) {
Assert(m_peerman.get())->RelayInv(inv_opt.value());
}
}
}

View File

@ -3310,13 +3310,16 @@ void PeerManagerImpl::ProcessPeerMsgRet(const PeerMsgRet& ret, CNode& pfrom)
if (!ret) Misbehaving(pfrom.GetId(), ret.error().score, ret.error().message);
}
void PeerManagerImpl::PostProcessMessage(MessageProcessingResult&& ret, NodeId node)
void PeerManagerImpl::PostProcessMessage(MessageProcessingResult&& result, NodeId node)
{
if (ret.m_error) {
Misbehaving(node, ret.m_error->score, ret.m_error->message);
if (result.m_error) {
Misbehaving(node, result.m_error->score, result.m_error->message);
}
if (ret.m_inventory) {
RelayInv(ret.m_inventory.value(), MIN_PEER_PROTO_VERSION);
if (result.m_to_erase) {
WITH_LOCK(cs_main, EraseObjectRequest(node, result.m_to_erase.value()));
}
if (result.m_inventory) {
RelayInv(result.m_inventory.value(), MIN_PEER_PROTO_VERSION);
}
}
@ -4994,7 +4997,7 @@ void PeerManagerImpl::ProcessMessage(
m_mn_sync.ProcessMessage(pfrom, msg_type, vRecv);
ProcessPeerMsgRet(m_govman.ProcessMessage(pfrom, m_connman, *this, msg_type, vRecv), pfrom);
ProcessPeerMsgRet(CMNAuth::ProcessMessage(pfrom, peer->m_their_services, m_connman, m_mn_metaman, m_mn_activeman, m_chainman.ActiveChain(), m_mn_sync, m_dmnman->GetListAtChainTip(), msg_type, vRecv), pfrom);
ProcessPeerMsgRet(m_llmq_ctx->quorum_block_processor->ProcessMessage(pfrom, msg_type, vRecv), pfrom);
PostProcessMessage(m_llmq_ctx->quorum_block_processor->ProcessMessage(pfrom, msg_type, vRecv), pfrom.GetId());
ProcessPeerMsgRet(m_llmq_ctx->qdkgsman->ProcessMessage(pfrom, this, is_masternode, msg_type, vRecv), pfrom);
ProcessPeerMsgRet(m_llmq_ctx->qman->ProcessMessage(pfrom, msg_type, vRecv), pfrom);
m_llmq_ctx->shareman->ProcessMessage(pfrom, m_sporkman, msg_type, vRecv);

View File

@ -585,6 +585,7 @@ struct MessageProcessingResult
{
std::optional<MisbehavingError> m_error;
std::optional<CInv> m_inventory;
std::optional<CInv> m_to_erase;
MessageProcessingResult() = default;
MessageProcessingResult(MisbehavingError error) :

View File

@ -89,13 +89,9 @@ EXPECTED_CIRCULAR_DEPENDENCIES=(
"coinjoin/context -> coinjoin/server -> net_processing -> coinjoin/context"
"coinjoin/server -> net_processing -> coinjoin/server"
"llmq/context -> llmq/ehf_signals -> net_processing -> llmq/context"
"llmq/blockprocessor -> net_processing -> llmq/blockprocessor"
"llmq/chainlocks -> llmq/instantsend -> net_processing -> llmq/chainlocks"
"net_processing -> spork -> net_processing"
"evo/simplifiedmns -> llmq/blockprocessor -> net_processing -> evo/simplifiedmns"
"governance/governance -> net_processing -> governance/governance"
"llmq/blockprocessor -> net_processing -> llmq/context -> llmq/blockprocessor"
"llmq/blockprocessor -> net_processing -> llmq/quorums -> llmq/blockprocessor"
"rpc/blockchain -> rpc/server -> rpc/blockchain"
)