Implement CInstantSendManager and related P2P messages

This commit is contained in:
Alexander Block 2018-09-21 16:01:04 +02:00
parent 5bbc122749
commit 83dbcc483f
11 changed files with 1075 additions and 0 deletions

View File

@ -173,6 +173,7 @@ BITCOIN_CORE_H = \
llmq/quorums_dkgsessionmgr.h \
llmq/quorums_dkgsession.h \
llmq/quorums_init.h \
llmq/quorums_instantsend.h \
llmq/quorums_signing.h \
llmq/quorums_signing_shares.h \
llmq/quorums_utils.h \
@ -291,6 +292,7 @@ libdash_server_a_SOURCES = \
llmq/quorums_dkgsessionmgr.cpp \
llmq/quorums_dkgsession.cpp \
llmq/quorums_init.cpp \
llmq/quorums_instantsend.cpp \
llmq/quorums_signing.cpp \
llmq/quorums_signing_shares.cpp \
llmq/quorums_utils.cpp \

View File

@ -623,6 +623,7 @@ public:
consensus.llmqs[Consensus::LLMQ_400_60] = llmq400_60;
consensus.llmqs[Consensus::LLMQ_400_85] = llmq400_85;
consensus.llmqChainLocks = Consensus::LLMQ_50_60;
consensus.llmqForInstantSend = Consensus::LLMQ_50_60;
fMiningRequiresPeers = true;
fDefaultConsistencyChecks = false;
@ -787,6 +788,7 @@ public:
consensus.llmqs[Consensus::LLMQ_10_60] = llmq10_60;
consensus.llmqs[Consensus::LLMQ_50_60] = llmq50_60;
consensus.llmqChainLocks = Consensus::LLMQ_10_60;
consensus.llmqForInstantSend = Consensus::LLMQ_10_60;
}
void UpdateBIP9Parameters(Consensus::DeploymentPos d, int64_t nStartTime, int64_t nTimeout, int64_t nWindowSize, int64_t nThreshold)

View File

@ -174,6 +174,7 @@ struct Params {
std::map<LLMQType, LLMQParams> llmqs;
LLMQType llmqChainLocks;
LLMQType llmqForInstantSend{LLMQ_NONE};
};
} // namespace Consensus

View File

@ -18,6 +18,7 @@
#include "llmq/quorums.h"
#include "llmq/quorums_chainlocks.h"
#include "llmq/quorums_instantsend.h"
#include "llmq/quorums_dkgsessionmgr.h"
void CDSNotificationInterface::InitializeCurrentBlockTip()
@ -72,6 +73,7 @@ void CDSNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, con
void CDSNotificationInterface::SyncTransaction(const CTransaction &tx, const CBlockIndex *pindex, int posInBlock)
{
llmq::quorumInstantSendManager->SyncTransaction(tx, pindex, posInBlock);
instantsend.SyncTransaction(tx, pindex, posInBlock);
CPrivateSend::SyncTransaction(tx, pindex, posInBlock);
}
@ -82,3 +84,8 @@ void CDSNotificationInterface::NotifyMasternodeListChanged(const CDeterministicM
governance.CheckMasternodeOrphanVotes(connman);
governance.UpdateCachesAndClean();
}
void CDSNotificationInterface::NotifyChainLock(const CBlockIndex* pindex)
{
llmq::quorumInstantSendManager->NotifyChainLock(pindex);
}

View File

@ -23,6 +23,7 @@ protected:
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
void SyncTransaction(const CTransaction &tx, const CBlockIndex *pindex, int posInBlock) override;
void NotifyMasternodeListChanged(const CDeterministicMNList& newList) override;
void NotifyChainLock(const CBlockIndex* pindex);
private:
CConnman& connman;

View File

@ -10,6 +10,7 @@
#include "quorums_chainlocks.h"
#include "quorums_debug.h"
#include "quorums_dkgsessionmgr.h"
#include "quorums_instantsend.h"
#include "quorums_signing.h"
#include "quorums_signing_shares.h"
@ -29,10 +30,13 @@ void InitLLMQSystem(CEvoDB& evoDb, CScheduler* scheduler, bool unitTests)
quorumSigSharesManager = new CSigSharesManager();
quorumSigningManager = new CSigningManager(unitTests);
chainLocksHandler = new CChainLocksHandler(scheduler);
quorumInstantSendManager = new CInstantSendManager(scheduler);
}
void DestroyLLMQSystem()
{
delete quorumInstantSendManager;
quorumInstantSendManager = nullptr;
delete chainLocksHandler;
chainLocksHandler = nullptr;
delete quorumSigningManager;
@ -64,10 +68,16 @@ void StartLLMQSystem()
if (chainLocksHandler) {
chainLocksHandler->RegisterAsRecoveredSigsListener();
}
if (quorumInstantSendManager) {
quorumInstantSendManager->RegisterAsRecoveredSigsListener();
}
}
void StopLLMQSystem()
{
if (quorumInstantSendManager) {
quorumInstantSendManager->UnregisterAsRecoveredSigsListener();
}
if (chainLocksHandler) {
chainLocksHandler->UnregisterAsRecoveredSigsListener();
}

View File

@ -0,0 +1,883 @@
// Copyright (c) 2019 The Dash Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "quorums_chainlocks.h"
#include "quorums_instantsend.h"
#include "quorums_utils.h"
#include "bls/bls_batchverifier.h"
#include "chainparams.h"
#include "coins.h"
#include "txmempool.h"
#include "masternode-sync.h"
#include "net_processing.h"
#include "scheduler.h"
#include "spork.h"
#include "validation.h"
#include "wallet/wallet.h"
// needed for nCompleteTXLocks
#include "instantx.h"
#include <boost/algorithm/string/replace.hpp>
namespace llmq
{
static const std::string INPUTLOCK_REQUESTID_PREFIX = "inlock";
static const std::string IXLOCK_REQUESTID_PREFIX = "ixlock";
CInstantSendManager* quorumInstantSendManager;
uint256 CInstantXLock::GetRequestId() const
{
CHashWriter hw(SER_GETHASH, 0);
hw << IXLOCK_REQUESTID_PREFIX;
hw << inputs;
return hw.GetHash();
}
CInstantSendManager::CInstantSendManager(CScheduler* _scheduler) :
scheduler(_scheduler)
{
}
CInstantSendManager::~CInstantSendManager()
{
}
void CInstantSendManager::RegisterAsRecoveredSigsListener()
{
quorumSigningManager->RegisterRecoveredSigsListener(this);
}
void CInstantSendManager::UnregisterAsRecoveredSigsListener()
{
quorumSigningManager->UnregisterRecoveredSigsListener(this);
}
bool CInstantSendManager::ProcessTx(CNode* pfrom, const CTransaction& tx, CConnman& connman, const Consensus::Params& params)
{
if (!IsNewInstantSendEnabled()) {
return true;
}
auto llmqType = params.llmqForInstantSend;
if (llmqType == Consensus::LLMQ_NONE) {
return true;
}
if (!fMasternodeMode) {
return true;
}
// Ignore any InstantSend messages until blockchain is synced
if (!masternodeSync.IsBlockchainSynced()) {
return true;
}
if (IsConflicted(tx)) {
return false;
}
if (!CheckCanLock(tx, true, params)) {
return false;
}
std::vector<uint256> ids;
ids.reserve(tx.vin.size());
for (const auto& in : tx.vin) {
auto id = ::SerializeHash(std::make_pair(INPUTLOCK_REQUESTID_PREFIX, in.prevout));
ids.emplace_back(id);
}
{
LOCK(cs);
size_t alreadyVotedCount = 0;
for (size_t i = 0; i < ids.size(); i++) {
auto it = inputVotes.find(ids[i]);
if (it != inputVotes.end()) {
if (it->second != tx.GetHash()) {
LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s: input %s is conflicting with ixlock %s\n", __func__,
tx.GetHash().ToString(), tx.vin[i].prevout.ToStringShort(), it->second.ToString());
return false;
}
alreadyVotedCount++;
}
}
if (alreadyVotedCount == ids.size()) {
return true;
}
for (auto& id : ids) {
inputVotes.emplace(id, tx.GetHash());
}
}
// don't even try the actual signing if any input is conflicting
for (auto& id : ids) {
if (quorumSigningManager->IsConflicting(llmqType, id, tx.GetHash())) {
return false;
}
}
for (auto& id : ids) {
quorumSigningManager->AsyncSignIfMember(llmqType, id, tx.GetHash());
}
// We might have received all input locks before we got the corresponding TX. In this case, we have to sign the
// ixlock now instead of waiting for the input locks.
TrySignInstantXLock(tx);
return true;
}
bool CInstantSendManager::CheckCanLock(const CTransaction& tx, bool printDebug, const Consensus::Params& params)
{
int nInstantSendConfirmationsRequired = params.nInstantSendConfirmationsRequired;
uint256 txHash = tx.GetHash();
CAmount nValueIn = 0;
for (const auto& in : tx.vin) {
CAmount v = 0;
if (!CheckCanLock(in.prevout, printDebug, &txHash, &v, params)) {
return false;
}
nValueIn += v;
}
// TODO decide if we should limit max input values. This was ok to do in the old system, but in the new system
// where we want to have all TXs locked at some point, this is counterproductive (especially when ChainLocks later
// depend on all TXs being locked first)
// CAmount maxValueIn = sporkManager.GetSporkValue(SPORK_5_INSTANTSEND_MAX_VALUE);
// if (nValueIn > maxValueIn * COIN) {
// if (printDebug) {
// LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s: TX input value too high. nValueIn=%f, maxValueIn=%d", __func__,
// tx.GetHash().ToString(), nValueIn / (double)COIN, maxValueIn);
// }
// return false;
// }
return true;
}
bool CInstantSendManager::CheckCanLock(const COutPoint& outpoint, bool printDebug, const uint256* _txHash, CAmount* retValue, const Consensus::Params& params)
{
int nInstantSendConfirmationsRequired = params.nInstantSendConfirmationsRequired;
if (IsLocked(outpoint.hash)) {
// if prevout was ix locked, allow locking of descendants (no matter if prevout is in mempool or already mined)
return true;
}
static uint256 txHashNull;
const uint256* txHash = &txHashNull;
if (_txHash) {
txHash = _txHash;
}
auto mempoolTx = mempool.get(outpoint.hash);
if (mempoolTx) {
if (printDebug) {
LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s: parent mempool TX %s is not locked\n", __func__,
txHash->ToString(), outpoint.hash.ToString());
}
return false;
}
Coin coin;
const CBlockIndex* pindexMined = nullptr;
{
LOCK(cs_main);
if (!pcoinsTip->GetCoin(outpoint, coin) || coin.IsSpent()) {
if (printDebug) {
LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s: failed to find UTXO %s\n", __func__,
txHash->ToString(), outpoint.ToStringShort());
}
return false;
}
pindexMined = chainActive[coin.nHeight];
}
int nTxAge = chainActive.Height() - coin.nHeight + 1;
// 1 less than the "send IX" gui requires, in case of a block propagating the network at the time
int nConfirmationsRequired = nInstantSendConfirmationsRequired - 1;
if (nTxAge < nConfirmationsRequired) {
if (!llmq::chainLocksHandler->HasChainLock(pindexMined->nHeight, pindexMined->GetBlockHash())) {
if (printDebug) {
LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s: outpoint %s too new and not ChainLocked. nTxAge=%d, nConfirmationsRequired=%d\n", __func__,
txHash->ToString(), outpoint.ToStringShort(), nTxAge, nConfirmationsRequired);
}
return false;
}
}
if (retValue) {
*retValue = coin.out.nValue;
}
return true;
}
void CInstantSendManager::HandleNewRecoveredSig(const CRecoveredSig& recoveredSig)
{
if (!IsNewInstantSendEnabled()) {
return;
}
auto llmqType = Params().GetConsensus().llmqForInstantSend;
if (llmqType == Consensus::LLMQ_NONE) {
return;
}
auto& params = Params().GetConsensus().llmqs.at(llmqType);
uint256 txid;
bool isInstantXLock = false;
{
LOCK(cs);
auto it = inputVotes.find(recoveredSig.id);
if (it != inputVotes.end()) {
txid = it->second;
}
if (creatingInstantXLocks.count(recoveredSig.id)) {
isInstantXLock = true;
}
}
if (!txid.IsNull()) {
HandleNewInputLockRecoveredSig(recoveredSig, txid);
} else if (isInstantXLock) {
HandleNewInstantXLockRecoveredSig(recoveredSig);
}
}
void CInstantSendManager::HandleNewInputLockRecoveredSig(const CRecoveredSig& recoveredSig, const uint256& txid)
{
auto llmqType = Params().GetConsensus().llmqForInstantSend;
CTransactionRef tx;
uint256 hashBlock;
if (!GetTransaction(txid, tx, Params().GetConsensus(), hashBlock, true)) {
return;
}
if (LogAcceptCategory("instantsend")) {
for (auto& in : tx->vin) {
auto id = ::SerializeHash(std::make_pair(INPUTLOCK_REQUESTID_PREFIX, in.prevout));
if (id == recoveredSig.id) {
LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s: got recovered sig for input %s\n", __func__,
txid.ToString(), in.prevout.ToStringShort());
break;
}
}
}
TrySignInstantXLock(*tx);
}
void CInstantSendManager::TrySignInstantXLock(const CTransaction& tx)
{
auto llmqType = Params().GetConsensus().llmqForInstantSend;
for (auto& in : tx.vin) {
auto id = ::SerializeHash(std::make_pair(INPUTLOCK_REQUESTID_PREFIX, in.prevout));
if (!quorumSigningManager->HasRecoveredSig(llmqType, id, tx.GetHash())) {
return;
}
}
LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s: got all recovered sigs, creating CInstantXLock\n", __func__,
tx.GetHash().ToString());
CInstantXLockInfo ixlockInfo;
ixlockInfo.time = GetTimeMillis();
ixlockInfo.ixlock.txid = tx.GetHash();
for (auto& in : tx.vin) {
ixlockInfo.ixlock.inputs.emplace_back(in.prevout);
}
auto id = ixlockInfo.ixlock.GetRequestId();
if (quorumSigningManager->HasRecoveredSigForId(llmqType, id)) {
return;
}
{
LOCK(cs);
auto e = creatingInstantXLocks.emplace(id, ixlockInfo);
if (!e.second) {
return;
}
txToCreatingInstantXLocks.emplace(tx.GetHash(), &e.first->second);
}
quorumSigningManager->AsyncSignIfMember(llmqType, id, tx.GetHash());
}
void CInstantSendManager::HandleNewInstantXLockRecoveredSig(const llmq::CRecoveredSig& recoveredSig)
{
CInstantXLockInfo ixlockInfo;
{
LOCK(cs);
auto it = creatingInstantXLocks.find(recoveredSig.id);
if (it == creatingInstantXLocks.end()) {
return;
}
ixlockInfo = std::move(it->second);
creatingInstantXLocks.erase(it);
txToCreatingInstantXLocks.erase(ixlockInfo.ixlock.txid);
}
if (ixlockInfo.ixlock.txid != recoveredSig.msgHash) {
LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s: ixlock conflicts with %s, dropping own version", __func__,
ixlockInfo.ixlock.txid.ToString(), recoveredSig.msgHash.ToString());
return;
}
ixlockInfo.ixlock.sig = recoveredSig.sig;
ProcessInstantXLock(-1, ::SerializeHash(ixlockInfo.ixlock), ixlockInfo.ixlock);
}
void CInstantSendManager::ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman)
{
if (!IsNewInstantSendEnabled()) {
return;
}
if (strCommand == NetMsgType::IXLOCK) {
CInstantXLock ixlock;
vRecv >> ixlock;
ProcessMessageInstantXLock(pfrom, ixlock, connman);
}
}
void CInstantSendManager::ProcessMessageInstantXLock(CNode* pfrom, const llmq::CInstantXLock& ixlock, CConnman& connman)
{
bool ban = false;
if (!PreVerifyInstantXLock(pfrom->id, ixlock, ban)) {
if (ban) {
LOCK(cs_main);
Misbehaving(pfrom->id, 100);
}
return;
}
auto hash = ::SerializeHash(ixlock);
LOCK(cs);
if (pendingInstantXLocks.count(hash) || finalInstantXLocks.count(hash)) {
return;
}
LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s, ixlock=%s: received ixlock, peer=%d\n", __func__,
ixlock.txid.ToString(), hash.ToString(), pfrom->id);
pendingInstantXLocks.emplace(hash, std::make_pair(pfrom->id, std::move(ixlock)));
if (!hasScheduledProcessPending) {
hasScheduledProcessPending = true;
scheduler->schedule([&] {
ProcessPendingInstantXLocks();
}, boost::chrono::system_clock::now() + boost::chrono::milliseconds(100));
}
}
bool CInstantSendManager::PreVerifyInstantXLock(NodeId nodeId, const llmq::CInstantXLock& ixlock, bool& retBan)
{
retBan = false;
if (ixlock.txid.IsNull() || !ixlock.sig.IsValid() || ixlock.inputs.empty()) {
retBan = true;
return false;
}
std::set<COutPoint> dups;
for (auto& o : ixlock.inputs) {
if (!dups.emplace(o).second) {
retBan = true;
return false;
}
}
return true;
}
void CInstantSendManager::ProcessPendingInstantXLocks()
{
auto llmqType = Params().GetConsensus().llmqForInstantSend;
decltype(pendingInstantXLocks) pend;
{
LOCK(cs);
hasScheduledProcessPending = false;
pend = std::move(pendingInstantXLocks);
}
if (!IsNewInstantSendEnabled()) {
return;
}
int tipHeight;
{
LOCK(cs_main);
tipHeight = chainActive.Height();
}
CBLSBatchVerifier<NodeId, uint256> batchVerifier(false, true, 8);
std::unordered_map<uint256, std::pair<CQuorumCPtr, CRecoveredSig>> recSigs;
for (const auto& p : pend) {
auto& hash = p.first;
auto nodeId = p.second.first;
auto& ixlock = p.second.second;
auto id = ixlock.GetRequestId();
// no need to verify an IXLOCK if we already have verified the recovered sig that belongs to it
if (quorumSigningManager->HasRecoveredSig(llmqType, id, ixlock.txid)) {
continue;
}
auto quorum = quorumSigningManager->SelectQuorumForSigning(llmqType, tipHeight, id);
if (!quorum) {
// should not happen, but if one fails to select, all others will also fail to select
return;
}
uint256 signHash = CLLMQUtils::BuildSignHash(llmqType, quorum->quorumHash, id, ixlock.txid);
batchVerifier.PushMessage(nodeId, hash, signHash, ixlock.sig, quorum->quorumPublicKey);
// We can reconstruct the CRecoveredSig objects from the ixlock and pass it to the signing manager, which
// avoids unnecessary double-verification of the signature. We however only do this when verification here
// turns out to be good (which is checked further down)
if (!quorumSigningManager->HasRecoveredSigForId(llmqType, id)) {
CRecoveredSig recSig;
recSig.llmqType = llmqType;
recSig.quorumHash = quorum->quorumHash;
recSig.id = id;
recSig.msgHash = ixlock.txid;
recSig.sig = ixlock.sig;
recSigs.emplace(std::piecewise_construct,
std::forward_as_tuple(hash),
std::forward_as_tuple(std::move(quorum), std::move(recSig)));
}
}
batchVerifier.Verify();
if (!batchVerifier.badSources.empty()) {
LOCK(cs_main);
for (auto& nodeId : batchVerifier.badSources) {
// Let's not be too harsh, as the peer might simply be unlucky and might have sent us an old lock which
// does not validate anymore due to changed quorums
Misbehaving(nodeId, 20);
}
}
for (const auto& p : pend) {
auto& hash = p.first;
auto nodeId = p.second.first;
auto& ixlock = p.second.second;
if (batchVerifier.badMessages.count(hash)) {
LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s, ixlock=%s: invalid sig in ixlock, peer=%d\n", __func__,
ixlock.txid.ToString(), hash.ToString(), nodeId);
continue;
}
ProcessInstantXLock(nodeId, hash, ixlock);
// See comment further on top. We pass a reconstructed recovered sig to the signing manager to avoid
// double-verification of the sig.
auto it = recSigs.find(hash);
if (it != recSigs.end()) {
auto& quorum = it->second.first;
auto& recSig = it->second.second;
if (!quorumSigningManager->HasRecoveredSigForId(llmqType, recSig.id)) {
recSig.UpdateHash();
LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s, ixlock=%s: passing reconstructed recSig to signing mgr, peer=%d\n", __func__,
ixlock.txid.ToString(), hash.ToString(), nodeId);
quorumSigningManager->PushReconstructedRecoveredSig(recSig, quorum);
}
}
}
}
void CInstantSendManager::ProcessInstantXLock(NodeId from, const uint256& hash, const CInstantXLock& ixlock)
{
{
LOCK(cs_main);
g_connman->RemoveAskFor(hash);
}
CInstantXLockInfo ixlockInfo;
ixlockInfo.time = GetTimeMillis();
ixlockInfo.ixlock = ixlock;
ixlockInfo.ixlockHash = hash;
uint256 hashBlock;
// we ignore failure here as we must be able to propagate the lock even if we don't have the TX locally
if (GetTransaction(ixlock.txid, ixlockInfo.tx, Params().GetConsensus(), hashBlock)) {
if (!hashBlock.IsNull()) {
{
LOCK(cs_main);
ixlockInfo.pindexMined = mapBlockIndex.at(hashBlock);
}
// Let's see if the TX that was locked by this ixlock is already mined in a ChainLocked block. If yes,
// we can simply ignore the ixlock, as the ChainLock implies locking of all TXs in that chain
if (llmq::chainLocksHandler->HasChainLock(ixlockInfo.pindexMined->nHeight, ixlockInfo.pindexMined->GetBlockHash())) {
LogPrint("instantsend", "CInstantSendManager::%s -- txlock=%s, ixlock=%s: dropping ixlock as it already got a ChainLock in block %s, peer=%d\n", __func__,
ixlock.txid.ToString(), hash.ToString(), hashBlock.ToString(), from);
return;
}
}
}
{
LOCK(cs);
auto e = finalInstantXLocks.emplace(hash, ixlockInfo);
if (!e.second) {
return;
}
auto ixlockInfoPtr = &e.first->second;
creatingInstantXLocks.erase(ixlockInfoPtr->ixlock.GetRequestId());
txToCreatingInstantXLocks.erase(ixlockInfoPtr->ixlock.txid);
LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s, ixlock=%s: processsing ixlock, peer=%d\n", __func__,
ixlock.txid.ToString(), hash.ToString(), from);
if (!txToInstantXLock.emplace(ixlock.txid, ixlockInfoPtr).second) {
LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s, ixlock=%s: duplicate ixlock, other ixlock=%s, peer=%d\n", __func__,
ixlock.txid.ToString(), hash.ToString(), txToInstantXLock[ixlock.txid]->ixlockHash.ToString(), from);
txToInstantXLock.erase(hash);
return;
}
for (size_t i = 0; i < ixlock.inputs.size(); i++) {
auto& in = ixlock.inputs[i];
if (!inputToInstantXLock.emplace(in, ixlockInfoPtr).second) {
LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s, ixlock=%s: conflicting input in ixlock. input=%s, other ixlock=%s, peer=%d\n", __func__,
ixlock.txid.ToString(), hash.ToString(), in.ToStringShort(), inputToInstantXLock[in]->ixlockHash.ToString(), from);
txToInstantXLock.erase(hash);
for (size_t j = 0; j < i; j++) {
inputToInstantXLock.erase(ixlock.inputs[j]);
}
return;
}
}
}
CInv inv(MSG_IXLOCK, hash);
g_connman->RelayInv(inv);
RemoveMempoolConflictsForLock(hash, ixlock);
RetryLockMempoolTxs(ixlock.txid);
UpdateWalletTransaction(ixlock.txid);
}
void CInstantSendManager::UpdateWalletTransaction(const uint256& txid)
{
#ifdef ENABLE_WALLET
if (!pwalletMain) {
return;
}
if (pwalletMain->UpdatedTransaction(txid)) {
// bumping this to update UI
nCompleteTXLocks++;
// notify an external script once threshold is reached
std::string strCmd = GetArg("-instantsendnotify", "");
if (!strCmd.empty()) {
boost::replace_all(strCmd, "%s", txid.GetHex());
boost::thread t(runCommand, strCmd); // thread runs free
}
}
#endif
LOCK(cs);
auto it = txToInstantXLock.find(txid);
if (it == txToInstantXLock.end()) {
return;
}
if (it->second->tx == nullptr) {
return;
}
GetMainSignals().NotifyTransactionLock(*it->second->tx);
}
void CInstantSendManager::SyncTransaction(const CTransaction& tx, const CBlockIndex* pindex, int posInBlock)
{
if (!IsNewInstantSendEnabled()) {
return;
}
{
LOCK(cs);
auto it = txToInstantXLock.find(tx.GetHash());
if (it == txToInstantXLock.end()) {
return;
}
auto ixlockInfo = it->second;
if (ixlockInfo->tx == nullptr) {
ixlockInfo->tx = MakeTransactionRef(tx);
}
if (posInBlock == CMainSignals::SYNC_TRANSACTION_NOT_IN_BLOCK) {
UpdateIxLockMinedBlock(ixlockInfo, nullptr);
return;
}
UpdateIxLockMinedBlock(ixlockInfo, pindex);
}
if (IsLocked(tx.GetHash())) {
RetryLockMempoolTxs(tx.GetHash());
}
}
void CInstantSendManager::NotifyChainLock(const CBlockIndex* pindex)
{
{
LOCK(cs);
// Let's find all ixlocks that correspond to TXs which are part of the freshly ChainLocked chain and then delete
// the ixlocks. We do this because the ChainLocks imply locking and thus it's not needed to further track
// or propagate the ixlocks
std::unordered_set<uint256> toDelete;
while (pindex && pindex != pindexLastChainLock) {
auto its = blockToInstantXLocks.equal_range(pindex->GetBlockHash());
while (its.first != its.second) {
auto ixlockInfo = its.first->second;
LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s, ixlock=%s: removing ixlock as it got ChainLocked in block %s\n", __func__,
ixlockInfo->ixlock.txid.ToString(), ixlockInfo->ixlockHash.ToString(), pindex->GetBlockHash().ToString());
toDelete.emplace(its.first->second->ixlockHash);
++its.first;
}
pindex = pindex->pprev;
}
pindexLastChainLock = pindex;
for (auto& ixlockHash : toDelete) {
RemoveFinalIxLock(ixlockHash);
}
}
RetryLockMempoolTxs(uint256());
}
void CInstantSendManager::UpdateIxLockMinedBlock(llmq::CInstantXLockInfo* ixlockInfo, const CBlockIndex* pindex)
{
AssertLockHeld(cs);
if (ixlockInfo->pindexMined == pindex) {
return;
}
if (ixlockInfo->pindexMined) {
auto its = blockToInstantXLocks.equal_range(ixlockInfo->pindexMined->GetBlockHash());
while (its.first != its.second) {
if (its.first->second == ixlockInfo) {
its.first = blockToInstantXLocks.erase(its.first);
} else {
++its.first;
}
}
}
if (pindex) {
blockToInstantXLocks.emplace(pindex->GetBlockHash(), ixlockInfo);
}
ixlockInfo->pindexMined = pindex;
}
void CInstantSendManager::RemoveFinalIxLock(const uint256& hash)
{
AssertLockHeld(cs);
auto it = finalInstantXLocks.find(hash);
if (it == finalInstantXLocks.end()) {
return;
}
auto& ixlockInfo = it->second;
txToInstantXLock.erase(ixlockInfo.ixlock.txid);
for (auto& in : ixlockInfo.ixlock.inputs) {
auto inputRequestId = ::SerializeHash(std::make_pair(INPUTLOCK_REQUESTID_PREFIX, in));
inputVotes.erase(inputRequestId);
inputToInstantXLock.erase(in);
}
UpdateIxLockMinedBlock(&ixlockInfo, nullptr);
}
void CInstantSendManager::RemoveMempoolConflictsForLock(const uint256& hash, const CInstantXLock& ixlock)
{
LOCK(mempool.cs);
std::unordered_map<uint256, CTransactionRef> toDelete;
for (auto& in : ixlock.inputs) {
auto it = mempool.mapNextTx.find(in);
if (it == mempool.mapNextTx.end()) {
continue;
}
if (it->second->GetHash() != ixlock.txid) {
toDelete.emplace(it->second->GetHash(), mempool.get(it->second->GetHash()));
LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s, ixlock=%s: mempool TX %s with input %s conflicts with ixlock\n", __func__,
ixlock.txid.ToString(), hash.ToString(), it->second->GetHash().ToString(), in.ToStringShort());
}
}
for (auto& p : toDelete) {
mempool.removeRecursive(*p.second, MemPoolRemovalReason::CONFLICT);
}
}
void CInstantSendManager::RetryLockMempoolTxs(const uint256& lockedParentTx)
{
// Let's retry all mempool TXs which don't have an ixlock yet and where the parents got ChainLocked now
std::unordered_map<uint256, CTransactionRef> txs;
{
LOCK(mempool.cs);
if (lockedParentTx.IsNull()) {
txs.reserve(mempool.mapTx.size());
for (auto it = mempool.mapTx.begin(); it != mempool.mapTx.end(); ++it) {
txs.emplace(it->GetTx().GetHash(), it->GetSharedTx());
}
} else {
auto it = mempool.mapNextTx.lower_bound(COutPoint(lockedParentTx, 0));
while (it != mempool.mapNextTx.end() && it->first->hash == lockedParentTx) {
txs.emplace(it->second->GetHash(), mempool.get(it->second->GetHash()));
++it;
}
}
}
for (auto& p : txs) {
auto& tx = p.second;
{
LOCK(cs);
if (txToCreatingInstantXLocks.count(tx->GetHash())) {
// we're already in the middle of locking this one
continue;
}
if (IsLocked(tx->GetHash())) {
continue;
}
if (IsConflicted(*tx)) {
// should not really happen as we have already filtered these out
continue;
}
}
// CheckCanLock is already called by ProcessTx, so we should avoid calling it twice. But we also shouldn't spam
// the logs when retrying TXs that are not ready yet.
if (LogAcceptCategory("instantsend")) {
if (!CheckCanLock(*tx, false, Params().GetConsensus())) {
continue;
}
LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s: retrying to lock\n", __func__,
tx->GetHash().ToString());
}
ProcessTx(nullptr, *tx, *g_connman, Params().GetConsensus());
}
}
bool CInstantSendManager::AlreadyHave(const CInv& inv)
{
if (!IsNewInstantSendEnabled()) {
return true;
}
LOCK(cs);
return finalInstantXLocks.count(inv.hash) != 0 || pendingInstantXLocks.count(inv.hash) != 0;
}
bool CInstantSendManager::GetInstantXLockByHash(const uint256& hash, llmq::CInstantXLock& ret)
{
if (!IsNewInstantSendEnabled()) {
return false;
}
LOCK(cs);
auto it = finalInstantXLocks.find(hash);
if (it == finalInstantXLocks.end()) {
return false;
}
ret = it->second.ixlock;
return true;
}
bool CInstantSendManager::IsLocked(const uint256& txHash)
{
if (!IsNewInstantSendEnabled()) {
return false;
}
LOCK(cs);
return txToInstantXLock.count(txHash) != 0;
}
bool CInstantSendManager::IsConflicted(const CTransaction& tx)
{
LOCK(cs);
uint256 dummy;
return GetConflictingTx(tx, dummy);
}
bool CInstantSendManager::GetConflictingTx(const CTransaction& tx, uint256& retConflictTxHash)
{
if (!IsNewInstantSendEnabled()) {
return false;
}
LOCK(cs);
for (const auto& in : tx.vin) {
auto it = inputToInstantXLock.find(in.prevout);
if (it == inputToInstantXLock.end()) {
continue;
}
if (it->second->ixlock.txid != tx.GetHash()) {
retConflictTxHash = it->second->ixlock.txid;
return true;
}
}
return false;
}
bool IsOldInstantSendEnabled()
{
int spork2Value = sporkManager.GetSporkValue(SPORK_2_INSTANTSEND_ENABLED);
if (spork2Value == 0) {
return true;
}
return false;
}
bool IsNewInstantSendEnabled()
{
int spork2Value = sporkManager.GetSporkValue(SPORK_2_INSTANTSEND_ENABLED);
if (spork2Value == 1) {
return true;
}
return false;
}
bool IsInstantSendEnabled()
{
int spork2Value = sporkManager.GetSporkValue(SPORK_2_INSTANTSEND_ENABLED);
return spork2Value == 0 || spork2Value == 1;
}
}

View File

@ -0,0 +1,149 @@
// Copyright (c) 2019 The Dash Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef DASH_QUORUMS_INSTANTX_H
#define DASH_QUORUMS_INSTANTX_H
#include "quorums_signing.h"
#include "coins.h"
#include "primitives/transaction.h"
#include <unordered_map>
#include <unordered_set>
class CScheduler;
namespace llmq
{
class CInstantXLock
{
public:
std::vector<COutPoint> inputs;
uint256 txid;
CBLSSignature sig;
public:
ADD_SERIALIZE_METHODS
template<typename Stream, typename Operation>
inline void SerializationOp(Stream& s, Operation ser_action)
{
READWRITE(inputs);
READWRITE(txid);
READWRITE(sig);
}
uint256 GetRequestId() const;
};
class CInstantXLockInfo
{
public:
// might be nullptr when ixlock is received before the TX itself
CTransactionRef tx;
CInstantXLock ixlock;
// only valid when recovered sig was received
uint256 ixlockHash;
// time when it was created/received
int64_t time;
// might be null initially (when TX was not mined yet) and will later be filled by SyncTransaction
const CBlockIndex* pindexMined{nullptr};
};
class CInstantSendManager : public CRecoveredSigsListener
{
private:
CCriticalSection cs;
CScheduler* scheduler;
/**
* These are the votes/signatures we performed locally. It's indexed by the LLMQ requestId, which is
* hash(TXLOCK_REQUESTID_PREFIX, prevout). The map values are the txids we voted for. This map is used to
* avoid voting for the same input twice.
*/
std::unordered_map<uint256, uint256, StaticSaltedHasher> inputVotes;
/**
* These are the ixlocks that are currently in the middle of being created. Entries are created when we observed
* recovered signatures for all inputs of a TX. At the same time, we initiate signing of our sigshare for the ixlock.
* When the recovered sig for the ixlock later arrives, we can finish the ixlock and propagate it.
*/
std::unordered_map<uint256, CInstantXLockInfo, StaticSaltedHasher> creatingInstantXLocks;
// maps from txid to the in-progress ixlock
std::unordered_map<uint256, CInstantXLockInfo*, StaticSaltedHasher> txToCreatingInstantXLocks;
/**
* These are the final ixlocks, indexed by their own hash. The other maps are used to get from TXs, inputs and blocks
* to ixlocks.
*/
std::unordered_map<uint256, CInstantXLockInfo, StaticSaltedHasher> finalInstantXLocks;
std::unordered_map<uint256, CInstantXLockInfo*, StaticSaltedHasher> txToInstantXLock;
std::unordered_map<COutPoint, CInstantXLockInfo*, SaltedOutpointHasher> inputToInstantXLock;
std::unordered_multimap<uint256, CInstantXLockInfo*, StaticSaltedHasher> blockToInstantXLocks;
const CBlockIndex* pindexLastChainLock{nullptr};
// Incoming and not verified yet
std::unordered_map<uint256, std::pair<NodeId, CInstantXLock>> pendingInstantXLocks;
bool hasScheduledProcessPending{false};
public:
CInstantSendManager(CScheduler* _scheduler);
~CInstantSendManager();
void RegisterAsRecoveredSigsListener();
void UnregisterAsRecoveredSigsListener();
public:
bool ProcessTx(CNode* pfrom, const CTransaction& tx, CConnman& connman, const Consensus::Params& params);
bool CheckCanLock(const CTransaction& tx, bool printDebug, const Consensus::Params& params);
bool CheckCanLock(const COutPoint& outpoint, bool printDebug, const uint256* _txHash, CAmount* retValue, const Consensus::Params& params);
bool IsLocked(const uint256& txHash);
bool IsConflicted(const CTransaction& tx);
bool GetConflictingTx(const CTransaction& tx, uint256& retConflictTxHash);
virtual void HandleNewRecoveredSig(const CRecoveredSig& recoveredSig);
void HandleNewInputLockRecoveredSig(const CRecoveredSig& recoveredSig, const uint256& txid);
void HandleNewInstantXLockRecoveredSig(const CRecoveredSig& recoveredSig);
void TrySignInstantXLock(const CTransaction& tx);
void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman);
void ProcessMessageInstantXLock(CNode* pfrom, const CInstantXLock& ixlock, CConnman& connman);
bool PreVerifyInstantXLock(NodeId nodeId, const CInstantXLock& ixlock, bool& retBan);
void ProcessPendingInstantXLocks();
void ProcessInstantXLock(NodeId from, const uint256& hash, const CInstantXLock& ixlock);
void UpdateWalletTransaction(const uint256& txid);
void SyncTransaction(const CTransaction &tx, const CBlockIndex *pindex, int posInBlock);
void NotifyChainLock(const CBlockIndex* pindex);
void UpdateIxLockMinedBlock(CInstantXLockInfo* ixlockInfo, const CBlockIndex* pindex);
void RemoveFinalIxLock(const uint256& hash);
void RemoveMempoolConflictsForLock(const uint256& hash, const CInstantXLock& ixlock);
void RetryLockMempoolTxs(const uint256& lockedParentTx);
bool AlreadyHave(const CInv& inv);
bool GetInstantXLockByHash(const uint256& hash, CInstantXLock& ret);
};
extern CInstantSendManager* quorumInstantSendManager;
// The meaning of spork 2 has changed in v0.14. Before that, spork 2 was simply time based and either enabled or not
// After 0.14, spork 2 can have 3 states.
// 0 = old system is active (0 is compatible with the value set on mainnet at time of deployment)
// 1 = new system is active (old nodes will interpret this as the old system being enabled, but then won't get enough IX lock votes)
// everything else = disabled
// TODO When the new system is fully deployed and enabled, we can remove this special handling of the spork in a future version
// and revert to the old behaviour.
bool IsOldInstantSendEnabled();
bool IsNewInstantSendEnabled();
bool IsInstantSendEnabled();
}
#endif//DASH_QUORUMS_INSTANTX_H

View File

@ -50,6 +50,7 @@
#include "llmq/quorums_debug.h"
#include "llmq/quorums_dkgsessionmgr.h"
#include "llmq/quorums_init.h"
#include "llmq/quorums_instantsend.h"
#include "llmq/quorums_signing.h"
#include "llmq/quorums_signing_shares.h"
@ -976,6 +977,8 @@ bool static AlreadyHave(const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
return llmq::quorumSigningManager->AlreadyHave(inv);
case MSG_CLSIG:
return llmq::chainLocksHandler->AlreadyHave(inv);
case MSG_IXLOCK:
return llmq::quorumInstantSendManager->AlreadyHave(inv);
}
// Don't know what it is, just say we already got one
@ -1296,6 +1299,14 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
}
}
if (!push && (inv.type == MSG_IXLOCK)) {
llmq::CInstantXLock o;
if (llmq::quorumInstantSendManager->GetInstantXLockByHash(inv.hash, o)) {
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::IXLOCK, o));
push = true;
}
}
if (!push)
vNotFound.push_back(inv);
}
@ -1777,6 +1788,9 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
case MSG_CLSIG:
doubleRequestDelay = 5 * 1000000;
break;
case MSG_IXLOCK:
doubleRequestDelay = 5 * 1000000;
break;
}
pfrom->AskFor(inv, doubleRequestDelay);
}
@ -2943,6 +2957,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
llmq::quorumSigSharesManager->ProcessMessage(pfrom, strCommand, vRecv, connman);
llmq::quorumSigningManager->ProcessMessage(pfrom, strCommand, vRecv, connman);
llmq::chainLocksHandler->ProcessMessage(pfrom, strCommand, vRecv, connman);
llmq::quorumInstantSendManager->ProcessMessage(pfrom, strCommand, vRecv, connman);
}
else
{

View File

@ -71,6 +71,7 @@ const char *QGETSIGSHARES="qgetsigs";
const char *QBSIGSHARES="qbsigs";
const char *QSIGREC="qsigrec";
const char *CLSIG="clsig";
const char *IXLOCK="ixlock";
};
static const char* ppszTypeName[] =
@ -107,6 +108,7 @@ static const char* ppszTypeName[] =
NetMsgType::QDEBUGSTATUS,
NetMsgType::QSIGREC,
NetMsgType::CLSIG,
NetMsgType::IXLOCK,
};
/** All known message types. Keep this in the same order as the list of
@ -172,6 +174,7 @@ const static std::string allNetMessageTypes[] = {
NetMsgType::QBSIGSHARES,
NetMsgType::QSIGREC,
NetMsgType::CLSIG,
NetMsgType::IXLOCK,
};
const static std::vector<std::string> allNetMessageTypesVec(allNetMessageTypes, allNetMessageTypes+ARRAYLEN(allNetMessageTypes));

View File

@ -277,6 +277,7 @@ extern const char *QGETSIGSHARES;
extern const char *QBSIGSHARES;
extern const char *QSIGREC;
extern const char *CLSIG;
extern const char *IXLOCK;
};
/* Get a vector of all valid message types (see above) */
@ -379,6 +380,7 @@ enum GetDataMsg {
MSG_QUORUM_DEBUG_STATUS = 27,
MSG_QUORUM_RECOVERED_SIG = 28,
MSG_CLSIG = 29,
MSG_IXLOCK = 30,
};
/** inv message data */