refactor(instantsend): make cs_db a Mutex, replace cs with multiple smaller mutexes (#4784)

* refactor(instantsend): make cs_db a Mutex

This introduces GetInstantSendLockByHashInternal and GetInstantSendLockHashByTxidInternal as they are called in Locked and Unlocked contexts. The Internal functions do not lock cs_db, the non-internal functions simply lock cs_db then call the internal function. This ensures saftety (all public functions lock cs_db immediately & all private functions have cs_db already locked) enforced via clang thread safety, while allowing us to use a Mutex here

* refactor(instantsend): remove CInstantSendManager::cs, replace it with cs_inputReqests, cs_creating, cs_pendingLocks, cs_nonLocked, cs_pendingRetry

LOCKS_EXCLUDED are used everywhere where the associated mutex is locked ensuring that deadlocks are impossible
This commit is contained in:
PastaPastaPasta 2022-04-20 13:54:20 -05:00 committed by GitHub
parent 8096e286e1
commit 087e62588e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 166 additions and 167 deletions

View File

@ -113,7 +113,7 @@ void CInstantSendDb::RemoveInstantSendLock(CDBBatch& batch, const uint256& hash,
{
AssertLockHeld(cs_db);
if (!islock) {
islock = GetInstantSendLockByHash(hash, false);
islock = GetInstantSendLockByHashInternal(hash, false);
if (!islock) {
return;
}
@ -195,7 +195,7 @@ std::unordered_map<uint256, CInstantSendLockPtr, StaticSaltedHasher> CInstantSen
}
auto& islockHash = std::get<2>(curKey);
auto islock = GetInstantSendLockByHash(islockHash, false);
auto islock = GetInstantSendLockByHashInternal(islockHash, false);
if (islock) {
RemoveInstantSendLock(batch, islockHash, islock);
ret.emplace(islockHash, islock);
@ -257,7 +257,7 @@ void CInstantSendDb::WriteBlockInstantSendLocks(const std::shared_ptr<const CBlo
// coinbase and TXs with no inputs can't be locked
continue;
}
uint256 islockHash = GetInstantSendLockHashByTxid(tx->GetHash());
uint256 islockHash = GetInstantSendLockHashByTxidInternal(tx->GetHash());
// update DB about when an IS lock was mined
if (!islockHash.IsNull()) {
WriteInstantSendLockMined(batch, islockHash, pindexConnected->nHeight);
@ -275,7 +275,7 @@ void CInstantSendDb::RemoveBlockInstantSendLocks(const std::shared_ptr<const CBl
// coinbase and TXs with no inputs can't be locked
continue;
}
uint256 islockHash = GetInstantSendLockHashByTxid(tx->GetHash());
uint256 islockHash = GetInstantSendLockHashByTxidInternal(tx->GetHash());
if (!islockHash.IsNull()) {
RemoveInstantSendLockMined(batch, islockHash, pindexDisconnected->nHeight);
}
@ -286,7 +286,7 @@ void CInstantSendDb::RemoveBlockInstantSendLocks(const std::shared_ptr<const CBl
bool CInstantSendDb::KnownInstantSendLock(const uint256& islockHash) const
{
LOCK(cs_db);
return GetInstantSendLockByHash(islockHash) != nullptr || db->Exists(std::make_tuple(DB_ARCHIVED_BY_HASH, islockHash));
return GetInstantSendLockByHashInternal(islockHash) != nullptr || db->Exists(std::make_tuple(DB_ARCHIVED_BY_HASH, islockHash));
}
size_t CInstantSendDb::GetInstantSendLockCount() const
@ -312,9 +312,9 @@ size_t CInstantSendDb::GetInstantSendLockCount() const
return cnt;
}
CInstantSendLockPtr CInstantSendDb::GetInstantSendLockByHash(const uint256& hash, bool use_cache) const
CInstantSendLockPtr CInstantSendDb::GetInstantSendLockByHashInternal(const uint256& hash, bool use_cache) const
{
LOCK(cs_db);
AssertLockHeld(cs_db);
if (hash.IsNull()) {
return nullptr;
}
@ -337,9 +337,9 @@ CInstantSendLockPtr CInstantSendDb::GetInstantSendLockByHash(const uint256& hash
return ret;
}
uint256 CInstantSendDb::GetInstantSendLockHashByTxid(const uint256& txid) const
uint256 CInstantSendDb::GetInstantSendLockHashByTxidInternal(const uint256& txid) const
{
LOCK(cs_db);
AssertLockHeld(cs_db);
uint256 islockHash;
if (!txidCache.get(txid, islockHash)) {
if (!db->Read(std::make_tuple(DB_HASH_BY_TXID, txid), islockHash)) {
@ -353,7 +353,7 @@ uint256 CInstantSendDb::GetInstantSendLockHashByTxid(const uint256& txid) const
CInstantSendLockPtr CInstantSendDb::GetInstantSendLockByTxid(const uint256& txid) const
{
LOCK(cs_db);
return GetInstantSendLockByHash(GetInstantSendLockHashByTxid(txid));
return GetInstantSendLockByHashInternal(GetInstantSendLockHashByTxidInternal(txid));
}
CInstantSendLockPtr CInstantSendDb::GetInstantSendLockByInput(const COutPoint& outpoint) const
@ -366,7 +366,7 @@ CInstantSendLockPtr CInstantSendDb::GetInstantSendLockByInput(const COutPoint& o
}
outpointCache.insert(outpoint, islockHash);
}
return GetInstantSendLockByHash(islockHash);
return GetInstantSendLockByHashInternal(islockHash);
}
std::vector<uint256> CInstantSendDb::GetInstantSendLocksByParent(const uint256& parent) const
@ -414,7 +414,7 @@ std::vector<uint256> CInstantSendDb::RemoveChainedInstantSendLocks(const uint256
stack.pop_back();
for (auto& childIslockHash : children) {
auto childIsLock = GetInstantSendLockByHash(childIslockHash, false);
auto childIsLock = GetInstantSendLockByHashInternal(childIslockHash, false);
if (!childIsLock) {
continue;
}
@ -479,8 +479,6 @@ void CInstantSendManager::Stop()
void CInstantSendManager::ProcessTx(const CTransaction& tx, bool fRetroactive, const Consensus::Params& params)
{
AssertLockNotHeld(cs);
if (!fMasternodeMode || !IsInstantSendEnabled() || !masternodeSync.IsBlockchainSynced()) {
return;
}
@ -563,7 +561,7 @@ bool CInstantSendManager::TrySignInputLocks(const CTransaction& tx, bool fRetroa
for (size_t i = 0; i < tx.vin.size(); i++) {
auto& in = tx.vin[i];
auto& id = ids[i];
WITH_LOCK(cs, inputRequestIds.emplace(id));
WITH_LOCK(cs_inputReqests, inputRequestIds.emplace(id));
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s: trying to vote on input %s with id %s. fRetroactive=%d\n", __func__,
tx.GetHash().ToString(), in.prevout.ToStringShort(), id.ToString(), fRetroactive);
if (quorumSigningManager->AsyncSignIfMember(llmqType, id, tx.GetHash(), {}, fRetroactive)) {
@ -588,8 +586,6 @@ bool CInstantSendManager::CheckCanLock(const CTransaction& tx, bool printDebug,
bool CInstantSendManager::CheckCanLock(const COutPoint& outpoint, bool printDebug, const uint256& txHash, const Consensus::Params& params) const
{
AssertLockNotHeld(cs);
int nInstantSendConfirmationsRequired = params.nInstantSendConfirmationsRequired;
if (IsLocked(outpoint.hash)) {
@ -648,19 +644,12 @@ void CInstantSendManager::HandleNewRecoveredSig(const CRecoveredSig& recoveredSi
}
uint256 txid;
bool isInstantSendLock = false;
{
LOCK(cs);
if (inputRequestIds.count(recoveredSig.getId())) {
txid = recoveredSig.getMsgHash();
}
if (creatingInstantSendLocks.count(recoveredSig.getId())) {
isInstantSendLock = true;
}
if (LOCK(cs_inputReqests); inputRequestIds.count(recoveredSig.getId())) {
txid = recoveredSig.getMsgHash();
}
if (!txid.IsNull()) {
HandleNewInputLockRecoveredSig(recoveredSig, txid);
} else if (isInstantSendLock) {
} else if (/*isInstantSendLock=*/ WITH_LOCK(cs_creating, return creatingInstantSendLocks.count(recoveredSig.getId()))) {
HandleNewInstantSendLockRecoveredSig(recoveredSig);
}
}
@ -726,7 +715,7 @@ void CInstantSendManager::TrySignInstantSendLock(const CTransaction& tx)
}
{
LOCK(cs);
LOCK(cs_creating);
auto e = creatingInstantSendLocks.emplace(id, std::move(islock));
if (!e.second) {
return;
@ -742,7 +731,7 @@ void CInstantSendManager::HandleNewInstantSendLockRecoveredSig(const llmq::CReco
CInstantSendLockPtr islock;
{
LOCK(cs);
LOCK(cs_creating);
auto it = creatingInstantSendLocks.find(recoveredSig.getId());
if (it == creatingInstantSendLocks.end()) {
return;
@ -762,10 +751,10 @@ void CInstantSendManager::HandleNewInstantSendLockRecoveredSig(const llmq::CReco
islock->sig = recoveredSig.sig;
auto hash = ::SerializeHash(*islock);
LOCK(cs);
if (pendingInstantSendLocks.count(hash) || db.KnownInstantSendLock(hash)) {
if (WITH_LOCK(cs_pendingLocks, return pendingInstantSendLocks.count(hash)) || db.KnownInstantSendLock(hash)) {
return;
}
LOCK(cs_pendingLocks);
pendingInstantSendLocks.emplace(hash, std::make_pair(-1, islock));
}
@ -785,8 +774,6 @@ void CInstantSendManager::ProcessMessage(CNode* pfrom, const std::string& msg_ty
void CInstantSendManager::ProcessMessageInstantSendLock(const CNode* pfrom, const llmq::CInstantSendLockPtr& islock)
{
AssertLockNotHeld(cs);
auto hash = ::SerializeHash(*islock);
bool fDIP0024IsActive = false;
@ -825,14 +812,15 @@ void CInstantSendManager::ProcessMessageInstantSendLock(const CNode* pfrom, cons
// return;
// }
LOCK(cs);
if (pendingInstantSendLocks.count(hash) || pendingNoTxInstantSendLocks.count(hash) || db.KnownInstantSendLock(hash)) {
if (WITH_LOCK(cs_pendingLocks, return pendingInstantSendLocks.count(hash) || pendingNoTxInstantSendLocks.count(hash))
|| db.KnownInstantSendLock(hash)) {
return;
}
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s: received islock, peer=%d\n", __func__,
islock->txid.ToString(), hash.ToString(), pfrom->GetId());
LOCK(cs_pendingLocks);
pendingInstantSendLocks.emplace(hash, std::make_pair(pfrom->GetId(), islock));
}
@ -879,7 +867,7 @@ bool CInstantSendManager::ProcessPendingInstantSendLocks(bool deterministic)
}
{
LOCK(cs);
LOCK(cs_pendingLocks);
// only process a max 32 locks at a time to avoid duplicate verification of recovered signatures which have been
// verified by CSigningManager in parallel
const size_t maxCount = 32;
@ -1049,7 +1037,7 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s: processing islock, peer=%d\n", __func__,
islock->txid.ToString(), hash.ToString(), from);
{
LOCK(cs);
LOCK(cs_creating);
creatingInstantSendLocks.erase(islock->GetRequestId());
txToCreatingInstantSendLocks.erase(islock->txid);
}
@ -1099,7 +1087,7 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has
if (tx == nullptr) {
// put it in a separate pending map and try again later
LOCK(cs);
LOCK(cs_pendingLocks);
pendingNoTxInstantSendLocks.try_emplace(hash, std::make_pair(from, islock));
} else {
db.WriteNewInstantSendLock(hash, *islock);
@ -1108,15 +1096,11 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has
}
}
{
LOCK(cs);
// This will also add children TXs to pendingRetryTxs
RemoveNonLockedTx(islock->txid, true);
// We don't need the recovered sigs for the inputs anymore. This prevents unnecessary propagation of these sigs.
// We only need the ISLOCK from now on to detect conflicts
TruncateRecoveredSigsForInputs(*islock);
}
// This will also add children TXs to pendingRetryTxs
RemoveNonLockedTx(islock->txid, true);
// We don't need the recovered sigs for the inputs anymore. This prevents unnecessary propagation of these sigs.
// We only need the ISLOCK from now on to detect conflicts
TruncateRecoveredSigsForInputs(*islock);
const auto is_det = islock->IsDeterministic();
CInv inv(is_det ? MSG_ISDLOCK : MSG_ISLOCK, hash);
@ -1150,7 +1134,7 @@ void CInstantSendManager::TransactionAddedToMempool(const CTransactionRef& tx)
CInstantSendLockPtr islock{nullptr};
{
LOCK(cs);
LOCK(cs_pendingLocks);
auto it = pendingNoTxInstantSendLocks.begin();
while (it != pendingNoTxInstantSendLocks.end()) {
if (it->second.second->txid == tx->GetHash()) {
@ -1198,7 +1182,6 @@ void CInstantSendManager::BlockConnected(const std::shared_ptr<const CBlock>& pb
}
if (!vtxConflicted.empty()) {
LOCK(cs);
for (const auto& tx : vtxConflicted) {
RemoveConflictedTx(*tx);
}
@ -1217,7 +1200,6 @@ void CInstantSendManager::BlockConnected(const std::shared_ptr<const CBlock>& pb
AddNonLockedTx(tx, pindex);
} else {
// TX is locked, so make sure we don't track it anymore
LOCK(cs);
RemoveNonLockedTx(tx->GetHash(), true);
}
}
@ -1233,39 +1215,42 @@ void CInstantSendManager::BlockDisconnected(const std::shared_ptr<const CBlock>&
void CInstantSendManager::AddNonLockedTx(const CTransactionRef& tx, const CBlockIndex* pindexMined)
{
LOCK(cs);
auto res = nonLockedTxs.emplace(tx->GetHash(), NonLockedTxInfo());
auto& info = res.first->second;
info.pindexMined = pindexMined;
{
LOCK(cs_nonLocked);
auto [it, did_insert] = nonLockedTxs.emplace(tx->GetHash(), NonLockedTxInfo());
auto& nonLockedTxInfo = it->second;
nonLockedTxInfo.pindexMined = pindexMined;
if (res.second) {
info.tx = tx;
for (const auto& in : tx->vin) {
nonLockedTxs[in.prevout.hash].children.emplace(tx->GetHash());
nonLockedTxsByOutpoints.emplace(in.prevout, tx->GetHash());
if (did_insert) {
nonLockedTxInfo.tx = tx;
for (const auto &in: tx->vin) {
nonLockedTxs[in.prevout.hash].children.emplace(tx->GetHash());
nonLockedTxsByOutpoints.emplace(in.prevout, tx->GetHash());
}
}
}
auto it = pendingNoTxInstantSendLocks.begin();
while (it != pendingNoTxInstantSendLocks.end()) {
if (it->second.second->txid == tx->GetHash()) {
// we received an islock earlier, let's put it back into pending and verify/lock
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s\n", __func__,
tx->GetHash().ToString(), it->first.ToString());
pendingInstantSendLocks.try_emplace(it->first, it->second);
pendingNoTxInstantSendLocks.erase(it);
break;
{
LOCK(cs_pendingLocks);
auto it = pendingNoTxInstantSendLocks.begin();
while (it != pendingNoTxInstantSendLocks.end()) {
if (it->second.second->txid == tx->GetHash()) {
// we received an islock earlier, let's put it back into pending and verify/lock
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s\n", __func__,
tx->GetHash().ToString(), it->first.ToString());
pendingInstantSendLocks.try_emplace(it->first, it->second);
pendingNoTxInstantSendLocks.erase(it);
break;
}
++it;
}
++it;
}
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, pindexMined=%s\n", __func__,
tx->GetHash().ToString(), pindexMined ? pindexMined->GetBlockHash().ToString() : "");
}
void CInstantSendManager::RemoveNonLockedTx(const uint256& txid, bool retryChildren)
{
AssertLockHeld(cs);
LOCK(cs_nonLocked);
auto it = nonLockedTxs.find(txid);
if (it == nonLockedTxs.end()) {
@ -1276,6 +1261,7 @@ void CInstantSendManager::RemoveNonLockedTx(const uint256& txid, bool retryChild
size_t retryChildrenCount = 0;
if (retryChildren) {
// TX got locked, so we can retry locking children
LOCK(cs_pendingRetry);
for (auto& childTxid : info.children) {
pendingRetryTxs.emplace(childTxid);
retryChildrenCount++;
@ -1303,9 +1289,9 @@ void CInstantSendManager::RemoveNonLockedTx(const uint256& txid, bool retryChild
void CInstantSendManager::RemoveConflictedTx(const CTransaction& tx)
{
AssertLockHeld(cs);
RemoveNonLockedTx(tx.GetHash(), false);
LOCK(cs_inputReqests);
for (const auto& in : tx.vin) {
auto inputRequestId = ::SerializeHash(std::make_pair(INPUTLOCK_REQUESTID_PREFIX, in));
inputRequestIds.erase(inputRequestId);
@ -1314,10 +1300,9 @@ void CInstantSendManager::RemoveConflictedTx(const CTransaction& tx)
void CInstantSendManager::TruncateRecoveredSigsForInputs(const llmq::CInstantSendLock& islock)
{
AssertLockHeld(cs);
for (auto& in : islock.inputs) {
auto inputRequestId = ::SerializeHash(std::make_pair(INPUTLOCK_REQUESTID_PREFIX, in));
inputRequestIds.erase(inputRequestId);
WITH_LOCK(cs_inputReqests, inputRequestIds.erase(inputRequestId));
quorumSigningManager->TruncateRecoveredSig(CLLMQUtils::GetInstantSendLLMQType(islock.IsDeterministic()), inputRequestId);
}
}
@ -1359,7 +1344,6 @@ void CInstantSendManager::HandleFullyConfirmedBlock(const CBlockIndex* pindex)
auto removeISLocks = db.RemoveConfirmedInstantSendLocks(pindex->nHeight);
LOCK(cs);
for (const auto& p : removeISLocks) {
auto& islockHash = p.first;
auto& islock = p.second;
@ -1380,11 +1364,14 @@ void CInstantSendManager::HandleFullyConfirmedBlock(const CBlockIndex* pindex)
// Find all previously unlocked TXs that got locked by this fully confirmed (ChainLock) block and remove them
// from the nonLockedTxs map. Also collect all children of these TXs and mark them for retrying of IS locking.
std::vector<uint256> toRemove;
for (const auto& p : nonLockedTxs) {
auto pindexMined = p.second.pindexMined;
{
LOCK(cs_nonLocked);
for (const auto& p: nonLockedTxs) {
auto pindexMined = p.second.pindexMined;
if (pindexMined && pindex->GetAncestor(pindexMined->nHeight) == pindexMined) {
toRemove.emplace_back(p.first);
if (pindexMined && pindex->GetAncestor(pindexMined->nHeight) == pindexMined) {
toRemove.emplace_back(p.first);
}
}
}
for (const auto& txid : toRemove) {
@ -1419,11 +1406,8 @@ void CInstantSendManager::RemoveMempoolConflictsForLock(const uint256& hash, con
}
if (!toDelete.empty()) {
{
LOCK(cs);
for (const auto& p : toDelete) {
RemoveConflictedTx(*p.second);
}
for (const auto& p : toDelete) {
RemoveConflictedTx(*p.second);
}
AskNodesForLockedTx(islock.txid, connman);
}
@ -1434,7 +1418,7 @@ void CInstantSendManager::ResolveBlockConflicts(const uint256& islockHash, const
// Lets first collect all non-locked TXs which conflict with the given ISLOCK
std::unordered_map<const CBlockIndex*, std::unordered_map<uint256, CTransactionRef, StaticSaltedHasher>> conflicts;
{
LOCK(cs);
LOCK(cs_nonLocked);
for (auto& in : islock.inputs) {
auto it = nonLockedTxsByOutpoints.find(in);
if (it != nonLockedTxsByOutpoints.end()) {
@ -1478,17 +1462,14 @@ void CInstantSendManager::ResolveBlockConflicts(const uint256& islockHash, const
return;
}
bool isLockedTxKnown = WITH_LOCK(cs, return pendingNoTxInstantSendLocks.find(islockHash) == pendingNoTxInstantSendLocks.end());
bool isLockedTxKnown = WITH_LOCK(cs_pendingLocks, return pendingNoTxInstantSendLocks.find(islockHash) == pendingNoTxInstantSendLocks.end());
bool activateBestChain = false;
for (const auto& p : conflicts) {
auto pindex = p.first;
{
LOCK(cs);
for (auto& p2 : p.second) {
const auto& tx = *p2.second;
RemoveConflictedTx(tx);
}
for (auto& p2 : p.second) {
const auto& tx = *p2.second;
RemoveConflictedTx(tx);
}
LogPrintf("CInstantSendManager::%s -- invalidating block %s\n", __func__, pindex->GetBlockHash().ToString());
@ -1522,8 +1503,6 @@ void CInstantSendManager::ResolveBlockConflicts(const uint256& islockHash, const
void CInstantSendManager::RemoveConflictingLock(const uint256& islockHash, const llmq::CInstantSendLock& islock)
{
AssertLockNotHeld(cs);
LogPrintf("CInstantSendManager::%s -- txid=%s, islock=%s: Removing ISLOCK and its chained children\n", __func__,
islock.txid.ToString(), islockHash.ToString());
int tipHeight = WITH_LOCK(cs_main, return ::ChainActive().Height());
@ -1576,7 +1555,7 @@ void CInstantSendManager::AskNodesForLockedTx(const uint256& txid, const CConnma
void CInstantSendManager::ProcessPendingRetryLockTxs()
{
decltype(pendingRetryTxs) retryTxs = WITH_LOCK(cs, return std::move(pendingRetryTxs));
decltype(pendingRetryTxs) retryTxs = WITH_LOCK(cs_pendingRetry, return std::move(pendingRetryTxs));
if (retryTxs.empty()) {
return;
@ -1590,18 +1569,19 @@ void CInstantSendManager::ProcessPendingRetryLockTxs()
for (const auto& txid : retryTxs) {
CTransactionRef tx;
{
LOCK(cs);
auto it = nonLockedTxs.find(txid);
if (it == nonLockedTxs.end()) {
continue;
{
LOCK(cs_nonLocked);
auto it = nonLockedTxs.find(txid);
if (it == nonLockedTxs.end()) {
continue;
}
tx = it->second.tx;
}
tx = it->second.tx;
if (!tx) {
continue;
}
if (txToCreatingInstantSendLocks.count(tx->GetHash())) {
if (LOCK(cs_creating); txToCreatingInstantSendLocks.count(tx->GetHash())) {
// we're already in the middle of locking this one
continue;
}
@ -1629,9 +1609,8 @@ void CInstantSendManager::ProcessPendingRetryLockTxs()
}
if (retryCount != 0) {
LOCK(cs);
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- retried %d TXs. nonLockedTxs.size=%d\n", __func__,
retryCount, nonLockedTxs.size());
retryCount, WITH_LOCK(cs_nonLocked, return nonLockedTxs.size()));
}
}
@ -1641,8 +1620,8 @@ bool CInstantSendManager::AlreadyHave(const CInv& inv) const
return true;
}
LOCK(cs);
return pendingInstantSendLocks.count(inv.hash) != 0 || pendingNoTxInstantSendLocks.count(inv.hash) != 0 || db.KnownInstantSendLock(inv.hash);
return WITH_LOCK(cs_pendingLocks, return pendingInstantSendLocks.count(inv.hash) != 0 || pendingNoTxInstantSendLocks.count(inv.hash) != 0)
|| db.KnownInstantSendLock(inv.hash);
}
bool CInstantSendManager::GetInstantSendLockByHash(const uint256& hash, llmq::CInstantSendLock& ret) const
@ -1653,7 +1632,7 @@ bool CInstantSendManager::GetInstantSendLockByHash(const uint256& hash, llmq::CI
auto islock = db.GetInstantSendLockByHash(hash);
if (!islock) {
LOCK(cs);
LOCK(cs_pendingLocks);
auto it = pendingInstantSendLocks.find(hash);
if (it != pendingInstantSendLocks.end()) {
islock = it->second.second;
@ -1679,16 +1658,6 @@ CInstantSendLockPtr CInstantSendManager::GetInstantSendLockByTxid(const uint256&
return db.GetInstantSendLockByTxid(txid);
}
bool CInstantSendManager::GetInstantSendLockHashByTxid(const uint256& txid, uint256& ret) const
{
if (!IsInstantSendEnabled()) {
return false;
}
ret = db.GetInstantSendLockHashByTxid(txid);
return !ret.IsNull();
}
bool CInstantSendManager::IsLocked(const uint256& txHash) const
{
if (!IsInstantSendEnabled()) {
@ -1704,7 +1673,7 @@ bool CInstantSendManager::IsWaitingForTx(const uint256& txHash) const
return false;
}
LOCK(cs);
LOCK(cs_pendingLocks);
auto it = pendingNoTxInstantSendLocks.begin();
while (it != pendingNoTxInstantSendLocks.end()) {
if (it->second.second->txid == txHash) {

View File

@ -58,7 +58,7 @@ using CInstantSendLockPtr = std::shared_ptr<CInstantSendLock>;
class CInstantSendDb
{
private:
mutable CCriticalSection cs_db;
mutable Mutex cs_db;
static constexpr int CURRENT_VERSION{1};
@ -95,69 +95,88 @@ private:
*/
std::vector<uint256> GetInstantSendLocksByParent(const uint256& parent) const EXCLUSIVE_LOCKS_REQUIRED(cs_db);
/**
* See GetInstantSendLockByHash
*/
CInstantSendLockPtr GetInstantSendLockByHashInternal(const uint256& hash, bool use_cache = true) const EXCLUSIVE_LOCKS_REQUIRED(cs_db);
/**
* See GetInstantSendLockHashByTxid
*/
uint256 GetInstantSendLockHashByTxidInternal(const uint256& txid) const EXCLUSIVE_LOCKS_REQUIRED(cs_db);
public:
explicit CInstantSendDb(bool unitTests, bool fWipe) :
db(std::make_unique<CDBWrapper>(unitTests ? "" : (GetDataDir() / "llmq/isdb"), 32 << 20, unitTests, fWipe))
{}
void Upgrade();
void Upgrade() LOCKS_EXCLUDED(cs_db);
/**
* This method is called when an InstantSend Lock is processed and adds the lock to the database
* @param hash The hash of the InstantSend Lock
* @param islock The InstantSend Lock object itself
*/
void WriteNewInstantSendLock(const uint256& hash, const CInstantSendLock& islock);
void WriteNewInstantSendLock(const uint256& hash, const CInstantSendLock& islock) LOCKS_EXCLUDED(cs_db);
/**
* This method updates a DB entry for an InstantSend Lock from being not included in a block to being included in a block
* @param hash The hash of the InstantSend Lock
* @param nHeight The height that the transaction was included at
*/
void WriteInstantSendLockMined(const uint256& hash, int nHeight);
void WriteInstantSendLockMined(const uint256& hash, int nHeight) LOCKS_EXCLUDED(cs_db);
/**
* Archives and deletes all IS Locks which were mined into a block before nUntilHeight
* @param nUntilHeight Removes all IS Locks confirmed up until nUntilHeight
* @return returns an unordered_map of the hash of the IS Locks and a pointer object to the IS Locks for all IS Locks which were removed
*/
std::unordered_map<uint256, CInstantSendLockPtr, StaticSaltedHasher> RemoveConfirmedInstantSendLocks(int nUntilHeight);
std::unordered_map<uint256, CInstantSendLockPtr, StaticSaltedHasher> RemoveConfirmedInstantSendLocks(int nUntilHeight) LOCKS_EXCLUDED(cs_db);
/**
* Removes IS Locks from the archive if the tx was confirmed 100 blocks before nUntilHeight
* @param nUntilHeight the height from which to base the remove of archive IS Locks
*/
void RemoveArchivedInstantSendLocks(int nUntilHeight);
void WriteBlockInstantSendLocks(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected);
void RemoveBlockInstantSendLocks(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected);
bool KnownInstantSendLock(const uint256& islockHash) const;
void RemoveArchivedInstantSendLocks(int nUntilHeight) LOCKS_EXCLUDED(cs_db);
void WriteBlockInstantSendLocks(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected) LOCKS_EXCLUDED(cs_db);
void RemoveBlockInstantSendLocks(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected) LOCKS_EXCLUDED(cs_db);
bool KnownInstantSendLock(const uint256& islockHash) const LOCKS_EXCLUDED(cs_db);
/**
* Gets the number of IS Locks which have not been confirmed by a block
* @return size_t value of the number of IS Locks not confirmed by a block
*/
size_t GetInstantSendLockCount() const;
size_t GetInstantSendLockCount() const LOCKS_EXCLUDED(cs_db);
/**
* Gets a pointer to the IS Lock based on the hash
* @param hash The hash of the IS Lock
* @param use_cache Should we try using the cache first or not
* @return A Pointer object to the IS Lock, returns nullptr if it doesn't exist
*/
CInstantSendLockPtr GetInstantSendLockByHash(const uint256& hash, bool use_cache = true) const;
CInstantSendLockPtr GetInstantSendLockByHash(const uint256& hash, bool use_cache = true) const LOCKS_EXCLUDED(cs_db)
{
LOCK(cs_db);
return GetInstantSendLockByHashInternal(hash, use_cache);
};
/**
* Gets an IS Lock hash based on the txid the IS Lock is for
* @param txid The txid which is being searched for
* @return Returns the hash the IS Lock of the specified txid, returns uint256() if it doesn't exist
*/
uint256 GetInstantSendLockHashByTxid(const uint256& txid) const;
uint256 GetInstantSendLockHashByTxid(const uint256& txid) const LOCKS_EXCLUDED(cs_db)
{
LOCK(cs_db);
return GetInstantSendLockHashByTxidInternal(txid);
};
/**
* Gets an IS Lock pointer from the txid given
* @param txid The txid for which the IS Lock Pointer is being returned
* @return Returns the IS Lock Pointer associated with the txid, returns nullptr if it doesn't exist
*/
CInstantSendLockPtr GetInstantSendLockByTxid(const uint256& txid) const;
CInstantSendLockPtr GetInstantSendLockByTxid(const uint256& txid) const LOCKS_EXCLUDED(cs_db);
/**
* Gets an IS Lock pointer from an input given
* @param outpoint Since all inputs are really just outpoints that are being spent
* @return IS Lock Pointer associated with that input.
*/
CInstantSendLockPtr GetInstantSendLockByInput(const COutPoint& outpoint) const;
CInstantSendLockPtr GetInstantSendLockByInput(const COutPoint& outpoint) const LOCKS_EXCLUDED(cs_db);
/**
* Called when a ChainLock invalidated a IS Lock, removes any chained/children IS Locks and the invalidated IS Lock
* @param islockHash IS Lock hash which has been invalidated
@ -165,15 +184,14 @@ public:
* @param nHeight height of the block which received a chainlock and invalidated the IS Lock
* @return A vector of IS Lock hashes of all IS Locks removed
*/
std::vector<uint256> RemoveChainedInstantSendLocks(const uint256& islockHash, const uint256& txid, int nHeight);
std::vector<uint256> RemoveChainedInstantSendLocks(const uint256& islockHash, const uint256& txid, int nHeight) LOCKS_EXCLUDED(cs_db);
void RemoveAndArchiveInstantSendLock(const CInstantSendLockPtr& islock, int nHeight);
void RemoveAndArchiveInstantSendLock(const CInstantSendLockPtr& islock, int nHeight) LOCKS_EXCLUDED(cs_db);
};
class CInstantSendManager : public CRecoveredSigsListener
{
private:
mutable CCriticalSection cs;
CInstantSendDb db;
CConnman& connman;
@ -182,25 +200,30 @@ private:
std::thread workThread;
CThreadInterrupt workInterrupt;
mutable Mutex cs_inputReqests;
/**
* Request ids of inputs that we signed. Used to determine if a recovered signature belongs to an
* in-progress input lock.
*/
std::unordered_set<uint256, StaticSaltedHasher> inputRequestIds GUARDED_BY(cs);
std::unordered_set<uint256, StaticSaltedHasher> inputRequestIds GUARDED_BY(cs_inputReqests);
mutable Mutex cs_creating;
/**
* These are the islocks that are currently in the middle of being created. Entries are created when we observed
* recovered signatures for all inputs of a TX. At the same time, we initiate signing of our sigshare for the islock.
* When the recovered sig for the islock later arrives, we can finish the islock and propagate it.
*/
std::unordered_map<uint256, CInstantSendLock, StaticSaltedHasher> creatingInstantSendLocks GUARDED_BY(cs);
std::unordered_map<uint256, CInstantSendLock, StaticSaltedHasher> creatingInstantSendLocks GUARDED_BY(cs_creating);
// maps from txid to the in-progress islock
std::unordered_map<uint256, CInstantSendLock*, StaticSaltedHasher> txToCreatingInstantSendLocks GUARDED_BY(cs);
std::unordered_map<uint256, CInstantSendLock*, StaticSaltedHasher> txToCreatingInstantSendLocks GUARDED_BY(cs_creating);
mutable Mutex cs_pendingLocks;
// Incoming and not verified yet
std::unordered_map<uint256, std::pair<NodeId, CInstantSendLockPtr>, StaticSaltedHasher> pendingInstantSendLocks GUARDED_BY(cs);
std::unordered_map<uint256, std::pair<NodeId, CInstantSendLockPtr>, StaticSaltedHasher> pendingInstantSendLocks GUARDED_BY(cs_pendingLocks);
// Tried to verify but there is no tx yet
std::unordered_map<uint256, std::pair<NodeId, CInstantSendLockPtr>, StaticSaltedHasher> pendingNoTxInstantSendLocks GUARDED_BY(cs);
std::unordered_map<uint256, std::pair<NodeId, CInstantSendLockPtr>, StaticSaltedHasher> pendingNoTxInstantSendLocks GUARDED_BY(cs_pendingLocks);
// TXs which are neither IS locked nor ChainLocked. We use this to determine for which TXs we need to retry IS locking
// of child TXs
@ -209,10 +232,13 @@ private:
CTransactionRef tx;
std::unordered_set<uint256, StaticSaltedHasher> children;
};
std::unordered_map<uint256, NonLockedTxInfo, StaticSaltedHasher> nonLockedTxs GUARDED_BY(cs);
std::unordered_map<COutPoint, uint256, SaltedOutpointHasher> nonLockedTxsByOutpoints GUARDED_BY(cs);
std::unordered_set<uint256, StaticSaltedHasher> pendingRetryTxs GUARDED_BY(cs);
mutable Mutex cs_nonLocked;
std::unordered_map<uint256, NonLockedTxInfo, StaticSaltedHasher> nonLockedTxs GUARDED_BY(cs_nonLocked);
std::unordered_map<COutPoint, uint256, SaltedOutpointHasher> nonLockedTxsByOutpoints GUARDED_BY(cs_nonLocked);
mutable Mutex cs_pendingRetry;
std::unordered_set<uint256, StaticSaltedHasher> pendingRetryTxs GUARDED_BY(cs_pendingRetry);
public:
explicit CInstantSendManager(CConnman& _connman, bool unitTests, bool fWipe) : db(unitTests, fWipe), connman(_connman) { workInterrupt.reset(); }
@ -223,62 +249,66 @@ public:
void InterruptWorkerThread() { workInterrupt(); };
private:
void ProcessTx(const CTransaction& tx, bool fRetroactive, const Consensus::Params& params) LOCKS_EXCLUDED(cs);
void ProcessTx(const CTransaction& tx, bool fRetroactive, const Consensus::Params& params);
bool CheckCanLock(const CTransaction& tx, bool printDebug, const Consensus::Params& params) const;
bool CheckCanLock(const COutPoint& outpoint, bool printDebug, const uint256& txHash, const Consensus::Params& params) const LOCKS_EXCLUDED(cs);
bool CheckCanLock(const COutPoint& outpoint, bool printDebug, const uint256& txHash, const Consensus::Params& params) const;
bool IsConflicted(const CTransaction& tx) const { return GetConflictingLock(tx) != nullptr; };
void HandleNewInputLockRecoveredSig(const CRecoveredSig& recoveredSig, const uint256& txid);
void HandleNewInstantSendLockRecoveredSig(const CRecoveredSig& recoveredSig);
void HandleNewInstantSendLockRecoveredSig(const CRecoveredSig& recoveredSig) LOCKS_EXCLUDED(cs_creating, cs_pendingLocks);
bool TrySignInputLocks(const CTransaction& tx, bool allowResigning, Consensus::LLMQType llmqType, const Consensus::Params& params);
void TrySignInstantSendLock(const CTransaction& tx);
bool TrySignInputLocks(const CTransaction& tx, bool allowResigning, Consensus::LLMQType llmqType, const Consensus::Params& params) LOCKS_EXCLUDED(cs_inputReqests);
void TrySignInstantSendLock(const CTransaction& tx) LOCKS_EXCLUDED(cs_creating);
void ProcessMessageInstantSendLock(const CNode* pfrom, const CInstantSendLockPtr& islock) LOCKS_EXCLUDED(cs);
void ProcessMessageInstantSendLock(const CNode* pfrom, const CInstantSendLockPtr& islock);
static bool PreVerifyInstantSendLock(const CInstantSendLock& islock);
bool ProcessPendingInstantSendLocks();
bool ProcessPendingInstantSendLocks(bool deterministic);
bool ProcessPendingInstantSendLocks(bool deterministic) LOCKS_EXCLUDED(cs_pendingLocks);
std::unordered_set<uint256, StaticSaltedHasher> ProcessPendingInstantSendLocks(const Consensus::LLMQType llmqType, int signOffset, const std::unordered_map<uint256, std::pair<NodeId, CInstantSendLockPtr>, StaticSaltedHasher>& pend, bool ban);
void ProcessInstantSendLock(NodeId from, const uint256& hash, const CInstantSendLockPtr& islock);
std::unordered_set<uint256, StaticSaltedHasher> ProcessPendingInstantSendLocks(const Consensus::LLMQType llmqType,
int signOffset,
const std::unordered_map<uint256,
std::pair<NodeId, CInstantSendLockPtr>,
StaticSaltedHasher>& pend,
bool ban) LOCKS_EXCLUDED(cs_pendingLocks);
void ProcessInstantSendLock(NodeId from, const uint256& hash, const CInstantSendLockPtr& islock) LOCKS_EXCLUDED(cs_creating, cs_pendingLocks);
void AddNonLockedTx(const CTransactionRef& tx, const CBlockIndex* pindexMined);
void RemoveNonLockedTx(const uint256& txid, bool retryChildren) EXCLUSIVE_LOCKS_REQUIRED(cs);
void RemoveConflictedTx(const CTransaction& tx) EXCLUSIVE_LOCKS_REQUIRED(cs);
void TruncateRecoveredSigsForInputs(const CInstantSendLock& islock) EXCLUSIVE_LOCKS_REQUIRED(cs);
void AddNonLockedTx(const CTransactionRef& tx, const CBlockIndex* pindexMined) LOCKS_EXCLUDED(cs_pendingLocks, cs_nonLocked);
void RemoveNonLockedTx(const uint256& txid, bool retryChildren) LOCKS_EXCLUDED(cs_nonLocked, cs_pendingRetry);
void RemoveConflictedTx(const CTransaction& tx) LOCKS_EXCLUDED(cs_inputReqests);
void TruncateRecoveredSigsForInputs(const CInstantSendLock& islock) LOCKS_EXCLUDED(cs_inputReqests);
void RemoveMempoolConflictsForLock(const uint256& hash, const CInstantSendLock& islock);
void ResolveBlockConflicts(const uint256& islockHash, const CInstantSendLock& islock);
void ResolveBlockConflicts(const uint256& islockHash, const CInstantSendLock& islock) LOCKS_EXCLUDED(cs_pendingLocks, cs_nonLocked);
static void AskNodesForLockedTx(const uint256& txid, const CConnman& connman);
void ProcessPendingRetryLockTxs();
void ProcessPendingRetryLockTxs() LOCKS_EXCLUDED(cs_creating, cs_nonLocked, cs_pendingRetry);
void WorkThreadMain();
void HandleFullyConfirmedBlock(const CBlockIndex* pindex);
void HandleFullyConfirmedBlock(const CBlockIndex* pindex) LOCKS_EXCLUDED(cs_nonLocked);
public:
bool IsLocked(const uint256& txHash) const;
bool IsWaitingForTx(const uint256& txHash) const;
bool IsWaitingForTx(const uint256& txHash) const LOCKS_EXCLUDED(cs_pendingLocks);
CInstantSendLockPtr GetConflictingLock(const CTransaction& tx) const;
void HandleNewRecoveredSig(const CRecoveredSig& recoveredSig) override;
void HandleNewRecoveredSig(const CRecoveredSig& recoveredSig) override LOCKS_EXCLUDED(cs_inputReqests, cs_creating);
void ProcessMessage(CNode* pfrom, const std::string& msg_type, CDataStream& vRecv);
void TransactionAddedToMempool(const CTransactionRef& tx);
void TransactionAddedToMempool(const CTransactionRef& tx) LOCKS_EXCLUDED(cs_pendingLocks);
void TransactionRemovedFromMempool(const CTransactionRef& tx);
void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex, const std::vector<CTransactionRef>& vtxConflicted);
void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected);
bool AlreadyHave(const CInv& inv) const;
bool GetInstantSendLockByHash(const uint256& hash, CInstantSendLock& ret) const;
bool AlreadyHave(const CInv& inv) const LOCKS_EXCLUDED(cs_pendingLocks);
bool GetInstantSendLockByHash(const uint256& hash, CInstantSendLock& ret) const LOCKS_EXCLUDED(cs_pendingLocks);
CInstantSendLockPtr GetInstantSendLockByTxid(const uint256& txid) const;
bool GetInstantSendLockHashByTxid(const uint256& txid, uint256& ret) const;
void NotifyChainLock(const CBlockIndex* pindexChainLock);
void UpdatedBlockTip(const CBlockIndex* pindexNew);
void RemoveConflictingLock(const uint256& islockHash, const CInstantSendLock& islock) LOCKS_EXCLUDED(cs);
void RemoveConflictingLock(const uint256& islockHash, const CInstantSendLock& islock);
size_t GetInstantSendLockCount() const;
};