// Copyright (c) 2018 The Dash Core developers // Distributed under the MIT/X11 software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef DASH_QUORUMS_DKGSESSIONHANDLER_H #define DASH_QUORUMS_DKGSESSIONHANDLER_H #include "llmq/quorums_dkgsession.h" #include "validation.h" #include "ctpl.h" namespace llmq { enum QuorumPhase { QuorumPhase_Idle, QuorumPhase_Initialized, QuorumPhase_Contribute, QuorumPhase_Complain, QuorumPhase_Justify, QuorumPhase_Commit, QuorumPhase_Finalize, QuorumPhase_None=-1, }; /** * Acts as a FIFO queue for incoming DKG messages. The reason we need this is that deserialization of these messages * is too slow to be processed in the main message handler thread. So, instead of processing them directly from the * main handler thread, we push them into a CDKGPendingMessages object and later pop+deserialize them in the DKG phase * handler thread. * * Each message type has it's own instance of this class. */ class CDKGPendingMessages { public: typedef std::pair> BinaryMessage; private: mutable CCriticalSection cs; size_t maxMessagesPerNode; std::list pendingMessages; std::map messagesPerNode; std::set seenMessages; public: CDKGPendingMessages(size_t _maxMessagesPerNode); void PushPendingMessage(NodeId from, CDataStream& vRecv); std::list PopPendingMessages(size_t maxCount); bool HasSeen(const uint256& hash) const; void Clear(); // Might return nullptr messages, which indicates that deserialization failed for some reason template std::vector>> PopAndDeserializeMessages(size_t maxCount) { auto binaryMessages = PopPendingMessages(maxCount); if (binaryMessages.empty()) { return {}; } std::vector>> ret; ret.reserve(binaryMessages.size()); for (auto& bm : binaryMessages) { auto msg = std::make_shared(); try { *bm.second >> *msg; } catch (...) { msg = nullptr; } ret.emplace_back(std::make_pair(bm.first, std::move(msg))); } return std::move(ret); } }; /** * Handles multiple sequential sessions of one specific LLMQ type. There is one instance of this class per LLMQ type. * * It internally starts the phase handler thread, which constantly loops and sequentially processes one session at a * time and waiting for the next phase if necessary. */ class CDKGSessionHandler { private: friend class CDKGSessionManager; private: mutable CCriticalSection cs; std::atomic stopRequested{false}; const Consensus::LLMQParams& params; CEvoDB& evoDb; ctpl::thread_pool& messageHandlerPool; CBLSWorker& blsWorker; CDKGSessionManager& dkgManager; QuorumPhase phase{QuorumPhase_Idle}; int quorumHeight{-1}; uint256 quorumHash; std::shared_ptr curSession; std::thread phaseHandlerThread; CDKGPendingMessages pendingContributions; CDKGPendingMessages pendingComplaints; CDKGPendingMessages pendingJustifications; CDKGPendingMessages pendingPrematureCommitments; public: CDKGSessionHandler(const Consensus::LLMQParams& _params, CEvoDB& _evoDb, ctpl::thread_pool& _messageHandlerPool, CBLSWorker& blsWorker, CDKGSessionManager& _dkgManager); ~CDKGSessionHandler(); void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload); void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman); private: bool InitNewQuorum(int height, const uint256& quorumHash); std::pair GetPhaseAndQuorumHash(); typedef std::function StartPhaseFunc; typedef std::function WhileWaitFunc; void WaitForNextPhase(QuorumPhase curPhase, QuorumPhase nextPhase, uint256& expectedQuorumHash, const WhileWaitFunc& runWhileWaiting); void WaitForNewQuorum(const uint256& oldQuorumHash); void SleepBeforePhase(QuorumPhase curPhase, uint256& expectedQuorumHash, double randomSleepFactor, const WhileWaitFunc& runWhileWaiting); void HandlePhase(QuorumPhase curPhase, QuorumPhase nextPhase, uint256& expectedQuorumHash, double randomSleepFactor, const StartPhaseFunc& startPhaseFunc, const WhileWaitFunc& runWhileWaiting); void HandleDKGRound(); void PhaseHandlerThread(); }; } #endif //DASH_QUORUMS_DKGSESSIONHANDLER_H