Implement LLMQ DKG

This commit is contained in:
Alexander Block 2018-05-24 16:14:55 +02:00
parent 318b598139
commit 6836f8c38b
18 changed files with 2812 additions and 6 deletions

View File

@ -141,10 +141,13 @@ BITCOIN_CORE_H = \
keystore.h \ keystore.h \
dbwrapper.h \ dbwrapper.h \
limitedmap.h \ limitedmap.h \
llmq/quorums_commitment.h \
llmq/quorums_blockprocessor.h \ llmq/quorums_blockprocessor.h \
llmq/quorums_utils.h \ llmq/quorums_commitment.h \
llmq/quorums_dkgsessionhandler.h \
llmq/quorums_dkgsessionmgr.h \
llmq/quorums_dkgsession.h \
llmq/quorums_init.h \ llmq/quorums_init.h \
llmq/quorums_utils.h \
masternode-meta.h \ masternode-meta.h \
masternode-payments.h \ masternode-payments.h \
masternode-sync.h \ masternode-sync.h \
@ -249,10 +252,13 @@ libdash_server_a_SOURCES = \
governance-validators.cpp \ governance-validators.cpp \
governance-vote.cpp \ governance-vote.cpp \
governance-votedb.cpp \ governance-votedb.cpp \
llmq/quorums_commitment.cpp \
llmq/quorums_blockprocessor.cpp \ llmq/quorums_blockprocessor.cpp \
llmq/quorums_utils.cpp \ llmq/quorums_commitment.cpp \
llmq/quorums_dkgsessionhandler.cpp \
llmq/quorums_dkgsessionmgr.cpp \
llmq/quorums_dkgsession.cpp \
llmq/quorums_init.cpp \ llmq/quorums_init.cpp \
llmq/quorums_utils.cpp \
masternode-meta.cpp \ masternode-meta.cpp \
masternode-payments.cpp \ masternode-payments.cpp \
masternode-sync.cpp \ masternode-sync.cpp \

View File

@ -118,6 +118,12 @@ static Consensus::LLMQParams llmq10_60 = {
.dkgPhaseBlocks = 2, .dkgPhaseBlocks = 2,
.dkgMiningWindowStart = 10, // dkgPhaseBlocks * 5 = after finalization .dkgMiningWindowStart = 10, // dkgPhaseBlocks * 5 = after finalization
.dkgMiningWindowEnd = 18, .dkgMiningWindowEnd = 18,
.dkgRndSleepTime = 0,
.dkgBadVotesThreshold = 8,
.neighborConnections = 2,
.diagonalConnections = 2,
.keepOldConnections = 24,
}; };
static Consensus::LLMQParams llmq50_60 = { static Consensus::LLMQParams llmq50_60 = {
@ -131,6 +137,12 @@ static Consensus::LLMQParams llmq50_60 = {
.dkgPhaseBlocks = 2, .dkgPhaseBlocks = 2,
.dkgMiningWindowStart = 10, // dkgPhaseBlocks * 5 = after finalization .dkgMiningWindowStart = 10, // dkgPhaseBlocks * 5 = after finalization
.dkgMiningWindowEnd = 18, .dkgMiningWindowEnd = 18,
.dkgRndSleepTime = 1 * 60 * 1000,
.dkgBadVotesThreshold = 40,
.neighborConnections = 2,
.diagonalConnections = 2,
.keepOldConnections = 24,
}; };
static Consensus::LLMQParams llmq400_60 = { static Consensus::LLMQParams llmq400_60 = {
@ -144,6 +156,12 @@ static Consensus::LLMQParams llmq400_60 = {
.dkgPhaseBlocks = 4, .dkgPhaseBlocks = 4,
.dkgMiningWindowStart = 20, // dkgPhaseBlocks * 5 = after finalization .dkgMiningWindowStart = 20, // dkgPhaseBlocks * 5 = after finalization
.dkgMiningWindowEnd = 28, .dkgMiningWindowEnd = 28,
.dkgRndSleepTime = 2 * 60 * 1000,
.dkgBadVotesThreshold = 300,
.neighborConnections = 4,
.diagonalConnections = 4,
.keepOldConnections = 4,
}; };
// Used for deployment and min-proto-version signalling, so it needs a higher threshold // Used for deployment and min-proto-version signalling, so it needs a higher threshold
@ -158,6 +176,12 @@ static Consensus::LLMQParams llmq400_85 = {
.dkgPhaseBlocks = 4, .dkgPhaseBlocks = 4,
.dkgMiningWindowStart = 20, // dkgPhaseBlocks * 5 = after finalization .dkgMiningWindowStart = 20, // dkgPhaseBlocks * 5 = after finalization
.dkgMiningWindowEnd = 48, // give it a larger mining window to make sure it is mined .dkgMiningWindowEnd = 48, // give it a larger mining window to make sure it is mined
.dkgRndSleepTime = 2 * 60 * 1000,
.dkgBadVotesThreshold = 300,
.neighborConnections = 4,
.diagonalConnections = 4,
.keepOldConnections = 4,
}; };

View File

@ -97,6 +97,12 @@ struct LLMQParams {
// should at the same time not be too large so that not too much space is wasted with null commitments in case a DKG // should at the same time not be too large so that not too much space is wasted with null commitments in case a DKG
// session failed. // session failed.
int dkgMiningWindowEnd; int dkgMiningWindowEnd;
int dkgRndSleepTime;
int dkgBadVotesThreshold;
int neighborConnections;
int diagonalConnections;
int keepOldConnections;
}; };
/** /**

View File

@ -9,6 +9,7 @@
#include "masternode-payments.h" #include "masternode-payments.h"
#include "masternode-sync.h" #include "masternode-sync.h"
#include "privatesend.h" #include "privatesend.h"
#include "llmq/quorums_dkgsessionmgr.h"
#ifdef ENABLE_WALLET #ifdef ENABLE_WALLET
#include "privatesend-client.h" #include "privatesend-client.h"
#endif // ENABLE_WALLET #endif // ENABLE_WALLET
@ -59,6 +60,7 @@ void CDSNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, con
#endif // ENABLE_WALLET #endif // ENABLE_WALLET
instantsend.UpdatedBlockTip(pindexNew); instantsend.UpdatedBlockTip(pindexNew);
governance.UpdatedBlockTip(pindexNew, connman); governance.UpdatedBlockTip(pindexNew, connman);
llmq::quorumDKGSessionManager->UpdatedBlockTip(pindexNew, pindexFork, fInitialDownload);
} }
void CDSNotificationInterface::SyncTransaction(const CTransaction &tx, const CBlockIndex *pindex, int posInBlock) void CDSNotificationInterface::SyncTransaction(const CTransaction &tx, const CBlockIndex *pindex, int posInBlock)

View File

@ -67,6 +67,7 @@
#include "warnings.h" #include "warnings.h"
#include "evo/deterministicmns.h" #include "evo/deterministicmns.h"
#include "llmq/quorums_init.h"
#include "llmq/quorums_init.h" #include "llmq/quorums_init.h"
@ -543,6 +544,7 @@ std::string HelpMessage(HelpMessageMode mode)
strUsage += HelpMessageOpt("-limitdescendantcount=<n>", strprintf("Do not accept transactions if any ancestor would have <n> or more in-mempool descendants (default: %u)", DEFAULT_DESCENDANT_LIMIT)); strUsage += HelpMessageOpt("-limitdescendantcount=<n>", strprintf("Do not accept transactions if any ancestor would have <n> or more in-mempool descendants (default: %u)", DEFAULT_DESCENDANT_LIMIT));
strUsage += HelpMessageOpt("-limitdescendantsize=<n>", strprintf("Do not accept transactions if any ancestor would have more than <n> kilobytes of in-mempool descendants (default: %u).", DEFAULT_DESCENDANT_SIZE_LIMIT)); strUsage += HelpMessageOpt("-limitdescendantsize=<n>", strprintf("Do not accept transactions if any ancestor would have more than <n> kilobytes of in-mempool descendants (default: %u).", DEFAULT_DESCENDANT_SIZE_LIMIT));
strUsage += HelpMessageOpt("-bip9params=deployment:start:end", "Use given start/end times for specified BIP9 deployment (regtest-only)"); strUsage += HelpMessageOpt("-bip9params=deployment:start:end", "Use given start/end times for specified BIP9 deployment (regtest-only)");
strUsage += HelpMessageOpt("-watchquorums=<n>", strprintf("Watch and validate quorum communication (default: %u)", llmq::DEFAULT_WATCH_QUORUMS));
} }
std::string debugCategories = "addrman, alert, bench, cmpctblock, coindb, db, http, leveldb, libevent, lock, mempool, mempoolrej, net, proxy, prune, rand, reindex, rpc, selectcoins, tor, zmq, " std::string debugCategories = "addrman, alert, bench, cmpctblock, coindb, db, http, leveldb, libevent, lock, mempool, mempoolrej, net, proxy, prune, rand, reindex, rpc, selectcoins, tor, zmq, "
"dash (or specifically: gobject, instantsend, keepass, masternode, mnpayments, mnsync, privatesend, spork)"; // Don't translate these and qt below "dash (or specifically: gobject, instantsend, keepass, masternode, mnpayments, mnsync, privatesend, spork)"; // Don't translate these and qt below

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,350 @@
// Copyright (c) 2018 The Dash Core developers
// Distributed under the MIT/X11 software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef DASH_QUORUMS_DKGSESSION_H
#define DASH_QUORUMS_DKGSESSION_H
#include "consensus/params.h"
#include "net.h"
#include "batchedlogger.h"
#include "bls/bls_ies.h"
#include "bls/bls_worker.h"
#include "evo/deterministicmns.h"
#include "evo/evodb.h"
#include "llmq/quorums_utils.h"
class UniValue;
namespace llmq
{
class CFinalCommitment;
class CDKGSession;
class CDKGSessionManager;
class CDKGLogger : public CBatchedLogger
{
public:
CDKGLogger(CDKGSession& _quorumDkg, const std::string& _func);
CDKGLogger(Consensus::LLMQType _llmqType, const uint256& _quorumHash, int _height, bool _areWeMember, const std::string& _func);
};
class CDKGContribution
{
public:
uint8_t llmqType;
uint256 quorumHash;
uint256 proTxHash;
BLSVerificationVectorPtr vvec;
std::shared_ptr<CBLSIESMultiRecipientObjects<CBLSSecretKey>> contributions;
CBLSSignature sig;
public:
template<typename Stream>
inline void SerializeWithoutSig(Stream& s) const
{
s << llmqType;
s << quorumHash;
s << proTxHash;
s << *vvec;
s << *contributions;
}
template<typename Stream>
inline void Serialize(Stream& s) const
{
SerializeWithoutSig(s);
s << sig;
}
template<typename Stream>
inline void Unserialize(Stream& s)
{
BLSVerificationVector tmp1;
CBLSIESMultiRecipientObjects<CBLSSecretKey> tmp2;
s >> llmqType;
s >> quorumHash;
s >> proTxHash;
s >> tmp1;
s >> tmp2;
s >> sig;
vvec = std::make_shared<BLSVerificationVector>(std::move(tmp1));
contributions = std::make_shared<CBLSIESMultiRecipientObjects<CBLSSecretKey>>(std::move(tmp2));
}
uint256 GetSignHash() const
{
CHashWriter hw(SER_GETHASH, 0);
SerializeWithoutSig(hw);
hw << CBLSSignature();
return hw.GetHash();
}
};
class CDKGComplaint
{
public:
uint8_t llmqType;
uint256 quorumHash;
uint256 proTxHash;
std::vector<bool> badMembers;
std::vector<bool> complainForMembers;
CBLSSignature sig;
public:
CDKGComplaint() {}
CDKGComplaint(const Consensus::LLMQParams& params);
ADD_SERIALIZE_METHODS
template<typename Stream, typename Operation>
inline void SerializationOp(Stream& s, Operation ser_action)
{
READWRITE(llmqType);
READWRITE(quorumHash);
READWRITE(proTxHash);
READWRITE(DYNBITSET(badMembers));
READWRITE(DYNBITSET(complainForMembers));
READWRITE(sig);
}
uint256 GetSignHash() const
{
CDKGComplaint tmp(*this);
tmp.sig = CBLSSignature();
return ::SerializeHash(tmp);
}
};
class CDKGJustification
{
public:
uint8_t llmqType;
uint256 quorumHash;
uint256 proTxHash;
std::vector<std::pair<uint32_t, CBLSSecretKey>> contributions;
CBLSSignature sig;
public:
ADD_SERIALIZE_METHODS
template<typename Stream, typename Operation>
inline void SerializationOp(Stream& s, Operation ser_action)
{
READWRITE(llmqType);
READWRITE(quorumHash);
READWRITE(proTxHash);
READWRITE(contributions);
READWRITE(sig);
}
uint256 GetSignHash() const
{
CDKGJustification tmp(*this);
tmp.sig = CBLSSignature();
return ::SerializeHash(tmp);
}
};
// each member commits to a single set of valid members with this message
// then each node aggregate all received premature commitments
// into a single CFinalCommitment, which is only valid if
// enough (>=minSize) premature commitments were aggregated
class CDKGPrematureCommitment
{
public:
uint8_t llmqType;
uint256 quorumHash;
uint256 proTxHash;
std::vector<bool> validMembers;
CBLSPublicKey quorumPublicKey;
uint256 quorumVvecHash;
CBLSSignature quorumSig; // threshold sig share of quorumHash+validMembers+pubKeyHash+vvecHash
CBLSSignature sig; // single member sig of quorumHash+validMembers+pubKeyHash+vvecHash
public:
CDKGPrematureCommitment() {}
CDKGPrematureCommitment(const Consensus::LLMQParams& params);
int CountValidMembers() const
{
return (int)std::count(validMembers.begin(), validMembers.end(), true);
}
public:
ADD_SERIALIZE_METHODS
template<typename Stream, typename Operation>
inline void SerializationOp(Stream& s, Operation ser_action)
{
READWRITE(llmqType);
READWRITE(quorumHash);
READWRITE(proTxHash);
READWRITE(DYNBITSET(validMembers));
READWRITE(quorumPublicKey);
READWRITE(quorumVvecHash);
READWRITE(quorumSig);
READWRITE(sig);
}
uint256 GetSignHash() const
{
return CLLMQUtils::BuildCommitmentHash(llmqType, quorumHash, validMembers, quorumPublicKey, quorumVvecHash);
}
};
class CDKGMember
{
public:
CDKGMember(CDeterministicMNCPtr _dmn, size_t _idx);
CDeterministicMNCPtr dmn;
size_t idx;
CBLSId id;
std::set<uint256> contributions;
std::set<uint256> complaints;
std::set<uint256> justifications;
std::set<uint256> prematureCommitments;
std::set<uint256> badMemberVotes;
std::set<uint256> complaintsFromOthers;
bool bad{false};
bool weComplain{false};
bool someoneComplain{false};
};
/**
* The DKG session is a single instance of the DKG process. It is owned and called by CDKGSessionHandler, which passes
* received DKG messages to the session. The session is not persistent and will loose it's state (the whole object is
* discarded) when it finishes (after the mining phase) or is aborted.
*
* When incoming contributions are received and the verification vector is valid, it is passed to CDKGSessionManager
* which will store it in the evo DB. Secret key contributions which are meant for the local member are also passed
* to CDKGSessionManager to store them in the evo DB. If verification of the SK contribution initially fails, it is
* not passed to CDKGSessionManager. If the justification phase later gives a valid SK contribution from the same
* member, it is then passed to CDKGSessionManager and after this handled the same way.
*
* The contributions stored by CDKGSessionManager are then later loaded by the quorum instances and used for signing
* sessions, but only if the local node is a member of the quorum.
*/
class CDKGSession
{
friend class CDKGSessionHandler;
friend class CDKGSessionManager;
friend class CDKGLogger;
template<typename Message> friend class CDKGMessageHandler;
private:
const Consensus::LLMQParams& params;
CEvoDB& evoDb;
CBLSWorker& blsWorker;
CBLSWorkerCache cache;
CDKGSessionManager& dkgManager;
uint256 quorumHash;
int height{-1};
private:
std::vector<std::unique_ptr<CDKGMember>> members;
std::map<uint256, size_t> membersMap;
BLSVerificationVectorPtr vvecContribution;
BLSSecretKeyVector skContributions;
BLSIdVector memberIds;
std::vector<BLSVerificationVectorPtr> receivedVvecs;
// these are not necessarily verified yet. Only trust in what was written to the DB
BLSSecretKeyVector receivedSkContributions;
uint256 myProTxHash;
CBLSId myId;
size_t myIdx{(size_t)-1};
// all indexed by msg hash
// we expect to only receive a single vvec and contribution per member, but we must also be able to relay
// conflicting messages as otherwise an attacker might be able to broadcast conflicting (valid+invalid) messages
// and thus split the quorum. Such members are later removed from the quorum.
mutable CCriticalSection invCs;
std::map<uint256, CDKGContribution> contributions;
std::map<uint256, CDKGComplaint> complaints;
std::map<uint256, CDKGJustification> justifications;
std::map<uint256, CDKGPrematureCommitment> prematureCommitments;
std::set<CInv> invSet;
std::set<CService> participatingNodes;
std::set<uint256> seenMessages;
std::vector<size_t> pendingContributionVerifications;
// filled by ReceivePrematureCommitment and used by FinalizeCommitments
std::set<uint256> validCommitments;
public:
CDKGSession(const Consensus::LLMQParams& _params, CEvoDB& _evoDb, CBLSWorker& _blsWorker, CDKGSessionManager& _dkgManager) :
params(_params), evoDb(_evoDb), blsWorker(_blsWorker), cache(_blsWorker), dkgManager(_dkgManager) {}
bool Init(int _height, const uint256& _quorumHash, const std::vector<CDeterministicMNCPtr>& mns, const uint256& _myProTxHash);
/**
* The following sets of methods are for the first 4 phases handled in the session. The flow of message calls
* is identical for all phases:
* 1. Execute local action (e.g. create/send own contributions)
* 2. PreVerify incoming messages for this phase. Preverification means that everything from the message is checked
* that does not require too much resources for verification. This specifically excludes all CPU intensive BLS
* operations.
* 3. CDKGSessionHandler will collect pre verified messages in batches and perform batched BLS signature verification
* on these.
* 4. ReceiveMessage is called for each pre verified message with a valid signature. ReceiveMessage is also
* responsible for further verification of validity (e.g. validate vvecs and SK contributions).
*/
// Phase 1: contribution
void Contribute();
void SendContributions();
bool PreVerifyMessage(const uint256& hash, const CDKGContribution& qc, bool& retBan);
void ReceiveMessage(const uint256& hash, const CDKGContribution& qc, bool& retBan);
void VerifyPendingContributions();
// Phase 2: complaint
void VerifyAndComplain();
void SendComplaint();
bool PreVerifyMessage(const uint256& hash, const CDKGComplaint& qc, bool& retBan);
void ReceiveMessage(const uint256& hash, const CDKGComplaint& qc, bool& retBan);
// Phase 3: justification
void VerifyAndJustify();
void SendJustification(const std::set<uint256>& forMembers);
bool PreVerifyMessage(const uint256& hash, const CDKGJustification& qj, bool& retBan);
void ReceiveMessage(const uint256& hash, const CDKGJustification& qj, bool& retBan);
// Phase 4: commit
void VerifyAndCommit();
void SendCommitment();
bool PreVerifyMessage(const uint256& hash, const CDKGPrematureCommitment& qc, bool& retBan);
void ReceiveMessage(const uint256& hash, const CDKGPrematureCommitment& qc, bool& retBan);
// Phase 5: aggregate/finalize
std::vector<CFinalCommitment> FinalizeCommitments();
bool AreWeMember() const { return !myProTxHash.IsNull(); }
void MarkBadMember(size_t idx);
bool Seen(const uint256& msgHash);
void AddParticipatingNode(NodeId nodeId);
void RelayInvToParticipants(const CInv& inv);
public:
CDKGMember* GetMember(const uint256& proTxHash);
};
}
#endif //DASH_QUORUMS_DKGSESSION_H

View File

@ -0,0 +1,538 @@
// Copyright (c) 2018 The Dash Core developers
// Distributed under the MIT/X11 software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "quorums_dkgsessionhandler.h"
#include "quorums_blockprocessor.h"
#include "quorums_debug.h"
#include "quorums_init.h"
#include "quorums_utils.h"
#include "activemasternode.h"
#include "chainparams.h"
#include "init.h"
#include "net_processing.h"
#include "validation.h"
namespace llmq
{
CDKGPendingMessages::CDKGPendingMessages(size_t _maxMessagesPerNode) :
maxMessagesPerNode(_maxMessagesPerNode)
{
}
void CDKGPendingMessages::PushPendingMessage(NodeId from, CDataStream& vRecv)
{
// this will also consume the data, even if we bail out early
auto pm = std::make_shared<CDataStream>(std::move(vRecv));
{
LOCK(cs);
if (messagesPerNode[from] >= maxMessagesPerNode) {
// TODO ban?
LogPrint("net", "CDKGPendingMessages::%s -- too many messages, peer=%d\n", __func__, from);
return;
}
messagesPerNode[from]++;
}
CHashWriter hw(SER_GETHASH, 0);
hw.write(pm->data(), pm->size());
uint256 hash = hw.GetHash();
LOCK2(cs_main, cs);
if (!seenMessages.emplace(hash).second) {
LogPrint("net", "CDKGPendingMessages::%s -- already seen %s, peer=%d", __func__, from);
return;
}
g_connman->RemoveAskFor(hash);
pendingMessages.emplace_back(std::make_pair(from, std::move(pm)));
}
std::list<CDKGPendingMessages::BinaryMessage> CDKGPendingMessages::PopPendingMessages(size_t maxCount)
{
LOCK(cs);
std::list<BinaryMessage> ret;
while (!pendingMessages.empty() && ret.size() < maxCount) {
ret.emplace_back(std::move(pendingMessages.front()));
pendingMessages.pop_front();
}
return std::move(ret);
}
bool CDKGPendingMessages::HasSeen(const uint256& hash) const
{
LOCK(cs);
return seenMessages.count(hash) != 0;
}
void CDKGPendingMessages::Clear()
{
LOCK(cs);
pendingMessages.clear();
messagesPerNode.clear();
seenMessages.clear();
}
//////
CDKGSessionHandler::CDKGSessionHandler(const Consensus::LLMQParams& _params, CEvoDB& _evoDb, ctpl::thread_pool& _messageHandlerPool, CBLSWorker& _blsWorker, CDKGSessionManager& _dkgManager) :
params(_params),
evoDb(_evoDb),
messageHandlerPool(_messageHandlerPool),
blsWorker(_blsWorker),
dkgManager(_dkgManager),
curSession(std::make_shared<CDKGSession>(_params, _evoDb, _blsWorker, _dkgManager)),
pendingContributions((size_t)_params.size * 2), // we allow size*2 messages as we need to make sure we see bad behavior (double messages)
pendingComplaints((size_t)_params.size * 2),
pendingJustifications((size_t)_params.size * 2),
pendingPrematureCommitments((size_t)_params.size * 2)
{
phaseHandlerThread = std::thread([this] {
RenameThread(strprintf("quorum-phase-%d", (uint8_t)params.type).c_str());
PhaseHandlerThread();
});
}
CDKGSessionHandler::~CDKGSessionHandler()
{
stopRequested = true;
if (phaseHandlerThread.joinable()) {
phaseHandlerThread.join();
}
}
void CDKGSessionHandler::UpdatedBlockTip(const CBlockIndex* pindexNew, const CBlockIndex* pindexFork, bool fInitialDownload)
{
AssertLockHeld(cs_main);
LOCK(cs);
int quorumStageInt = pindexNew->nHeight % params.dkgInterval;
CBlockIndex* pindexQuorum = chainActive[pindexNew->nHeight - quorumStageInt];
quorumHeight = pindexQuorum->nHeight;
quorumHash = pindexQuorum->GetBlockHash();
QuorumPhase newPhase = phase;
if (quorumStageInt == 0) {
newPhase = QuorumPhase_Initialized;
} else if (quorumStageInt == params.dkgPhaseBlocks * 1) {
newPhase = QuorumPhase_Contribute;
} else if (quorumStageInt == params.dkgPhaseBlocks * 2) {
newPhase = QuorumPhase_Complain;
} else if (quorumStageInt == params.dkgPhaseBlocks * 3) {
newPhase = QuorumPhase_Justify;
} else if (quorumStageInt == params.dkgPhaseBlocks * 4) {
newPhase = QuorumPhase_Commit;
} else if (quorumStageInt == params.dkgPhaseBlocks * 5) {
newPhase = QuorumPhase_Finalize;
} else if (quorumStageInt == params.dkgPhaseBlocks * 6) {
newPhase = QuorumPhase_Idle;
}
phase = newPhase;
}
void CDKGSessionHandler::ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman)
{
// We don't handle messages in the calling thread as deserialization/processing of these would block everything
if (strCommand == NetMsgType::QCONTRIB) {
pendingContributions.PushPendingMessage(pfrom->id, vRecv);
} else if (strCommand == NetMsgType::QCOMPLAINT) {
pendingComplaints.PushPendingMessage(pfrom->id, vRecv);
} else if (strCommand == NetMsgType::QJUSTIFICATION) {
pendingJustifications.PushPendingMessage(pfrom->id, vRecv);
} else if (strCommand == NetMsgType::QPCOMMITMENT) {
pendingPrematureCommitments.PushPendingMessage(pfrom->id, vRecv);
}
}
bool CDKGSessionHandler::InitNewQuorum(int height, const uint256& quorumHash)
{
//AssertLockHeld(cs_main);
const auto& consensus = Params().GetConsensus();
curSession = std::make_shared<CDKGSession>(params, evoDb, blsWorker, dkgManager);
if (!deterministicMNManager->IsDIP3Active(height)) {
return false;
}
auto mns = CLLMQUtils::GetAllQuorumMembers(params.type, quorumHash);
if (!curSession->Init(height, quorumHash, mns, activeMasternodeInfo.proTxHash)) {
LogPrintf("CDKGSessionManager::%s -- quorum initialiation failed\n", __func__);
return false;
}
return true;
}
std::pair<QuorumPhase, uint256> CDKGSessionHandler::GetPhaseAndQuorumHash()
{
LOCK(cs);
return std::make_pair(phase, quorumHash);
}
class AbortPhaseException : public std::exception {
};
void CDKGSessionHandler::WaitForNextPhase(QuorumPhase curPhase,
QuorumPhase nextPhase,
uint256& expectedQuorumHash,
const WhileWaitFunc& runWhileWaiting)
{
while (true) {
if (stopRequested || ShutdownRequested()) {
throw AbortPhaseException();
}
auto p = GetPhaseAndQuorumHash();
if (!expectedQuorumHash.IsNull() && p.second != expectedQuorumHash) {
throw AbortPhaseException();
}
if (p.first == nextPhase) {
expectedQuorumHash = p.second;
return;
}
if (curPhase != QuorumPhase_None && p.first != curPhase) {
throw AbortPhaseException();
}
if (!runWhileWaiting()) {
MilliSleep(100);
}
}
}
void CDKGSessionHandler::WaitForNewQuorum(const uint256& oldQuorumHash)
{
while (true) {
if (stopRequested || ShutdownRequested()) {
throw AbortPhaseException();
}
auto p = GetPhaseAndQuorumHash();
if (p.second != oldQuorumHash) {
return;
}
MilliSleep(100);
}
}
void CDKGSessionHandler::RandomSleep(QuorumPhase curPhase,
uint256& expectedQuorumHash,
double randomSleepFactor,
const WhileWaitFunc& runWhileWaiting)
{
// Randomly sleep some time to not fully overload the whole network
int64_t endTime = GetTimeMillis() + GetRandInt((int)(params.dkgRndSleepTime * randomSleepFactor));
while (GetTimeMillis() < endTime) {
if (stopRequested || ShutdownRequested()) {
throw AbortPhaseException();
}
auto p = GetPhaseAndQuorumHash();
if (p.first != curPhase || p.second != expectedQuorumHash) {
throw AbortPhaseException();
}
if (!runWhileWaiting()) {
MilliSleep(100);
}
}
}
void CDKGSessionHandler::HandlePhase(QuorumPhase curPhase,
QuorumPhase nextPhase,
uint256& expectedQuorumHash,
double randomSleepFactor,
const StartPhaseFunc& startPhaseFunc,
const WhileWaitFunc& runWhileWaiting)
{
RandomSleep(curPhase, expectedQuorumHash, randomSleepFactor, runWhileWaiting);
startPhaseFunc();
WaitForNextPhase(curPhase, nextPhase, expectedQuorumHash, runWhileWaiting);
}
// returns a set of NodeIds which sent invalid messages
template<typename Message>
std::set<NodeId> BatchVerifyMessageSigs(CDKGSession& session, const std::vector<std::pair<NodeId, std::shared_ptr<Message>>>& messages)
{
if (messages.empty()) {
return {};
}
std::set<NodeId> ret;
bool revertToSingleVerification = false;
CBLSSignature aggSig;
std::vector<CBLSPublicKey> pubKeys;
std::vector<uint256> messageHashes;
std::set<uint256> messageHashesSet;
pubKeys.reserve(messages.size());
messageHashes.reserve(messages.size());
bool first = true;
for (const auto& p : messages ) {
const auto& msg = *p.second;
auto member = session.GetMember(msg.proTxHash);
if (!member) {
// should not happen as it was verified before
ret.emplace(p.first);
continue;
}
if (first) {
aggSig = msg.sig;
} else {
aggSig.AggregateInsecure(msg.sig);
}
first = false;
auto msgHash = msg.GetSignHash();
if (!messageHashesSet.emplace(msgHash).second) {
// can only happen in 2 cases:
// 1. Someone sent us the same message twice but with differing signature, meaning that at least one of them
// must be invalid. In this case, we'd have to revert to single message verification nevertheless
// 2. Someone managed to find a way to create two different binary representations of a message that deserializes
// to the same object representation. This would be some form of malleability. However, this shouldn't be
// possible as only deterministic/unique BLS signatures and very simple data types are involved
revertToSingleVerification = true;
break;
}
pubKeys.emplace_back(member->dmn->pdmnState->pubKeyOperator);
messageHashes.emplace_back(msgHash);
}
if (!revertToSingleVerification) {
bool valid = aggSig.VerifyInsecureAggregated(pubKeys, messageHashes);
if (valid) {
// all good
return ret;
}
// are all messages from the same node?
NodeId firstNodeId;
first = true;
bool nodeIdsAllSame = true;
for (auto it = messages.begin(); it != messages.end(); ++it) {
if (first) {
firstNodeId = it->first;
} else {
first = false;
if (it->first != firstNodeId) {
nodeIdsAllSame = false;
break;
}
}
}
// if yes, take a short path and return a set with only him
if (nodeIdsAllSame) {
ret.emplace(firstNodeId);
return ret;
}
// different nodes, let's figure out who are the bad ones
}
for (const auto& p : messages) {
if (ret.count(p.first)) {
continue;
}
const auto& msg = *p.second;
auto member = session.GetMember(msg.proTxHash);
bool valid = msg.sig.VerifyInsecure(member->dmn->pdmnState->pubKeyOperator, msg.GetSignHash());
if (!valid) {
ret.emplace(p.first);
}
}
return ret;
}
template<typename Message>
bool ProcessPendingMessageBatch(CDKGSession& session, CDKGPendingMessages& pendingMessages, size_t maxCount)
{
auto msgs = pendingMessages.PopAndDeserializeMessages<Message>(maxCount);
if (msgs.empty()) {
return false;
}
std::vector<uint256> hashes;
std::vector<std::pair<NodeId, std::shared_ptr<Message>>> preverifiedMessages;
hashes.reserve(msgs.size());
preverifiedMessages.reserve(msgs.size());
for (const auto& p : msgs) {
if (!p.second) {
LogPrint("net", "%s -- failed to deserialize message, peer=%d", __func__, p.first);
{
LOCK(cs_main);
Misbehaving(p.first, 100);
}
continue;
}
const auto& msg = *p.second;
auto hash = ::SerializeHash(msg);
{
LOCK(cs_main);
g_connman->RemoveAskFor(hash);
}
bool ban = false;
if (!session.PreVerifyMessage(hash, msg, ban)) {
if (ban) {
LogPrint("net", "%s -- banning node due to failed preverification, peer=%d", __func__, p.first);
{
LOCK(cs_main);
Misbehaving(p.first, 100);
}
}
LogPrint("net", "%s -- skipping message due to failed preverification, peer=%d", __func__, p.first);
continue;
}
hashes.emplace_back(hash);
preverifiedMessages.emplace_back(p);
}
if (preverifiedMessages.empty()) {
return true;
}
auto badNodes = BatchVerifyMessageSigs(session, preverifiedMessages);
if (!badNodes.empty()) {
LOCK(cs_main);
for (auto nodeId : badNodes) {
LogPrint("net", "%s -- failed to verify signature, peer=%d", __func__, nodeId);
Misbehaving(nodeId, 100);
}
}
for (size_t i = 0; i < preverifiedMessages.size(); i++) {
NodeId nodeId = preverifiedMessages[i].first;
if (badNodes.count(nodeId)) {
continue;
}
const auto& msg = *preverifiedMessages[i].second;
bool ban = false;
session.ReceiveMessage(hashes[i], msg, ban);
if (ban) {
LogPrint("net", "%s -- banning node after ReceiveMessage failed, peer=%d", __func__, nodeId);
LOCK(cs_main);
Misbehaving(nodeId, 100);
badNodes.emplace(nodeId);
}
}
for (const auto& p : preverifiedMessages) {
NodeId nodeId = p.first;
if (badNodes.count(nodeId)) {
continue;
}
session.AddParticipatingNode(nodeId);
}
return true;
}
void CDKGSessionHandler::HandleDKGRound()
{
uint256 curQuorumHash;
WaitForNextPhase(QuorumPhase_None, QuorumPhase_Initialized, curQuorumHash, []{return false;});
{
LOCK(cs);
pendingContributions.Clear();
pendingComplaints.Clear();
pendingJustifications.Clear();
pendingPrematureCommitments.Clear();
}
if (!InitNewQuorum(quorumHeight, quorumHash)) {
// should actually never happen
WaitForNewQuorum(curQuorumHash);
throw AbortPhaseException();
}
if (curSession->AreWeMember() || GetBoolArg("-watchquorums", DEFAULT_WATCH_QUORUMS)) {
std::set<CService> connections;
if (curSession->AreWeMember()) {
connections = CLLMQUtils::GetQuorumConnections(params.type, curQuorumHash, curSession->myProTxHash);
} else {
auto cindexes = CLLMQUtils::CalcDeterministicWatchConnections(params.type, curQuorumHash, curSession->members.size(), 1);
for (auto idx : cindexes) {
connections.emplace(curSession->members[idx]->dmn->pdmnState->addr);
}
}
if (!connections.empty()) {
std::string debugMsg = strprintf("CDKGSessionManager::%s -- adding masternodes quorum connections for quorum %s:\n", __func__, curSession->quorumHash.ToString());
for (auto& c : connections) {
debugMsg += strprintf(" %s\n", c.ToString(false));
}
LogPrintf(debugMsg);
g_connman->AddMasternodeQuorumNodes(params.type, curQuorumHash, connections);
LOCK(curSession->invCs);
curSession->participatingNodes = g_connman->GetMasternodeQuorumAddresses(params.type, curQuorumHash);
}
}
WaitForNextPhase(QuorumPhase_Initialized, QuorumPhase_Contribute, curQuorumHash, []{return false;});
// Contribute
auto fContributeStart = [this]() {
curSession->Contribute();
};
auto fContributeWait = [this] {
return ProcessPendingMessageBatch<CDKGContribution>(*curSession, pendingContributions, 8);
};
HandlePhase(QuorumPhase_Contribute, QuorumPhase_Complain, curQuorumHash, 1, fContributeStart, fContributeWait);
// Complain
auto fComplainStart = [this]() {
curSession->VerifyAndComplain();
};
auto fComplainWait = [this] {
return ProcessPendingMessageBatch<CDKGComplaint>(*curSession, pendingComplaints, 8);
};
HandlePhase(QuorumPhase_Complain, QuorumPhase_Justify, curQuorumHash, 0, fComplainStart, fComplainWait);
// Justify
auto fJustifyStart = [this]() {
curSession->VerifyAndJustify();
};
auto fJustifyWait = [this] {
return ProcessPendingMessageBatch<CDKGJustification>(*curSession, pendingJustifications, 8);
};
HandlePhase(QuorumPhase_Justify, QuorumPhase_Commit, curQuorumHash, 0, fJustifyStart, fJustifyWait);
// Commit
auto fCommitStart = [this]() {
curSession->VerifyAndCommit();
};
auto fCommitWait = [this] {
return ProcessPendingMessageBatch<CDKGPrematureCommitment>(*curSession, pendingPrematureCommitments, 8);
};
HandlePhase(QuorumPhase_Commit, QuorumPhase_Finalize, curQuorumHash, 1, fCommitStart, fCommitWait);
auto finalCommitments = curSession->FinalizeCommitments();
for (auto& fqc : finalCommitments) {
quorumBlockProcessor->AddMinableCommitment(fqc);
}
}
void CDKGSessionHandler::PhaseHandlerThread()
{
while (!stopRequested && !ShutdownRequested()) {
try {
HandleDKGRound();
} catch (AbortPhaseException& e) {
LogPrintf("CDKGSessionHandler::%s -- aborted current DKG session\n", __func__);
}
}
}
}

View File

@ -0,0 +1,138 @@
// Copyright (c) 2018 The Dash Core developers
// Distributed under the MIT/X11 software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef DASH_QUORUMS_DKGSESSIONHANDLER_H
#define DASH_QUORUMS_DKGSESSIONHANDLER_H
#include "llmq/quorums_dkgsession.h"
#include "validation.h"
#include "ctpl.h"
namespace llmq
{
enum QuorumPhase {
QuorumPhase_Idle,
QuorumPhase_Initialized,
QuorumPhase_Contribute,
QuorumPhase_Complain,
QuorumPhase_Justify,
QuorumPhase_Commit,
QuorumPhase_Finalize,
QuorumPhase_None=-1,
};
/**
* Acts as a FIFO queue for incoming DKG messages. The reason we need this is that deserialization of these messages
* is too slow to be processed in the main message handler thread. So, instead of processing them directly from the
* main handler thread, we push them into a CDKGPendingMessages object and later pop+deserialize them in the DKG phase
* handler thread.
*
* Each message type has it's own instance of this class.
*/
class CDKGPendingMessages
{
public:
typedef std::pair<NodeId, std::shared_ptr<CDataStream>> BinaryMessage;
private:
mutable CCriticalSection cs;
size_t maxMessagesPerNode;
std::list<BinaryMessage> pendingMessages;
std::map<NodeId, size_t> messagesPerNode;
std::set<uint256> seenMessages;
public:
CDKGPendingMessages(size_t _maxMessagesPerNode);
void PushPendingMessage(NodeId from, CDataStream& vRecv);
std::list<BinaryMessage> PopPendingMessages(size_t maxCount);
bool HasSeen(const uint256& hash) const;
void Clear();
// Might return nullptr messages, which indicates that deserialization failed for some reason
template<typename Message>
std::vector<std::pair<NodeId, std::shared_ptr<Message>>> PopAndDeserializeMessages(size_t maxCount)
{
auto binaryMessages = PopPendingMessages(maxCount);
if (binaryMessages.empty()) {
return {};
}
std::vector<std::pair<NodeId, std::shared_ptr<Message>>> ret;
ret.reserve(binaryMessages.size());
for (auto& bm : binaryMessages) {
auto msg = std::make_shared<Message>();
try {
*bm.second >> *msg;
} catch (...) {
msg = nullptr;
}
ret.emplace_back(std::make_pair(bm.first, std::move(msg)));
}
return std::move(ret);
}
};
/**
* Handles multiple sequential sessions of one specific LLMQ type. There is one instance of this class per LLMQ type.
*
* It internally starts the phase handler thread, which constantly loops and sequentially processes one session at a
* time and waiting for the next phase if necessary.
*/
class CDKGSessionHandler
{
private:
friend class CDKGSessionManager;
private:
mutable CCriticalSection cs;
std::atomic<bool> stopRequested{false};
const Consensus::LLMQParams& params;
CEvoDB& evoDb;
ctpl::thread_pool& messageHandlerPool;
CBLSWorker& blsWorker;
CDKGSessionManager& dkgManager;
QuorumPhase phase{QuorumPhase_Idle};
int quorumHeight{-1};
uint256 quorumHash;
std::shared_ptr<CDKGSession> curSession;
std::thread phaseHandlerThread;
CDKGPendingMessages pendingContributions;
CDKGPendingMessages pendingComplaints;
CDKGPendingMessages pendingJustifications;
CDKGPendingMessages pendingPrematureCommitments;
public:
CDKGSessionHandler(const Consensus::LLMQParams& _params, CEvoDB& _evoDb, ctpl::thread_pool& _messageHandlerPool, CBLSWorker& blsWorker, CDKGSessionManager& _dkgManager);
~CDKGSessionHandler();
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload);
void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman);
private:
bool InitNewQuorum(int height, const uint256& quorumHash);
std::pair<QuorumPhase, uint256> GetPhaseAndQuorumHash();
typedef std::function<void()> StartPhaseFunc;
typedef std::function<bool()> WhileWaitFunc;
void WaitForNextPhase(QuorumPhase curPhase, QuorumPhase nextPhase, uint256& expectedQuorumHash, const WhileWaitFunc& runWhileWaiting);
void WaitForNewQuorum(const uint256& oldQuorumHash);
void RandomSleep(QuorumPhase curPhase, uint256& expectedQuorumHash, double randomSleepFactor, const WhileWaitFunc& runWhileWaiting);
void HandlePhase(QuorumPhase curPhase, QuorumPhase nextPhase, uint256& expectedQuorumHash, double randomSleepFactor, const StartPhaseFunc& startPhaseFunc, const WhileWaitFunc& runWhileWaiting);
void HandleDKGRound();
void PhaseHandlerThread();
};
}
#endif //DASH_QUORUMS_DKGSESSIONHANDLER_H

View File

@ -0,0 +1,279 @@
// Copyright (c) 2018 The Dash Core developers
// Distributed under the MIT/X11 software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "quorums_dkgsessionmgr.h"
#include "quorums_blockprocessor.h"
#include "quorums_init.h"
#include "quorums_utils.h"
#include "chainparams.h"
#include "net_processing.h"
#include "spork.h"
#include "validation.h"
namespace llmq
{
CDKGSessionManager* quorumDKGSessionManager;
static const std::string DB_VVEC = "qdkg_V";
static const std::string DB_SKCONTRIB = "qdkg_S";
CDKGSessionManager::CDKGSessionManager(CEvoDB& _evoDb, CBLSWorker& _blsWorker) :
evoDb(_evoDb),
blsWorker(_blsWorker)
{
for (auto& qt : Params().GetConsensus().llmqs) {
dkgSessionHandlers.emplace(std::piecewise_construct,
std::forward_as_tuple(qt.first),
std::forward_as_tuple(qt.second, _evoDb, messageHandlerPool, blsWorker, *this));
}
messageHandlerPool.resize(2);
RenameThreadPool(messageHandlerPool, "quorum-msg");
}
CDKGSessionManager::~CDKGSessionManager()
{
messageHandlerPool.stop(true);
}
void CDKGSessionManager::UpdatedBlockTip(const CBlockIndex* pindexNew, const CBlockIndex* pindexFork, bool fInitialDownload)
{
const auto& consensus = Params().GetConsensus();
CleanupCache();
if (fInitialDownload)
return;
if (!deterministicMNManager->IsDIP3Active(pindexNew->nHeight))
return;
if (!sporkManager.IsSporkActive(SPORK_17_QUORUM_DKG_ENABLED))
return;
LOCK(cs_main);
for (auto& qt : dkgSessionHandlers) {
qt.second.UpdatedBlockTip(pindexNew, pindexFork, fInitialDownload);
}
}
void CDKGSessionManager::ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman)
{
if (!sporkManager.IsSporkActive(SPORK_17_QUORUM_DKG_ENABLED))
return;
if (strCommand != NetMsgType::QCONTRIB
&& strCommand != NetMsgType::QCOMPLAINT
&& strCommand != NetMsgType::QJUSTIFICATION
&& strCommand != NetMsgType::QPCOMMITMENT
&& strCommand != NetMsgType::QWATCH) {
return;
}
if (strCommand == NetMsgType::QWATCH) {
pfrom->qwatch = true;
for (auto& p : dkgSessionHandlers) {
LOCK2(p.second.cs, p.second.curSession->invCs);
p.second.curSession->participatingNodes.emplace(pfrom->addr);
}
return;
}
if (vRecv.size() < 1) {
LOCK(cs_main);
Misbehaving(pfrom->id, 100);
return;
}
// peek into the message and see which LLMQType it is. First byte of all messages is always the LLMQType
Consensus::LLMQType llmqType = (Consensus::LLMQType)*vRecv.begin();
if (!dkgSessionHandlers.count(llmqType)) {
LOCK(cs_main);
Misbehaving(pfrom->id, 100);
return;
}
dkgSessionHandlers.at(llmqType).ProcessMessage(pfrom, strCommand, vRecv, connman);
}
bool CDKGSessionManager::AlreadyHave(const CInv& inv) const
{
if (!sporkManager.IsSporkActive(SPORK_17_QUORUM_DKG_ENABLED))
return false;
for (auto& p : dkgSessionHandlers) {
auto& dkgType = p.second;
if (dkgType.pendingContributions.HasSeen(inv.hash)
|| dkgType.pendingComplaints.HasSeen(inv.hash)
|| dkgType.pendingJustifications.HasSeen(inv.hash)
|| dkgType.pendingPrematureCommitments.HasSeen(inv.hash)) {
return true;
}
}
return false;
}
bool CDKGSessionManager::GetContribution(const uint256& hash, CDKGContribution& ret) const
{
if (!sporkManager.IsSporkActive(SPORK_17_QUORUM_DKG_ENABLED))
return false;
for (auto& p : dkgSessionHandlers) {
auto& dkgType = p.second;
LOCK2(dkgType.cs, dkgType.curSession->invCs);
if (dkgType.phase < QuorumPhase_Initialized || dkgType.phase > QuorumPhase_Contribute) {
continue;
}
auto it = dkgType.curSession->contributions.find(hash);
if (it != dkgType.curSession->contributions.end()) {
ret = it->second;
return true;
}
}
return false;
}
bool CDKGSessionManager::GetComplaint(const uint256& hash, CDKGComplaint& ret) const
{
if (!sporkManager.IsSporkActive(SPORK_17_QUORUM_DKG_ENABLED))
return false;
for (auto& p : dkgSessionHandlers) {
auto& dkgType = p.second;
LOCK2(dkgType.cs, dkgType.curSession->invCs);
if (dkgType.phase < QuorumPhase_Contribute || dkgType.phase > QuorumPhase_Complain) {
continue;
}
auto it = dkgType.curSession->complaints.find(hash);
if (it != dkgType.curSession->complaints.end()) {
ret = it->second;
return true;
}
}
return false;
}
bool CDKGSessionManager::GetJustification(const uint256& hash, CDKGJustification& ret) const
{
if (!sporkManager.IsSporkActive(SPORK_17_QUORUM_DKG_ENABLED))
return false;
for (auto& p : dkgSessionHandlers) {
auto& dkgType = p.second;
LOCK2(dkgType.cs, dkgType.curSession->invCs);
if (dkgType.phase < QuorumPhase_Complain || dkgType.phase > QuorumPhase_Justify) {
continue;
}
auto it = dkgType.curSession->justifications.find(hash);
if (it != dkgType.curSession->justifications.end()) {
ret = it->second;
return true;
}
}
return false;
}
bool CDKGSessionManager::GetPrematureCommitment(const uint256& hash, CDKGPrematureCommitment& ret) const
{
if (!sporkManager.IsSporkActive(SPORK_17_QUORUM_DKG_ENABLED))
return false;
for (auto& p : dkgSessionHandlers) {
auto& dkgType = p.second;
LOCK2(dkgType.cs, dkgType.curSession->invCs);
if (dkgType.phase < QuorumPhase_Justify || dkgType.phase > QuorumPhase_Commit) {
continue;
}
auto it = dkgType.curSession->prematureCommitments.find(hash);
if (it != dkgType.curSession->prematureCommitments.end() && dkgType.curSession->validCommitments.count(hash)) {
ret = it->second;
return true;
}
}
return false;
}
void CDKGSessionManager::WriteVerifiedVvecContribution(Consensus::LLMQType llmqType, const uint256& quorumHash, const uint256& proTxHash, const BLSVerificationVectorPtr& vvec)
{
evoDb.GetRawDB().Write(std::make_tuple(DB_VVEC, (uint8_t)llmqType, quorumHash, proTxHash), *vvec);
}
void CDKGSessionManager::WriteVerifiedSkContribution(Consensus::LLMQType llmqType, const uint256& quorumHash, const uint256& proTxHash, const CBLSSecretKey& skContribution)
{
evoDb.GetRawDB().Write(std::make_tuple(DB_SKCONTRIB, (uint8_t)llmqType, quorumHash, proTxHash), skContribution);
}
bool CDKGSessionManager::GetVerifiedContributions(Consensus::LLMQType llmqType, const uint256& quorumHash, const std::vector<bool>& validMembers, std::vector<uint16_t>& memberIndexesRet, std::vector<BLSVerificationVectorPtr>& vvecsRet, BLSSecretKeyVector& skContributionsRet)
{
auto members = CLLMQUtils::GetAllQuorumMembers(llmqType, quorumHash);
if (validMembers.size() != members.size()) {
// should never happen as we should always call this method with correct params
return false;
}
memberIndexesRet.clear();
vvecsRet.clear();
skContributionsRet.clear();
memberIndexesRet.reserve(members.size());
vvecsRet.reserve(members.size());
skContributionsRet.reserve(members.size());
for (size_t i = 0; i < members.size(); i++) {
if (validMembers[i]) {
BLSVerificationVectorPtr vvec;
CBLSSecretKey skContribution;
if (!GetVerifiedContribution(llmqType, quorumHash, members[i]->proTxHash, vvec, skContribution)) {
return false;
}
memberIndexesRet.emplace_back(i);
vvecsRet.emplace_back(vvec);
skContributionsRet.emplace_back(skContribution);
}
}
return true;
}
bool CDKGSessionManager::GetVerifiedContribution(Consensus::LLMQType llmqType, const uint256& quorumHash, const uint256& proTxHash, BLSVerificationVectorPtr& vvecRet, CBLSSecretKey& skContributionRet)
{
LOCK(contributionsCacheCs);
ContributionsCacheKey cacheKey = {llmqType, quorumHash, proTxHash};
auto it = contributionsCache.find(cacheKey);
if (it != contributionsCache.end()) {
vvecRet = it->second.vvec;
skContributionRet = it->second.skContribution;
return true;
}
BLSVerificationVector vvec;
BLSVerificationVectorPtr vvecPtr;
CBLSSecretKey skContribution;
if (evoDb.GetRawDB().Read(std::make_tuple(DB_VVEC, (uint8_t)llmqType, quorumHash, proTxHash), vvec)) {
vvecPtr = std::make_shared<BLSVerificationVector>(std::move(vvec));
}
evoDb.GetRawDB().Read(std::make_tuple(DB_SKCONTRIB, (uint8_t)llmqType, quorumHash, proTxHash), skContribution);
it = contributionsCache.emplace(cacheKey, ContributionsCacheEntry{GetTimeMillis(), vvecPtr, skContribution}).first;
vvecRet = it->second.vvec;
skContributionRet = it->second.skContribution;
return true;
}
void CDKGSessionManager::CleanupCache()
{
LOCK(contributionsCacheCs);
auto curTime = GetTimeMillis();
for (auto it = contributionsCache.begin(); it != contributionsCache.end(); ) {
if (curTime - it->second.entryTime > MAX_CONTRIBUTION_CACHE_TIME) {
it = contributionsCache.erase(it);
} else {
++it;
}
}
}
}

View File

@ -0,0 +1,76 @@
// Copyright (c) 2018 The Dash Core developers
// Distributed under the MIT/X11 software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef DASH_QUORUMS_DKGSESSIONMGR_H
#define DASH_QUORUMS_DKGSESSIONMGR_H
#include "llmq/quorums_dkgsessionhandler.h"
#include "validation.h"
#include "ctpl.h"
class UniValue;
namespace llmq
{
class CDKGSessionManager
{
static const int64_t MAX_CONTRIBUTION_CACHE_TIME = 60 * 1000;
private:
CEvoDB& evoDb;
CBLSWorker& blsWorker;
ctpl::thread_pool messageHandlerPool;
std::map<Consensus::LLMQType, CDKGSessionHandler> dkgSessionHandlers;
CCriticalSection contributionsCacheCs;
struct ContributionsCacheKey {
Consensus::LLMQType llmqType;
uint256 quorumHash;
uint256 proTxHash;
bool operator<(const ContributionsCacheKey& r) const
{
if (llmqType != r.llmqType) return llmqType < r.llmqType;
if (quorumHash != r.quorumHash) return quorumHash < r.quorumHash;
return proTxHash < r.proTxHash;
}
};
struct ContributionsCacheEntry {
int64_t entryTime;
BLSVerificationVectorPtr vvec;
CBLSSecretKey skContribution;
};
std::map<ContributionsCacheKey, ContributionsCacheEntry> contributionsCache;
public:
CDKGSessionManager(CEvoDB& _evoDb, CBLSWorker& _blsWorker);
~CDKGSessionManager();
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload);
void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman);
bool AlreadyHave(const CInv& inv) const;
bool GetContribution(const uint256& hash, CDKGContribution& ret) const;
bool GetComplaint(const uint256& hash, CDKGComplaint& ret) const;
bool GetJustification(const uint256& hash, CDKGJustification& ret) const;
bool GetPrematureCommitment(const uint256& hash, CDKGPrematureCommitment& ret) const;
// Verified contributions are written while in the DKG
void WriteVerifiedVvecContribution(Consensus::LLMQType llmqType, const uint256& quorumHash, const uint256& proTxHash, const BLSVerificationVectorPtr& vvec);
void WriteVerifiedSkContribution(Consensus::LLMQType llmqType, const uint256& quorumHash, const uint256& proTxHash, const CBLSSecretKey& skContribution);
bool GetVerifiedContributions(Consensus::LLMQType llmqType, const uint256& quorumHash, const std::vector<bool>& validMembers, std::vector<uint16_t>& memberIndexesRet, std::vector<BLSVerificationVectorPtr>& vvecsRet, BLSSecretKeyVector& skContributionsRet);
bool GetVerifiedContribution(Consensus::LLMQType llmqType, const uint256& quorumHash, const uint256& proTxHash, BLSVerificationVectorPtr& vvecRet, CBLSSecretKey& skContributionRet);
private:
void CleanupCache();
};
extern CDKGSessionManager* quorumDKGSessionManager;
}
#endif //DASH_QUORUMS_DKGSESSIONMGR_H

View File

@ -6,17 +6,23 @@
#include "quorums_blockprocessor.h" #include "quorums_blockprocessor.h"
#include "quorums_commitment.h" #include "quorums_commitment.h"
#include "quorums_dkgsessionmgr.h"
namespace llmq namespace llmq
{ {
static CBLSWorker blsWorker;
void InitLLMQSystem(CEvoDB& evoDb) void InitLLMQSystem(CEvoDB& evoDb)
{ {
quorumBlockProcessor = new CQuorumBlockProcessor(evoDb); quorumBlockProcessor = new CQuorumBlockProcessor(evoDb);
quorumDKGSessionManager = new CDKGSessionManager(evoDb, blsWorker);
} }
void DestroyLLMQSystem() void DestroyLLMQSystem()
{ {
delete quorumDKGSessionManager;
quorumDKGSessionManager = NULL;
delete quorumBlockProcessor; delete quorumBlockProcessor;
quorumBlockProcessor = nullptr; quorumBlockProcessor = nullptr;
} }

View File

@ -10,6 +10,9 @@ class CEvoDB;
namespace llmq namespace llmq
{ {
// If true, we will connect to all new quorums and watch their communication
static const bool DEFAULT_WATCH_QUORUMS = false;
void InitLLMQSystem(CEvoDB& evoDb); void InitLLMQSystem(CEvoDB& evoDb);
void DestroyLLMQSystem(); void DestroyLLMQSystem();

View File

@ -29,5 +29,60 @@ uint256 CLLMQUtils::BuildCommitmentHash(uint8_t llmqType, const uint256& blockHa
return hw.GetHash(); return hw.GetHash();
} }
std::set<CService> CLLMQUtils::GetQuorumConnections(Consensus::LLMQType llmqType, const uint256& blockHash, const uint256& forMember)
{
auto& params = Params().GetConsensus().llmqs.at(llmqType);
auto mns = GetAllQuorumMembers(llmqType, blockHash);
std::set<CService> result;
for (size_t i = 0; i < mns.size(); i++) {
auto& dmn = mns[i];
if (dmn->proTxHash == forMember) {
for (int n = 0; n < params.neighborConnections; n++) {
size_t idx = (i + 1 + n) % mns.size();
auto& otherDmn = mns[idx];
if (otherDmn == dmn) {
continue;
}
result.emplace(otherDmn->pdmnState->addr);
}
size_t startIdx = i + mns.size() / 2;
startIdx -= (params.diagonalConnections / 2) * params.neighborConnections;
startIdx %= mns.size();
for (int n = 0; n < params.diagonalConnections; n++) {
size_t idx = startIdx + n * params.neighborConnections;
idx %= mns.size();
auto& otherDmn = mns[idx];
if (otherDmn == dmn) {
continue;
}
result.emplace(otherDmn->pdmnState->addr);
}
}
}
return result;
}
std::set<size_t> CLLMQUtils::CalcDeterministicWatchConnections(Consensus::LLMQType llmqType, const uint256& blockHash, size_t memberCount, size_t connectionCount)
{
static uint256 qwatchConnectionSeed;
static std::atomic<bool> qwatchConnectionSeedGenerated{false};
static CCriticalSection qwatchConnectionSeedCs;
if (!qwatchConnectionSeedGenerated) {
LOCK(qwatchConnectionSeedCs);
if (!qwatchConnectionSeedGenerated) {
qwatchConnectionSeed = GetRandHash();
qwatchConnectionSeedGenerated = true;
}
}
std::set<size_t> result;
uint256 rnd = qwatchConnectionSeed;
for (size_t i = 0; i < connectionCount; i++) {
rnd = ::SerializeHash(std::make_pair(rnd, std::make_pair((uint8_t)llmqType, blockHash)));
result.emplace(rnd.GetUint64(0) % memberCount);
}
return result;
}
} }

View File

@ -21,6 +21,9 @@ public:
static std::vector<CDeterministicMNCPtr> GetAllQuorumMembers(Consensus::LLMQType llmqType, const uint256& blockHash); static std::vector<CDeterministicMNCPtr> GetAllQuorumMembers(Consensus::LLMQType llmqType, const uint256& blockHash);
static uint256 BuildCommitmentHash(uint8_t llmqType, const uint256& blockHash, const std::vector<bool>& validMembers, const CBLSPublicKey& pubKey, const uint256& vvecHash); static uint256 BuildCommitmentHash(uint8_t llmqType, const uint256& blockHash, const std::vector<bool>& validMembers, const CBLSPublicKey& pubKey, const uint256& vvecHash);
static std::set<CService> GetQuorumConnections(Consensus::LLMQType llmqType, const uint256& blockHash, const uint256& forMember);
static std::set<size_t> CalcDeterministicWatchConnections(Consensus::LLMQType llmqType, const uint256& blockHash, size_t memberCount, size_t connectionCount);
}; };
} }

View File

@ -44,8 +44,10 @@
#include "evo/deterministicmns.h" #include "evo/deterministicmns.h"
#include "evo/simplifiedmns.h" #include "evo/simplifiedmns.h"
#include "llmq/quorums_commitment.h"
#include "llmq/quorums_blockprocessor.h" #include "llmq/quorums_blockprocessor.h"
#include "llmq/quorums_commitment.h"
#include "llmq/quorums_dkgsessionmgr.h"
#include "llmq/quorums_init.h"
#include <boost/thread.hpp> #include <boost/thread.hpp>
@ -949,6 +951,11 @@ bool static AlreadyHave(const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
case MSG_QUORUM_FINAL_COMMITMENT: case MSG_QUORUM_FINAL_COMMITMENT:
return llmq::quorumBlockProcessor->HasMinableCommitment(inv.hash); return llmq::quorumBlockProcessor->HasMinableCommitment(inv.hash);
case MSG_QUORUM_CONTRIB:
case MSG_QUORUM_COMPLAINT:
case MSG_QUORUM_JUSTIFICATION:
case MSG_QUORUM_PREMATURE_COMMITMENT:
return llmq::quorumDKGSessionManager->AlreadyHave(inv);
} }
// Don't know what it is, just say we already got one // Don't know what it is, just say we already got one
@ -1218,6 +1225,35 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
} }
} }
if (!push && (inv.type == MSG_QUORUM_CONTRIB)) {
llmq::CDKGContribution o;
if (llmq::quorumDKGSessionManager->GetContribution(inv.hash, o)) {
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::QCONTRIB, o));
push = true;
}
}
if (!push && (inv.type == MSG_QUORUM_COMPLAINT)) {
llmq::CDKGComplaint o;
if (llmq::quorumDKGSessionManager->GetComplaint(inv.hash, o)) {
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::QCOMPLAINT, o));
push = true;
}
}
if (!push && (inv.type == MSG_QUORUM_JUSTIFICATION)) {
llmq::CDKGJustification o;
if (llmq::quorumDKGSessionManager->GetJustification(inv.hash, o)) {
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::QJUSTIFICATION, o));
push = true;
}
}
if (!push && (inv.type == MSG_QUORUM_PREMATURE_COMMITMENT)) {
llmq::CDKGPrematureCommitment o;
if (llmq::quorumDKGSessionManager->GetPrematureCommitment(inv.hash, o)) {
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::QPCOMMITMENT, o));
push = true;
}
}
if (!push) if (!push)
vNotFound.push_back(inv); vNotFound.push_back(inv);
} }
@ -1556,6 +1592,10 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion)); connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion));
} }
if (GetBoolArg("-watchquorums", llmq::DEFAULT_WATCH_QUORUMS)) {
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::QWATCH));
}
pfrom->fSuccessfullyConnected = true; pfrom->fSuccessfullyConnected = true;
} }
@ -2870,6 +2910,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
masternodeSync.ProcessMessage(pfrom, strCommand, vRecv); masternodeSync.ProcessMessage(pfrom, strCommand, vRecv);
governance.ProcessMessage(pfrom, strCommand, vRecv, connman); governance.ProcessMessage(pfrom, strCommand, vRecv, connman);
llmq::quorumBlockProcessor->ProcessMessage(pfrom, strCommand, vRecv, connman); llmq::quorumBlockProcessor->ProcessMessage(pfrom, strCommand, vRecv, connman);
llmq::quorumDKGSessionManager->ProcessMessage(pfrom, strCommand, vRecv, connman);
} }
else else
{ {

View File

@ -60,6 +60,10 @@ const char *GETMNLISTDIFF="getmnlistd";
const char *MNLISTDIFF="mnlistdiff"; const char *MNLISTDIFF="mnlistdiff";
const char *QFCOMMITMENT="qfcommit"; const char *QFCOMMITMENT="qfcommit";
const char *QCONTRIB="qcontrib"; const char *QCONTRIB="qcontrib";
const char *QCOMPLAINT="qcomplaint";
const char *QJUSTIFICATION="qjustify";
const char *QPCOMMITMENT="qpcommit";
const char *QWATCH="qwatch";
}; };
static const char* ppszTypeName[] = static const char* ppszTypeName[] =
@ -90,6 +94,9 @@ static const char* ppszTypeName[] =
NetMsgType::QFCOMMITMENT, NetMsgType::QFCOMMITMENT,
"qdcommit", // was only shortly used on testnet "qdcommit", // was only shortly used on testnet
NetMsgType::QCONTRIB, NetMsgType::QCONTRIB,
NetMsgType::QCOMPLAINT,
NetMsgType::QJUSTIFICATION,
NetMsgType::QPCOMMITMENT,
}; };
/** All known message types. Keep this in the same order as the list of /** All known message types. Keep this in the same order as the list of
@ -144,6 +151,10 @@ const static std::string allNetMessageTypes[] = {
NetMsgType::MNLISTDIFF, NetMsgType::MNLISTDIFF,
NetMsgType::QFCOMMITMENT, NetMsgType::QFCOMMITMENT,
NetMsgType::QCONTRIB, NetMsgType::QCONTRIB,
NetMsgType::QCOMPLAINT,
NetMsgType::QJUSTIFICATION,
NetMsgType::QPCOMMITMENT,
NetMsgType::QWATCH,
}; };
const static std::vector<std::string> allNetMessageTypesVec(allNetMessageTypes, allNetMessageTypes+ARRAYLEN(allNetMessageTypes)); const static std::vector<std::string> allNetMessageTypesVec(allNetMessageTypes, allNetMessageTypes+ARRAYLEN(allNetMessageTypes));

View File

@ -266,6 +266,10 @@ extern const char *GETMNLISTDIFF;
extern const char *MNLISTDIFF; extern const char *MNLISTDIFF;
extern const char *QFCOMMITMENT; extern const char *QFCOMMITMENT;
extern const char *QCONTRIB; extern const char *QCONTRIB;
extern const char *QCOMPLAINT;
extern const char *QJUSTIFICATION;
extern const char *QPCOMMITMENT;
extern const char *QWATCH;
}; };
/* Get a vector of all valid message types (see above) */ /* Get a vector of all valid message types (see above) */
@ -361,7 +365,10 @@ enum GetDataMsg {
MSG_CMPCT_BLOCK = 20, //!< Defined in BIP152 MSG_CMPCT_BLOCK = 20, //!< Defined in BIP152
MSG_QUORUM_FINAL_COMMITMENT = 21, MSG_QUORUM_FINAL_COMMITMENT = 21,
/* MSG_QUORUM_DUMMY_COMMITMENT = 22, */ // was shortly used on testnet/devnet/regtest /* MSG_QUORUM_DUMMY_COMMITMENT = 22, */ // was shortly used on testnet/devnet/regtest
/* MSG_QUORUM_DUMMY_CONTRIBUTION = 23, */ // not a valid contribution and only allowed on testnet/devnet/regtest. Will later be replaced with the real contribution MSG_QUORUM_CONTRIB = 23,
MSG_QUORUM_COMPLAINT = 24,
MSG_QUORUM_JUSTIFICATION = 25,
MSG_QUORUM_PREMATURE_COMMITMENT = 26,
}; };
/** inv message data */ /** inv message data */