neobytes/src/llmq/quorums_dkgsessionhandler.h
2019-01-09 12:16:12 +01:00

139 lines
4.6 KiB
C++

// Copyright (c) 2018 The Dash Core developers
// Distributed under the MIT/X11 software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef DASH_QUORUMS_DKGSESSIONHANDLER_H
#define DASH_QUORUMS_DKGSESSIONHANDLER_H
#include "llmq/quorums_dkgsession.h"
#include "validation.h"
#include "ctpl.h"
namespace llmq
{
enum QuorumPhase {
QuorumPhase_Idle,
QuorumPhase_Initialized,
QuorumPhase_Contribute,
QuorumPhase_Complain,
QuorumPhase_Justify,
QuorumPhase_Commit,
QuorumPhase_Finalize,
QuorumPhase_None=-1,
};
/**
* Acts as a FIFO queue for incoming DKG messages. The reason we need this is that deserialization of these messages
* is too slow to be processed in the main message handler thread. So, instead of processing them directly from the
* main handler thread, we push them into a CDKGPendingMessages object and later pop+deserialize them in the DKG phase
* handler thread.
*
* Each message type has it's own instance of this class.
*/
class CDKGPendingMessages
{
public:
typedef std::pair<NodeId, std::shared_ptr<CDataStream>> BinaryMessage;
private:
mutable CCriticalSection cs;
size_t maxMessagesPerNode;
std::list<BinaryMessage> pendingMessages;
std::map<NodeId, size_t> messagesPerNode;
std::set<uint256> seenMessages;
public:
CDKGPendingMessages(size_t _maxMessagesPerNode);
void PushPendingMessage(NodeId from, CDataStream& vRecv);
std::list<BinaryMessage> PopPendingMessages(size_t maxCount);
bool HasSeen(const uint256& hash) const;
void Clear();
// Might return nullptr messages, which indicates that deserialization failed for some reason
template<typename Message>
std::vector<std::pair<NodeId, std::shared_ptr<Message>>> PopAndDeserializeMessages(size_t maxCount)
{
auto binaryMessages = PopPendingMessages(maxCount);
if (binaryMessages.empty()) {
return {};
}
std::vector<std::pair<NodeId, std::shared_ptr<Message>>> ret;
ret.reserve(binaryMessages.size());
for (auto& bm : binaryMessages) {
auto msg = std::make_shared<Message>();
try {
*bm.second >> *msg;
} catch (...) {
msg = nullptr;
}
ret.emplace_back(std::make_pair(bm.first, std::move(msg)));
}
return std::move(ret);
}
};
/**
* Handles multiple sequential sessions of one specific LLMQ type. There is one instance of this class per LLMQ type.
*
* It internally starts the phase handler thread, which constantly loops and sequentially processes one session at a
* time and waiting for the next phase if necessary.
*/
class CDKGSessionHandler
{
private:
friend class CDKGSessionManager;
private:
mutable CCriticalSection cs;
std::atomic<bool> stopRequested{false};
const Consensus::LLMQParams& params;
CEvoDB& evoDb;
ctpl::thread_pool& messageHandlerPool;
CBLSWorker& blsWorker;
CDKGSessionManager& dkgManager;
QuorumPhase phase{QuorumPhase_Idle};
int quorumHeight{-1};
uint256 quorumHash;
std::shared_ptr<CDKGSession> curSession;
std::thread phaseHandlerThread;
CDKGPendingMessages pendingContributions;
CDKGPendingMessages pendingComplaints;
CDKGPendingMessages pendingJustifications;
CDKGPendingMessages pendingPrematureCommitments;
public:
CDKGSessionHandler(const Consensus::LLMQParams& _params, CEvoDB& _evoDb, ctpl::thread_pool& _messageHandlerPool, CBLSWorker& blsWorker, CDKGSessionManager& _dkgManager);
~CDKGSessionHandler();
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload);
void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman);
private:
bool InitNewQuorum(int height, const uint256& quorumHash);
std::pair<QuorumPhase, uint256> GetPhaseAndQuorumHash();
typedef std::function<void()> StartPhaseFunc;
typedef std::function<bool()> WhileWaitFunc;
void WaitForNextPhase(QuorumPhase curPhase, QuorumPhase nextPhase, uint256& expectedQuorumHash, const WhileWaitFunc& runWhileWaiting);
void WaitForNewQuorum(const uint256& oldQuorumHash);
void RandomSleep(QuorumPhase curPhase, uint256& expectedQuorumHash, double randomSleepFactor, const WhileWaitFunc& runWhileWaiting);
void HandlePhase(QuorumPhase curPhase, QuorumPhase nextPhase, uint256& expectedQuorumHash, double randomSleepFactor, const StartPhaseFunc& startPhaseFunc, const WhileWaitFunc& runWhileWaiting);
void HandleDKGRound();
void PhaseHandlerThread();
};
}
#endif //DASH_QUORUMS_DKGSESSIONHANDLER_H