instantsend: Do not consider islocks with unknown txes as complete (#4147)

* instantsend: Avoid writing IS locks for unknown txes

* instantsend: Allow a competing tx into mempool if there is an islock waiting for it

* use try_emplace

* Hold cs_main while calling ResetBlockFailureFlags
This commit is contained in:
UdjinM6 2021-11-30 14:14:08 +03:00 committed by GitHub
parent 9718bb394b
commit 3b8b3e254a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 134 additions and 44 deletions

View File

@ -830,7 +830,7 @@ void CInstantSendManager::ProcessMessageInstantSendLock(const CNode* pfrom, cons
} }
LOCK(cs); LOCK(cs);
if (pendingInstantSendLocks.count(hash) || db.KnownInstantSendLock(hash)) { if (pendingInstantSendLocks.count(hash) || pendingNoTxInstantSendLocks.count(hash) || db.KnownInstantSendLock(hash)) {
return; return;
} }
@ -1079,10 +1079,17 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has
} }
} }
if (tx == nullptr) {
// put it in a separate pending map and try again later
LOCK(cs);
pendingNoTxInstantSendLocks.try_emplace(hash, std::make_pair(from, islock));
} else {
db.WriteNewInstantSendLock(hash, *islock); db.WriteNewInstantSendLock(hash, *islock);
if (pindexMined) { if (pindexMined) {
db.WriteInstantSendLockMined(hash, pindexMined->nHeight); db.WriteInstantSendLockMined(hash, pindexMined->nHeight);
} }
}
{ {
LOCK(cs); LOCK(cs);
@ -1104,13 +1111,16 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has
} }
ResolveBlockConflicts(hash, *islock); ResolveBlockConflicts(hash, *islock);
RemoveMempoolConflictsForLock(hash, *islock);
if (tx != nullptr) { if (tx != nullptr) {
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- notify about an in-time lock for tx %s\n", __func__, tx->GetHash().ToString()); RemoveMempoolConflictsForLock(hash, *islock);
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- notify about lock %s for tx %s\n", __func__,
hash.ToString(), tx->GetHash().ToString());
GetMainSignals().NotifyTransactionLock(tx, islock); GetMainSignals().NotifyTransactionLock(tx, islock);
// bump mempool counter to make sure newly locked txes are picked up by getblocktemplate // bump mempool counter to make sure newly locked txes are picked up by getblocktemplate
mempool.AddTransactionsUpdated(1); mempool.AddTransactionsUpdated(1);
} else {
AskNodesForLockedTx(islock->txid);
} }
} }
@ -1120,26 +1130,30 @@ void CInstantSendManager::TransactionAddedToMempool(const CTransactionRef& tx)
return; return;
} }
CInstantSendLockPtr islock = db.GetInstantSendLockByTxid(tx->GetHash()); CInstantSendLockPtr islock{nullptr};
{
LOCK(cs);
auto it = pendingNoTxInstantSendLocks.begin();
while (it != pendingNoTxInstantSendLocks.end()) {
if (it->second.second->txid == tx->GetHash()) {
// we received an islock ealier
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s\n", __func__,
tx->GetHash().ToString(), it->first.ToString());
islock = it->second.second;
pendingInstantSendLocks.try_emplace(it->first, it->second);
pendingNoTxInstantSendLocks.erase(it);
break;
}
++it;
}
}
if (islock == nullptr) { if (islock == nullptr) {
ProcessTx(*tx, false, Params().GetConsensus()); ProcessTx(*tx, false, Params().GetConsensus());
// TX is not locked, so make sure it is tracked // TX is not locked, so make sure it is tracked
AddNonLockedTx(tx, nullptr); AddNonLockedTx(tx, nullptr);
} else { } else {
{ RemoveMempoolConflictsForLock(::SerializeHash(*islock), *islock);
// TX is locked, so make sure we don't track it anymore
LOCK(cs);
RemoveNonLockedTx(tx->GetHash(), true);
}
// In case the islock was received before the TX, filtered announcement might have missed this islock because
// we were unable to check for filter matches deep inside the TX. Now we have the TX, so we should retry.
CInv inv(islock->IsDeterministic() ? MSG_ISDLOCK : MSG_ISLOCK, ::SerializeHash(*islock));
g_connman->RelayInvFiltered(inv, *tx, islock->IsDeterministic() ? ISDLOCK_PROTO_VERSION : LLMQS_PROTO_VERSION);
// If the islock was received before the TX, we know we were not able to send
// the notification at that time, we need to do it now.
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- notify about an earlier received lock for tx %s\n", __func__, tx->GetHash().ToString());
GetMainSignals().NotifyTransactionLock(tx, islock);
} }
} }
@ -1214,6 +1228,19 @@ void CInstantSendManager::AddNonLockedTx(const CTransactionRef& tx, const CBlock
} }
} }
auto it = pendingNoTxInstantSendLocks.begin();
while (it != pendingNoTxInstantSendLocks.end()) {
if (it->second.second->txid == tx->GetHash()) {
// we received an islock ealier, let's put it back into pending and verify/lock
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s\n", __func__,
tx->GetHash().ToString(), it->first.ToString());
pendingInstantSendLocks.try_emplace(it->first, it->second);
pendingNoTxInstantSendLocks.erase(it);
break;
}
++it;
}
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, pindexMined=%s\n", __func__, LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, pindexMined=%s\n", __func__,
tx->GetHash().ToString(), pindexMined ? pindexMined->GetBlockHash().ToString() : ""); tx->GetHash().ToString(), pindexMined ? pindexMined->GetBlockHash().ToString() : "");
} }
@ -1436,6 +1463,8 @@ void CInstantSendManager::ResolveBlockConflicts(const uint256& islockHash, const
return; return;
} }
bool isLockedTxKnown = WITH_LOCK(cs, return pendingNoTxInstantSendLocks.find(islockHash) == pendingNoTxInstantSendLocks.end());
bool activateBestChain = false; bool activateBestChain = false;
for (const auto& p : conflicts) { for (const auto& p : conflicts) {
auto pindex = p.first; auto pindex = p.first;
@ -1457,7 +1486,13 @@ void CInstantSendManager::ResolveBlockConflicts(const uint256& islockHash, const
// This should not have happened and we are in a state were it's not safe to continue anymore // This should not have happened and we are in a state were it's not safe to continue anymore
assert(false); assert(false);
} }
if (isLockedTxKnown) {
activateBestChain = true; activateBestChain = true;
} else {
LogPrintf("CInstantSendManager::%s -- resetting block %s\n", __func__, pindex2->GetBlockHash().ToString());
LOCK(cs_main);
ResetBlockFailureFlags(pindex2);
}
} }
if (activateBestChain) { if (activateBestChain) {
@ -1578,7 +1613,7 @@ bool CInstantSendManager::AlreadyHave(const CInv& inv) const
} }
LOCK(cs); LOCK(cs);
return pendingInstantSendLocks.count(inv.hash) != 0 || db.KnownInstantSendLock(inv.hash); return pendingInstantSendLocks.count(inv.hash) != 0 || pendingNoTxInstantSendLocks.count(inv.hash) != 0 || db.KnownInstantSendLock(inv.hash);
} }
bool CInstantSendManager::GetInstantSendLockByHash(const uint256& hash, llmq::CInstantSendLock& ret) const bool CInstantSendManager::GetInstantSendLockByHash(const uint256& hash, llmq::CInstantSendLock& ret) const
@ -1589,8 +1624,19 @@ bool CInstantSendManager::GetInstantSendLockByHash(const uint256& hash, llmq::CI
auto islock = db.GetInstantSendLockByHash(hash); auto islock = db.GetInstantSendLockByHash(hash);
if (!islock) { if (!islock) {
LOCK(cs);
auto it = pendingInstantSendLocks.find(hash);
if (it != pendingInstantSendLocks.end()) {
islock = it->second.second;
} else {
auto itNoTx = pendingNoTxInstantSendLocks.find(hash);
if (itNoTx != pendingNoTxInstantSendLocks.end()) {
islock = itNoTx->second.second;
} else {
return false; return false;
} }
}
}
ret = *islock; ret = *islock;
return true; return true;
} }
@ -1628,6 +1674,25 @@ bool CInstantSendManager::IsConflicted(const CTransaction& tx) const
return GetConflictingLock(tx) != nullptr; return GetConflictingLock(tx) != nullptr;
} }
bool CInstantSendManager::IsWaitingForTx(const uint256& txHash) const
{
if (!IsInstantSendEnabled()) {
return false;
}
LOCK(cs);
auto it = pendingNoTxInstantSendLocks.begin();
while (it != pendingNoTxInstantSendLocks.end()) {
if (it->second.second->txid == txHash) {
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s\n", __func__,
txHash.ToString(), it->first.ToString());
return true;
}
++it;
}
return false;
}
CInstantSendLockPtr CInstantSendManager::GetConflictingLock(const CTransaction& tx) const CInstantSendLockPtr CInstantSendManager::GetConflictingLock(const CTransaction& tx) const
{ {
if (!IsInstantSendEnabled()) { if (!IsInstantSendEnabled()) {

View File

@ -196,6 +196,8 @@ private:
// Incoming and not verified yet // Incoming and not verified yet
std::unordered_map<uint256, std::pair<NodeId, CInstantSendLockPtr>, StaticSaltedHasher> pendingInstantSendLocks GUARDED_BY(cs); std::unordered_map<uint256, std::pair<NodeId, CInstantSendLockPtr>, StaticSaltedHasher> pendingInstantSendLocks GUARDED_BY(cs);
// Tried to veryfy but there is no tx yet
std::unordered_map<uint256, std::pair<NodeId, CInstantSendLockPtr>, StaticSaltedHasher> pendingNoTxInstantSendLocks GUARDED_BY(cs);
// TXs which are neither IS locked nor ChainLocked. We use this to determine for which TXs we need to retry IS locking // TXs which are neither IS locked nor ChainLocked. We use this to determine for which TXs we need to retry IS locking
// of child TXs // of child TXs
@ -251,6 +253,7 @@ private:
public: public:
bool IsLocked(const uint256& txHash) const; bool IsLocked(const uint256& txHash) const;
bool IsWaitingForTx(const uint256& txHash) const;
CInstantSendLockPtr GetConflictingLock(const CTransaction& tx) const; CInstantSendLockPtr GetConflictingLock(const CTransaction& tx) const;
void HandleNewRecoveredSig(const CRecoveredSig& recoveredSig) override; void HandleNewRecoveredSig(const CRecoveredSig& recoveredSig) override;

View File

@ -1398,7 +1398,9 @@ bool static AlreadyHave(const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
// masternode would not be able to exploit this to spam the network with specially // masternode would not be able to exploit this to spam the network with specially
// crafted invalid DSTX-es and potentially cause high load cheaply, because // crafted invalid DSTX-es and potentially cause high load cheaply, because
// corresponding checks in ProcessMessage won't let it to send DSTX-es too often. // corresponding checks in ProcessMessage won't let it to send DSTX-es too often.
bool fIgnoreRecentRejects = llmq::quorumInstantSendManager->IsLocked(inv.hash) || inv.type == MSG_DSTX; bool fIgnoreRecentRejects = inv.type == MSG_DSTX ||
llmq::quorumInstantSendManager->IsWaitingForTx(inv.hash) ||
llmq::quorumInstantSendManager->IsLocked(inv.hash);
return (!fIgnoreRecentRejects && recentRejects->contains(inv.hash)) || return (!fIgnoreRecentRejects && recentRejects->contains(inv.hash)) ||
(inv.type == MSG_DSTX && static_cast<bool>(CCoinJoin::GetDSTX(inv.hash))) || (inv.type == MSG_DSTX && static_cast<bool>(CCoinJoin::GetDSTX(inv.hash))) ||

View File

@ -588,6 +588,10 @@ static bool AcceptToMemoryPoolWorker(const CChainParams& chainparams, CTxMemPool
REJECT_INVALID, "tx-txlock-conflict"); REJECT_INVALID, "tx-txlock-conflict");
} }
if (llmq::quorumInstantSendManager->IsWaitingForTx(hash)) {
pool.removeConflicts(tx);
pool.removeProTxConflicts(tx);
} else {
// Check for conflicts with in-memory transactions // Check for conflicts with in-memory transactions
for (const CTxIn &txin : tx.vin) for (const CTxIn &txin : tx.vin)
{ {
@ -598,6 +602,7 @@ static bool AcceptToMemoryPoolWorker(const CChainParams& chainparams, CTxMemPool
return state.Invalid(false, REJECT_DUPLICATE, "txn-mempool-conflict"); return state.Invalid(false, REJECT_DUPLICATE, "txn-mempool-conflict");
} }
} }
}
{ {
CCoinsView dummy; CCoinsView dummy;

View File

@ -232,14 +232,24 @@ class LLMQ_IS_CL_Conflicts(DashTestFramework):
self.sync_all() self.sync_all()
assert self.nodes[0].getbestblockhash() == cl_block.hash assert self.nodes[0].getbestblockhash() == cl_block.hash
# Send the ISLOCK, which should result in the last 2 blocks to be invalidated, even though the nodes don't know # Send the ISLOCK, which should result in the last 2 blocks to be disconnected,
# the locked transaction yet # even though the nodes don't know the locked transaction yet
self.test_node.send_islock(islock, deterministic) self.test_node.send_islock(islock, deterministic)
for node in self.nodes: for node in self.nodes:
wait_until(lambda: node.getbestblockhash() == good_tip, timeout=10, sleep=0.5) wait_until(lambda: node.getbestblockhash() == good_tip, timeout=10, sleep=0.5)
# islock for tx2 is incomplete, tx1 should return in mempool now that blocks are disconnected
assert rawtx1_txid in set(node.getrawmempool())
# Send the actual transaction and mine it # Should drop tx1 and accept tx2 because there is an islock waiting for it
self.nodes[0].sendrawtransaction(rawtx2) self.nodes[0].sendrawtransaction(rawtx2)
# bump mocktime to force tx relay
self.bump_mocktime(60)
for node in self.nodes:
self.wait_for_instantlock(rawtx2_txid, node)
# Should not allow competing txes now
assert_raises_rpc_error(-26, "tx-txlock-conflict", self.nodes[0].sendrawtransaction, rawtx1)
islock_tip = self.nodes[0].generate(1)[0] islock_tip = self.nodes[0].generate(1)[0]
self.sync_all() self.sync_all()

View File

@ -4,7 +4,7 @@
# file COPYING or http://www.opensource.org/licenses/mit-license.php. # file COPYING or http://www.opensource.org/licenses/mit-license.php.
from test_framework.test_framework import DashTestFramework from test_framework.test_framework import DashTestFramework
from test_framework.util import assert_equal, assert_raises_rpc_error, hash256, hex_str_to_bytes, isolate_node, reconnect_isolated_node from test_framework.util import assert_equal, assert_raises_rpc_error, isolate_node, reconnect_isolated_node
''' '''
p2p_instantsend.py p2p_instantsend.py
@ -85,6 +85,8 @@ class InstantSendTest(DashTestFramework):
sender = self.nodes[self.sender_idx] sender = self.nodes[self.sender_idx]
receiver = self.nodes[self.receiver_idx] receiver = self.nodes[self.receiver_idx]
isolated = self.nodes[self.isolated_idx] isolated = self.nodes[self.isolated_idx]
connected_nodes = self.nodes.copy()
del connected_nodes[self.isolated_idx]
# feed the sender with some balance # feed the sender with some balance
sender_addr = sender.getnewaddress() sender_addr = sender.getnewaddress()
@ -95,28 +97,31 @@ class InstantSendTest(DashTestFramework):
# create doublespending transaction, but don't relay it # create doublespending transaction, but don't relay it
dblspnd_tx = self.create_raw_tx(sender, isolated, 0.5, 1, 100) dblspnd_tx = self.create_raw_tx(sender, isolated, 0.5, 1, 100)
dblspnd_txid = hash256(hex_str_to_bytes(dblspnd_tx['hex']))[::-1].hex()
# isolate one node from network # isolate one node from network
isolate_node(isolated) isolate_node(isolated)
# send doublespend transaction to isolated node # send doublespend transaction to isolated node
isolated.sendrawtransaction(dblspnd_tx['hex']) dblspnd_txid = isolated.sendrawtransaction(dblspnd_tx['hex'])
assert dblspnd_txid in set(isolated.getrawmempool())
# let isolated node rejoin the network # let isolated node rejoin the network
# The previously isolated node should NOT relay the doublespending TX # The previously isolated node should NOT relay the doublespending TX
reconnect_isolated_node(isolated, 0) reconnect_isolated_node(isolated, 0)
for node in self.nodes: for node in connected_nodes:
if node is not isolated:
assert_raises_rpc_error(-5, "No such mempool or blockchain transaction", node.getrawtransaction, dblspnd_txid) assert_raises_rpc_error(-5, "No such mempool or blockchain transaction", node.getrawtransaction, dblspnd_txid)
# instantsend to receiver. The previously isolated node should prune the doublespend TX and request the correct # Instantsend to receiver. The previously isolated node won't accept the tx but it should
# TX from other nodes. # request the correct TX from other nodes once the corresponding lock is received.
# And this time the doublespend TX should be pruned once the correct tx is received.
receiver_addr = receiver.getnewaddress() receiver_addr = receiver.getnewaddress()
is_id = sender.sendtoaddress(receiver_addr, 0.9) is_id = sender.sendtoaddress(receiver_addr, 0.9)
# wait for the transaction to propagate # wait for the transaction to propagate
self.sync_mempools() self.sync_mempools()
for node in self.nodes: for node in self.nodes:
self.wait_for_instantlock(is_id, node) self.wait_for_instantlock(is_id, node)
assert_raises_rpc_error(-5, "No such mempool or blockchain transaction", isolated.getrawtransaction, dblspnd_txid) assert dblspnd_txid not in set(isolated.getrawmempool())
# send coins back to the controller node without waiting for confirmations # send coins back to the controller node without waiting for confirmations
receiver.sendtoaddress(self.nodes[0].getnewaddress(), 0.9, "", "", True) sentback_id = receiver.sendtoaddress(self.nodes[0].getnewaddress(), 0.9, "", "", True)
self.sync_mempools()
for node in self.nodes:
self.wait_for_instantlock(sentback_id, node)
assert_equal(receiver.getwalletinfo()["balance"], 0) assert_equal(receiver.getwalletinfo()["balance"], 0)
# mine more blocks # mine more blocks
self.bump_mocktime(1) self.bump_mocktime(1)