Track which TXs are not locked yet and use this info in ProcessPendingRetryLockTxs (#2869)
* Track which TXs are not locked yet and use this info in ProcessPendingRetryLockTxs Instead of relying on ReadBlockFromDisk. This should be less disk+CPU intensive but require more RAM. It also fixes a bug in ProcessPendingRetryLockTxs which caused ChainLocked parents to not be considered for retrying of its children. * Handle review commments
This commit is contained in:
parent
c4549aca23
commit
cd94cbe6f0
@ -768,7 +768,8 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has
|
|||||||
db.WriteInstantSendLockMined(hash, pindexMined->nHeight);
|
db.WriteInstantSendLockMined(hash, pindexMined->nHeight);
|
||||||
}
|
}
|
||||||
|
|
||||||
pendingRetryTxs.emplace(islock.txid);
|
// This will also add children TXs to pendingRetryTxs
|
||||||
|
RemoveNonLockedTx(islock.txid);
|
||||||
}
|
}
|
||||||
|
|
||||||
CInv inv(MSG_ISLOCK, hash);
|
CInv inv(MSG_ISLOCK, hash);
|
||||||
@ -837,12 +838,63 @@ void CInstantSendManager::SyncTransaction(const CTransaction& tx, const CBlockIn
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool chainlocked = pindex && chainLocksHandler->HasChainLock(pindex->nHeight, pindex->GetBlockHash());
|
bool chainlocked = pindex && chainLocksHandler->HasChainLock(pindex->nHeight, pindex->GetBlockHash());
|
||||||
if (!islockHash.IsNull() || chainlocked) {
|
if (islockHash.IsNull() && !chainlocked) {
|
||||||
LOCK(cs);
|
|
||||||
pendingRetryTxs.emplace(tx.GetHash());
|
|
||||||
} else {
|
|
||||||
ProcessTx(tx, Params().GetConsensus());
|
ProcessTx(tx, Params().GetConsensus());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LOCK(cs);
|
||||||
|
if (!chainlocked && islockHash.IsNull()) {
|
||||||
|
// TX is not locked, so make sure it is tracked
|
||||||
|
AddNonLockedTx(MakeTransactionRef(tx));
|
||||||
|
nonLockedTxs.at(tx.GetHash()).pindexMined = posInBlock == CMainSignals::SYNC_TRANSACTION_NOT_IN_BLOCK ? pindex : nullptr;
|
||||||
|
} else {
|
||||||
|
// TX is locked, so make sure we don't track it anymore
|
||||||
|
RemoveNonLockedTx(tx.GetHash());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void CInstantSendManager::AddNonLockedTx(const CTransactionRef& tx)
|
||||||
|
{
|
||||||
|
AssertLockHeld(cs);
|
||||||
|
auto it = nonLockedTxs.emplace(tx->GetHash(), NonLockedTxInfo()).first;
|
||||||
|
auto& info = it->second;
|
||||||
|
|
||||||
|
if (!info.tx) {
|
||||||
|
info.tx = tx;
|
||||||
|
for (const auto& in : tx->vin) {
|
||||||
|
nonLockedTxs[in.prevout.hash].children.emplace(tx->GetHash());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void CInstantSendManager::RemoveNonLockedTx(const uint256& txid)
|
||||||
|
{
|
||||||
|
AssertLockHeld(cs);
|
||||||
|
|
||||||
|
auto it = nonLockedTxs.find(txid);
|
||||||
|
if (it == nonLockedTxs.end()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
auto& info = it->second;
|
||||||
|
|
||||||
|
// TX got locked, so we can retry locking children
|
||||||
|
for (auto& childTxid : info.children) {
|
||||||
|
pendingRetryTxs.emplace(childTxid);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (info.tx) {
|
||||||
|
for (const auto& in : info.tx->vin) {
|
||||||
|
auto jt = nonLockedTxs.find(in.prevout.hash);
|
||||||
|
if (jt != nonLockedTxs.end()) {
|
||||||
|
jt->second.children.erase(txid);
|
||||||
|
if (!jt->second.tx && jt->second.children.empty()) {
|
||||||
|
nonLockedTxs.erase(jt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
nonLockedTxs.erase(it);
|
||||||
}
|
}
|
||||||
|
|
||||||
void CInstantSendManager::NotifyChainLock(const CBlockIndex* pindexChainLock)
|
void CInstantSendManager::NotifyChainLock(const CBlockIndex* pindexChainLock)
|
||||||
@ -887,8 +939,20 @@ void CInstantSendManager::HandleFullyConfirmedBlock(const CBlockIndex* pindex)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Retry all not yet locked mempool TXs and TX which where mined after the fully confirmed block
|
// Find all previously unlocked TXs that got locked by this fully confirmed (ChainLock) block and remove them
|
||||||
pendingRetryAllTxs = true;
|
// from the nonLockedTxs map. Also collect all children of these TXs and mark them for retrying of IS locking.
|
||||||
|
std::vector<uint256> toRemove;
|
||||||
|
for (auto& p : nonLockedTxs) {
|
||||||
|
auto pindexMined = p.second.pindexMined;
|
||||||
|
|
||||||
|
if (pindexMined && pindex->GetAncestor(pindexMined->nHeight) == pindexMined) {
|
||||||
|
toRemove.emplace_back(p.first);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (auto& txid : toRemove) {
|
||||||
|
// This will also add children to pendingRetryTxs
|
||||||
|
RemoveNonLockedTx(txid);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (auto& p : removeISLocks) {
|
for (auto& p : removeISLocks) {
|
||||||
@ -922,16 +986,13 @@ void CInstantSendManager::RemoveMempoolConflictsForLock(const uint256& hash, con
|
|||||||
|
|
||||||
bool CInstantSendManager::ProcessPendingRetryLockTxs()
|
bool CInstantSendManager::ProcessPendingRetryLockTxs()
|
||||||
{
|
{
|
||||||
bool retryAllTxs;
|
decltype(pendingRetryTxs) retryTxs;
|
||||||
decltype(pendingRetryTxs) parentTxs;
|
|
||||||
{
|
{
|
||||||
LOCK(cs);
|
LOCK(cs);
|
||||||
retryAllTxs = pendingRetryAllTxs;
|
retryTxs = std::move(pendingRetryTxs);
|
||||||
parentTxs = std::move(pendingRetryTxs);
|
|
||||||
pendingRetryAllTxs = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!retryAllTxs && parentTxs.empty()) {
|
if (retryTxs.empty()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -939,79 +1000,21 @@ bool CInstantSendManager::ProcessPendingRetryLockTxs()
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Let's retry all unlocked TXs from mempool and and recently connected blocks
|
int retryCount = 0;
|
||||||
|
for (const auto& txid : retryTxs) {
|
||||||
std::unordered_map<uint256, CTransactionRef> txs;
|
CTransactionRef tx;
|
||||||
|
|
||||||
{
|
|
||||||
LOCK(mempool.cs);
|
|
||||||
|
|
||||||
if (retryAllTxs) {
|
|
||||||
txs.reserve(mempool.mapTx.size());
|
|
||||||
for (auto it = mempool.mapTx.begin(); it != mempool.mapTx.end(); ++it) {
|
|
||||||
txs.emplace(it->GetTx().GetHash(), it->GetSharedTx());
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
for (const auto& parentTx : parentTxs) {
|
|
||||||
auto it = mempool.mapNextTx.lower_bound(COutPoint(parentTx, 0));
|
|
||||||
while (it != mempool.mapNextTx.end() && it->first->hash == parentTx) {
|
|
||||||
txs.emplace(it->second->GetHash(), mempool.get(it->second->GetHash()));
|
|
||||||
++it;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const CBlockIndex* pindexWalk = nullptr;
|
|
||||||
{
|
|
||||||
LOCK(cs_main);
|
|
||||||
pindexWalk = chainActive.Tip();
|
|
||||||
}
|
|
||||||
|
|
||||||
// scan blocks until we hit the last chainlocked block we know of. Also stop scanning after a depth of 6 to avoid
|
|
||||||
// signing thousands of TXs at once. Also, after a depth of 6, blocks get eligible for ChainLocking even if unsafe
|
|
||||||
// TXs are included, so there is no need to retroactively sign these.
|
|
||||||
int depth = 0;
|
|
||||||
while (pindexWalk && depth < 6) {
|
|
||||||
if (chainLocksHandler->HasChainLock(pindexWalk->nHeight, pindexWalk->GetBlockHash())) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
CBlock block;
|
|
||||||
{
|
|
||||||
LOCK(cs_main);
|
|
||||||
if (!ReadBlockFromDisk(block, pindexWalk, Params().GetConsensus())) {
|
|
||||||
pindexWalk = pindexWalk->pprev;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (const auto& tx : block.vtx) {
|
|
||||||
if (retryAllTxs) {
|
|
||||||
txs.emplace(tx->GetHash(), tx);
|
|
||||||
} else {
|
|
||||||
bool isChild = false;
|
|
||||||
for (auto& in : tx->vin) {
|
|
||||||
if (parentTxs.count(in.prevout.hash)) {
|
|
||||||
isChild = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (isChild) {
|
|
||||||
txs.emplace(tx->GetHash(), tx);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pindexWalk = pindexWalk->pprev;
|
|
||||||
depth++;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool didWork = false;
|
|
||||||
for (auto& p : txs) {
|
|
||||||
auto& tx = p.second;
|
|
||||||
{
|
{
|
||||||
LOCK(cs);
|
LOCK(cs);
|
||||||
|
auto it = nonLockedTxs.find(txid);
|
||||||
|
if (it == nonLockedTxs.end()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
tx = it->second.tx;
|
||||||
|
|
||||||
|
if (!tx) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (txToCreatingInstantSendLocks.count(tx->GetHash())) {
|
if (txToCreatingInstantSendLocks.count(tx->GetHash())) {
|
||||||
// we're already in the middle of locking this one
|
// we're already in the middle of locking this one
|
||||||
continue;
|
continue;
|
||||||
@ -1036,10 +1039,16 @@ bool CInstantSendManager::ProcessPendingRetryLockTxs()
|
|||||||
}
|
}
|
||||||
|
|
||||||
ProcessTx(*tx, Params().GetConsensus());
|
ProcessTx(*tx, Params().GetConsensus());
|
||||||
didWork = true;
|
retryCount++;
|
||||||
}
|
}
|
||||||
|
|
||||||
return didWork;
|
if (retryCount != 0) {
|
||||||
|
LOCK(cs);
|
||||||
|
LogPrint("instantsend", "CInstantSendManager::%s -- retried %d TXs. nonLockedTxs.size=%d\n", __func__,
|
||||||
|
retryCount, nonLockedTxs.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
return retryCount != 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool CInstantSendManager::AlreadyHave(const CInv& inv)
|
bool CInstantSendManager::AlreadyHave(const CInv& inv)
|
||||||
|
@ -93,9 +93,16 @@ private:
|
|||||||
// Incoming and not verified yet
|
// Incoming and not verified yet
|
||||||
std::unordered_map<uint256, std::pair<NodeId, CInstantSendLock>> pendingInstantSendLocks;
|
std::unordered_map<uint256, std::pair<NodeId, CInstantSendLock>> pendingInstantSendLocks;
|
||||||
|
|
||||||
// a set of recently IS locked TXs for which we can retry locking of children
|
// TXs which are neither IS locked nor ChainLocked. We use this to determine for which TXs we need to retry IS locking
|
||||||
|
// of child TXs
|
||||||
|
struct NonLockedTxInfo {
|
||||||
|
const CBlockIndex* pindexMined{nullptr};
|
||||||
|
CTransactionRef tx;
|
||||||
|
std::unordered_set<uint256, StaticSaltedHasher> children;
|
||||||
|
};
|
||||||
|
std::unordered_map<uint256, NonLockedTxInfo, StaticSaltedHasher> nonLockedTxs;
|
||||||
|
|
||||||
std::unordered_set<uint256, StaticSaltedHasher> pendingRetryTxs;
|
std::unordered_set<uint256, StaticSaltedHasher> pendingRetryTxs;
|
||||||
bool pendingRetryAllTxs{false};
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
CInstantSendManager(CDBWrapper& _llmqDb);
|
CInstantSendManager(CDBWrapper& _llmqDb);
|
||||||
@ -127,6 +134,9 @@ public:
|
|||||||
void UpdateWalletTransaction(const uint256& txid, const CTransactionRef& tx);
|
void UpdateWalletTransaction(const uint256& txid, const CTransactionRef& tx);
|
||||||
|
|
||||||
void SyncTransaction(const CTransaction &tx, const CBlockIndex *pindex, int posInBlock);
|
void SyncTransaction(const CTransaction &tx, const CBlockIndex *pindex, int posInBlock);
|
||||||
|
void AddNonLockedTx(const CTransactionRef& tx);
|
||||||
|
void RemoveNonLockedTx(const uint256& txid);
|
||||||
|
|
||||||
void NotifyChainLock(const CBlockIndex* pindexChainLock);
|
void NotifyChainLock(const CBlockIndex* pindexChainLock);
|
||||||
void UpdatedBlockTip(const CBlockIndex* pindexNew);
|
void UpdatedBlockTip(const CBlockIndex* pindexNew);
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user