From 6836f8c38b9d6545c7b99f5717cece522f768a0b Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Thu, 24 May 2018 16:14:55 +0200 Subject: [PATCH] Implement LLMQ DKG --- src/Makefile.am | 14 +- src/chainparams.cpp | 24 + src/consensus/params.h | 6 + src/dsnotificationinterface.cpp | 2 + src/init.cpp | 2 + src/llmq/quorums_dkgsession.cpp | 1259 ++++++++++++++++++++++++ src/llmq/quorums_dkgsession.h | 350 +++++++ src/llmq/quorums_dkgsessionhandler.cpp | 538 ++++++++++ src/llmq/quorums_dkgsessionhandler.h | 138 +++ src/llmq/quorums_dkgsessionmgr.cpp | 279 ++++++ src/llmq/quorums_dkgsessionmgr.h | 76 ++ src/llmq/quorums_init.cpp | 6 + src/llmq/quorums_init.h | 3 + src/llmq/quorums_utils.cpp | 55 ++ src/llmq/quorums_utils.h | 3 + src/net_processing.cpp | 43 +- src/protocol.cpp | 11 + src/protocol.h | 9 +- 18 files changed, 2812 insertions(+), 6 deletions(-) create mode 100644 src/llmq/quorums_dkgsession.cpp create mode 100644 src/llmq/quorums_dkgsession.h create mode 100644 src/llmq/quorums_dkgsessionhandler.cpp create mode 100644 src/llmq/quorums_dkgsessionhandler.h create mode 100644 src/llmq/quorums_dkgsessionmgr.cpp create mode 100644 src/llmq/quorums_dkgsessionmgr.h diff --git a/src/Makefile.am b/src/Makefile.am index 2d160190b..be2464264 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -141,10 +141,13 @@ BITCOIN_CORE_H = \ keystore.h \ dbwrapper.h \ limitedmap.h \ - llmq/quorums_commitment.h \ llmq/quorums_blockprocessor.h \ - llmq/quorums_utils.h \ + llmq/quorums_commitment.h \ + llmq/quorums_dkgsessionhandler.h \ + llmq/quorums_dkgsessionmgr.h \ + llmq/quorums_dkgsession.h \ llmq/quorums_init.h \ + llmq/quorums_utils.h \ masternode-meta.h \ masternode-payments.h \ masternode-sync.h \ @@ -249,10 +252,13 @@ libdash_server_a_SOURCES = \ governance-validators.cpp \ governance-vote.cpp \ governance-votedb.cpp \ - llmq/quorums_commitment.cpp \ llmq/quorums_blockprocessor.cpp \ - llmq/quorums_utils.cpp \ + llmq/quorums_commitment.cpp \ + llmq/quorums_dkgsessionhandler.cpp \ + llmq/quorums_dkgsessionmgr.cpp \ + llmq/quorums_dkgsession.cpp \ llmq/quorums_init.cpp \ + llmq/quorums_utils.cpp \ masternode-meta.cpp \ masternode-payments.cpp \ masternode-sync.cpp \ diff --git a/src/chainparams.cpp b/src/chainparams.cpp index e8cff0692..278a73f13 100644 --- a/src/chainparams.cpp +++ b/src/chainparams.cpp @@ -118,6 +118,12 @@ static Consensus::LLMQParams llmq10_60 = { .dkgPhaseBlocks = 2, .dkgMiningWindowStart = 10, // dkgPhaseBlocks * 5 = after finalization .dkgMiningWindowEnd = 18, + .dkgRndSleepTime = 0, + .dkgBadVotesThreshold = 8, + + .neighborConnections = 2, + .diagonalConnections = 2, + .keepOldConnections = 24, }; static Consensus::LLMQParams llmq50_60 = { @@ -131,6 +137,12 @@ static Consensus::LLMQParams llmq50_60 = { .dkgPhaseBlocks = 2, .dkgMiningWindowStart = 10, // dkgPhaseBlocks * 5 = after finalization .dkgMiningWindowEnd = 18, + .dkgRndSleepTime = 1 * 60 * 1000, + .dkgBadVotesThreshold = 40, + + .neighborConnections = 2, + .diagonalConnections = 2, + .keepOldConnections = 24, }; static Consensus::LLMQParams llmq400_60 = { @@ -144,6 +156,12 @@ static Consensus::LLMQParams llmq400_60 = { .dkgPhaseBlocks = 4, .dkgMiningWindowStart = 20, // dkgPhaseBlocks * 5 = after finalization .dkgMiningWindowEnd = 28, + .dkgRndSleepTime = 2 * 60 * 1000, + .dkgBadVotesThreshold = 300, + + .neighborConnections = 4, + .diagonalConnections = 4, + .keepOldConnections = 4, }; // Used for deployment and min-proto-version signalling, so it needs a higher threshold @@ -158,6 +176,12 @@ static Consensus::LLMQParams llmq400_85 = { .dkgPhaseBlocks = 4, .dkgMiningWindowStart = 20, // dkgPhaseBlocks * 5 = after finalization .dkgMiningWindowEnd = 48, // give it a larger mining window to make sure it is mined + .dkgRndSleepTime = 2 * 60 * 1000, + .dkgBadVotesThreshold = 300, + + .neighborConnections = 4, + .diagonalConnections = 4, + .keepOldConnections = 4, }; diff --git a/src/consensus/params.h b/src/consensus/params.h index aeca8e519..d452c3278 100644 --- a/src/consensus/params.h +++ b/src/consensus/params.h @@ -97,6 +97,12 @@ struct LLMQParams { // should at the same time not be too large so that not too much space is wasted with null commitments in case a DKG // session failed. int dkgMiningWindowEnd; + int dkgRndSleepTime; + int dkgBadVotesThreshold; + + int neighborConnections; + int diagonalConnections; + int keepOldConnections; }; /** diff --git a/src/dsnotificationinterface.cpp b/src/dsnotificationinterface.cpp index a13a8d3c3..8450cc26b 100644 --- a/src/dsnotificationinterface.cpp +++ b/src/dsnotificationinterface.cpp @@ -9,6 +9,7 @@ #include "masternode-payments.h" #include "masternode-sync.h" #include "privatesend.h" +#include "llmq/quorums_dkgsessionmgr.h" #ifdef ENABLE_WALLET #include "privatesend-client.h" #endif // ENABLE_WALLET @@ -59,6 +60,7 @@ void CDSNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, con #endif // ENABLE_WALLET instantsend.UpdatedBlockTip(pindexNew); governance.UpdatedBlockTip(pindexNew, connman); + llmq::quorumDKGSessionManager->UpdatedBlockTip(pindexNew, pindexFork, fInitialDownload); } void CDSNotificationInterface::SyncTransaction(const CTransaction &tx, const CBlockIndex *pindex, int posInBlock) diff --git a/src/init.cpp b/src/init.cpp index 4735d4cb2..76ce689f0 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -67,6 +67,7 @@ #include "warnings.h" #include "evo/deterministicmns.h" +#include "llmq/quorums_init.h" #include "llmq/quorums_init.h" @@ -543,6 +544,7 @@ std::string HelpMessage(HelpMessageMode mode) strUsage += HelpMessageOpt("-limitdescendantcount=", strprintf("Do not accept transactions if any ancestor would have or more in-mempool descendants (default: %u)", DEFAULT_DESCENDANT_LIMIT)); strUsage += HelpMessageOpt("-limitdescendantsize=", strprintf("Do not accept transactions if any ancestor would have more than kilobytes of in-mempool descendants (default: %u).", DEFAULT_DESCENDANT_SIZE_LIMIT)); strUsage += HelpMessageOpt("-bip9params=deployment:start:end", "Use given start/end times for specified BIP9 deployment (regtest-only)"); + strUsage += HelpMessageOpt("-watchquorums=", strprintf("Watch and validate quorum communication (default: %u)", llmq::DEFAULT_WATCH_QUORUMS)); } std::string debugCategories = "addrman, alert, bench, cmpctblock, coindb, db, http, leveldb, libevent, lock, mempool, mempoolrej, net, proxy, prune, rand, reindex, rpc, selectcoins, tor, zmq, " "dash (or specifically: gobject, instantsend, keepass, masternode, mnpayments, mnsync, privatesend, spork)"; // Don't translate these and qt below diff --git a/src/llmq/quorums_dkgsession.cpp b/src/llmq/quorums_dkgsession.cpp new file mode 100644 index 000000000..268f587bf --- /dev/null +++ b/src/llmq/quorums_dkgsession.cpp @@ -0,0 +1,1259 @@ +// Copyright (c) 2018 The Dash Core developers +// Distributed under the MIT/X11 software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "quorums_dkgsession.h" + +#include "quorums_commitment.h" +#include "quorums_dkgsessionmgr.h" +#include "quorums_utils.h" + +#include "evo/specialtx.h" + +#include "activemasternode.h" +#include "chainparams.h" +#include "init.h" +#include "net.h" +#include "netmessagemaker.h" +#include "spork.h" +#include "univalue.h" +#include "validation.h" + +#include "cxxtimer.hpp" + +namespace llmq +{ + +double contributionOmitRate = 0; +double contributionLieRate = 0; +double complainLieRate = 0; +double justifyOmitRate = 0; +double justifyLieRate = 0; +double commitOmitRate = 0; +double commitLieRate = 0; + +static bool RandBool(double rate) +{ + const uint64_t v = 100000000; + uint64_t r = GetRand(v + 1); + if (r <= v * rate) + return true; + return false; +} + +CDKGLogger::CDKGLogger(CDKGSession& _quorumDkg, const std::string& _func) : + CDKGLogger(_quorumDkg.params.type, _quorumDkg.quorumHash, _quorumDkg.height, _quorumDkg.AreWeMember(), _func) +{ +} + +CDKGLogger::CDKGLogger(Consensus::LLMQType _llmqType, const uint256& _quorumHash, int _height, bool _areWeMember, const std::string& _func) : + CBatchedLogger(strprintf("QuorumDKG(type=%d, height=%d, member=%d, func=%s)", _llmqType, _height, _areWeMember, _func)) +{ +} + + +CDKGComplaint::CDKGComplaint(const Consensus::LLMQParams& params) : + badMembers((size_t)params.size), complainForMembers((size_t)params.size) +{ +} + +CDKGPrematureCommitment::CDKGPrematureCommitment(const Consensus::LLMQParams& params) : + validMembers((size_t)params.size) +{ +} + +CDKGMember::CDKGMember(CDeterministicMNCPtr _dmn, size_t _idx) : + dmn(_dmn), + idx(_idx), + id(CBLSId::FromHash(_dmn->proTxHash)) +{ + +} + +bool CDKGSession::Init(int _height, const uint256& _quorumHash, const std::vector& mns, const uint256& _myProTxHash) +{ + if (mns.size() < params.minSize) { + return false; + } + + height = _height; + quorumHash = _quorumHash; + + members.resize(mns.size()); + memberIds.resize(members.size()); + receivedVvecs.resize(members.size()); + receivedSkContributions.resize(members.size()); + + for (size_t i = 0; i < mns.size(); i++) { + members[i] = std::unique_ptr(new CDKGMember(mns[i], i)); + membersMap.emplace(members[i]->dmn->proTxHash, i); + memberIds[i] = members[i]->id; + } + + if (!_myProTxHash.IsNull()) { + for (size_t i = 0; i < members.size(); i++) { + auto& m = members[i]; + if (m->dmn->proTxHash == _myProTxHash) { + myIdx = i; + myProTxHash = _myProTxHash; + myId = m->id; + break; + } + } + } + + CDKGLogger logger(*this, __func__); + + if (myProTxHash.IsNull()) { + logger.Printf("initialized as observer. mns=%d\n", mns.size()); + } else { + logger.Printf("initialized as member. mns=%d\n", mns.size()); + } + + return true; +} + +void CDKGSession::Contribute() +{ + CDKGLogger logger(*this, __func__); + + if (!AreWeMember()) { + return; + } + + cxxtimer::Timer t1(true); + logger.Printf("generating contributions\n"); + if (!blsWorker.GenerateContributions(params.threshold, memberIds, vvecContribution, skContributions)) { + // this should never happen actually + logger.Printf("GenerateContributions failed\n"); + return; + } + logger.Printf("generated contributions. time=%d\n", t1.count()); + + SendContributions(); +} + +void CDKGSession::SendContributions() +{ + CDKGLogger logger(*this, __func__); + + assert(AreWeMember()); + + logger.Printf("sending contributions\n"); + + if (RandBool(contributionOmitRate)) { + logger.Printf("omitting\n"); + return; + } + + CDKGContribution qc; + qc.llmqType = (uint8_t)params.type; + qc.quorumHash = quorumHash; + qc.proTxHash = myProTxHash; + qc.vvec = vvecContribution; + + cxxtimer::Timer t1(true); + qc.contributions = std::make_shared>(); + qc.contributions->InitEncrypt(members.size()); + + for (size_t i = 0; i < members.size(); i++) { + auto& m = members[i]; + CBLSSecretKey skContrib = skContributions[i]; + + if (RandBool(contributionLieRate)) { + logger.Printf("lying for %s\n", m->dmn->proTxHash.ToString()); + skContrib.MakeNewKey(); + } + + if (!qc.contributions->Encrypt(i, m->dmn->pdmnState->pubKeyOperator, skContrib, PROTOCOL_VERSION)) { + logger.Printf("failed to encrypt contribution for %s\n", m->dmn->proTxHash.ToString()); + return; + } + } + + logger.Printf("encrypted contributions. time=%d\n", t1.count()); + + qc.sig = activeMasternodeInfo.blsKeyOperator->Sign(qc.GetSignHash()); + + logger.Flush(); + + uint256 hash = ::SerializeHash(qc); + bool ban = false; + if (PreVerifyMessage(hash, qc, ban)) { + ReceiveMessage(hash, qc, ban); + } +} + +// only performs cheap verifications, but not the signature of the message. this is checked with batched verification +bool CDKGSession::PreVerifyMessage(const uint256& hash, const CDKGContribution& qc, bool& retBan) +{ + CDKGLogger logger(*this, __func__); + + cxxtimer::Timer t1(true); + + retBan = false; + + if (qc.quorumHash != quorumHash) { + logger.Printf("contribution for wrong quorum, rejecting\n"); + return false; + } + + if (Seen(hash)) { + return false; + } + + auto member = GetMember(qc.proTxHash); + if (!member) { + logger.Printf("contributor not a member of this quorum, rejecting contribution\n"); + retBan = true; + return false; + } + + if (qc.contributions->blobs.size() != members.size()) { + logger.Printf("invalid contributions count\n"); + retBan = true; + return false; + } + if (qc.vvec->size() != params.threshold) { + logger.Printf("invalid verification vector length\n"); + retBan = true; + return false; + } + + if (!blsWorker.VerifyVerificationVector(*qc.vvec)) { + logger.Printf("invalid verification vector\n"); + retBan = true; + return false; + } + + if (member->contributions.size() >= 2) { + // don't do any further processing if we got more than 1 valid contributions already + // this is a DoS protection against members sending multiple contributions with valid signatures to us + // we must bail out before any expensive BLS verification happens + logger.Printf("dropping contribution from %s as we already got %d contributions\n", member->dmn->proTxHash.ToString(), member->contributions.size()); + return false; + } + + return true; +} + +void CDKGSession::ReceiveMessage(const uint256& hash, const CDKGContribution& qc, bool& retBan) +{ + CDKGLogger logger(*this, __func__); + + retBan = false; + + auto member = GetMember(qc.proTxHash); + + cxxtimer::Timer t1(true); + logger.Printf("received contribution from %s\n", qc.proTxHash.ToString()); + + { + // relay, no matter if further verification fails + // This ensures the whole quorum sees the bad behavior + LOCK(invCs); + + if (member->contributions.size() >= 2) { + // only relay up to 2 contributions, that's enough to let the other members know about his bad behavior + return; + } + + contributions.emplace(hash, qc); + member->contributions.emplace(hash); + + CInv inv(MSG_QUORUM_CONTRIB, hash); + invSet.emplace(inv); + RelayInvToParticipants(inv); + + if (member->contributions.size() > 1) { + // don't do any further processing if we got more than 1 contribution. we already relayed it, + // so others know about his bad behavior + MarkBadMember(member->idx); + logger.Printf("%s did send multiple contributions\n", member->dmn->proTxHash.ToString()); + return; + } + } + + receivedVvecs[member->idx] = qc.vvec; + + int receivedCount = 0; + for (auto& m : members) { + if (!m->contributions.empty()) { + receivedCount++; + } + } + + logger.Printf("received and relayed contribution. received=%d/%d, time=%d\n", receivedCount, members.size(), t1.count()); + + cxxtimer::Timer t2(true); + + if (!AreWeMember()) { + // can't further validate + return; + } + + dkgManager.WriteVerifiedVvecContribution(params.type, qc.quorumHash, qc.proTxHash, qc.vvec); + + bool complain = false; + CBLSSecretKey skContribution; + if (!qc.contributions->Decrypt(myIdx, *activeMasternodeInfo.blsKeyOperator, skContribution, PROTOCOL_VERSION)) { + logger.Printf("contribution from %s could not be decrypted\n", member->dmn->proTxHash.ToString()); + complain = true; + } else if (RandBool(complainLieRate)) { + logger.Printf("lying/complaining for %s\n", member->dmn->proTxHash.ToString()); + complain = true; + } + + if (complain) { + member->weComplain = true; + return; + } + + logger.Printf("decrypted our contribution share. time=%d\n", t2.count()); + + bool verifyPending = false; + receivedSkContributions[member->idx] = skContribution; + pendingContributionVerifications.emplace_back(member->idx); + if (pendingContributionVerifications.size() >= 32) { + verifyPending = true; + } + + if (verifyPending) { + VerifyPendingContributions(); + } +} + +// Verifies all pending secret key contributions in one batch +// This is done by aggregating the verification vectors belonging to the secret key contributions +// The resulting aggregated vvec is then used to recover a public key share +// The public key share must match the public key belonging to the aggregated secret key contributions +// See CBLSWorker::VerifyContributionShares for more details. +void CDKGSession::VerifyPendingContributions() +{ + CDKGLogger logger(*this, __func__); + + cxxtimer::Timer t1(true); + + std::vector pend = std::move(pendingContributionVerifications); + if (pend.empty()) { + return; + } + + std::vector memberIndexes; + std::vector vvecs; + BLSSecretKeyVector skContributions; + + for (auto& idx : pend) { + auto& m = members[idx]; + if (m->bad || m->weComplain) { + continue; + } + memberIndexes.emplace_back(idx); + vvecs.emplace_back(receivedVvecs[idx]); + skContributions.emplace_back(receivedSkContributions[idx]); + } + + auto result = blsWorker.VerifyContributionShares(myId, vvecs, skContributions); + if (result.size() != memberIndexes.size()) { + logger.Printf("VerifyContributionShares returned result of size %d but size %d was expected, something is wrong\n", result.size(), memberIndexes.size()); + return; + } + + for (size_t i = 0; i < memberIndexes.size(); i++) { + if (!result[i]) { + auto& m = members[memberIndexes[i]]; + logger.Printf("invalid contribution from %s. will complain later\n", m->dmn->proTxHash.ToString()); + m->weComplain = true; + } else { + size_t memberIdx = memberIndexes[i]; + dkgManager.WriteVerifiedSkContribution(params.type, quorumHash, members[memberIdx]->dmn->proTxHash, skContributions[i]); + } + } + + logger.Printf("verified %d pending contributions. time=%d\n", pend.size(), t1.count()); +} + +void CDKGSession::VerifyAndComplain() +{ + if (!AreWeMember()) { + return; + } + + VerifyPendingContributions(); + + CDKGLogger logger(*this, __func__); + + // we check all members if they sent us their contributions + // we consider members as bad if they missed to send anything or if they sent multiple + // in both cases we won't give him a second chance as he is either down, buggy or an adversary + // we assume that such a participant will be marked as bad by the whole network in most cases, + // as propagation will ensure that all nodes see the same vvecs/contributions. In case nodes come to + // different conclusions, the aggregation phase will handle this (most voted quorum key wins) + + cxxtimer::Timer t1(true); + + for (auto& m : members) { + if (m->bad) { + continue; + } + if (m->contributions.empty()) { + logger.Printf("%s did not send any contribution\n", m->dmn->proTxHash.ToString()); + MarkBadMember(m->idx); + continue; + } + } + + logger.Printf("verified contributions. time=%d\n", t1.count()); + logger.Flush(); + + SendComplaint(); +} + +void CDKGSession::SendComplaint() +{ + CDKGLogger logger(*this, __func__); + + assert(AreWeMember()); + + CDKGComplaint qc(params); + qc.llmqType = (uint8_t)params.type; + qc.quorumHash = quorumHash; + qc.proTxHash = myProTxHash; + + int badCount = 0; + int complaintCount = 0; + for (size_t i = 0; i < members.size(); i++) { + auto& m = members[i]; + if (m->bad) { + qc.badMembers[i] = true; + badCount++; + } else if (m->weComplain) { + qc.complainForMembers[i] = true; + complaintCount++; + } + } + + if (badCount == 0 && complaintCount == 0) { + return; + } + + logger.Printf("sending complaint. badCount=%d, complaintCount=%d\n", badCount, complaintCount); + + qc.sig = activeMasternodeInfo.blsKeyOperator->Sign(qc.GetSignHash()); + + logger.Flush(); + + uint256 hash = ::SerializeHash(qc); + bool ban = false; + if (PreVerifyMessage(hash, qc, ban)) { + ReceiveMessage(hash, qc, ban); + } +} + +// only performs cheap verifications, but not the signature of the message. this is checked with batched verification +bool CDKGSession::PreVerifyMessage(const uint256& hash, const CDKGComplaint& qc, bool& retBan) +{ + CDKGLogger logger(*this, __func__); + + retBan = false; + + if (qc.quorumHash != quorumHash) { + logger.Printf("complaint for wrong quorum, rejecting\n"); + return false; + } + + if (Seen(hash)) { + return false; + } + + auto member = GetMember(qc.proTxHash); + if (!member) { + logger.Printf("complainer not a member of this quorum, rejecting complaint\n"); + retBan = true; + return false; + } + + if (qc.badMembers.size() != (size_t)params.size) { + logger.Printf("invalid badMembers bitset size\n"); + retBan = true; + return false; + } + + if (qc.complainForMembers.size() != (size_t)params.size) { + logger.Printf("invalid complainForMembers bitset size\n"); + retBan = true; + return false; + } + + if (member->complaints.size() >= 2) { + // don't do any further processing if we got more than 1 valid complaints already + // this is a DoS protection against members sending multiple complaints with valid signatures to us + // we must bail out before any expensive BLS verification happens + logger.Printf("dropping complaint from %s as we already got %d complaints\n", + member->dmn->proTxHash.ToString(), member->complaints.size()); + return false; + } + + return true; +} + +void CDKGSession::ReceiveMessage(const uint256& hash, const CDKGComplaint& qc, bool& retBan) +{ + CDKGLogger logger(*this, __func__); + + retBan = false; + + logger.Printf("received complaint from %s\n", qc.proTxHash.ToString()); + + auto member = GetMember(qc.proTxHash); + + { + LOCK(invCs); + + if (member->complaints.size() >= 2) { + // only relay up to 2 complaints, that's enough to let the other members know about his bad behavior + return; + } + + complaints.emplace(hash, qc); + member->complaints.emplace(hash); + + CInv inv(MSG_QUORUM_COMPLAINT, hash); + invSet.emplace(inv); + RelayInvToParticipants(inv); + + if (member->complaints.size() > 1) { + // don't do any further processing if we got more than 1 complaint. we already relayed it, + // so others know about his bad behavior + MarkBadMember(member->idx); + logger.Printf("%s did send multiple complaints\n", member->dmn->proTxHash.ToString()); + return; + } + } + + int receivedCount = 0; + for (size_t i = 0; i < members.size(); i++) { + auto& m = members[i]; + if (qc.badMembers[i]) { + logger.Printf("%s voted for %s to be bad\n", member->dmn->proTxHash.ToString(), m->dmn->proTxHash.ToString()); + m->badMemberVotes.emplace(qc.proTxHash); + if (AreWeMember() && i == myIdx) { + logger.Printf("%s voted for us to be bad\n", member->dmn->proTxHash.ToString()); + } + } + if (qc.complainForMembers[i]) { + m->complaintsFromOthers.emplace(qc.proTxHash); + m->someoneComplain = true; + if (AreWeMember() && i == myIdx) { + logger.Printf("%s complained about us\n", member->dmn->proTxHash.ToString()); + } + } + if (!m->complaints.empty()) { + receivedCount++; + } + } + + logger.Printf("received and relayed complaint. received=%d\n", receivedCount); +} + +void CDKGSession::VerifyAndJustify() +{ + if (!AreWeMember()) { + return; + } + + CDKGLogger logger(*this, __func__); + + std::set justifyFor; + + for (auto& m : members) { + if (m->bad) { + continue; + } + if (m->badMemberVotes.size() >= params.dkgBadVotesThreshold) { + logger.Printf("%s marked as bad as %d other members voted for this\n", m->dmn->proTxHash.ToString(), m->badMemberVotes.size()); + MarkBadMember(m->idx); + continue; + } + if (m->complaints.empty()) { + continue; + } + if (m->complaints.size() != 1) { + logger.Printf("%s sent multiple complaints\n", m->dmn->proTxHash.ToString()); + MarkBadMember(m->idx); + continue; + } + + auto& qc = complaints.at(*m->complaints.begin()); + if (qc.complainForMembers[myIdx]) { + justifyFor.emplace(qc.proTxHash); + } + } + + logger.Flush(); + if (!justifyFor.empty()) { + SendJustification(justifyFor); + } +} + +void CDKGSession::SendJustification(const std::set& forMembers) +{ + CDKGLogger logger(*this, __func__); + + assert(AreWeMember()); + + logger.Printf("sending justification for %d members\n", forMembers.size()); + + CDKGJustification qj; + qj.llmqType = (uint8_t)params.type; + qj.quorumHash = quorumHash; + qj.proTxHash = myProTxHash; + qj.contributions.reserve(forMembers.size()); + + for (size_t i = 0; i < members.size(); i++) { + auto& m = members[i]; + if (!forMembers.count(m->dmn->proTxHash)) { + continue; + } + logger.Printf("justifying for %s\n", m->dmn->proTxHash.ToString()); + + CBLSSecretKey skContribution = skContributions[i]; + + if (RandBool(justifyLieRate)) { + logger.Printf("lying for %s\n", m->dmn->proTxHash.ToString()); + skContribution.MakeNewKey(); + } + + qj.contributions.emplace_back(i, skContribution); + } + + if (RandBool(justifyOmitRate)) { + logger.Printf("omitting\n"); + return; + } + + qj.sig = activeMasternodeInfo.blsKeyOperator->Sign(qj.GetSignHash()); + + logger.Flush(); + + uint256 hash = ::SerializeHash(qj); + bool ban = false; + if (PreVerifyMessage(hash, qj, ban)) { + ReceiveMessage(hash, qj, ban); + } +} + +// only performs cheap verifications, but not the signature of the message. this is checked with batched verification +bool CDKGSession::PreVerifyMessage(const uint256& hash, const CDKGJustification& qj, bool& retBan) +{ + CDKGLogger logger(*this, __func__); + + retBan = false; + + if (qj.quorumHash != quorumHash) { + logger.Printf("justification for wrong quorum, rejecting\n"); + return false; + } + + if (Seen(hash)) { + return false; + } + + auto member = GetMember(qj.proTxHash); + if (!member) { + logger.Printf("justifier not a member of this quorum, rejecting justification\n"); + retBan = true; + return false; + } + + if (qj.contributions.empty()) { + logger.Printf("justification with no contributions\n"); + retBan = true; + return false; + } + + std::set contributionsSet; + for (const auto& p : qj.contributions) { + if (p.first > members.size()) { + logger.Printf("invalid contribution index\n"); + retBan = true; + return false; + } + + if (!contributionsSet.emplace(p.first).second) { + logger.Printf("duplicate contribution index\n"); + retBan = true; + return false; + } + + auto& skShare = p.second; + if (!skShare.IsValid()) { + logger.Printf("invalid contribution\n"); + retBan = true; + return false; + } + } + + if (member->justifications.size() >= 2) { + // don't do any further processing if we got more than 1 valid justification already + // this is a DoS protection against members sending multiple justifications with valid signatures to us + // we must bail out before any expensive BLS verification happens + logger.Printf("dropping justification from %s as we already got %d justifications\n", + member->dmn->proTxHash.ToString(), member->justifications.size()); + return false; + } + + return true; +} + +void CDKGSession::ReceiveMessage(const uint256& hash, const CDKGJustification& qj, bool& retBan) +{ + CDKGLogger logger(*this, __func__); + + retBan = false; + + logger.Printf("received justification from %s\n", qj.proTxHash.ToString()); + + auto member = GetMember(qj.proTxHash); + + { + LOCK(invCs); + + if (member->justifications.size() >= 2) { + // only relay up to 2 justifications, that's enough to let the other members know about his bad behavior + return; + } + + justifications.emplace(hash, qj); + member->justifications.emplace(hash); + + // we always relay, even if further verification fails + CInv inv(MSG_QUORUM_JUSTIFICATION, hash); + invSet.emplace(inv); + RelayInvToParticipants(inv); + + if (member->justifications.size() > 1) { + // don't do any further processing if we got more than 1 justification. we already relayed it, + // so others know about his bad behavior + logger.Printf("%s did send multiple justifications\n", member->dmn->proTxHash.ToString()); + MarkBadMember(member->idx); + return; + } + + if (member->bad) { + // we locally determined him to be bad (sent none or more then one contributions) + // don't give him a second chance (but we relay the justification in case other members disagree) + return; + } + } + + for (const auto& p : qj.contributions) { + auto& member2 = members[p.first]; + + if (!member->complaintsFromOthers.count(member2->dmn->proTxHash)) { + logger.Printf("got justification from %s for %s even though he didn't complain\n", + member->dmn->proTxHash.ToString(), member2->dmn->proTxHash.ToString()); + MarkBadMember(member->idx); + } + } + if (member->bad) { + return; + } + + cxxtimer::Timer t1(true); + + std::list> futures; + for (const auto& p : qj.contributions) { + auto& member2 = members[p.first]; + auto& skContribution = p.second; + + // watch out to not bail out before these async calls finish (they rely on valid references) + futures.emplace_back(blsWorker.AsyncVerifyContributionShare(member2->id, receivedVvecs[member->idx], skContribution)); + } + auto resultIt = futures.begin(); + for (const auto& p : qj.contributions) { + auto& member2 = members[p.first]; + auto& skContribution = p.second; + + bool result = (resultIt++)->get(); + if (!result) { + logger.Printf(" %s did send an invalid justification for %s\n", member->dmn->proTxHash.ToString(), member2->dmn->proTxHash.ToString()); + MarkBadMember(member->idx); + } else { + logger.Printf(" %s justified for %s\n", member->dmn->proTxHash.ToString(), member2->dmn->proTxHash.ToString()); + if (AreWeMember() && member2->id == myId) { + receivedSkContributions[member->idx] = skContribution; + member->weComplain = false; + + dkgManager.WriteVerifiedSkContribution(params.type, quorumHash, member->dmn->proTxHash, skContribution); + } + member->complaintsFromOthers.erase(member2->dmn->proTxHash); + } + } + + int receivedCount = 0; + int expectedCount = 0; + + for (auto& m : members) { + if (!m->justifications.empty()) { + receivedCount++; + } + + if (m->someoneComplain) { + expectedCount++; + } + } + + logger.Printf("verified justification: received=%d/%d time=%d\n", receivedCount, expectedCount, t1.count()); +} + +void CDKGSession::VerifyAndCommit() +{ + if (!AreWeMember()) { + return; + } + + CDKGLogger logger(*this, __func__); + + std::vector badMembers; + std::vector openComplaintMembers; + + for (auto& m : members) { + if (m->bad) { + badMembers.emplace_back(m->idx); + continue; + } + if (!m->complaintsFromOthers.empty()) { + MarkBadMember(m->idx); + openComplaintMembers.emplace_back(m->idx); + } + } + + if (!badMembers.empty() || !openComplaintMembers.empty()) { + logger.Printf("verification result:\n"); + } + if (!badMembers.empty()) { + logger.Printf(" members previously determined as bad:\n"); + for (auto& idx : badMembers) { + logger.Printf(" %s\n", members[idx]->dmn->proTxHash.ToString()); + } + } + if (!openComplaintMembers.empty()) { + logger.Printf(" members with open complaints and now marked as bad:\n"); + for (auto& idx : openComplaintMembers) { + logger.Printf(" %s\n", members[idx]->dmn->proTxHash.ToString()); + } + } + + logger.Flush(); + + SendCommitment(); +} + +void CDKGSession::SendCommitment() +{ + CDKGLogger logger(*this, __func__); + + assert(AreWeMember()); + + logger.Printf("sending commitment\n"); + + CDKGPrematureCommitment qc(params); + qc.llmqType = (uint8_t)params.type; + qc.quorumHash = quorumHash; + qc.proTxHash = myProTxHash; + + for (size_t i = 0; i < members.size(); i++) { + auto& m = members[i]; + if (!m->bad) { + qc.validMembers[i] = true; + } + } + + if (qc.CountValidMembers() < params.minSize) { + logger.Printf("not enough valid members. not sending commitment\n"); + return; + } + + if (RandBool(commitOmitRate)) { + logger.Printf("omitting\n"); + return; + } + + cxxtimer::Timer timerTotal(true); + + cxxtimer::Timer t1(true); + std::vector memberIndexes; + std::vector vvecs; + BLSSecretKeyVector skContributions; + if (!dkgManager.GetVerifiedContributions(params.type, quorumHash, qc.validMembers, memberIndexes, vvecs, skContributions)) { + logger.Printf("failed to get valid contributions\n"); + return; + } + + BLSVerificationVectorPtr vvec = cache.BuildQuorumVerificationVector(::SerializeHash(memberIndexes), vvecs); + if (vvec == nullptr) { + logger.Printf("failed to build quorum verification vector\n"); + return; + } + t1.stop(); + + cxxtimer::Timer t2(true); + CBLSSecretKey skShare = cache.AggregateSecretKeys(::SerializeHash(memberIndexes), skContributions); + if (!skShare.IsValid()) { + logger.Printf("failed to build own secret share\n"); + return; + } + t2.stop(); + + logger.Printf("skShare=%s, pubKeyShare=%s\n", skShare.ToString(), skShare.GetPublicKey().ToString()); + + cxxtimer::Timer t3(true); + qc.quorumPublicKey = (*vvec)[0]; + qc.quorumVvecHash = ::SerializeHash(*vvec); + + int lieType = -1; + if (RandBool(commitLieRate)) { + lieType = GetRandInt(5); + logger.Printf("lying on commitment. lieType=%d\n", lieType); + } + + if (lieType == 0) { + CBLSSecretKey k; + k.MakeNewKey(); + qc.quorumPublicKey = k.GetPublicKey(); + } else if (lieType == 1) { + (*qc.quorumVvecHash.begin())++; + } + + uint256 commitmentHash = CLLMQUtils::BuildCommitmentHash(qc.llmqType, qc.quorumHash, qc.validMembers, qc.quorumPublicKey, qc.quorumVvecHash); + + if (lieType == 2) { + (*commitmentHash.begin())++; + } + + qc.sig = activeMasternodeInfo.blsKeyOperator->Sign(commitmentHash); + qc.quorumSig = skShare.Sign(commitmentHash); + + if (lieType == 3) { + std::vector buf; + qc.sig.GetBuf(buf); + buf[5]++; + qc.sig.SetBuf(buf); + } else if (lieType == 4) { + std::vector buf; + qc.quorumSig.GetBuf(buf); + buf[5]++; + qc.quorumSig.SetBuf(buf); + } + + t3.stop(); + timerTotal.stop(); + + logger.Printf("built premature commitment. time1=%d, time2=%d, time3=%d, totalTime=%d\n", + t1.count(), t2.count(), t3.count(), timerTotal.count()); + + + logger.Flush(); + + uint256 hash = ::SerializeHash(qc); + bool ban = false; + if (PreVerifyMessage(hash, qc, ban)) { + ReceiveMessage(hash, qc, ban); + } +} + +// only performs cheap verifications, but not the signature of the message. this is checked with batched verification +bool CDKGSession::PreVerifyMessage(const uint256& hash, const CDKGPrematureCommitment& qc, bool& retBan) +{ + CDKGLogger logger(*this, __func__); + + cxxtimer::Timer t1(true); + + retBan = false; + + if (qc.quorumHash != quorumHash) { + logger.Printf("commitment for wrong quorum, rejecting\n"); + return false; + } + + if (Seen(hash)) { + logger.Printf("already received premature commitment\n"); + return false; + } + + auto member = GetMember(qc.proTxHash); + if (!member) { + logger.Printf("committer not a member of this quorum, rejecting premature commitment\n"); + retBan = true; + return false; + } + + if (qc.validMembers.size() != (size_t)params.size) { + logger.Printf("invalid validMembers bitset size\n"); + retBan = true; + return false; + } + + if (qc.CountValidMembers() < params.minSize) { + logger.Printf("invalid validMembers count. validMembersCount=%d\n", qc.CountValidMembers()); + retBan = true; + return false; + } + if (!qc.sig.IsValid()) { + logger.Printf("invalid membersSig\n"); + retBan = true; + return false; + } + if (!qc.quorumSig.IsValid()) { + logger.Printf("invalid quorumSig\n"); + retBan = true; + return false; + } + + for (size_t i = members.size(); i < params.size; i++) { + if (qc.validMembers[i]) { + retBan = true; + logger.Printf("invalid validMembers bitset. bit %d should not be set\n", i); + return false; + } + } + + if (member->prematureCommitments.size() >= 2) { + // don't do any further processing if we got more than 1 valid commitment already + // this is a DoS protection against members sending multiple commitments with valid signatures to us + // we must bail out before any expensive BLS verification happens + logger.Printf("dropping commitment from %s as we already got %d commitments\n", + member->dmn->proTxHash.ToString(), member->prematureCommitments.size()); + return false; + } + + return true; +} + +void CDKGSession::ReceiveMessage(const uint256& hash, const CDKGPrematureCommitment& qc, bool& retBan) +{ + CDKGLogger logger(*this, __func__); + + retBan = false; + + cxxtimer::Timer t1(true); + + logger.Printf("received premature commitment from %s. validMembers=%d\n", qc.proTxHash.ToString(), qc.CountValidMembers()); + + auto member = GetMember(qc.proTxHash); + + { + LOCK(invCs); + + // keep track of ALL commitments but only relay valid ones (or if we couldn't build the vvec) + // relaying is done further down + prematureCommitments.emplace(hash, qc); + member->prematureCommitments.emplace(hash); + } + + std::vector memberIndexes; + std::vector vvecs; + BLSSecretKeyVector skContributions; + BLSVerificationVectorPtr quorumVvec; + if (dkgManager.GetVerifiedContributions(params.type, qc.quorumHash, qc.validMembers, memberIndexes, vvecs, skContributions)) { + quorumVvec = cache.BuildQuorumVerificationVector(::SerializeHash(memberIndexes), vvecs); + } + + if (quorumVvec == nullptr) { + logger.Printf("failed to build quorum verification vector. skipping full verification\n"); + // we might be the unlucky one who didn't receive all contributions, but we still have to relay + // the premature commitment as others might be luckier + } else { + // we got all information that is needed to verify everything (even though we might not be a member of the quorum) + // if any of this verification fails, we won't relay this message. This ensures that invalid messages are lost + // in the network. Nodes relaying such invalid messages to us are not punished as they might have not known + // all contributions. We only handle up to 2 commitments per member, so a DoS shouldn't be possible + + if ((*quorumVvec)[0] != qc.quorumPublicKey) { + logger.Printf("calculated quorum public key does not match\n"); + return; + } + uint256 vvecHash = ::SerializeHash(*quorumVvec); + if (qc.quorumVvecHash != vvecHash) { + logger.Printf("calculated quorum vvec hash does not match\n"); + return; + } + + CBLSPublicKey pubKeyShare = cache.BuildPubKeyShare(::SerializeHash(std::make_pair(memberIndexes, member->id)), quorumVvec, member->id); + if (!pubKeyShare.IsValid()) { + logger.Printf("failed to calculate public key share\n"); + return; + } + + if (!qc.quorumSig.VerifyInsecure(pubKeyShare, qc.GetSignHash())) { + logger.Printf("failed to verify quorumSig\n"); + return; + } + } + + LOCK(invCs); + validCommitments.emplace(hash); + + CInv inv(MSG_QUORUM_PREMATURE_COMMITMENT, hash); + invSet.emplace(inv); + RelayInvToParticipants(inv); + + int receivedCount = 0; + for (auto& m : members) { + if (!m->prematureCommitments.empty()) { + receivedCount++; + } + } + + t1.stop(); + + logger.Printf("verified premature commitment. received=%d/%d, time=%d\n", receivedCount, members.size(), t1.count()); +} + +std::vector CDKGSession::FinalizeCommitments() +{ + if (!AreWeMember()) { + return {}; + } + + CDKGLogger logger(*this, __func__); + + cxxtimer::Timer totalTimer(true); + + typedef std::vector Key; + std::map> commitmentsMap; + + for (auto& p : prematureCommitments) { + auto& qc = p.second; + if (!validCommitments.count(p.first)) { + continue; + } + + // should have been verified before + assert(qc.CountValidMembers() >= params.minSize); + + auto it = commitmentsMap.find(qc.validMembers); + if (it == commitmentsMap.end()) { + it = commitmentsMap.emplace(qc.validMembers, std::vector()).first; + } + + it->second.emplace_back(qc); + } + + std::vector finalCommitments; + for (const auto& p : commitmentsMap) { + auto& cvec = p.second; + if (cvec.size() < params.minSize) { + // commitment was signed by a minority + continue; + } + + std::vector signerIds; + std::vector thresholdSigs; + + auto& first = cvec[0]; + + CFinalCommitment fqc(params, first.quorumHash); + fqc.validMembers = first.validMembers; + fqc.quorumPublicKey = first.quorumPublicKey; + fqc.quorumVvecHash = first.quorumVvecHash; + + uint256 commitmentHash = CLLMQUtils::BuildCommitmentHash(fqc.llmqType, fqc.quorumHash, fqc.validMembers, fqc.quorumPublicKey, fqc.quorumVvecHash); + + std::vector aggSigs; + std::vector aggPks; + aggSigs.reserve(cvec.size()); + aggPks.reserve(cvec.size()); + + for (size_t i = 0; i < cvec.size(); i++) { + auto& qc = cvec[i]; + + if (qc.quorumPublicKey != first.quorumPublicKey || qc.quorumVvecHash != first.quorumVvecHash) { + logger.Printf("quorumPublicKey or quorumVvecHash does not match, skipping"); + continue; + } + + size_t signerIndex = membersMap[qc.proTxHash]; + const auto& m = members[signerIndex]; + + fqc.signers[signerIndex] = true; + aggSigs.emplace_back(qc.sig); + aggPks.emplace_back(m->dmn->pdmnState->pubKeyOperator); + + signerIds.emplace_back(m->id); + thresholdSigs.emplace_back(qc.quorumSig); + } + + cxxtimer::Timer t1(true); + fqc.membersSig = CBLSSignature::AggregateSecure(aggSigs, aggPks, commitmentHash); + t1.stop(); + + cxxtimer::Timer t2(true); + if (!fqc.quorumSig.Recover(thresholdSigs, signerIds)) { + logger.Printf("failed to recover quorum sig\n"); + continue; + } + t2.stop(); + + finalCommitments.emplace_back(fqc); + + logger.Printf("final commitment: validMembers=%d, signers=%d, quorumPublicKey=%s, time1=%d, time2=%d\n", + fqc.CountValidMembers(), fqc.CountSigners(), fqc.quorumPublicKey.ToString(), + t1.count(), t2.count()); + } + + logger.Flush(); + + return finalCommitments; +} + +CDKGMember* CDKGSession::GetMember(const uint256& proTxHash) +{ + auto it = membersMap.find(proTxHash); + if (it == membersMap.end()) { + return nullptr; + } + return members[it->second].get(); +} + +void CDKGSession::MarkBadMember(size_t idx) +{ + auto member = members.at(idx).get(); + if (member->bad) { + return; + } + member->bad = true; +} + +bool CDKGSession::Seen(const uint256& msgHash) +{ + return !seenMessages.emplace(msgHash).second; +} + +void CDKGSession::AddParticipatingNode(NodeId nodeId) +{ + LOCK(invCs); + g_connman->ForNode(nodeId, [&](CNode* pnode) { + if (!participatingNodes.emplace(pnode->addr).second) { + return true; + } + + for (auto& inv : invSet) { + pnode->PushInventory(inv); + } + return true; + }); +} + +void CDKGSession::RelayInvToParticipants(const CInv& inv) +{ + LOCK(invCs); + g_connman->ForEachNode([&](CNode* pnode) { + if (participatingNodes.count(pnode->addr)) { + pnode->PushInventory(inv); + } + }); +} + +} \ No newline at end of file diff --git a/src/llmq/quorums_dkgsession.h b/src/llmq/quorums_dkgsession.h new file mode 100644 index 000000000..7620bcf06 --- /dev/null +++ b/src/llmq/quorums_dkgsession.h @@ -0,0 +1,350 @@ +// Copyright (c) 2018 The Dash Core developers +// Distributed under the MIT/X11 software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef DASH_QUORUMS_DKGSESSION_H +#define DASH_QUORUMS_DKGSESSION_H + +#include "consensus/params.h" +#include "net.h" +#include "batchedlogger.h" + +#include "bls/bls_ies.h" +#include "bls/bls_worker.h" + +#include "evo/deterministicmns.h" +#include "evo/evodb.h" + +#include "llmq/quorums_utils.h" + +class UniValue; + +namespace llmq +{ + +class CFinalCommitment; +class CDKGSession; +class CDKGSessionManager; + +class CDKGLogger : public CBatchedLogger +{ +public: + CDKGLogger(CDKGSession& _quorumDkg, const std::string& _func); + CDKGLogger(Consensus::LLMQType _llmqType, const uint256& _quorumHash, int _height, bool _areWeMember, const std::string& _func); +}; + +class CDKGContribution +{ +public: + uint8_t llmqType; + uint256 quorumHash; + uint256 proTxHash; + BLSVerificationVectorPtr vvec; + std::shared_ptr> contributions; + CBLSSignature sig; + +public: + template + inline void SerializeWithoutSig(Stream& s) const + { + s << llmqType; + s << quorumHash; + s << proTxHash; + s << *vvec; + s << *contributions; + } + template + inline void Serialize(Stream& s) const + { + SerializeWithoutSig(s); + s << sig; + } + template + inline void Unserialize(Stream& s) + { + BLSVerificationVector tmp1; + CBLSIESMultiRecipientObjects tmp2; + + s >> llmqType; + s >> quorumHash; + s >> proTxHash; + s >> tmp1; + s >> tmp2; + s >> sig; + + vvec = std::make_shared(std::move(tmp1)); + contributions = std::make_shared>(std::move(tmp2)); + } + + uint256 GetSignHash() const + { + CHashWriter hw(SER_GETHASH, 0); + SerializeWithoutSig(hw); + hw << CBLSSignature(); + return hw.GetHash(); + } +}; + +class CDKGComplaint +{ +public: + uint8_t llmqType; + uint256 quorumHash; + uint256 proTxHash; + std::vector badMembers; + std::vector complainForMembers; + CBLSSignature sig; + +public: + CDKGComplaint() {} + CDKGComplaint(const Consensus::LLMQParams& params); + + ADD_SERIALIZE_METHODS + + template + inline void SerializationOp(Stream& s, Operation ser_action) + { + READWRITE(llmqType); + READWRITE(quorumHash); + READWRITE(proTxHash); + READWRITE(DYNBITSET(badMembers)); + READWRITE(DYNBITSET(complainForMembers)); + READWRITE(sig); + } + + uint256 GetSignHash() const + { + CDKGComplaint tmp(*this); + tmp.sig = CBLSSignature(); + return ::SerializeHash(tmp); + } +}; + +class CDKGJustification +{ +public: + uint8_t llmqType; + uint256 quorumHash; + uint256 proTxHash; + std::vector> contributions; + CBLSSignature sig; + +public: + ADD_SERIALIZE_METHODS + + template + inline void SerializationOp(Stream& s, Operation ser_action) + { + READWRITE(llmqType); + READWRITE(quorumHash); + READWRITE(proTxHash); + READWRITE(contributions); + READWRITE(sig); + } + + uint256 GetSignHash() const + { + CDKGJustification tmp(*this); + tmp.sig = CBLSSignature(); + return ::SerializeHash(tmp); + } +}; + +// each member commits to a single set of valid members with this message +// then each node aggregate all received premature commitments +// into a single CFinalCommitment, which is only valid if +// enough (>=minSize) premature commitments were aggregated +class CDKGPrematureCommitment +{ +public: + uint8_t llmqType; + uint256 quorumHash; + uint256 proTxHash; + std::vector validMembers; + + CBLSPublicKey quorumPublicKey; + uint256 quorumVvecHash; + + CBLSSignature quorumSig; // threshold sig share of quorumHash+validMembers+pubKeyHash+vvecHash + CBLSSignature sig; // single member sig of quorumHash+validMembers+pubKeyHash+vvecHash + +public: + CDKGPrematureCommitment() {} + CDKGPrematureCommitment(const Consensus::LLMQParams& params); + + int CountValidMembers() const + { + return (int)std::count(validMembers.begin(), validMembers.end(), true); + } + +public: + ADD_SERIALIZE_METHODS + + template + inline void SerializationOp(Stream& s, Operation ser_action) + { + READWRITE(llmqType); + READWRITE(quorumHash); + READWRITE(proTxHash); + READWRITE(DYNBITSET(validMembers)); + READWRITE(quorumPublicKey); + READWRITE(quorumVvecHash); + READWRITE(quorumSig); + READWRITE(sig); + } + + uint256 GetSignHash() const + { + return CLLMQUtils::BuildCommitmentHash(llmqType, quorumHash, validMembers, quorumPublicKey, quorumVvecHash); + } +}; + +class CDKGMember +{ +public: + CDKGMember(CDeterministicMNCPtr _dmn, size_t _idx); + + CDeterministicMNCPtr dmn; + size_t idx; + CBLSId id; + + std::set contributions; + std::set complaints; + std::set justifications; + std::set prematureCommitments; + + std::set badMemberVotes; + std::set complaintsFromOthers; + + bool bad{false}; + bool weComplain{false}; + bool someoneComplain{false}; +}; + +/** + * The DKG session is a single instance of the DKG process. It is owned and called by CDKGSessionHandler, which passes + * received DKG messages to the session. The session is not persistent and will loose it's state (the whole object is + * discarded) when it finishes (after the mining phase) or is aborted. + * + * When incoming contributions are received and the verification vector is valid, it is passed to CDKGSessionManager + * which will store it in the evo DB. Secret key contributions which are meant for the local member are also passed + * to CDKGSessionManager to store them in the evo DB. If verification of the SK contribution initially fails, it is + * not passed to CDKGSessionManager. If the justification phase later gives a valid SK contribution from the same + * member, it is then passed to CDKGSessionManager and after this handled the same way. + * + * The contributions stored by CDKGSessionManager are then later loaded by the quorum instances and used for signing + * sessions, but only if the local node is a member of the quorum. + */ +class CDKGSession +{ + friend class CDKGSessionHandler; + friend class CDKGSessionManager; + friend class CDKGLogger; + template friend class CDKGMessageHandler; + +private: + const Consensus::LLMQParams& params; + + CEvoDB& evoDb; + CBLSWorker& blsWorker; + CBLSWorkerCache cache; + CDKGSessionManager& dkgManager; + + uint256 quorumHash; + int height{-1}; + +private: + std::vector> members; + std::map membersMap; + BLSVerificationVectorPtr vvecContribution; + BLSSecretKeyVector skContributions; + + BLSIdVector memberIds; + std::vector receivedVvecs; + // these are not necessarily verified yet. Only trust in what was written to the DB + BLSSecretKeyVector receivedSkContributions; + + uint256 myProTxHash; + CBLSId myId; + size_t myIdx{(size_t)-1}; + + // all indexed by msg hash + // we expect to only receive a single vvec and contribution per member, but we must also be able to relay + // conflicting messages as otherwise an attacker might be able to broadcast conflicting (valid+invalid) messages + // and thus split the quorum. Such members are later removed from the quorum. + mutable CCriticalSection invCs; + std::map contributions; + std::map complaints; + std::map justifications; + std::map prematureCommitments; + std::set invSet; + std::set participatingNodes; + + std::set seenMessages; + + std::vector pendingContributionVerifications; + + // filled by ReceivePrematureCommitment and used by FinalizeCommitments + std::set validCommitments; + +public: + CDKGSession(const Consensus::LLMQParams& _params, CEvoDB& _evoDb, CBLSWorker& _blsWorker, CDKGSessionManager& _dkgManager) : + params(_params), evoDb(_evoDb), blsWorker(_blsWorker), cache(_blsWorker), dkgManager(_dkgManager) {} + + bool Init(int _height, const uint256& _quorumHash, const std::vector& mns, const uint256& _myProTxHash); + + /** + * The following sets of methods are for the first 4 phases handled in the session. The flow of message calls + * is identical for all phases: + * 1. Execute local action (e.g. create/send own contributions) + * 2. PreVerify incoming messages for this phase. Preverification means that everything from the message is checked + * that does not require too much resources for verification. This specifically excludes all CPU intensive BLS + * operations. + * 3. CDKGSessionHandler will collect pre verified messages in batches and perform batched BLS signature verification + * on these. + * 4. ReceiveMessage is called for each pre verified message with a valid signature. ReceiveMessage is also + * responsible for further verification of validity (e.g. validate vvecs and SK contributions). + */ + + // Phase 1: contribution + void Contribute(); + void SendContributions(); + bool PreVerifyMessage(const uint256& hash, const CDKGContribution& qc, bool& retBan); + void ReceiveMessage(const uint256& hash, const CDKGContribution& qc, bool& retBan); + void VerifyPendingContributions(); + + // Phase 2: complaint + void VerifyAndComplain(); + void SendComplaint(); + bool PreVerifyMessage(const uint256& hash, const CDKGComplaint& qc, bool& retBan); + void ReceiveMessage(const uint256& hash, const CDKGComplaint& qc, bool& retBan); + + // Phase 3: justification + void VerifyAndJustify(); + void SendJustification(const std::set& forMembers); + bool PreVerifyMessage(const uint256& hash, const CDKGJustification& qj, bool& retBan); + void ReceiveMessage(const uint256& hash, const CDKGJustification& qj, bool& retBan); + + // Phase 4: commit + void VerifyAndCommit(); + void SendCommitment(); + bool PreVerifyMessage(const uint256& hash, const CDKGPrematureCommitment& qc, bool& retBan); + void ReceiveMessage(const uint256& hash, const CDKGPrematureCommitment& qc, bool& retBan); + + // Phase 5: aggregate/finalize + std::vector FinalizeCommitments(); + + bool AreWeMember() const { return !myProTxHash.IsNull(); } + void MarkBadMember(size_t idx); + + bool Seen(const uint256& msgHash); + void AddParticipatingNode(NodeId nodeId); + void RelayInvToParticipants(const CInv& inv); + +public: + CDKGMember* GetMember(const uint256& proTxHash); +}; + +} + +#endif //DASH_QUORUMS_DKGSESSION_H diff --git a/src/llmq/quorums_dkgsessionhandler.cpp b/src/llmq/quorums_dkgsessionhandler.cpp new file mode 100644 index 000000000..a74bb5830 --- /dev/null +++ b/src/llmq/quorums_dkgsessionhandler.cpp @@ -0,0 +1,538 @@ +// Copyright (c) 2018 The Dash Core developers +// Distributed under the MIT/X11 software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "quorums_dkgsessionhandler.h" +#include "quorums_blockprocessor.h" +#include "quorums_debug.h" +#include "quorums_init.h" +#include "quorums_utils.h" + +#include "activemasternode.h" +#include "chainparams.h" +#include "init.h" +#include "net_processing.h" +#include "validation.h" + +namespace llmq +{ + +CDKGPendingMessages::CDKGPendingMessages(size_t _maxMessagesPerNode) : + maxMessagesPerNode(_maxMessagesPerNode) +{ +} + +void CDKGPendingMessages::PushPendingMessage(NodeId from, CDataStream& vRecv) +{ + // this will also consume the data, even if we bail out early + auto pm = std::make_shared(std::move(vRecv)); + + { + LOCK(cs); + + if (messagesPerNode[from] >= maxMessagesPerNode) { + // TODO ban? + LogPrint("net", "CDKGPendingMessages::%s -- too many messages, peer=%d\n", __func__, from); + return; + } + messagesPerNode[from]++; + } + + CHashWriter hw(SER_GETHASH, 0); + hw.write(pm->data(), pm->size()); + uint256 hash = hw.GetHash(); + + LOCK2(cs_main, cs); + + if (!seenMessages.emplace(hash).second) { + LogPrint("net", "CDKGPendingMessages::%s -- already seen %s, peer=%d", __func__, from); + return; + } + + g_connman->RemoveAskFor(hash); + + pendingMessages.emplace_back(std::make_pair(from, std::move(pm))); +} + +std::list CDKGPendingMessages::PopPendingMessages(size_t maxCount) +{ + LOCK(cs); + + std::list ret; + while (!pendingMessages.empty() && ret.size() < maxCount) { + ret.emplace_back(std::move(pendingMessages.front())); + pendingMessages.pop_front(); + } + + return std::move(ret); +} + +bool CDKGPendingMessages::HasSeen(const uint256& hash) const +{ + LOCK(cs); + return seenMessages.count(hash) != 0; +} + +void CDKGPendingMessages::Clear() +{ + LOCK(cs); + pendingMessages.clear(); + messagesPerNode.clear(); + seenMessages.clear(); +} + +////// + +CDKGSessionHandler::CDKGSessionHandler(const Consensus::LLMQParams& _params, CEvoDB& _evoDb, ctpl::thread_pool& _messageHandlerPool, CBLSWorker& _blsWorker, CDKGSessionManager& _dkgManager) : + params(_params), + evoDb(_evoDb), + messageHandlerPool(_messageHandlerPool), + blsWorker(_blsWorker), + dkgManager(_dkgManager), + curSession(std::make_shared(_params, _evoDb, _blsWorker, _dkgManager)), + pendingContributions((size_t)_params.size * 2), // we allow size*2 messages as we need to make sure we see bad behavior (double messages) + pendingComplaints((size_t)_params.size * 2), + pendingJustifications((size_t)_params.size * 2), + pendingPrematureCommitments((size_t)_params.size * 2) +{ + phaseHandlerThread = std::thread([this] { + RenameThread(strprintf("quorum-phase-%d", (uint8_t)params.type).c_str()); + PhaseHandlerThread(); + }); +} + +CDKGSessionHandler::~CDKGSessionHandler() +{ + stopRequested = true; + if (phaseHandlerThread.joinable()) { + phaseHandlerThread.join(); + } +} + +void CDKGSessionHandler::UpdatedBlockTip(const CBlockIndex* pindexNew, const CBlockIndex* pindexFork, bool fInitialDownload) +{ + AssertLockHeld(cs_main); + LOCK(cs); + + int quorumStageInt = pindexNew->nHeight % params.dkgInterval; + CBlockIndex* pindexQuorum = chainActive[pindexNew->nHeight - quorumStageInt]; + + quorumHeight = pindexQuorum->nHeight; + quorumHash = pindexQuorum->GetBlockHash(); + + QuorumPhase newPhase = phase; + if (quorumStageInt == 0) { + newPhase = QuorumPhase_Initialized; + } else if (quorumStageInt == params.dkgPhaseBlocks * 1) { + newPhase = QuorumPhase_Contribute; + } else if (quorumStageInt == params.dkgPhaseBlocks * 2) { + newPhase = QuorumPhase_Complain; + } else if (quorumStageInt == params.dkgPhaseBlocks * 3) { + newPhase = QuorumPhase_Justify; + } else if (quorumStageInt == params.dkgPhaseBlocks * 4) { + newPhase = QuorumPhase_Commit; + } else if (quorumStageInt == params.dkgPhaseBlocks * 5) { + newPhase = QuorumPhase_Finalize; + } else if (quorumStageInt == params.dkgPhaseBlocks * 6) { + newPhase = QuorumPhase_Idle; + } + phase = newPhase; +} + +void CDKGSessionHandler::ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman) +{ + // We don't handle messages in the calling thread as deserialization/processing of these would block everything + if (strCommand == NetMsgType::QCONTRIB) { + pendingContributions.PushPendingMessage(pfrom->id, vRecv); + } else if (strCommand == NetMsgType::QCOMPLAINT) { + pendingComplaints.PushPendingMessage(pfrom->id, vRecv); + } else if (strCommand == NetMsgType::QJUSTIFICATION) { + pendingJustifications.PushPendingMessage(pfrom->id, vRecv); + } else if (strCommand == NetMsgType::QPCOMMITMENT) { + pendingPrematureCommitments.PushPendingMessage(pfrom->id, vRecv); + } +} + +bool CDKGSessionHandler::InitNewQuorum(int height, const uint256& quorumHash) +{ + //AssertLockHeld(cs_main); + + const auto& consensus = Params().GetConsensus(); + + curSession = std::make_shared(params, evoDb, blsWorker, dkgManager); + + if (!deterministicMNManager->IsDIP3Active(height)) { + return false; + } + + auto mns = CLLMQUtils::GetAllQuorumMembers(params.type, quorumHash); + + if (!curSession->Init(height, quorumHash, mns, activeMasternodeInfo.proTxHash)) { + LogPrintf("CDKGSessionManager::%s -- quorum initialiation failed\n", __func__); + return false; + } + + return true; +} + +std::pair CDKGSessionHandler::GetPhaseAndQuorumHash() +{ + LOCK(cs); + return std::make_pair(phase, quorumHash); +} + +class AbortPhaseException : public std::exception { +}; + +void CDKGSessionHandler::WaitForNextPhase(QuorumPhase curPhase, + QuorumPhase nextPhase, + uint256& expectedQuorumHash, + const WhileWaitFunc& runWhileWaiting) +{ + while (true) { + if (stopRequested || ShutdownRequested()) { + throw AbortPhaseException(); + } + auto p = GetPhaseAndQuorumHash(); + if (!expectedQuorumHash.IsNull() && p.second != expectedQuorumHash) { + throw AbortPhaseException(); + } + if (p.first == nextPhase) { + expectedQuorumHash = p.second; + return; + } + if (curPhase != QuorumPhase_None && p.first != curPhase) { + throw AbortPhaseException(); + } + if (!runWhileWaiting()) { + MilliSleep(100); + } + } +} + +void CDKGSessionHandler::WaitForNewQuorum(const uint256& oldQuorumHash) +{ + while (true) { + if (stopRequested || ShutdownRequested()) { + throw AbortPhaseException(); + } + auto p = GetPhaseAndQuorumHash(); + if (p.second != oldQuorumHash) { + return; + } + MilliSleep(100); + } +} + +void CDKGSessionHandler::RandomSleep(QuorumPhase curPhase, + uint256& expectedQuorumHash, + double randomSleepFactor, + const WhileWaitFunc& runWhileWaiting) +{ + // Randomly sleep some time to not fully overload the whole network + int64_t endTime = GetTimeMillis() + GetRandInt((int)(params.dkgRndSleepTime * randomSleepFactor)); + while (GetTimeMillis() < endTime) { + if (stopRequested || ShutdownRequested()) { + throw AbortPhaseException(); + } + auto p = GetPhaseAndQuorumHash(); + if (p.first != curPhase || p.second != expectedQuorumHash) { + throw AbortPhaseException(); + } + if (!runWhileWaiting()) { + MilliSleep(100); + } + } +} + +void CDKGSessionHandler::HandlePhase(QuorumPhase curPhase, + QuorumPhase nextPhase, + uint256& expectedQuorumHash, + double randomSleepFactor, + const StartPhaseFunc& startPhaseFunc, + const WhileWaitFunc& runWhileWaiting) +{ + RandomSleep(curPhase, expectedQuorumHash, randomSleepFactor, runWhileWaiting); + startPhaseFunc(); + WaitForNextPhase(curPhase, nextPhase, expectedQuorumHash, runWhileWaiting); +} + +// returns a set of NodeIds which sent invalid messages +template +std::set BatchVerifyMessageSigs(CDKGSession& session, const std::vector>>& messages) +{ + if (messages.empty()) { + return {}; + } + + std::set ret; + bool revertToSingleVerification = false; + + CBLSSignature aggSig; + std::vector pubKeys; + std::vector messageHashes; + std::set messageHashesSet; + pubKeys.reserve(messages.size()); + messageHashes.reserve(messages.size()); + bool first = true; + for (const auto& p : messages ) { + const auto& msg = *p.second; + + auto member = session.GetMember(msg.proTxHash); + if (!member) { + // should not happen as it was verified before + ret.emplace(p.first); + continue; + } + + if (first) { + aggSig = msg.sig; + } else { + aggSig.AggregateInsecure(msg.sig); + } + first = false; + + auto msgHash = msg.GetSignHash(); + if (!messageHashesSet.emplace(msgHash).second) { + // can only happen in 2 cases: + // 1. Someone sent us the same message twice but with differing signature, meaning that at least one of them + // must be invalid. In this case, we'd have to revert to single message verification nevertheless + // 2. Someone managed to find a way to create two different binary representations of a message that deserializes + // to the same object representation. This would be some form of malleability. However, this shouldn't be + // possible as only deterministic/unique BLS signatures and very simple data types are involved + revertToSingleVerification = true; + break; + } + + pubKeys.emplace_back(member->dmn->pdmnState->pubKeyOperator); + messageHashes.emplace_back(msgHash); + } + if (!revertToSingleVerification) { + bool valid = aggSig.VerifyInsecureAggregated(pubKeys, messageHashes); + if (valid) { + // all good + return ret; + } + + // are all messages from the same node? + NodeId firstNodeId; + first = true; + bool nodeIdsAllSame = true; + for (auto it = messages.begin(); it != messages.end(); ++it) { + if (first) { + firstNodeId = it->first; + } else { + first = false; + if (it->first != firstNodeId) { + nodeIdsAllSame = false; + break; + } + } + } + // if yes, take a short path and return a set with only him + if (nodeIdsAllSame) { + ret.emplace(firstNodeId); + return ret; + } + // different nodes, let's figure out who are the bad ones + } + + for (const auto& p : messages) { + if (ret.count(p.first)) { + continue; + } + + const auto& msg = *p.second; + auto member = session.GetMember(msg.proTxHash); + bool valid = msg.sig.VerifyInsecure(member->dmn->pdmnState->pubKeyOperator, msg.GetSignHash()); + if (!valid) { + ret.emplace(p.first); + } + } + return ret; +} + +template +bool ProcessPendingMessageBatch(CDKGSession& session, CDKGPendingMessages& pendingMessages, size_t maxCount) +{ + auto msgs = pendingMessages.PopAndDeserializeMessages(maxCount); + if (msgs.empty()) { + return false; + } + + std::vector hashes; + std::vector>> preverifiedMessages; + hashes.reserve(msgs.size()); + preverifiedMessages.reserve(msgs.size()); + + for (const auto& p : msgs) { + if (!p.second) { + LogPrint("net", "%s -- failed to deserialize message, peer=%d", __func__, p.first); + { + LOCK(cs_main); + Misbehaving(p.first, 100); + } + continue; + } + const auto& msg = *p.second; + + auto hash = ::SerializeHash(msg); + { + LOCK(cs_main); + g_connman->RemoveAskFor(hash); + } + + bool ban = false; + if (!session.PreVerifyMessage(hash, msg, ban)) { + if (ban) { + LogPrint("net", "%s -- banning node due to failed preverification, peer=%d", __func__, p.first); + { + LOCK(cs_main); + Misbehaving(p.first, 100); + } + } + LogPrint("net", "%s -- skipping message due to failed preverification, peer=%d", __func__, p.first); + continue; + } + hashes.emplace_back(hash); + preverifiedMessages.emplace_back(p); + } + if (preverifiedMessages.empty()) { + return true; + } + + auto badNodes = BatchVerifyMessageSigs(session, preverifiedMessages); + if (!badNodes.empty()) { + LOCK(cs_main); + for (auto nodeId : badNodes) { + LogPrint("net", "%s -- failed to verify signature, peer=%d", __func__, nodeId); + Misbehaving(nodeId, 100); + } + } + + for (size_t i = 0; i < preverifiedMessages.size(); i++) { + NodeId nodeId = preverifiedMessages[i].first; + if (badNodes.count(nodeId)) { + continue; + } + const auto& msg = *preverifiedMessages[i].second; + bool ban = false; + session.ReceiveMessage(hashes[i], msg, ban); + if (ban) { + LogPrint("net", "%s -- banning node after ReceiveMessage failed, peer=%d", __func__, nodeId); + LOCK(cs_main); + Misbehaving(nodeId, 100); + badNodes.emplace(nodeId); + } + } + + for (const auto& p : preverifiedMessages) { + NodeId nodeId = p.first; + if (badNodes.count(nodeId)) { + continue; + } + session.AddParticipatingNode(nodeId); + } + + return true; +} + +void CDKGSessionHandler::HandleDKGRound() +{ + uint256 curQuorumHash; + + WaitForNextPhase(QuorumPhase_None, QuorumPhase_Initialized, curQuorumHash, []{return false;}); + + { + LOCK(cs); + pendingContributions.Clear(); + pendingComplaints.Clear(); + pendingJustifications.Clear(); + pendingPrematureCommitments.Clear(); + } + + if (!InitNewQuorum(quorumHeight, quorumHash)) { + // should actually never happen + WaitForNewQuorum(curQuorumHash); + throw AbortPhaseException(); + } + + if (curSession->AreWeMember() || GetBoolArg("-watchquorums", DEFAULT_WATCH_QUORUMS)) { + std::set connections; + if (curSession->AreWeMember()) { + connections = CLLMQUtils::GetQuorumConnections(params.type, curQuorumHash, curSession->myProTxHash); + } else { + auto cindexes = CLLMQUtils::CalcDeterministicWatchConnections(params.type, curQuorumHash, curSession->members.size(), 1); + for (auto idx : cindexes) { + connections.emplace(curSession->members[idx]->dmn->pdmnState->addr); + } + } + if (!connections.empty()) { + std::string debugMsg = strprintf("CDKGSessionManager::%s -- adding masternodes quorum connections for quorum %s:\n", __func__, curSession->quorumHash.ToString()); + for (auto& c : connections) { + debugMsg += strprintf(" %s\n", c.ToString(false)); + } + LogPrintf(debugMsg); + g_connman->AddMasternodeQuorumNodes(params.type, curQuorumHash, connections); + + LOCK(curSession->invCs); + curSession->participatingNodes = g_connman->GetMasternodeQuorumAddresses(params.type, curQuorumHash); + } + } + + WaitForNextPhase(QuorumPhase_Initialized, QuorumPhase_Contribute, curQuorumHash, []{return false;}); + + // Contribute + auto fContributeStart = [this]() { + curSession->Contribute(); + }; + auto fContributeWait = [this] { + return ProcessPendingMessageBatch(*curSession, pendingContributions, 8); + }; + HandlePhase(QuorumPhase_Contribute, QuorumPhase_Complain, curQuorumHash, 1, fContributeStart, fContributeWait); + + // Complain + auto fComplainStart = [this]() { + curSession->VerifyAndComplain(); + }; + auto fComplainWait = [this] { + return ProcessPendingMessageBatch(*curSession, pendingComplaints, 8); + }; + HandlePhase(QuorumPhase_Complain, QuorumPhase_Justify, curQuorumHash, 0, fComplainStart, fComplainWait); + + // Justify + auto fJustifyStart = [this]() { + curSession->VerifyAndJustify(); + }; + auto fJustifyWait = [this] { + return ProcessPendingMessageBatch(*curSession, pendingJustifications, 8); + }; + HandlePhase(QuorumPhase_Justify, QuorumPhase_Commit, curQuorumHash, 0, fJustifyStart, fJustifyWait); + + // Commit + auto fCommitStart = [this]() { + curSession->VerifyAndCommit(); + }; + auto fCommitWait = [this] { + return ProcessPendingMessageBatch(*curSession, pendingPrematureCommitments, 8); + }; + HandlePhase(QuorumPhase_Commit, QuorumPhase_Finalize, curQuorumHash, 1, fCommitStart, fCommitWait); + + auto finalCommitments = curSession->FinalizeCommitments(); + for (auto& fqc : finalCommitments) { + quorumBlockProcessor->AddMinableCommitment(fqc); + } +} + +void CDKGSessionHandler::PhaseHandlerThread() +{ + while (!stopRequested && !ShutdownRequested()) { + try { + HandleDKGRound(); + } catch (AbortPhaseException& e) { + LogPrintf("CDKGSessionHandler::%s -- aborted current DKG session\n", __func__); + } + } +} + +} diff --git a/src/llmq/quorums_dkgsessionhandler.h b/src/llmq/quorums_dkgsessionhandler.h new file mode 100644 index 000000000..1d5c6fc9e --- /dev/null +++ b/src/llmq/quorums_dkgsessionhandler.h @@ -0,0 +1,138 @@ +// Copyright (c) 2018 The Dash Core developers +// Distributed under the MIT/X11 software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef DASH_QUORUMS_DKGSESSIONHANDLER_H +#define DASH_QUORUMS_DKGSESSIONHANDLER_H + +#include "llmq/quorums_dkgsession.h" + +#include "validation.h" + +#include "ctpl.h" + +namespace llmq +{ + +enum QuorumPhase { + QuorumPhase_Idle, + QuorumPhase_Initialized, + QuorumPhase_Contribute, + QuorumPhase_Complain, + QuorumPhase_Justify, + QuorumPhase_Commit, + QuorumPhase_Finalize, + + QuorumPhase_None=-1, +}; + +/** + * Acts as a FIFO queue for incoming DKG messages. The reason we need this is that deserialization of these messages + * is too slow to be processed in the main message handler thread. So, instead of processing them directly from the + * main handler thread, we push them into a CDKGPendingMessages object and later pop+deserialize them in the DKG phase + * handler thread. + * + * Each message type has it's own instance of this class. + */ +class CDKGPendingMessages +{ +public: + typedef std::pair> BinaryMessage; + +private: + mutable CCriticalSection cs; + size_t maxMessagesPerNode; + std::list pendingMessages; + std::map messagesPerNode; + std::set seenMessages; + +public: + CDKGPendingMessages(size_t _maxMessagesPerNode); + + void PushPendingMessage(NodeId from, CDataStream& vRecv); + std::list PopPendingMessages(size_t maxCount); + bool HasSeen(const uint256& hash) const; + void Clear(); + + // Might return nullptr messages, which indicates that deserialization failed for some reason + template + std::vector>> PopAndDeserializeMessages(size_t maxCount) + { + auto binaryMessages = PopPendingMessages(maxCount); + if (binaryMessages.empty()) { + return {}; + } + + std::vector>> ret; + ret.reserve(binaryMessages.size()); + for (auto& bm : binaryMessages) { + auto msg = std::make_shared(); + try { + *bm.second >> *msg; + } catch (...) { + msg = nullptr; + } + ret.emplace_back(std::make_pair(bm.first, std::move(msg))); + } + + return std::move(ret); + } +}; + +/** + * Handles multiple sequential sessions of one specific LLMQ type. There is one instance of this class per LLMQ type. + * + * It internally starts the phase handler thread, which constantly loops and sequentially processes one session at a + * time and waiting for the next phase if necessary. + */ +class CDKGSessionHandler +{ +private: + friend class CDKGSessionManager; + +private: + mutable CCriticalSection cs; + std::atomic stopRequested{false}; + + const Consensus::LLMQParams& params; + CEvoDB& evoDb; + ctpl::thread_pool& messageHandlerPool; + CBLSWorker& blsWorker; + CDKGSessionManager& dkgManager; + + QuorumPhase phase{QuorumPhase_Idle}; + int quorumHeight{-1}; + uint256 quorumHash; + std::shared_ptr curSession; + std::thread phaseHandlerThread; + + CDKGPendingMessages pendingContributions; + CDKGPendingMessages pendingComplaints; + CDKGPendingMessages pendingJustifications; + CDKGPendingMessages pendingPrematureCommitments; + +public: + CDKGSessionHandler(const Consensus::LLMQParams& _params, CEvoDB& _evoDb, ctpl::thread_pool& _messageHandlerPool, CBLSWorker& blsWorker, CDKGSessionManager& _dkgManager); + ~CDKGSessionHandler(); + + void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload); + void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman); + +private: + bool InitNewQuorum(int height, const uint256& quorumHash); + + std::pair GetPhaseAndQuorumHash(); + + typedef std::function StartPhaseFunc; + typedef std::function WhileWaitFunc; + void WaitForNextPhase(QuorumPhase curPhase, QuorumPhase nextPhase, uint256& expectedQuorumHash, const WhileWaitFunc& runWhileWaiting); + void WaitForNewQuorum(const uint256& oldQuorumHash); + void RandomSleep(QuorumPhase curPhase, uint256& expectedQuorumHash, double randomSleepFactor, const WhileWaitFunc& runWhileWaiting); + void HandlePhase(QuorumPhase curPhase, QuorumPhase nextPhase, uint256& expectedQuorumHash, double randomSleepFactor, const StartPhaseFunc& startPhaseFunc, const WhileWaitFunc& runWhileWaiting); + void HandleDKGRound(); + void PhaseHandlerThread(); +}; + +} + +#endif //DASH_QUORUMS_DKGSESSIONHANDLER_H diff --git a/src/llmq/quorums_dkgsessionmgr.cpp b/src/llmq/quorums_dkgsessionmgr.cpp new file mode 100644 index 000000000..f397f8e5a --- /dev/null +++ b/src/llmq/quorums_dkgsessionmgr.cpp @@ -0,0 +1,279 @@ +// Copyright (c) 2018 The Dash Core developers +// Distributed under the MIT/X11 software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "quorums_dkgsessionmgr.h" +#include "quorums_blockprocessor.h" +#include "quorums_init.h" +#include "quorums_utils.h" + +#include "chainparams.h" +#include "net_processing.h" +#include "spork.h" +#include "validation.h" + +namespace llmq +{ + +CDKGSessionManager* quorumDKGSessionManager; + +static const std::string DB_VVEC = "qdkg_V"; +static const std::string DB_SKCONTRIB = "qdkg_S"; + +CDKGSessionManager::CDKGSessionManager(CEvoDB& _evoDb, CBLSWorker& _blsWorker) : + evoDb(_evoDb), + blsWorker(_blsWorker) +{ + for (auto& qt : Params().GetConsensus().llmqs) { + dkgSessionHandlers.emplace(std::piecewise_construct, + std::forward_as_tuple(qt.first), + std::forward_as_tuple(qt.second, _evoDb, messageHandlerPool, blsWorker, *this)); + } + + messageHandlerPool.resize(2); + RenameThreadPool(messageHandlerPool, "quorum-msg"); +} + +CDKGSessionManager::~CDKGSessionManager() +{ + messageHandlerPool.stop(true); +} + +void CDKGSessionManager::UpdatedBlockTip(const CBlockIndex* pindexNew, const CBlockIndex* pindexFork, bool fInitialDownload) +{ + const auto& consensus = Params().GetConsensus(); + + CleanupCache(); + + if (fInitialDownload) + return; + if (!deterministicMNManager->IsDIP3Active(pindexNew->nHeight)) + return; + if (!sporkManager.IsSporkActive(SPORK_17_QUORUM_DKG_ENABLED)) + return; + + LOCK(cs_main); + + for (auto& qt : dkgSessionHandlers) { + qt.second.UpdatedBlockTip(pindexNew, pindexFork, fInitialDownload); + } +} + +void CDKGSessionManager::ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman) +{ + if (!sporkManager.IsSporkActive(SPORK_17_QUORUM_DKG_ENABLED)) + return; + + if (strCommand != NetMsgType::QCONTRIB + && strCommand != NetMsgType::QCOMPLAINT + && strCommand != NetMsgType::QJUSTIFICATION + && strCommand != NetMsgType::QPCOMMITMENT + && strCommand != NetMsgType::QWATCH) { + return; + } + + if (strCommand == NetMsgType::QWATCH) { + pfrom->qwatch = true; + for (auto& p : dkgSessionHandlers) { + LOCK2(p.second.cs, p.second.curSession->invCs); + p.second.curSession->participatingNodes.emplace(pfrom->addr); + } + return; + } + + if (vRecv.size() < 1) { + LOCK(cs_main); + Misbehaving(pfrom->id, 100); + return; + } + + // peek into the message and see which LLMQType it is. First byte of all messages is always the LLMQType + Consensus::LLMQType llmqType = (Consensus::LLMQType)*vRecv.begin(); + if (!dkgSessionHandlers.count(llmqType)) { + LOCK(cs_main); + Misbehaving(pfrom->id, 100); + return; + } + + dkgSessionHandlers.at(llmqType).ProcessMessage(pfrom, strCommand, vRecv, connman); +} + +bool CDKGSessionManager::AlreadyHave(const CInv& inv) const +{ + if (!sporkManager.IsSporkActive(SPORK_17_QUORUM_DKG_ENABLED)) + return false; + + for (auto& p : dkgSessionHandlers) { + auto& dkgType = p.second; + if (dkgType.pendingContributions.HasSeen(inv.hash) + || dkgType.pendingComplaints.HasSeen(inv.hash) + || dkgType.pendingJustifications.HasSeen(inv.hash) + || dkgType.pendingPrematureCommitments.HasSeen(inv.hash)) { + return true; + } + } + return false; +} + +bool CDKGSessionManager::GetContribution(const uint256& hash, CDKGContribution& ret) const +{ + if (!sporkManager.IsSporkActive(SPORK_17_QUORUM_DKG_ENABLED)) + return false; + + for (auto& p : dkgSessionHandlers) { + auto& dkgType = p.second; + LOCK2(dkgType.cs, dkgType.curSession->invCs); + if (dkgType.phase < QuorumPhase_Initialized || dkgType.phase > QuorumPhase_Contribute) { + continue; + } + auto it = dkgType.curSession->contributions.find(hash); + if (it != dkgType.curSession->contributions.end()) { + ret = it->second; + return true; + } + } + return false; +} + +bool CDKGSessionManager::GetComplaint(const uint256& hash, CDKGComplaint& ret) const +{ + if (!sporkManager.IsSporkActive(SPORK_17_QUORUM_DKG_ENABLED)) + return false; + + for (auto& p : dkgSessionHandlers) { + auto& dkgType = p.second; + LOCK2(dkgType.cs, dkgType.curSession->invCs); + if (dkgType.phase < QuorumPhase_Contribute || dkgType.phase > QuorumPhase_Complain) { + continue; + } + auto it = dkgType.curSession->complaints.find(hash); + if (it != dkgType.curSession->complaints.end()) { + ret = it->second; + return true; + } + } + return false; +} + +bool CDKGSessionManager::GetJustification(const uint256& hash, CDKGJustification& ret) const +{ + if (!sporkManager.IsSporkActive(SPORK_17_QUORUM_DKG_ENABLED)) + return false; + + for (auto& p : dkgSessionHandlers) { + auto& dkgType = p.second; + LOCK2(dkgType.cs, dkgType.curSession->invCs); + if (dkgType.phase < QuorumPhase_Complain || dkgType.phase > QuorumPhase_Justify) { + continue; + } + auto it = dkgType.curSession->justifications.find(hash); + if (it != dkgType.curSession->justifications.end()) { + ret = it->second; + return true; + } + } + return false; +} + +bool CDKGSessionManager::GetPrematureCommitment(const uint256& hash, CDKGPrematureCommitment& ret) const +{ + if (!sporkManager.IsSporkActive(SPORK_17_QUORUM_DKG_ENABLED)) + return false; + + for (auto& p : dkgSessionHandlers) { + auto& dkgType = p.second; + LOCK2(dkgType.cs, dkgType.curSession->invCs); + if (dkgType.phase < QuorumPhase_Justify || dkgType.phase > QuorumPhase_Commit) { + continue; + } + auto it = dkgType.curSession->prematureCommitments.find(hash); + if (it != dkgType.curSession->prematureCommitments.end() && dkgType.curSession->validCommitments.count(hash)) { + ret = it->second; + return true; + } + } + return false; +} + +void CDKGSessionManager::WriteVerifiedVvecContribution(Consensus::LLMQType llmqType, const uint256& quorumHash, const uint256& proTxHash, const BLSVerificationVectorPtr& vvec) +{ + evoDb.GetRawDB().Write(std::make_tuple(DB_VVEC, (uint8_t)llmqType, quorumHash, proTxHash), *vvec); +} + +void CDKGSessionManager::WriteVerifiedSkContribution(Consensus::LLMQType llmqType, const uint256& quorumHash, const uint256& proTxHash, const CBLSSecretKey& skContribution) +{ + evoDb.GetRawDB().Write(std::make_tuple(DB_SKCONTRIB, (uint8_t)llmqType, quorumHash, proTxHash), skContribution); +} + +bool CDKGSessionManager::GetVerifiedContributions(Consensus::LLMQType llmqType, const uint256& quorumHash, const std::vector& validMembers, std::vector& memberIndexesRet, std::vector& vvecsRet, BLSSecretKeyVector& skContributionsRet) +{ + auto members = CLLMQUtils::GetAllQuorumMembers(llmqType, quorumHash); + + if (validMembers.size() != members.size()) { + // should never happen as we should always call this method with correct params + return false; + } + + memberIndexesRet.clear(); + vvecsRet.clear(); + skContributionsRet.clear(); + memberIndexesRet.reserve(members.size()); + vvecsRet.reserve(members.size()); + skContributionsRet.reserve(members.size()); + for (size_t i = 0; i < members.size(); i++) { + if (validMembers[i]) { + BLSVerificationVectorPtr vvec; + CBLSSecretKey skContribution; + if (!GetVerifiedContribution(llmqType, quorumHash, members[i]->proTxHash, vvec, skContribution)) { + return false; + } + + memberIndexesRet.emplace_back(i); + vvecsRet.emplace_back(vvec); + skContributionsRet.emplace_back(skContribution); + } + } + return true; +} + +bool CDKGSessionManager::GetVerifiedContribution(Consensus::LLMQType llmqType, const uint256& quorumHash, const uint256& proTxHash, BLSVerificationVectorPtr& vvecRet, CBLSSecretKey& skContributionRet) +{ + LOCK(contributionsCacheCs); + ContributionsCacheKey cacheKey = {llmqType, quorumHash, proTxHash}; + auto it = contributionsCache.find(cacheKey); + if (it != contributionsCache.end()) { + vvecRet = it->second.vvec; + skContributionRet = it->second.skContribution; + return true; + } + + BLSVerificationVector vvec; + BLSVerificationVectorPtr vvecPtr; + CBLSSecretKey skContribution; + if (evoDb.GetRawDB().Read(std::make_tuple(DB_VVEC, (uint8_t)llmqType, quorumHash, proTxHash), vvec)) { + vvecPtr = std::make_shared(std::move(vvec)); + } + evoDb.GetRawDB().Read(std::make_tuple(DB_SKCONTRIB, (uint8_t)llmqType, quorumHash, proTxHash), skContribution); + + it = contributionsCache.emplace(cacheKey, ContributionsCacheEntry{GetTimeMillis(), vvecPtr, skContribution}).first; + + vvecRet = it->second.vvec; + skContributionRet = it->second.skContribution; + + return true; +} + +void CDKGSessionManager::CleanupCache() +{ + LOCK(contributionsCacheCs); + auto curTime = GetTimeMillis(); + for (auto it = contributionsCache.begin(); it != contributionsCache.end(); ) { + if (curTime - it->second.entryTime > MAX_CONTRIBUTION_CACHE_TIME) { + it = contributionsCache.erase(it); + } else { + ++it; + } + } +} + +} diff --git a/src/llmq/quorums_dkgsessionmgr.h b/src/llmq/quorums_dkgsessionmgr.h new file mode 100644 index 000000000..4b506e07f --- /dev/null +++ b/src/llmq/quorums_dkgsessionmgr.h @@ -0,0 +1,76 @@ +// Copyright (c) 2018 The Dash Core developers +// Distributed under the MIT/X11 software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef DASH_QUORUMS_DKGSESSIONMGR_H +#define DASH_QUORUMS_DKGSESSIONMGR_H + +#include "llmq/quorums_dkgsessionhandler.h" + +#include "validation.h" + +#include "ctpl.h" + +class UniValue; + +namespace llmq +{ + +class CDKGSessionManager +{ + static const int64_t MAX_CONTRIBUTION_CACHE_TIME = 60 * 1000; + +private: + CEvoDB& evoDb; + CBLSWorker& blsWorker; + ctpl::thread_pool messageHandlerPool; + + std::map dkgSessionHandlers; + + CCriticalSection contributionsCacheCs; + struct ContributionsCacheKey { + Consensus::LLMQType llmqType; + uint256 quorumHash; + uint256 proTxHash; + bool operator<(const ContributionsCacheKey& r) const + { + if (llmqType != r.llmqType) return llmqType < r.llmqType; + if (quorumHash != r.quorumHash) return quorumHash < r.quorumHash; + return proTxHash < r.proTxHash; + } + }; + struct ContributionsCacheEntry { + int64_t entryTime; + BLSVerificationVectorPtr vvec; + CBLSSecretKey skContribution; + }; + std::map contributionsCache; + +public: + CDKGSessionManager(CEvoDB& _evoDb, CBLSWorker& _blsWorker); + ~CDKGSessionManager(); + + void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload); + + void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman); + bool AlreadyHave(const CInv& inv) const; + bool GetContribution(const uint256& hash, CDKGContribution& ret) const; + bool GetComplaint(const uint256& hash, CDKGComplaint& ret) const; + bool GetJustification(const uint256& hash, CDKGJustification& ret) const; + bool GetPrematureCommitment(const uint256& hash, CDKGPrematureCommitment& ret) const; + + // Verified contributions are written while in the DKG + void WriteVerifiedVvecContribution(Consensus::LLMQType llmqType, const uint256& quorumHash, const uint256& proTxHash, const BLSVerificationVectorPtr& vvec); + void WriteVerifiedSkContribution(Consensus::LLMQType llmqType, const uint256& quorumHash, const uint256& proTxHash, const CBLSSecretKey& skContribution); + bool GetVerifiedContributions(Consensus::LLMQType llmqType, const uint256& quorumHash, const std::vector& validMembers, std::vector& memberIndexesRet, std::vector& vvecsRet, BLSSecretKeyVector& skContributionsRet); + bool GetVerifiedContribution(Consensus::LLMQType llmqType, const uint256& quorumHash, const uint256& proTxHash, BLSVerificationVectorPtr& vvecRet, CBLSSecretKey& skContributionRet); + +private: + void CleanupCache(); +}; + +extern CDKGSessionManager* quorumDKGSessionManager; + +} + +#endif //DASH_QUORUMS_DKGSESSIONMGR_H diff --git a/src/llmq/quorums_init.cpp b/src/llmq/quorums_init.cpp index f6abb5d14..d6dd0f805 100644 --- a/src/llmq/quorums_init.cpp +++ b/src/llmq/quorums_init.cpp @@ -6,17 +6,23 @@ #include "quorums_blockprocessor.h" #include "quorums_commitment.h" +#include "quorums_dkgsessionmgr.h" namespace llmq { +static CBLSWorker blsWorker; + void InitLLMQSystem(CEvoDB& evoDb) { quorumBlockProcessor = new CQuorumBlockProcessor(evoDb); + quorumDKGSessionManager = new CDKGSessionManager(evoDb, blsWorker); } void DestroyLLMQSystem() { + delete quorumDKGSessionManager; + quorumDKGSessionManager = NULL; delete quorumBlockProcessor; quorumBlockProcessor = nullptr; } diff --git a/src/llmq/quorums_init.h b/src/llmq/quorums_init.h index 137248b88..eb5ca4cd7 100644 --- a/src/llmq/quorums_init.h +++ b/src/llmq/quorums_init.h @@ -10,6 +10,9 @@ class CEvoDB; namespace llmq { +// If true, we will connect to all new quorums and watch their communication +static const bool DEFAULT_WATCH_QUORUMS = false; + void InitLLMQSystem(CEvoDB& evoDb); void DestroyLLMQSystem(); diff --git a/src/llmq/quorums_utils.cpp b/src/llmq/quorums_utils.cpp index 452909e07..b73d15733 100644 --- a/src/llmq/quorums_utils.cpp +++ b/src/llmq/quorums_utils.cpp @@ -29,5 +29,60 @@ uint256 CLLMQUtils::BuildCommitmentHash(uint8_t llmqType, const uint256& blockHa return hw.GetHash(); } +std::set CLLMQUtils::GetQuorumConnections(Consensus::LLMQType llmqType, const uint256& blockHash, const uint256& forMember) +{ + auto& params = Params().GetConsensus().llmqs.at(llmqType); + + auto mns = GetAllQuorumMembers(llmqType, blockHash); + std::set result; + for (size_t i = 0; i < mns.size(); i++) { + auto& dmn = mns[i]; + if (dmn->proTxHash == forMember) { + for (int n = 0; n < params.neighborConnections; n++) { + size_t idx = (i + 1 + n) % mns.size(); + auto& otherDmn = mns[idx]; + if (otherDmn == dmn) { + continue; + } + result.emplace(otherDmn->pdmnState->addr); + } + size_t startIdx = i + mns.size() / 2; + startIdx -= (params.diagonalConnections / 2) * params.neighborConnections; + startIdx %= mns.size(); + for (int n = 0; n < params.diagonalConnections; n++) { + size_t idx = startIdx + n * params.neighborConnections; + idx %= mns.size(); + auto& otherDmn = mns[idx]; + if (otherDmn == dmn) { + continue; + } + result.emplace(otherDmn->pdmnState->addr); + } + } + } + return result; +} + +std::set CLLMQUtils::CalcDeterministicWatchConnections(Consensus::LLMQType llmqType, const uint256& blockHash, size_t memberCount, size_t connectionCount) +{ + static uint256 qwatchConnectionSeed; + static std::atomic qwatchConnectionSeedGenerated{false}; + static CCriticalSection qwatchConnectionSeedCs; + if (!qwatchConnectionSeedGenerated) { + LOCK(qwatchConnectionSeedCs); + if (!qwatchConnectionSeedGenerated) { + qwatchConnectionSeed = GetRandHash(); + qwatchConnectionSeedGenerated = true; + } + } + + std::set result; + uint256 rnd = qwatchConnectionSeed; + for (size_t i = 0; i < connectionCount; i++) { + rnd = ::SerializeHash(std::make_pair(rnd, std::make_pair((uint8_t)llmqType, blockHash))); + result.emplace(rnd.GetUint64(0) % memberCount); + } + return result; +} } diff --git a/src/llmq/quorums_utils.h b/src/llmq/quorums_utils.h index 2aab41628..7935aa9c4 100644 --- a/src/llmq/quorums_utils.h +++ b/src/llmq/quorums_utils.h @@ -21,6 +21,9 @@ public: static std::vector GetAllQuorumMembers(Consensus::LLMQType llmqType, const uint256& blockHash); static uint256 BuildCommitmentHash(uint8_t llmqType, const uint256& blockHash, const std::vector& validMembers, const CBLSPublicKey& pubKey, const uint256& vvecHash); + + static std::set GetQuorumConnections(Consensus::LLMQType llmqType, const uint256& blockHash, const uint256& forMember); + static std::set CalcDeterministicWatchConnections(Consensus::LLMQType llmqType, const uint256& blockHash, size_t memberCount, size_t connectionCount); }; } diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 54221a448..4611c72c5 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -44,8 +44,10 @@ #include "evo/deterministicmns.h" #include "evo/simplifiedmns.h" -#include "llmq/quorums_commitment.h" #include "llmq/quorums_blockprocessor.h" +#include "llmq/quorums_commitment.h" +#include "llmq/quorums_dkgsessionmgr.h" +#include "llmq/quorums_init.h" #include @@ -949,6 +951,11 @@ bool static AlreadyHave(const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main) case MSG_QUORUM_FINAL_COMMITMENT: return llmq::quorumBlockProcessor->HasMinableCommitment(inv.hash); + case MSG_QUORUM_CONTRIB: + case MSG_QUORUM_COMPLAINT: + case MSG_QUORUM_JUSTIFICATION: + case MSG_QUORUM_PREMATURE_COMMITMENT: + return llmq::quorumDKGSessionManager->AlreadyHave(inv); } // Don't know what it is, just say we already got one @@ -1218,6 +1225,35 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam } } + if (!push && (inv.type == MSG_QUORUM_CONTRIB)) { + llmq::CDKGContribution o; + if (llmq::quorumDKGSessionManager->GetContribution(inv.hash, o)) { + connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::QCONTRIB, o)); + push = true; + } + } + if (!push && (inv.type == MSG_QUORUM_COMPLAINT)) { + llmq::CDKGComplaint o; + if (llmq::quorumDKGSessionManager->GetComplaint(inv.hash, o)) { + connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::QCOMPLAINT, o)); + push = true; + } + } + if (!push && (inv.type == MSG_QUORUM_JUSTIFICATION)) { + llmq::CDKGJustification o; + if (llmq::quorumDKGSessionManager->GetJustification(inv.hash, o)) { + connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::QJUSTIFICATION, o)); + push = true; + } + } + if (!push && (inv.type == MSG_QUORUM_PREMATURE_COMMITMENT)) { + llmq::CDKGPrematureCommitment o; + if (llmq::quorumDKGSessionManager->GetPrematureCommitment(inv.hash, o)) { + connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::QPCOMMITMENT, o)); + push = true; + } + } + if (!push) vNotFound.push_back(inv); } @@ -1556,6 +1592,10 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion)); } + if (GetBoolArg("-watchquorums", llmq::DEFAULT_WATCH_QUORUMS)) { + connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::QWATCH)); + } + pfrom->fSuccessfullyConnected = true; } @@ -2870,6 +2910,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr masternodeSync.ProcessMessage(pfrom, strCommand, vRecv); governance.ProcessMessage(pfrom, strCommand, vRecv, connman); llmq::quorumBlockProcessor->ProcessMessage(pfrom, strCommand, vRecv, connman); + llmq::quorumDKGSessionManager->ProcessMessage(pfrom, strCommand, vRecv, connman); } else { diff --git a/src/protocol.cpp b/src/protocol.cpp index 299ffdcb5..76611d056 100644 --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -60,6 +60,10 @@ const char *GETMNLISTDIFF="getmnlistd"; const char *MNLISTDIFF="mnlistdiff"; const char *QFCOMMITMENT="qfcommit"; const char *QCONTRIB="qcontrib"; +const char *QCOMPLAINT="qcomplaint"; +const char *QJUSTIFICATION="qjustify"; +const char *QPCOMMITMENT="qpcommit"; +const char *QWATCH="qwatch"; }; static const char* ppszTypeName[] = @@ -90,6 +94,9 @@ static const char* ppszTypeName[] = NetMsgType::QFCOMMITMENT, "qdcommit", // was only shortly used on testnet NetMsgType::QCONTRIB, + NetMsgType::QCOMPLAINT, + NetMsgType::QJUSTIFICATION, + NetMsgType::QPCOMMITMENT, }; /** All known message types. Keep this in the same order as the list of @@ -144,6 +151,10 @@ const static std::string allNetMessageTypes[] = { NetMsgType::MNLISTDIFF, NetMsgType::QFCOMMITMENT, NetMsgType::QCONTRIB, + NetMsgType::QCOMPLAINT, + NetMsgType::QJUSTIFICATION, + NetMsgType::QPCOMMITMENT, + NetMsgType::QWATCH, }; const static std::vector allNetMessageTypesVec(allNetMessageTypes, allNetMessageTypes+ARRAYLEN(allNetMessageTypes)); diff --git a/src/protocol.h b/src/protocol.h index 2f608e1e8..5a8ecb831 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -266,6 +266,10 @@ extern const char *GETMNLISTDIFF; extern const char *MNLISTDIFF; extern const char *QFCOMMITMENT; extern const char *QCONTRIB; +extern const char *QCOMPLAINT; +extern const char *QJUSTIFICATION; +extern const char *QPCOMMITMENT; +extern const char *QWATCH; }; /* Get a vector of all valid message types (see above) */ @@ -361,7 +365,10 @@ enum GetDataMsg { MSG_CMPCT_BLOCK = 20, //!< Defined in BIP152 MSG_QUORUM_FINAL_COMMITMENT = 21, /* MSG_QUORUM_DUMMY_COMMITMENT = 22, */ // was shortly used on testnet/devnet/regtest - /* MSG_QUORUM_DUMMY_CONTRIBUTION = 23, */ // not a valid contribution and only allowed on testnet/devnet/regtest. Will later be replaced with the real contribution + MSG_QUORUM_CONTRIB = 23, + MSG_QUORUM_COMPLAINT = 24, + MSG_QUORUM_JUSTIFICATION = 25, + MSG_QUORUM_PREMATURE_COMMITMENT = 26, }; /** inv message data */