Send/Receive multiple messages as part of one P2P message in CSigSharesManager (#2729)

* Return bool in ProcessMessageXXX methods to indicate misbehaviour

* Send/Receive multiple messages as part of one P2P message in CSigSharesManager

Many messages, especially QSIGSHARESINV and QGETSIGSHARES, are very small
by nature (5-14 bytes for a 50 members LLMQ). The message headers are
24 bytes, meaning that we produce a lot of overhead for these small messages.
This sums up quite a bit when thousands of signing sessions are happening
in parallel.

This commit changes all related P2P messages to send a vector of messages
instead of a single message.

* Remove bogus lines

Included these by accident

* Unify handling of BanNode in ProcessMessageXXX methods

* Remove bogus check for fMasternodeMode

* Properly use == instead of misleading >= in SendMessages

* Put "didSend = true" near PushMessage
This commit is contained in:
Alexander Block 2019-03-01 08:21:09 +01:00 committed by GitHub
parent d2573c43b6
commit 5c84cab0f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 138 additions and 49 deletions

View File

@ -229,34 +229,76 @@ void CSigSharesManager::ProcessMessage(CNode* pfrom, const std::string& strComma
}
if (strCommand == NetMsgType::QSIGSESANN) {
CSigSesAnn ann;
vRecv >> ann;
ProcessMessageSigSesAnn(pfrom, ann, connman);
std::vector<CSigSesAnn> msgs;
vRecv >> msgs;
if (msgs.size() > MAX_MSGS_CNT_QSIGSESANN) {
LogPrint("llmq", "CSigSharesManager::%s -- too many announcements in QSIGSESANN message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSESANN, pfrom->id);
BanNode(pfrom->id);
return;
}
for (auto& ann : msgs) {
if (!ProcessMessageSigSesAnn(pfrom, ann, connman)) {
BanNode(pfrom->id);
return;
}
}
} else if (strCommand == NetMsgType::QSIGSHARESINV) {
CSigSharesInv inv;
vRecv >> inv;
ProcessMessageSigSharesInv(pfrom, inv, connman);
std::vector<CSigSharesInv> msgs;
vRecv >> msgs;
if (msgs.size() > MAX_MSGS_CNT_QSIGSHARESINV) {
LogPrint("llmq", "CSigSharesManager::%s -- too many invs in QSIGSHARESINV message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSHARESINV, pfrom->id);
BanNode(pfrom->id);
return;
}
for (auto& inv : msgs) {
if (!ProcessMessageSigSharesInv(pfrom, inv, connman)) {
BanNode(pfrom->id);
return;
}
}
} else if (strCommand == NetMsgType::QGETSIGSHARES) {
CSigSharesInv inv;
vRecv >> inv;
ProcessMessageGetSigShares(pfrom, inv, connman);
std::vector<CSigSharesInv> msgs;
vRecv >> msgs;
if (msgs.size() > MAX_MSGS_CNT_QGETSIGSHARES) {
LogPrint("llmq", "CSigSharesManager::%s -- too many invs in QGETSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QGETSIGSHARES, pfrom->id);
BanNode(pfrom->id);
return;
}
for (auto& inv : msgs) {
if (!ProcessMessageGetSigShares(pfrom, inv, connman)) {
BanNode(pfrom->id);
return;
}
}
} else if (strCommand == NetMsgType::QBSIGSHARES) {
CBatchedSigShares batchedSigShares;
vRecv >> batchedSigShares;
ProcessMessageBatchedSigShares(pfrom, batchedSigShares, connman);
std::vector<CBatchedSigShares> msgs;
vRecv >> msgs;
size_t totalSigsCount = 0;
for (auto& bs : msgs) {
totalSigsCount += bs.sigShares.size();
}
if (totalSigsCount > MAX_MSGS_TOTAL_BATCHED_SIGS) {
LogPrint("llmq", "CSigSharesManager::%s -- too many sigs in QBSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_TOTAL_BATCHED_SIGS, pfrom->id);
BanNode(pfrom->id);
return;
}
for (auto& bs : msgs) {
if (!ProcessMessageBatchedSigShares(pfrom, bs, connman)) {
BanNode(pfrom->id);
return;
}
}
}
}
void CSigSharesManager::ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& ann, CConnman& connman)
bool CSigSharesManager::ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& ann, CConnman& connman)
{
auto llmqType = (Consensus::LLMQType)ann.llmqType;
if (!Params().GetConsensus().llmqs.count(llmqType)) {
BanNode(pfrom->id);
return;
return false;
}
if (ann.sessionId == (uint32_t)-1 || ann.quorumHash.IsNull() || ann.id.IsNull() || ann.msgHash.IsNull()) {
BanNode(pfrom->id);
return;
return false;
}
LogPrint("llmq", "CSigSharesManager::%s -- ann={%s}, node=%d\n", __func__, ann.ToString(), pfrom->id);
@ -266,7 +308,7 @@ void CSigSharesManager::ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn&
// TODO should we ban here?
LogPrintf("CSigSharesManager::%s -- quorum %s not found, node=%d\n", __func__,
ann.quorumHash.ToString(), pfrom->id);
return;
return true; // let's still try other announcements from the same message
}
auto signHash = CLLMQUtils::BuildSignHash(llmqType, ann.quorumHash, ann.id, ann.msgHash);
@ -279,37 +321,34 @@ void CSigSharesManager::ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn&
session.recvSessionId = ann.sessionId;
session.quorum = quorum;
nodeState.sessionByRecvId.emplace(ann.sessionId, &session);
return true;
}
bool CSigSharesManager::VerifySigSharesInv(NodeId from, Consensus::LLMQType llmqType, const CSigSharesInv& inv)
{
if (!fMasternodeMode || activeMasternodeInfo.proTxHash.IsNull()) {
return false;
}
size_t quorumSize = (size_t)Params().GetConsensus().llmqs.at(llmqType).size;
if (inv.inv.size() != quorumSize) {
BanNode(from);
return false;
}
return true;
}
void CSigSharesManager::ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman)
bool CSigSharesManager::ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman)
{
CSigSharesNodeState::SessionInfo sessionInfo;
if (!GetSessionInfoByRecvId(pfrom->id, inv.sessionId, sessionInfo)) {
return;
return true;
}
if (!VerifySigSharesInv(pfrom->id, sessionInfo.llmqType, inv)) {
return;
return false;
}
// TODO for PoSe, we should consider propagating shares even if we already have a recovered sig
if (quorumSigningManager->HasRecoveredSigForSession(sessionInfo.signHash)) {
return;
return true;
}
LogPrint("llmq", "CSigSharesManager::%s -- signHash=%s, inv={%s}, node=%d\n", __func__,
@ -319,26 +358,27 @@ void CSigSharesManager::ProcessMessageSigSharesInv(CNode* pfrom, const CSigShare
auto& nodeState = nodeStates[pfrom->id];
auto session = nodeState.GetSessionByRecvId(inv.sessionId);
if (!session) {
return;
return true;
}
session->announced.Merge(inv);
session->knows.Merge(inv);
return true;
}
void CSigSharesManager::ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman)
bool CSigSharesManager::ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman)
{
CSigSharesNodeState::SessionInfo sessionInfo;
if (!GetSessionInfoByRecvId(pfrom->id, inv.sessionId, sessionInfo)) {
return;
return true;
}
if (!VerifySigSharesInv(pfrom->id, sessionInfo.llmqType, inv)) {
return;
return false;
}
// TODO for PoSe, we should consider propagating shares even if we already have a recovered sig
if (quorumSigningManager->HasRecoveredSigForSession(sessionInfo.signHash)) {
return;
return true;
}
LogPrint("llmq", "CSigSharesManager::%s -- signHash=%s, inv={%s}, node=%d\n", __func__,
@ -348,26 +388,23 @@ void CSigSharesManager::ProcessMessageGetSigShares(CNode* pfrom, const CSigShare
auto& nodeState = nodeStates[pfrom->id];
auto session = nodeState.GetSessionByRecvId(inv.sessionId);
if (!session) {
return;
return true;
}
session->requested.Merge(inv);
session->knows.Merge(inv);
return true;
}
void CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman)
bool CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman)
{
CSigSharesNodeState::SessionInfo sessionInfo;
if (!GetSessionInfoByRecvId(pfrom->id, batchedSigShares.sessionId, sessionInfo)) {
return;
return true;
}
bool ban = false;
if (!PreVerifyBatchedSigShares(pfrom->id, sessionInfo, batchedSigShares, ban)) {
if (ban) {
BanNode(pfrom->id);
return;
}
return;
return ban;
}
std::vector<CSigShare> sigShares;
@ -402,7 +439,7 @@ void CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatc
sessionInfo.signHash.ToString(), batchedSigShares.sigShares.size(), sigShares.size(), batchedSigShares.ToInv(sessionInfo.llmqType).ToString(), pfrom->id);
if (sigShares.empty()) {
return;
return true;
}
LOCK(cs);
@ -410,6 +447,7 @@ void CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatc
for (auto& s : sigShares) {
nodeState.pendingIncomingSigShares.Add(s.GetKey(), s);
}
return true;
}
bool CSigSharesManager::PreVerifyBatchedSigShares(NodeId nodeId, const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, bool& retBan)
@ -1012,47 +1050,90 @@ bool CSigSharesManager::SendMessages()
auto it1 = sigSessionAnnouncements.find(pnode->id);
if (it1 != sigSessionAnnouncements.end()) {
std::vector<CSigSesAnn> msgs;
msgs.reserve(it1->second.size());
for (auto& sigSesAnn : it1->second) {
LogPrint("llmq", "CSigSharesManager::SendMessages -- QSIGSESANN signHash=%s, sessionId=%d, node=%d\n",
CLLMQUtils::BuildSignHash(sigSesAnn).ToString(), sigSesAnn.sessionId, pnode->id);
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, sigSesAnn), false);
msgs.emplace_back(sigSesAnn);
if (msgs.size() == MAX_MSGS_CNT_QSIGSESANN) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs), false);
msgs.clear();
didSend = true;
}
}
if (!msgs.empty()) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs), false);
didSend = true;
}
}
auto it = sigSharesToRequest.find(pnode->id);
if (it != sigSharesToRequest.end()) {
std::vector<CSigSharesInv> msgs;
for (auto& p : it->second) {
assert(p.second.CountSet() != 0);
LogPrint("llmq", "CSigSharesManager::SendMessages -- QGETSIGSHARES signHash=%s, inv={%s}, node=%d\n",
p.first.ToString(), p.second.ToString(), pnode->id);
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, p.second), false);
msgs.emplace_back(std::move(p.second));
if (msgs.size() == MAX_MSGS_CNT_QGETSIGSHARES) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs), false);
msgs.clear();
didSend = true;
}
}
if (!msgs.empty()) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs), false);
didSend = true;
}
}
auto jt = sigSharesToSend.find(pnode->id);
if (jt != sigSharesToSend.end()) {
size_t totalSigsCount = 0;
std::vector<CBatchedSigShares> msgs;
for (auto& p : jt->second) {
assert(!p.second.sigShares.empty());
if (LogAcceptCategory("llmq")) {
LOCK(cs);
auto session = nodeStates[pnode->id].GetSessionBySignHash(p.first);
assert(session);
LogPrint("llmq", "CSigSharesManager::SendMessages -- QBSIGSHARES signHash=%s, inv={%s}, node=%d\n",
p.first.ToString(), p.second.ToInv(session->llmqType).ToString(), pnode->id);
}
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, p.second), false);
if (totalSigsCount + p.second.sigShares.size() > MAX_MSGS_TOTAL_BATCHED_SIGS) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, msgs), false);
msgs.clear();
totalSigsCount = 0;
didSend = true;
}
totalSigsCount += p.second.sigShares.size();
msgs.emplace_back(std::move(p.second));
}
if (!msgs.empty()) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, std::move(msgs)), false);
didSend = true;
}
}
auto kt = sigSharesToAnnounce.find(pnode->id);
if (kt != sigSharesToAnnounce.end()) {
std::vector<CSigSharesInv> msgs;
for (auto& p : kt->second) {
assert(p.second.CountSet() != 0);
LogPrint("llmq", "CSigSharesManager::SendMessages -- QSIGSHARESINV signHash=%s, inv={%s}, node=%d\n",
p.first.ToString(), p.second.ToString(), pnode->id);
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, p.second), false);
msgs.emplace_back(std::move(p.second));
if (msgs.size() == MAX_MSGS_CNT_QSIGSHARESINV) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs), false);
msgs.clear();
didSend = true;
}
}
if (!msgs.empty()) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs), false);
didSend = true;
}
}

View File

@ -337,6 +337,13 @@ class CSigSharesManager : public CRecoveredSigsListener
static const int64_t SESSION_TOTAL_TIMEOUT = 5 * 60 * 1000;
static const int64_t SIG_SHARE_REQUEST_TIMEOUT = 5 * 1000;
// we try to keep total message size below 10k
const size_t MAX_MSGS_CNT_QSIGSESANN = 100;
const size_t MAX_MSGS_CNT_QGETSIGSHARES = 200;
const size_t MAX_MSGS_CNT_QSIGSHARESINV = 200;
// 400 is the maximum quorum size, so this is also the maximum number of sigs we need to support
const size_t MAX_MSGS_TOTAL_BATCHED_SIGS = 400;
private:
CCriticalSection cs;
@ -378,10 +385,11 @@ public:
void HandleNewRecoveredSig(const CRecoveredSig& recoveredSig);
private:
void ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& ann, CConnman& connman);
void ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman);
void ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman);
void ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman);
// all of these return false when the currently processed message should be aborted (as each message actually contains multiple messages)
bool ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& ann, CConnman& connman);
bool ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman);
bool ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman);
bool ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman);
bool VerifySigSharesInv(NodeId from, Consensus::LLMQType llmqType, const CSigSharesInv& inv);
bool PreVerifyBatchedSigShares(NodeId nodeId, const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, bool& retBan);