Retry locking of child TXs in batches instead of per locked parent (#2858)

This especially avoids many calls to ReadBlockFromDisk
This commit is contained in:
Alexander Block 2019-04-12 13:36:52 +02:00 committed by UdjinM6
parent fbe44761c0
commit 0c54e41f22
2 changed files with 41 additions and 16 deletions

View File

@ -767,6 +767,8 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has
if (pindexMined) { if (pindexMined) {
db.WriteInstantSendLockMined(hash, pindexMined->nHeight); db.WriteInstantSendLockMined(hash, pindexMined->nHeight);
} }
pendingRetryTxs.emplace(islock.txid);
} }
CInv inv(MSG_ISLOCK, hash); CInv inv(MSG_ISLOCK, hash);
@ -779,8 +781,6 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has
} }
RemoveMempoolConflictsForLock(hash, islock); RemoveMempoolConflictsForLock(hash, islock);
RetryLockTxs(islock.txid);
UpdateWalletTransaction(islock.txid, tx); UpdateWalletTransaction(islock.txid, tx);
} }
@ -836,7 +836,8 @@ void CInstantSendManager::SyncTransaction(const CTransaction& tx, const CBlockIn
bool chainlocked = pindex && chainLocksHandler->HasChainLock(pindex->nHeight, pindex->GetBlockHash()); bool chainlocked = pindex && chainLocksHandler->HasChainLock(pindex->nHeight, pindex->GetBlockHash());
if (!islockHash.IsNull() || chainlocked) { if (!islockHash.IsNull() || chainlocked) {
RetryLockTxs(tx.GetHash()); LOCK(cs);
pendingRetryTxs.emplace(tx.GetHash());
} else { } else {
ProcessTx(tx, Params().GetConsensus()); ProcessTx(tx, Params().GetConsensus());
} }
@ -883,14 +884,14 @@ void CInstantSendManager::HandleFullyConfirmedBlock(const CBlockIndex* pindex)
inputRequestIds.erase(inputRequestId); inputRequestIds.erase(inputRequestId);
} }
} }
// Retry all not yet locked mempool TXs and TX which where mined after the fully confirmed block
pendingRetryAllTxs = true;
} }
for (auto& p : removeISLocks) { for (auto& p : removeISLocks) {
UpdateWalletTransaction(p.second->txid, nullptr); UpdateWalletTransaction(p.second->txid, nullptr);
} }
// Retry all not yet locked mempool TXs and TX which where mined after the fully confirmed block
RetryLockTxs(uint256());
} }
void CInstantSendManager::RemoveMempoolConflictsForLock(const uint256& hash, const CInstantSendLock& islock) void CInstantSendManager::RemoveMempoolConflictsForLock(const uint256& hash, const CInstantSendLock& islock)
@ -917,10 +918,23 @@ void CInstantSendManager::RemoveMempoolConflictsForLock(const uint256& hash, con
} }
} }
void CInstantSendManager::RetryLockTxs(const uint256& lockedParentTx) bool CInstantSendManager::ProcessPendingRetryLockTxs()
{ {
bool retryAllTxs;
decltype(pendingRetryTxs) parentTxs;
{
LOCK(cs);
retryAllTxs = pendingRetryAllTxs;
parentTxs = std::move(pendingRetryTxs);
pendingRetryAllTxs = false;
}
if (!retryAllTxs && parentTxs.empty()) {
return false;
}
if (!IsNewInstantSendEnabled()) { if (!IsNewInstantSendEnabled()) {
return; return false;
} }
// Let's retry all unlocked TXs from mempool and and recently connected blocks // Let's retry all unlocked TXs from mempool and and recently connected blocks
@ -930,16 +944,18 @@ void CInstantSendManager::RetryLockTxs(const uint256& lockedParentTx)
{ {
LOCK(mempool.cs); LOCK(mempool.cs);
if (lockedParentTx.IsNull()) { if (retryAllTxs) {
txs.reserve(mempool.mapTx.size()); txs.reserve(mempool.mapTx.size());
for (auto it = mempool.mapTx.begin(); it != mempool.mapTx.end(); ++it) { for (auto it = mempool.mapTx.begin(); it != mempool.mapTx.end(); ++it) {
txs.emplace(it->GetTx().GetHash(), it->GetSharedTx()); txs.emplace(it->GetTx().GetHash(), it->GetSharedTx());
} }
} else { } else {
auto it = mempool.mapNextTx.lower_bound(COutPoint(lockedParentTx, 0)); for (const auto& parentTx : parentTxs) {
while (it != mempool.mapNextTx.end() && it->first->hash == lockedParentTx) { auto it = mempool.mapNextTx.lower_bound(COutPoint(parentTx, 0));
txs.emplace(it->second->GetHash(), mempool.get(it->second->GetHash())); while (it != mempool.mapNextTx.end() && it->first->hash == parentTx) {
++it; txs.emplace(it->second->GetHash(), mempool.get(it->second->GetHash()));
++it;
}
} }
} }
} }
@ -969,12 +985,12 @@ void CInstantSendManager::RetryLockTxs(const uint256& lockedParentTx)
} }
for (const auto& tx : block.vtx) { for (const auto& tx : block.vtx) {
if (lockedParentTx.IsNull()) { if (retryAllTxs) {
txs.emplace(tx->GetHash(), tx); txs.emplace(tx->GetHash(), tx);
} else { } else {
bool isChild = false; bool isChild = false;
for (auto& in : tx->vin) { for (auto& in : tx->vin) {
if (in.prevout.hash == lockedParentTx) { if (parentTxs.count(in.prevout.hash)) {
isChild = true; isChild = true;
break; break;
} }
@ -989,6 +1005,7 @@ void CInstantSendManager::RetryLockTxs(const uint256& lockedParentTx)
depth++; depth++;
} }
bool didWork = false;
for (auto& p : txs) { for (auto& p : txs) {
auto& tx = p.second; auto& tx = p.second;
{ {
@ -1017,7 +1034,10 @@ void CInstantSendManager::RetryLockTxs(const uint256& lockedParentTx)
} }
ProcessTx(*tx, Params().GetConsensus()); ProcessTx(*tx, Params().GetConsensus());
didWork = true;
} }
return didWork;
} }
bool CInstantSendManager::AlreadyHave(const CInv& inv) bool CInstantSendManager::AlreadyHave(const CInv& inv)
@ -1089,6 +1109,7 @@ void CInstantSendManager::WorkThreadMain()
bool didWork = false; bool didWork = false;
didWork |= ProcessPendingInstantSendLocks(); didWork |= ProcessPendingInstantSendLocks();
didWork |= ProcessPendingRetryLockTxs();
if (!didWork) { if (!didWork) {
if (!workInterrupt.sleep_for(std::chrono::milliseconds(100))) { if (!workInterrupt.sleep_for(std::chrono::milliseconds(100))) {

View File

@ -93,6 +93,10 @@ private:
// Incoming and not verified yet // Incoming and not verified yet
std::unordered_map<uint256, std::pair<NodeId, CInstantSendLock>> pendingInstantSendLocks; std::unordered_map<uint256, std::pair<NodeId, CInstantSendLock>> pendingInstantSendLocks;
// a set of recently IS locked TXs for which we can retry locking of children
std::unordered_set<uint256, StaticSaltedHasher> pendingRetryTxs;
bool pendingRetryAllTxs{false};
public: public:
CInstantSendManager(CDBWrapper& _llmqDb); CInstantSendManager(CDBWrapper& _llmqDb);
~CInstantSendManager(); ~CInstantSendManager();
@ -129,7 +133,7 @@ public:
void HandleFullyConfirmedBlock(const CBlockIndex* pindex); void HandleFullyConfirmedBlock(const CBlockIndex* pindex);
void RemoveMempoolConflictsForLock(const uint256& hash, const CInstantSendLock& islock); void RemoveMempoolConflictsForLock(const uint256& hash, const CInstantSendLock& islock);
void RetryLockTxs(const uint256& lockedParentTx); bool ProcessPendingRetryLockTxs();
bool AlreadyHave(const CInv& inv); bool AlreadyHave(const CInv& inv);
bool GetInstantSendLockByHash(const uint256& hash, CInstantSendLock& ret); bool GetInstantSendLockByHash(const uint256& hash, CInstantSendLock& ret);