perf: actually only process each dsq once (#5484)

5 minute profiling shows previous usage around ~7% and current usage
around ~2%

## Issue being fixed or feature implemented
Due to us rapidly receiving multiple duplicates of DSQueue's, we start
processing them before it's added the the vector of processed ones, we
probably at one point tried to minimize locked time, but that's not
productive here

## What was done?
Expand the locked scope to ensure we don't double process. 

## How Has This Been Tested?
Ran full node for 5-10 minutes

## Breaking Changes
Should be none

## Checklist:
_Go over all the following points, and put an `x` in all the boxes that
apply._
- [ ] I have performed a self-review of my own code
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have added or updated relevant unit/integration/functional/e2e
tests
- [ ] I have made corresponding changes to the documentation
- [x] I have assigned this pull request to a milestone _(for repository
code-owners and collaborators only)_

---------

Co-authored-by: UdjinM6 <UdjinM6@users.noreply.github.com>
This commit is contained in:
PastaPastaPasta 2023-07-16 12:58:08 -05:00 committed by GitHub
parent 494b5c744c
commit 0cf9410d47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 60 additions and 50 deletions

View File

@ -68,9 +68,10 @@ void CCoinJoinClientQueueManager::ProcessDSQueue(const CNode& peer, PeerManager&
} }
{ {
TRY_LOCK(cs_vecqueue, lockRecv); LOCK(cs_ProcessDSQueue);
if (!lockRecv) return;
{
LOCK(cs_vecqueue);
// process every dsq only once // process every dsq only once
for (const auto &q: vecCoinJoinQueue) { for (const auto &q: vecCoinJoinQueue) {
if (q == dsq) { if (q == dsq) {
@ -78,7 +79,9 @@ void CCoinJoinClientQueueManager::ProcessDSQueue(const CNode& peer, PeerManager&
} }
if (q.fReady == dsq.fReady && q.masternodeOutpoint == dsq.masternodeOutpoint) { if (q.fReady == dsq.fReady && q.masternodeOutpoint == dsq.masternodeOutpoint) {
// no way the same mn can send another dsq with the same readiness this soon // no way the same mn can send another dsq with the same readiness this soon
LogPrint(BCLog::COINJOIN, "DSQUEUE -- Peer %s is sending WAY too many dsq messages for a masternode with collateral %s\n", peer.GetLogString(), dsq.masternodeOutpoint.ToStringShort()); LogPrint(BCLog::COINJOIN, /* Continued */
"DSQUEUE -- Peer %s is sending WAY too many dsq messages for a masternode with collateral %s\n",
peer.GetLogString(), dsq.masternodeOutpoint.ToStringShort());
return; return;
} }
} }
@ -103,31 +106,37 @@ void CCoinJoinClientQueueManager::ProcessDSQueue(const CNode& peer, PeerManager&
// if the queue is ready, submit if we can // if the queue is ready, submit if we can
if (dsq.fReady && ranges::any_of(coinJoinClientManagers, if (dsq.fReady && ranges::any_of(coinJoinClientManagers,
[this, &dmn](const auto& pair){ return pair.second->TrySubmitDenominate(dmn->pdmnState->addr, this->connman); })) { [this, &dmn](const auto &pair) {
LogPrint(BCLog::COINJOIN, "DSQUEUE -- CoinJoin queue (%s) is ready on masternode %s\n", dsq.ToString(), dmn->pdmnState->addr.ToString()); return pair.second->TrySubmitDenominate(dmn->pdmnState->addr,
this->connman);
})) {
LogPrint(BCLog::COINJOIN, "DSQUEUE -- CoinJoin queue (%s) is ready on masternode %s\n", dsq.ToString(),
dmn->pdmnState->addr.ToString());
return; return;
} else { } else {
int64_t nLastDsq = mmetaman.GetMetaInfo(dmn->proTxHash)->GetLastDsq(); int64_t nLastDsq = mmetaman.GetMetaInfo(dmn->proTxHash)->GetLastDsq();
int64_t nDsqThreshold = mmetaman.GetDsqThreshold(dmn->proTxHash, mnList.GetValidMNsCount()); int64_t nDsqThreshold = mmetaman.GetDsqThreshold(dmn->proTxHash, mnList.GetValidMNsCount());
LogPrint(BCLog::COINJOIN, "DSQUEUE -- nLastDsq: %d nDsqThreshold: %d nDsqCount: %d\n", nLastDsq, nDsqThreshold, mmetaman.GetDsqCount()); LogPrint(BCLog::COINJOIN, "DSQUEUE -- nLastDsq: %d nDsqThreshold: %d nDsqCount: %d\n", nLastDsq,
nDsqThreshold, mmetaman.GetDsqCount());
// don't allow a few nodes to dominate the queuing process // don't allow a few nodes to dominate the queuing process
if (nLastDsq != 0 && nDsqThreshold > mmetaman.GetDsqCount()) { if (nLastDsq != 0 && nDsqThreshold > mmetaman.GetDsqCount()) {
LogPrint(BCLog::COINJOIN, "DSQUEUE -- Masternode %s is sending too many dsq messages\n", dmn->proTxHash.ToString()); LogPrint(BCLog::COINJOIN, "DSQUEUE -- Masternode %s is sending too many dsq messages\n",
dmn->proTxHash.ToString());
return; return;
} }
mmetaman.AllowMixing(dmn->proTxHash); mmetaman.AllowMixing(dmn->proTxHash);
LogPrint(BCLog::COINJOIN, "DSQUEUE -- new CoinJoin queue (%s) from masternode %s\n", dsq.ToString(), dmn->pdmnState->addr.ToString()); LogPrint(BCLog::COINJOIN, "DSQUEUE -- new CoinJoin queue (%s) from masternode %s\n", dsq.ToString(),
dmn->pdmnState->addr.ToString());
ranges::any_of(coinJoinClientManagers, ranges::any_of(coinJoinClientManagers,
[&dsq](const auto &pair) { return pair.second->MarkAlreadyJoinedQueueAsTried(dsq); }); [&dsq](const auto &pair) { return pair.second->MarkAlreadyJoinedQueueAsTried(dsq); });
{TRY_LOCK(cs_vecqueue, lockRecv); WITH_LOCK(cs_vecqueue, vecCoinJoinQueue.push_back(dsq));
if (!lockRecv) return;
vecCoinJoinQueue.push_back(dsq);}
dsq.Relay(connman);
} }
} // cs_ProcessDSQueue
dsq.Relay(connman);
} }
void CCoinJoinClientManager::ProcessMessage(CNode& peer, PeerManager& peerman, CConnman& connman, const CTxMemPool& mempool, std::string_view msg_type, CDataStream& vRecv) void CCoinJoinClientManager::ProcessMessage(CNode& peer, PeerManager& peerman, CConnman& connman, const CTxMemPool& mempool, std::string_view msg_type, CDataStream& vRecv)

View File

@ -159,6 +159,7 @@ class CCoinJoinClientQueueManager : public CCoinJoinBaseManager
private: private:
CConnman& connman; CConnman& connman;
const CMasternodeSync& m_mn_sync; const CMasternodeSync& m_mn_sync;
mutable Mutex cs_ProcessDSQueue;
public: public:
explicit CCoinJoinClientQueueManager(CConnman& _connman, const CMasternodeSync& mn_sync) : explicit CCoinJoinClientQueueManager(CConnman& _connman, const CMasternodeSync& mn_sync) :