refactor: misc refactoring in signing_shares.* (#4522)

* refactor: introduce PendingSignatureData

* refactor: explicitly capture in lambdas

* refactor: use structured bindings

* refactor: use more const

* refactor: remove unused include header

* refactor: use if-init

* refactor: add nodiscard

* refactor: initialize llmqType

* refactor: add override

* refactor: remove redundant specifiers

* refactor: prevent shadowing

* refactor: use try_emplace where possible

* refactor: use more accurate name
This commit is contained in:
PastaPastaPasta 2021-10-15 14:38:56 -04:00 committed by GitHub
parent ea8d4c26b6
commit 1d457affaa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 135 additions and 162 deletions

View File

@ -15,7 +15,6 @@
#include <net_processing.h>
#include <netmessagemaker.h>
#include <spork.h>
#include <validation.h>
#include <cxxtimer.hpp>
@ -174,8 +173,7 @@ bool CSigSharesNodeState::GetSessionInfoByRecvId(uint32_t sessionId, SessionInfo
void CSigSharesNodeState::RemoveSession(const uint256& signHash)
{
auto it = sessions.find(signHash);
if (it != sessions.end()) {
if (const auto it = sessions.find(signHash); it != sessions.end()) {
sessionByRecvId.erase(it->second.recvSessionId);
sessions.erase(it);
}
@ -231,7 +229,7 @@ void CSigSharesManager::InterruptWorkerThread()
workInterrupt();
}
void CSigSharesManager::ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv)
void CSigSharesManager::ProcessMessage(const CNode* pfrom, const std::string& strCommand, CDataStream& vRecv)
{
// non-masternodes are not interested in sigshares
if (!fMasternodeMode || WITH_LOCK(activeMasternodeInfoCs, return activeMasternodeInfo.proTxHash.IsNull())) {
@ -239,16 +237,16 @@ void CSigSharesManager::ProcessMessage(CNode* pfrom, const std::string& strComma
}
if (sporkManager.IsSporkActive(SPORK_21_QUORUM_ALL_CONNECTED) && strCommand == NetMsgType::QSIGSHARE) {
std::vector<CSigShare> sigShares;
vRecv >> sigShares;
std::vector<CSigShare> receivedSigShares;
vRecv >> receivedSigShares;
if (sigShares.size() > MAX_MSGS_SIG_SHARES) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many sigs in QSIGSHARE message. cnt=%d, max=%d, node=%d\n", __func__, sigShares.size(), MAX_MSGS_SIG_SHARES, pfrom->GetId());
if (receivedSigShares.size() > MAX_MSGS_SIG_SHARES) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many sigs in QSIGSHARE message. cnt=%d, max=%d, node=%d\n", __func__, receivedSigShares.size(), MAX_MSGS_SIG_SHARES, pfrom->GetId());
BanNode(pfrom->GetId());
return;
}
for (const auto& sigShare : sigShares) {
for (const auto& sigShare : receivedSigShares) {
ProcessMessageSigShare(pfrom->GetId(), sigShare);
}
}
@ -316,7 +314,7 @@ void CSigSharesManager::ProcessMessage(CNode* pfrom, const std::string& strComma
}
}
bool CSigSharesManager::ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& ann)
bool CSigSharesManager::ProcessMessageSigSesAnn(const CNode* pfrom, const CSigSesAnn& ann)
{
auto llmqType = ann.llmqType;
if (!Params().GetConsensus().llmqs.count(llmqType)) {
@ -343,7 +341,7 @@ bool CSigSharesManager::ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn&
nodeState.sessionByRecvId.erase(ann.sessionId);
session.recvSessionId = ann.sessionId;
session.quorum = quorum;
nodeState.sessionByRecvId.emplace(ann.sessionId, &session);
nodeState.sessionByRecvId.try_emplace(ann.sessionId, &session);
return true;
}
@ -353,7 +351,7 @@ bool CSigSharesManager::VerifySigSharesInv(Consensus::LLMQType llmqType, const C
return inv.inv.size() == GetLLMQParams(llmqType).size;
}
bool CSigSharesManager::ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv)
bool CSigSharesManager::ProcessMessageSigSharesInv(const CNode* pfrom, const CSigSharesInv& inv)
{
CSigSharesNodeState::SessionInfo sessionInfo;
if (!GetSessionInfoByRecvId(pfrom->GetId(), inv.sessionId, sessionInfo)) {
@ -390,7 +388,7 @@ bool CSigSharesManager::ProcessMessageSigSharesInv(CNode* pfrom, const CSigShare
return true;
}
bool CSigSharesManager::ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv)
bool CSigSharesManager::ProcessMessageGetSigShares(const CNode* pfrom, const CSigSharesInv& inv)
{
CSigSharesNodeState::SessionInfo sessionInfo;
if (!GetSessionInfoByRecvId(pfrom->GetId(), inv.sessionId, sessionInfo)) {
@ -420,20 +418,19 @@ bool CSigSharesManager::ProcessMessageGetSigShares(CNode* pfrom, const CSigShare
return true;
}
bool CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares)
bool CSigSharesManager::ProcessMessageBatchedSigShares(const CNode* pfrom, const CBatchedSigShares& batchedSigShares)
{
CSigSharesNodeState::SessionInfo sessionInfo;
if (!GetSessionInfoByRecvId(pfrom->GetId(), batchedSigShares.sessionId, sessionInfo)) {
return true;
}
bool ban = false;
if (!PreVerifyBatchedSigShares(sessionInfo, batchedSigShares, ban)) {
if (bool ban{false}; !PreVerifyBatchedSigShares(sessionInfo, batchedSigShares, ban)) {
return !ban;
}
std::vector<CSigShare> sigShares;
sigShares.reserve(batchedSigShares.sigShares.size());
std::vector<CSigShare> sigSharesToProcess;
sigSharesToProcess.reserve(batchedSigShares.sigShares.size());
{
LOCK(cs);
@ -447,7 +444,7 @@ bool CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatc
// It's important to only skip seen *valid* sig shares here. If a node sends us a
// batch of mostly valid sig shares with a single invalid one and thus batched
// verification fails, we'd skip the valid ones in the future if received from other nodes
if (this->sigShares.Has(sigShare.GetKey())) {
if (sigShares.Has(sigShare.GetKey())) {
continue;
}
@ -456,20 +453,20 @@ bool CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatc
continue;
}
sigShares.emplace_back(sigShare);
sigSharesToProcess.emplace_back(sigShare);
}
}
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- signHash=%s, shares=%d, new=%d, inv={%s}, node=%d\n", __func__,
sessionInfo.signHash.ToString(), batchedSigShares.sigShares.size(), sigShares.size(), batchedSigShares.ToInvString(), pfrom->GetId());
sessionInfo.signHash.ToString(), batchedSigShares.sigShares.size(), sigSharesToProcess.size(), batchedSigShares.ToInvString(), pfrom->GetId());
if (sigShares.empty()) {
if (sigSharesToProcess.empty()) {
return true;
}
LOCK(cs);
auto& nodeState = nodeStates[pfrom->GetId()];
for (const auto& s : sigShares) {
for (const auto& s : sigSharesToProcess) {
nodeState.pendingIncomingSigShares.Add(s.GetKey(), s);
}
return true;
@ -547,8 +544,7 @@ bool CSigSharesManager::PreVerifyBatchedSigShares(const CSigSharesNodeState::Ses
std::unordered_set<uint16_t> dupMembers;
for (const auto& sigShare : batchedSigShares.sigShares) {
auto quorumMember = sigShare.first;
for (const auto& [quorumMember, _] : batchedSigShares.sigShares) {
if (!dupMembers.emplace(quorumMember).second) {
retBan = true;
return false;
@ -595,8 +591,7 @@ void CSigSharesManager::CollectPendingSigSharesToVerify(
auto& sigShare = *ns.pendingIncomingSigShares.GetFirst();
AssertLockHeld(cs);
bool alreadyHave = this->sigShares.Has(sigShare.GetKey());
if (!alreadyHave) {
if (const bool alreadyHave = this->sigShares.Has(sigShare.GetKey()); !alreadyHave) {
uniqueSignHashes.emplace(nodeId, sigShare.GetSignHash());
retSigShares[nodeId].emplace_back(sigShare);
}
@ -611,8 +606,8 @@ void CSigSharesManager::CollectPendingSigSharesToVerify(
// For the convenience of the caller, also build a map of quorumHash -> quorum
for (auto& p : retSigShares) {
for (auto& sigShare : p.second) {
for (const auto& [_, vecSigShares] : retSigShares) {
for (const auto& sigShare : vecSigShares) {
auto llmqType = sigShare.llmqType;
auto k = std::make_pair(llmqType, sigShare.quorumHash);
@ -622,7 +617,7 @@ void CSigSharesManager::CollectPendingSigSharesToVerify(
CQuorumCPtr quorum = quorumManager->GetQuorum(llmqType, sigShare.quorumHash);
assert(quorum != nullptr);
retQuorums.emplace(k, quorum);
retQuorums.try_emplace(k, quorum);
}
}
}
@ -644,11 +639,8 @@ bool CSigSharesManager::ProcessPendingSigShares(const CConnman& connman)
cxxtimer::Timer prepareTimer(true);
size_t verifyCount = 0;
for (auto& p : sigSharesByNodes) {
auto nodeId = p.first;
auto& v = p.second;
for (auto& sigShare : v) {
for (const auto& [nodeId, v] : sigSharesByNodes) {
for (const auto& sigShare : v) {
if (quorumSigningManager->HasRecoveredSigForId(sigShare.llmqType, sigShare.id)) {
continue;
}
@ -683,10 +675,7 @@ bool CSigSharesManager::ProcessPendingSigShares(const CConnman& connman)
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- verified sig shares. count=%d, pt=%d, vt=%d, nodes=%d\n", __func__, verifyCount, prepareTimer.count(), verifyTimer.count(), sigSharesByNodes.size());
for (const auto& p : sigSharesByNodes) {
auto nodeId = p.first;
const auto& v = p.second;
for (const auto& [nodeId, v] : sigSharesByNodes) {
if (batchVerifier.badSources.count(nodeId)) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- invalid sig shares from other node, banning peer=%d\n",
__func__, nodeId);
@ -702,19 +691,19 @@ bool CSigSharesManager::ProcessPendingSigShares(const CConnman& connman)
}
// It's ensured that no duplicates are passed to this method
void CSigSharesManager::ProcessPendingSigShares(const std::vector<CSigShare>& sigShares,
void CSigSharesManager::ProcessPendingSigShares(const std::vector<CSigShare>& sigSharesToProcess,
const std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& quorums,
const CConnman& connman)
{
cxxtimer::Timer t(true);
for (auto& sigShare : sigShares) {
for (auto& sigShare : sigSharesToProcess) {
auto quorumKey = std::make_pair(sigShare.llmqType, sigShare.quorumHash);
ProcessSigShare(sigShare, connman, quorums.at(quorumKey));
}
t.stop();
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- processed sigShare batch. shares=%d, time=%ds\n", __func__,
sigShares.size(), t.count());
sigSharesToProcess.size(), t.count());
}
// sig shares are already verified when entering this method
@ -782,14 +771,14 @@ void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256&
LOCK(cs);
auto signHash = CLLMQUtils::BuildSignHash(quorum->params.type, quorum->qc->quorumHash, id, msgHash);
auto sigShares = this->sigShares.GetAllForSignHash(signHash);
if (!sigShares) {
auto sigSharesForSignHash = sigShares.GetAllForSignHash(signHash);
if (!sigSharesForSignHash) {
return;
}
sigSharesForRecovery.reserve((size_t) quorum->params.threshold);
idsForRecovery.reserve((size_t) quorum->params.threshold);
for (auto it = sigShares->begin(); it != sigShares->end() && sigSharesForRecovery.size() < quorum->params.threshold; ++it) {
for (auto it = sigSharesForSignHash->begin(); it != sigSharesForSignHash->end() && sigSharesForRecovery.size() < quorum->params.threshold; ++it) {
auto& sigShare = it->second;
sigSharesForRecovery.emplace_back(sigShare.sigShare.Get());
idsForRecovery.emplace_back(quorum->members[sigShare.quorumMember]->proTxHash);
@ -863,11 +852,11 @@ void CSigSharesManager::CollectSigSharesToRequest(std::unordered_map<NodeId, std
// avoid requesting from same nodes all the time
std::vector<NodeId> shuffledNodeIds;
shuffledNodeIds.reserve(nodeStates.size());
for (const auto& p : nodeStates) {
if (p.second.sessions.empty()) {
for (const auto& [nodeId, nodeState] : nodeStates) {
if (nodeState.sessions.empty()) {
continue;
}
shuffledNodeIds.emplace_back(p.first);
shuffledNodeIds.emplace_back(nodeId);
}
Shuffle(shuffledNodeIds.begin(), shuffledNodeIds.end(), rnd);
@ -878,7 +867,7 @@ void CSigSharesManager::CollectSigSharesToRequest(std::unordered_map<NodeId, std
continue;
}
nodeState.requestedSigShares.EraseIf([&](const SigShareKey& k, int64_t t) {
nodeState.requestedSigShares.EraseIf([&now, &nodeId](const SigShareKey& k, int64_t t) {
if (now - t >= SIG_SHARE_REQUEST_TIMEOUT) {
// timeout while waiting for this one, so retry it with another node
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::CollectSigSharesToRequest -- timeout while waiting for %s-%d, node=%d\n",
@ -890,10 +879,7 @@ void CSigSharesManager::CollectSigSharesToRequest(std::unordered_map<NodeId, std
decltype(sigSharesToRequest.begin()->second)* invMap = nullptr;
for (auto& p2 : nodeState.sessions) {
auto& signHash = p2.first;
auto& session = p2.second;
for (auto& [signHash, session] : nodeState.sessions) {
if (CLLMQUtils::IsAllMembersConnectedEnabled(session.llmqType)) {
continue;
}
@ -916,8 +902,7 @@ void CSigSharesManager::CollectSigSharesToRequest(std::unordered_map<NodeId, std
// too many pending requests for this node
break;
}
auto p = sigSharesRequested.Get(k);
if (p) {
if (const auto p = sigSharesRequested.Get(k)) {
if (now - p->second >= SIG_SHARE_REQUEST_TIMEOUT && nodeId != p->first) {
// other node timed out, re-request from this node
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- other node timeout while waiting for %s-%d, re-request from=%d, node=%d\n", __func__,
@ -956,20 +941,14 @@ void CSigSharesManager::CollectSigSharesToSend(std::unordered_map<NodeId, std::u
{
AssertLockHeld(cs);
for (auto& p : nodeStates) {
auto nodeId = p.first;
auto& nodeState = p.second;
for (auto& [nodeId, nodeState] : nodeStates) {
if (nodeState.banned) {
continue;
}
decltype(sigSharesToSend.begin()->second)* sigSharesToSend2 = nullptr;
for (auto& p2 : nodeState.sessions) {
auto& signHash = p2.first;
auto& session = p2.second;
for (auto& [signHash, session] : nodeState.sessions) {
if (CLLMQUtils::IsAllMembersConnectedEnabled(session.llmqType)) {
continue;
}
@ -1002,7 +981,7 @@ void CSigSharesManager::CollectSigSharesToSend(std::unordered_map<NodeId, std::u
// only create the map if we actually add a batched sig
sigSharesToSend2 = &sigSharesToSend[nodeId];
}
(*sigSharesToSend2).emplace(signHash, std::move(batchedSigShares));
(*sigSharesToSend2).try_emplace(signHash, std::move(batchedSigShares));
}
}
}
@ -1018,29 +997,29 @@ void CSigSharesManager::CollectSigSharesToSendConcentrated(std::unordered_map<No
if (verifiedProRegTxHash.IsNull()) {
continue;
}
proTxToNode.emplace(verifiedProRegTxHash, pnode);
proTxToNode.try_emplace(verifiedProRegTxHash, pnode);
}
auto curTime = GetTime<std::chrono::milliseconds>().count();
for (auto& p : signedSessions) {
if (!CLLMQUtils::IsAllMembersConnectedEnabled(p.second.quorum->params.type)) {
for (auto& [_, signedSession] : signedSessions) {
if (!CLLMQUtils::IsAllMembersConnectedEnabled(signedSession.quorum->params.type)) {
continue;
}
if (p.second.attempt > p.second.quorum->params.recoveryMembers) {
if (signedSession.attempt > signedSession.quorum->params.recoveryMembers) {
continue;
}
if (curTime >= p.second.nextAttemptTime) {
int64_t waitTime = exp2(p.second.attempt) * EXP_SEND_FOR_RECOVERY_TIMEOUT;
if (curTime >= signedSession.nextAttemptTime) {
int64_t waitTime = exp2(signedSession.attempt) * EXP_SEND_FOR_RECOVERY_TIMEOUT;
waitTime = std::min(MAX_SEND_FOR_RECOVERY_TIMEOUT, waitTime);
p.second.nextAttemptTime = curTime + waitTime;
auto dmn = SelectMemberForRecovery(p.second.quorum, p.second.sigShare.id, p.second.attempt);
p.second.attempt++;
signedSession.nextAttemptTime = curTime + waitTime;
auto dmn = SelectMemberForRecovery(signedSession.quorum, signedSession.sigShare.id, signedSession.attempt);
signedSession.attempt++;
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- signHash=%s, sending to %s, attempt=%d\n", __func__,
p.second.sigShare.GetSignHash().ToString(), dmn->proTxHash.ToString(), p.second.attempt);
signedSession.sigShare.GetSignHash().ToString(), dmn->proTxHash.ToString(), signedSession.attempt);
auto it = proTxToNode.find(dmn->proTxHash);
if (it == proTxToNode.end()) {
@ -1048,7 +1027,7 @@ void CSigSharesManager::CollectSigSharesToSendConcentrated(std::unordered_map<No
}
auto& m = sigSharesToSend[it->second->GetId()];
m.emplace_back(p.second.sigShare);
m.emplace_back(signedSession.sigShare);
}
}
}
@ -1059,7 +1038,7 @@ void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_map<NodeId, st
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, std::unordered_set<NodeId>, StaticSaltedHasher> quorumNodesMap;
sigSharesQueuedToAnnounce.ForEach([&](const SigShareKey& sigShareKey, bool) {
sigSharesQueuedToAnnounce.ForEach([this, &quorumNodesMap, &sigSharesToAnnounce](const SigShareKey& sigShareKey, bool) {
AssertLockHeld(cs);
const auto& signHash = sigShareKey.first;
auto quorumMember = sigShareKey.second;
@ -1142,19 +1121,19 @@ bool CSigSharesManager::SendMessages()
CollectSigSharesToAnnounce(sigSharesToAnnounce);
CollectSigSharesToSendConcentrated(sigSharesToSend, vNodesCopy);
for (auto& p : sigSharesToRequest) {
for (auto& p2 : p.second) {
p2.second.sessionId = addSigSesAnnIfNeeded(p.first, p2.first);
for (auto& [nodeId, sigShareMap] : sigSharesToRequest) {
for (auto& [hash, sigShareInv] : sigShareMap) {
sigShareInv.sessionId = addSigSesAnnIfNeeded(nodeId, hash);
}
}
for (auto& p : sigShareBatchesToSend) {
for (auto& p2 : p.second) {
p2.second.sessionId = addSigSesAnnIfNeeded(p.first, p2.first);
for (auto& [nodeId, sigShareBatchesMap] : sigShareBatchesToSend) {
for (auto& [hash, sigShareBatch] : sigShareBatchesMap) {
sigShareBatch.sessionId = addSigSesAnnIfNeeded(nodeId, hash);
}
}
for (auto& p : sigSharesToAnnounce) {
for (auto& p2 : p.second) {
p2.second.sessionId = addSigSesAnnIfNeeded(p.first, p2.first);
for (auto& [nodeId, sigShareMap] : sigSharesToAnnounce) {
for (auto& [hash, sigShareInv] : sigShareMap) {
sigShareInv.sessionId = addSigSesAnnIfNeeded(nodeId, hash);
}
}
}
@ -1164,8 +1143,7 @@ bool CSigSharesManager::SendMessages()
for (auto& pnode : vNodesCopy) {
CNetMsgMaker msgMaker(pnode->GetSendVersion());
auto it1 = sigSessionAnnouncements.find(pnode->GetId());
if (it1 != sigSessionAnnouncements.end()) {
if (const auto it1 = sigSessionAnnouncements.find(pnode->GetId()); it1 != sigSessionAnnouncements.end()) {
std::vector<CSigSesAnn> msgs;
msgs.reserve(it1->second.size());
for (auto& sigSesAnn : it1->second) {
@ -1184,14 +1162,13 @@ bool CSigSharesManager::SendMessages()
}
}
auto it = sigSharesToRequest.find(pnode->GetId());
if (it != sigSharesToRequest.end()) {
if (const auto it = sigSharesToRequest.find(pnode->GetId()); it != sigSharesToRequest.end()) {
std::vector<CSigSharesInv> msgs;
for (auto& p : it->second) {
assert(p.second.CountSet() != 0);
for (const auto& [signHash, inv] : it->second) {
assert(inv.CountSet() != 0);
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::SendMessages -- QGETSIGSHARES signHash=%s, inv={%s}, node=%d\n",
p.first.ToString(), p.second.ToString(), pnode->GetId());
msgs.emplace_back(std::move(p.second));
signHash.ToString(), inv.ToString(), pnode->GetId());
msgs.emplace_back(inv);
if (msgs.size() == MAX_MSGS_CNT_QGETSIGSHARES) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs));
msgs.clear();
@ -1204,22 +1181,21 @@ bool CSigSharesManager::SendMessages()
}
}
auto jt = sigShareBatchesToSend.find(pnode->GetId());
if (jt != sigShareBatchesToSend.end()) {
if (const auto jt = sigShareBatchesToSend.find(pnode->GetId()); jt != sigShareBatchesToSend.end()) {
size_t totalSigsCount = 0;
std::vector<CBatchedSigShares> msgs;
for (auto& p : jt->second) {
assert(!p.second.sigShares.empty());
for (const auto& [signHash, inv] : jt->second) {
assert(!inv.sigShares.empty());
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::SendMessages -- QBSIGSHARES signHash=%s, inv={%s}, node=%d\n",
p.first.ToString(), p.second.ToInvString(), pnode->GetId());
if (totalSigsCount + p.second.sigShares.size() > MAX_MSGS_TOTAL_BATCHED_SIGS) {
signHash.ToString(), inv.ToInvString(), pnode->GetId());
if (totalSigsCount + inv.sigShares.size() > MAX_MSGS_TOTAL_BATCHED_SIGS) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, msgs));
msgs.clear();
totalSigsCount = 0;
didSend = true;
}
totalSigsCount += p.second.sigShares.size();
msgs.emplace_back(std::move(p.second));
totalSigsCount += inv.sigShares.size();
msgs.emplace_back(inv);
}
if (!msgs.empty()) {
@ -1228,14 +1204,13 @@ bool CSigSharesManager::SendMessages()
}
}
auto kt = sigSharesToAnnounce.find(pnode->GetId());
if (kt != sigSharesToAnnounce.end()) {
if (const auto kt = sigSharesToAnnounce.find(pnode->GetId()); kt != sigSharesToAnnounce.end()) {
std::vector<CSigSharesInv> msgs;
for (auto& p : kt->second) {
assert(p.second.CountSet() != 0);
for (const auto& [signHash, inv] : kt->second) {
assert(inv.CountSet() != 0);
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::SendMessages -- QSIGSHARESINV signHash=%s, inv={%s}, node=%d\n",
p.first.ToString(), p.second.ToString(), pnode->GetId());
msgs.emplace_back(std::move(p.second));
signHash.ToString(), inv.ToString(), pnode->GetId());
msgs.emplace_back(inv);
if (msgs.size() == MAX_MSGS_CNT_QSIGSHARESINV) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs));
msgs.clear();
@ -1283,14 +1258,14 @@ bool CSigSharesManager::GetSessionInfoByRecvId(NodeId nodeId, uint32_t sessionId
CSigShare CSigSharesManager::RebuildSigShare(const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, size_t idx)
{
assert(idx < batchedSigShares.sigShares.size());
const auto& s = batchedSigShares.sigShares[idx];
const auto& [member, sig] = batchedSigShares.sigShares[idx];
CSigShare sigShare;
sigShare.llmqType = session.llmqType;
sigShare.quorumHash = session.quorumHash;
sigShare.quorumMember = s.first;
sigShare.quorumMember = member;
sigShare.id = session.id;
sigShare.msgHash = session.msgHash;
sigShare.sigShare = s.second;
sigShare.sigShare = sig;
sigShare.UpdateKey();
return sigShare;
}
@ -1310,8 +1285,8 @@ void CSigSharesManager::Cleanup()
{
LOCK(cs);
sigShares.ForEach([&](const SigShareKey& k, const CSigShare& sigShare) {
quorums.emplace(std::make_pair(sigShare.llmqType, sigShare.quorumHash), nullptr);
sigShares.ForEach([&quorums](const SigShareKey&, const CSigShare& sigShare) {
quorums.try_emplace(std::make_pair(sigShare.llmqType, sigShare.quorumHash), nullptr);
});
}
@ -1329,7 +1304,7 @@ void CSigSharesManager::Cleanup()
// Now delete sessions which are for inactive quorums
LOCK(cs);
std::unordered_set<uint256, StaticSaltedHasher> inactiveQuorumSessions;
sigShares.ForEach([&](const SigShareKey& k, const CSigShare& sigShare) {
sigShares.ForEach([&quorums, &inactiveQuorumSessions](const SigShareKey&, const CSigShare& sigShare) {
if (!quorums.count(std::make_pair(sigShare.llmqType, sigShare.quorumHash))) {
inactiveQuorumSessions.emplace(sigShare.GetSignHash());
}
@ -1344,7 +1319,7 @@ void CSigSharesManager::Cleanup()
// Remove sessions which were successfully recovered
std::unordered_set<uint256, StaticSaltedHasher> doneSessions;
sigShares.ForEach([&](const SigShareKey& k, const CSigShare& sigShare) {
sigShares.ForEach([&doneSessions](const SigShareKey&, const CSigShare& sigShare) {
if (doneSessions.count(sigShare.GetSignHash())) {
return;
}
@ -1358,18 +1333,14 @@ void CSigSharesManager::Cleanup()
// Remove sessions which timed out
std::unordered_set<uint256, StaticSaltedHasher> timeoutSessions;
for (const auto& p : timeSeenForSessions) {
auto& signHash = p.first;
int64_t lastSeenTime = p.second;
for (const auto& [signHash, lastSeenTime] : timeSeenForSessions) {
if (now - lastSeenTime >= SESSION_NEW_SHARES_TIMEOUT) {
timeoutSessions.emplace(signHash);
}
}
for (auto& signHash : timeoutSessions) {
size_t count = sigShares.CountForSignHash(signHash);
if (count > 0) {
if (const size_t count = sigShares.CountForSignHash(signHash); count > 0) {
auto m = sigShares.GetAllForSignHash(signHash);
assert(m);
@ -1377,8 +1348,7 @@ void CSigSharesManager::Cleanup()
std::string strMissingMembers;
if (LogAcceptCategory(BCLog::LLMQ_SIGS)) {
auto quorumIt = quorums.find(std::make_pair(oneSigShare.llmqType, oneSigShare.quorumHash));
if (quorumIt != quorums.end()) {
if (const auto quorumIt = quorums.find(std::make_pair(oneSigShare.llmqType, oneSigShare.quorumHash)); quorumIt != quorums.end()) {
const auto& quorum = quorumIt->second;
for (size_t i = 0; i < quorum->members.size(); i++) {
if (!m->count((uint16_t)i)) {
@ -1403,11 +1373,11 @@ void CSigSharesManager::Cleanup()
std::unordered_set<NodeId> nodeStatesToDelete;
{
LOCK(cs);
for (const auto& p : nodeStates) {
nodeStatesToDelete.emplace(p.first);
for (const auto& [nodeId, _] : nodeStates) {
nodeStatesToDelete.emplace(nodeId);
}
}
g_connman->ForEachNode([&](CNode* pnode) {
g_connman->ForEachNode([&nodeStatesToDelete](const CNode* pnode) {
nodeStatesToDelete.erase(pnode->GetId());
});
@ -1419,7 +1389,7 @@ void CSigSharesManager::Cleanup()
continue;
}
// remove global requested state to force a re-request from another node
it->second.requestedSigShares.ForEach([&](const SigShareKey& k, bool) {
it->second.requestedSigShares.ForEach([this](const SigShareKey& k, bool) {
AssertLockHeld(cs);
sigSharesRequested.Erase(k);
});
@ -1433,9 +1403,8 @@ void CSigSharesManager::RemoveSigSharesForSession(const uint256& signHash)
{
AssertLockHeld(cs);
for (auto& p : nodeStates) {
auto& ns = p.second;
ns.RemoveSession(signHash);
for (auto& [_, nodeState] : nodeStates) {
nodeState.RemoveSession(signHash);
}
sigSharesRequested.EraseAllForSignHash(signHash);
@ -1453,7 +1422,7 @@ void CSigSharesManager::RemoveBannedNodeStates()
for (auto it = nodeStates.begin(); it != nodeStates.end();) {
if (IsBanned(it->first)) {
// re-request sigshares from other nodes
it->second.requestedSigShares.ForEach([&](const SigShareKey& k, int64_t) {
it->second.requestedSigShares.ForEach([this](const SigShareKey& k, int64_t) {
AssertLockHeld(cs);
sigSharesRequested.Erase(k);
});
@ -1483,7 +1452,7 @@ void CSigSharesManager::BanNode(NodeId nodeId)
auto& nodeState = it->second;
// Whatever we requested from him, let's request it from someone else now
nodeState.requestedSigShares.ForEach([&](const SigShareKey& k, int64_t) {
nodeState.requestedSigShares.ForEach([this](const SigShareKey& k, int64_t) {
AssertLockHeld(cs);
sigSharesRequested.Erase(k);
});
@ -1534,15 +1503,14 @@ void CSigSharesManager::AsyncSign(const CQuorumCPtr& quorum, const uint256& id,
void CSigSharesManager::SignPendingSigShares()
{
std::vector<std::tuple<const CQuorumCPtr, uint256, uint256>> v;
std::vector<PendingSignatureData> v;
{
LOCK(cs);
v = std::move(pendingSigns);
}
for (auto& t : v) {
const CQuorumCPtr pQuorum = std::get<0>(t);
CSigShare sigShare = CreateSigShare(pQuorum, std::get<1>(t), std::get<2>(t));
for (const auto& [pQuorum, id, msgHash] : v) {
CSigShare sigShare = CreateSigShare(pQuorum, id, msgHash);
if (sigShare.sigShare.Get().IsValid()) {
@ -1613,15 +1581,13 @@ void CSigSharesManager::ForceReAnnouncement(const CQuorumCPtr& quorum, Consensus
LOCK(cs);
auto signHash = CLLMQUtils::BuildSignHash(llmqType, quorum->qc->quorumHash, id, msgHash);
auto sigs = sigShares.GetAllForSignHash(signHash);
if (sigs) {
for (auto& p : *sigs) {
if (const auto sigs = sigShares.GetAllForSignHash(signHash)) {
for (const auto& [quorumMemberIndex, _] : *sigs) {
// re-announce every sigshare to every node
sigSharesQueuedToAnnounce.Add(std::make_pair(signHash, p.first), true);
sigSharesQueuedToAnnounce.Add(std::make_pair(signHash, quorumMemberIndex), true);
}
}
for (auto& p : nodeStates) {
CSigSharesNodeState& nodeState = p.second;
for (auto& [_, nodeState] : nodeStates) {
auto session = nodeState.GetSessionBySignHash(signHash);
if (!session) {
continue;

View File

@ -15,6 +15,7 @@
#include <thread>
#include <unordered_map>
#include <utility>
class CEvoDB;
class CScheduler;
@ -64,7 +65,7 @@ class CSigSesAnn
{
public:
uint32_t sessionId{UNINITIALIZED_SESSION_ID};
Consensus::LLMQType llmqType;
Consensus::LLMQType llmqType{Consensus::LLMQType::LLMQ_NONE};
uint256 quorumHash;
uint256 id;
uint256 msgHash;
@ -74,7 +75,7 @@ public:
READWRITE(VARINT(obj.sessionId), obj.llmqType, obj.quorumHash, obj.id, obj.msgHash);
}
std::string ToString() const;
[[nodiscard]] std::string ToString() const;
};
class CSigSharesInv
@ -94,13 +95,13 @@ public:
}
void Init(size_t size);
bool IsSet(uint16_t quorumMember) const;
[[nodiscard]] bool IsSet(uint16_t quorumMember) const;
void Set(uint16_t quorumMember, bool v);
void SetAll(bool v);
void Merge(const CSigSharesInv& inv2);
size_t CountSet() const;
std::string ToString() const;
[[nodiscard]] size_t CountSet() const;
[[nodiscard]] std::string ToString() const;
};
// sent through the message QBSIGSHARES as a vector of multiple batches
@ -116,7 +117,7 @@ public:
READWRITE(VARINT(obj.sessionId), obj.sigShares);
}
std::string ToInvString() const;
[[nodiscard]] std::string ToInvString() const;
};
template<typename T>
@ -149,7 +150,7 @@ public:
internalMap.clear();
}
bool Has(const SigShareKey& k) const
[[nodiscard]] bool Has(const SigShareKey& k) const
{
auto it = internalMap.find(k.first);
if (it == internalMap.end()) {
@ -191,7 +192,7 @@ public:
return &internalMap.begin()->second.begin()->second;
}
size_t Size() const
[[nodiscard]] size_t Size() const
{
size_t s = 0;
for (auto& p : internalMap) {
@ -200,7 +201,7 @@ public:
return s;
}
size_t CountForSignHash(const uint256& signHash) const
[[nodiscard]] size_t CountForSignHash(const uint256& signHash) const
{
auto it = internalMap.find(signHash);
if (it == internalMap.end()) {
@ -209,7 +210,7 @@ public:
return it->second.size();
}
bool Empty() const
[[nodiscard]] bool Empty() const
{
return internalMap.empty();
}
@ -327,6 +328,7 @@ public:
class CSigSharesManager : public CRecoveredSigsListener
{
private:
static constexpr int64_t SESSION_NEW_SHARES_TIMEOUT{60};
static constexpr int64_t SIG_SHARE_REQUEST_TIMEOUT{5};
@ -341,7 +343,6 @@ class CSigSharesManager : public CRecoveredSigsListener
static constexpr int64_t MAX_SEND_FOR_RECOVERY_TIMEOUT{10000};
static constexpr size_t MAX_MSGS_SIG_SHARES{32};
private:
CCriticalSection cs;
std::thread workThread;
@ -357,7 +358,15 @@ private:
SigShareMap<std::pair<NodeId, int64_t>> sigSharesRequested GUARDED_BY(cs);
SigShareMap<bool> sigSharesQueuedToAnnounce GUARDED_BY(cs);
std::vector<std::tuple<const CQuorumCPtr, uint256, uint256>> pendingSigns GUARDED_BY(cs);
struct PendingSignatureData {
const CQuorumCPtr quorum;
const uint256 id;
const uint256 msgHash;
PendingSignatureData(CQuorumCPtr quorum, const uint256& id, const uint256& msgHash) : quorum(std::move(quorum)), id(id), msgHash(msgHash){}
};
std::vector<PendingSignatureData> pendingSigns GUARDED_BY(cs);
FastRandomContext rnd GUARDED_BY(cs);
@ -366,7 +375,7 @@ private:
public:
CSigSharesManager();
~CSigSharesManager();
~CSigSharesManager() override;
void StartWorkerThread();
void StopWorkerThread();
@ -374,8 +383,7 @@ public:
void UnregisterAsRecoveredSigsListener();
void InterruptWorkerThread();
public:
void ProcessMessage(CNode* pnode, const std::string& strCommand, CDataStream& vRecv);
void ProcessMessage(const CNode* pnode, const std::string& strCommand, CDataStream& vRecv);
void AsyncSign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash);
CSigShare CreateSigShare(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash) const;
@ -387,10 +395,10 @@ public:
private:
// all of these return false when the currently processed message should be aborted (as each message actually contains multiple messages)
bool ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& ann);
bool ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv);
bool ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv);
bool ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares);
bool ProcessMessageSigSesAnn(const CNode* pfrom, const CSigSesAnn& ann);
bool ProcessMessageSigSharesInv(const CNode* pfrom, const CSigSharesInv& inv);
bool ProcessMessageGetSigShares(const CNode* pfrom, const CSigSharesInv& inv);
bool ProcessMessageBatchedSigShares(const CNode* pfrom, const CBatchedSigShares& batchedSigShares);
void ProcessMessageSigShare(NodeId fromId, const CSigShare& sigShare);
static bool VerifySigSharesInv(Consensus::LLMQType llmqType, const CSigSharesInv& inv);
@ -401,14 +409,13 @@ private:
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& retQuorums);
bool ProcessPendingSigShares(const CConnman& connman);
void ProcessPendingSigShares(const std::vector<CSigShare>& sigShares,
void ProcessPendingSigShares(const std::vector<CSigShare>& sigSharesToProcess,
const std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& quorums,
const CConnman& connman);
void ProcessSigShare(const CSigShare& sigShare, const CConnman& connman, const CQuorumCPtr& quorum);
void TryRecoverSig(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash);
private:
bool GetSessionInfoByRecvId(NodeId nodeId, uint32_t sessionId, CSigSharesNodeState::SessionInfo& retInfo);
static CSigShare RebuildSigShare(const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, size_t idx);