139 lines
4.6 KiB
C++
139 lines
4.6 KiB
C++
// Copyright (c) 2018 The Dash Core developers
|
|
// Distributed under the MIT/X11 software license, see the accompanying
|
|
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
|
|
|
#ifndef DASH_QUORUMS_DKGSESSIONHANDLER_H
|
|
#define DASH_QUORUMS_DKGSESSIONHANDLER_H
|
|
|
|
#include "llmq/quorums_dkgsession.h"
|
|
|
|
#include "validation.h"
|
|
|
|
#include "ctpl.h"
|
|
|
|
namespace llmq
|
|
{
|
|
|
|
enum QuorumPhase {
|
|
QuorumPhase_Idle,
|
|
QuorumPhase_Initialized,
|
|
QuorumPhase_Contribute,
|
|
QuorumPhase_Complain,
|
|
QuorumPhase_Justify,
|
|
QuorumPhase_Commit,
|
|
QuorumPhase_Finalize,
|
|
|
|
QuorumPhase_None=-1,
|
|
};
|
|
|
|
/**
|
|
* Acts as a FIFO queue for incoming DKG messages. The reason we need this is that deserialization of these messages
|
|
* is too slow to be processed in the main message handler thread. So, instead of processing them directly from the
|
|
* main handler thread, we push them into a CDKGPendingMessages object and later pop+deserialize them in the DKG phase
|
|
* handler thread.
|
|
*
|
|
* Each message type has it's own instance of this class.
|
|
*/
|
|
class CDKGPendingMessages
|
|
{
|
|
public:
|
|
typedef std::pair<NodeId, std::shared_ptr<CDataStream>> BinaryMessage;
|
|
|
|
private:
|
|
mutable CCriticalSection cs;
|
|
size_t maxMessagesPerNode;
|
|
std::list<BinaryMessage> pendingMessages;
|
|
std::map<NodeId, size_t> messagesPerNode;
|
|
std::set<uint256> seenMessages;
|
|
|
|
public:
|
|
CDKGPendingMessages(size_t _maxMessagesPerNode);
|
|
|
|
void PushPendingMessage(NodeId from, CDataStream& vRecv);
|
|
std::list<BinaryMessage> PopPendingMessages(size_t maxCount);
|
|
bool HasSeen(const uint256& hash) const;
|
|
void Clear();
|
|
|
|
// Might return nullptr messages, which indicates that deserialization failed for some reason
|
|
template<typename Message>
|
|
std::vector<std::pair<NodeId, std::shared_ptr<Message>>> PopAndDeserializeMessages(size_t maxCount)
|
|
{
|
|
auto binaryMessages = PopPendingMessages(maxCount);
|
|
if (binaryMessages.empty()) {
|
|
return {};
|
|
}
|
|
|
|
std::vector<std::pair<NodeId, std::shared_ptr<Message>>> ret;
|
|
ret.reserve(binaryMessages.size());
|
|
for (auto& bm : binaryMessages) {
|
|
auto msg = std::make_shared<Message>();
|
|
try {
|
|
*bm.second >> *msg;
|
|
} catch (...) {
|
|
msg = nullptr;
|
|
}
|
|
ret.emplace_back(std::make_pair(bm.first, std::move(msg)));
|
|
}
|
|
|
|
return std::move(ret);
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Handles multiple sequential sessions of one specific LLMQ type. There is one instance of this class per LLMQ type.
|
|
*
|
|
* It internally starts the phase handler thread, which constantly loops and sequentially processes one session at a
|
|
* time and waiting for the next phase if necessary.
|
|
*/
|
|
class CDKGSessionHandler
|
|
{
|
|
private:
|
|
friend class CDKGSessionManager;
|
|
|
|
private:
|
|
mutable CCriticalSection cs;
|
|
std::atomic<bool> stopRequested{false};
|
|
|
|
const Consensus::LLMQParams& params;
|
|
CEvoDB& evoDb;
|
|
ctpl::thread_pool& messageHandlerPool;
|
|
CBLSWorker& blsWorker;
|
|
CDKGSessionManager& dkgManager;
|
|
|
|
QuorumPhase phase{QuorumPhase_Idle};
|
|
int quorumHeight{-1};
|
|
uint256 quorumHash;
|
|
std::shared_ptr<CDKGSession> curSession;
|
|
std::thread phaseHandlerThread;
|
|
|
|
CDKGPendingMessages pendingContributions;
|
|
CDKGPendingMessages pendingComplaints;
|
|
CDKGPendingMessages pendingJustifications;
|
|
CDKGPendingMessages pendingPrematureCommitments;
|
|
|
|
public:
|
|
CDKGSessionHandler(const Consensus::LLMQParams& _params, CEvoDB& _evoDb, ctpl::thread_pool& _messageHandlerPool, CBLSWorker& blsWorker, CDKGSessionManager& _dkgManager);
|
|
~CDKGSessionHandler();
|
|
|
|
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload);
|
|
void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman);
|
|
|
|
private:
|
|
bool InitNewQuorum(int height, const uint256& quorumHash);
|
|
|
|
std::pair<QuorumPhase, uint256> GetPhaseAndQuorumHash();
|
|
|
|
typedef std::function<void()> StartPhaseFunc;
|
|
typedef std::function<bool()> WhileWaitFunc;
|
|
void WaitForNextPhase(QuorumPhase curPhase, QuorumPhase nextPhase, uint256& expectedQuorumHash, const WhileWaitFunc& runWhileWaiting);
|
|
void WaitForNewQuorum(const uint256& oldQuorumHash);
|
|
void RandomSleep(QuorumPhase curPhase, uint256& expectedQuorumHash, double randomSleepFactor, const WhileWaitFunc& runWhileWaiting);
|
|
void HandlePhase(QuorumPhase curPhase, QuorumPhase nextPhase, uint256& expectedQuorumHash, double randomSleepFactor, const StartPhaseFunc& startPhaseFunc, const WhileWaitFunc& runWhileWaiting);
|
|
void HandleDKGRound();
|
|
void PhaseHandlerThread();
|
|
};
|
|
|
|
}
|
|
|
|
#endif //DASH_QUORUMS_DKGSESSIONHANDLER_H
|