mirror of
https://github.com/dashpay/dash.git
synced 2024-12-27 04:52:59 +01:00
instantsend|sigs: Sleep when there is no more work (#3988)
* instantsend|sigs: Sleep when there is no more work Instead of sleeping only when no work has been done. Avoids useless cycles, improves batching. * llmq: Add and use nMaxBatchSize * llmq: Compare to what we got in return, not what we verified at the end It might happen that we get 32 pending but do only verify less than 32 and in this case we would assume there is no more work but it could still be more in the pipeline from my understanding. * llmq: Rename more_work -> fMoreWork * llmq: Be consistent with the other fMoreWork initialization Co-authored-by: xdustinface <xdustinfacex@gmail.com>
This commit is contained in:
parent
317353deb3
commit
899c124c76
@ -731,6 +731,7 @@ bool CInstantSendManager::PreVerifyInstantSendLock(const llmq::CInstantSendLock&
|
|||||||
bool CInstantSendManager::ProcessPendingInstantSendLocks()
|
bool CInstantSendManager::ProcessPendingInstantSendLocks()
|
||||||
{
|
{
|
||||||
decltype(pendingInstantSendLocks) pend;
|
decltype(pendingInstantSendLocks) pend;
|
||||||
|
bool fMoreWork{false};
|
||||||
|
|
||||||
{
|
{
|
||||||
LOCK(cs);
|
LOCK(cs);
|
||||||
@ -745,6 +746,7 @@ bool CInstantSendManager::ProcessPendingInstantSendLocks()
|
|||||||
pend.emplace(it->first, std::move(it->second));
|
pend.emplace(it->first, std::move(it->second));
|
||||||
pendingInstantSendLocks.erase(it);
|
pendingInstantSendLocks.erase(it);
|
||||||
}
|
}
|
||||||
|
fMoreWork = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -776,7 +778,7 @@ bool CInstantSendManager::ProcessPendingInstantSendLocks()
|
|||||||
ProcessPendingInstantSendLocks(dkgInterval, pend, true);
|
ProcessPendingInstantSendLocks(dkgInterval, pend, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return fMoreWork;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unordered_set<uint256> CInstantSendManager::ProcessPendingInstantSendLocks(int signOffset, const std::unordered_map<uint256, std::pair<NodeId, CInstantSendLockPtr>, StaticSaltedHasher>& pend, bool ban)
|
std::unordered_set<uint256> CInstantSendManager::ProcessPendingInstantSendLocks(int signOffset, const std::unordered_map<uint256, std::pair<NodeId, CInstantSendLockPtr>, StaticSaltedHasher>& pend, bool ban)
|
||||||
@ -1365,7 +1367,7 @@ void CInstantSendManager::AskNodesForLockedTx(const uint256& txid)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool CInstantSendManager::ProcessPendingRetryLockTxs()
|
void CInstantSendManager::ProcessPendingRetryLockTxs()
|
||||||
{
|
{
|
||||||
decltype(pendingRetryTxs) retryTxs;
|
decltype(pendingRetryTxs) retryTxs;
|
||||||
{
|
{
|
||||||
@ -1374,11 +1376,11 @@ bool CInstantSendManager::ProcessPendingRetryLockTxs()
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (retryTxs.empty()) {
|
if (retryTxs.empty()) {
|
||||||
return false;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!IsInstantSendEnabled()) {
|
if (!IsInstantSendEnabled()) {
|
||||||
return false;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int retryCount = 0;
|
int retryCount = 0;
|
||||||
@ -1428,8 +1430,6 @@ bool CInstantSendManager::ProcessPendingRetryLockTxs()
|
|||||||
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- retried %d TXs. nonLockedTxs.size=%d\n", __func__,
|
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- retried %d TXs. nonLockedTxs.size=%d\n", __func__,
|
||||||
retryCount, nonLockedTxs.size());
|
retryCount, nonLockedTxs.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
return retryCount != 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool CInstantSendManager::AlreadyHave(const CInv& inv)
|
bool CInstantSendManager::AlreadyHave(const CInv& inv)
|
||||||
@ -1521,17 +1521,13 @@ size_t CInstantSendManager::GetInstantSendLockCount()
|
|||||||
void CInstantSendManager::WorkThreadMain()
|
void CInstantSendManager::WorkThreadMain()
|
||||||
{
|
{
|
||||||
while (!workInterrupt) {
|
while (!workInterrupt) {
|
||||||
bool didWork = false;
|
bool fMoreWork = ProcessPendingInstantSendLocks();
|
||||||
|
ProcessPendingRetryLockTxs();
|
||||||
|
|
||||||
didWork |= ProcessPendingInstantSendLocks();
|
if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
|
||||||
didWork |= ProcessPendingRetryLockTxs();
|
|
||||||
|
|
||||||
if (!didWork) {
|
|
||||||
if (!workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool IsInstantSendEnabled()
|
bool IsInstantSendEnabled()
|
||||||
|
@ -159,7 +159,7 @@ public:
|
|||||||
void ResolveBlockConflicts(const uint256& islockHash, const CInstantSendLock& islock);
|
void ResolveBlockConflicts(const uint256& islockHash, const CInstantSendLock& islock);
|
||||||
void RemoveChainLockConflictingLock(const uint256& islockHash, const CInstantSendLock& islock);
|
void RemoveChainLockConflictingLock(const uint256& islockHash, const CInstantSendLock& islock);
|
||||||
static void AskNodesForLockedTx(const uint256& txid);
|
static void AskNodesForLockedTx(const uint256& txid);
|
||||||
bool ProcessPendingRetryLockTxs();
|
void ProcessPendingRetryLockTxs();
|
||||||
|
|
||||||
bool AlreadyHave(const CInv& inv);
|
bool AlreadyHave(const CInv& inv);
|
||||||
bool GetInstantSendLockByHash(const uint256& hash, CInstantSendLock& ret);
|
bool GetInstantSendLockByHash(const uint256& hash, CInstantSendLock& ret);
|
||||||
|
@ -621,7 +621,8 @@ bool CSigningManager::ProcessPendingRecoveredSigs()
|
|||||||
|
|
||||||
ProcessPendingReconstructedRecoveredSigs();
|
ProcessPendingReconstructedRecoveredSigs();
|
||||||
|
|
||||||
CollectPendingRecoveredSigsToVerify(32, recSigsByNode, quorums);
|
const size_t nMaxBatchSize{32};
|
||||||
|
CollectPendingRecoveredSigsToVerify(nMaxBatchSize, recSigsByNode, quorums);
|
||||||
if (recSigsByNode.empty()) {
|
if (recSigsByNode.empty()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -675,7 +676,7 @@ bool CSigningManager::ProcessPendingRecoveredSigs()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return recSigsByNode.size() >= nMaxBatchSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
// signature must be verified already
|
// signature must be verified already
|
||||||
|
@ -639,7 +639,8 @@ bool CSigSharesManager::ProcessPendingSigShares(CConnman& connman)
|
|||||||
std::unordered_map<NodeId, std::vector<CSigShare>> sigSharesByNodes;
|
std::unordered_map<NodeId, std::vector<CSigShare>> sigSharesByNodes;
|
||||||
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher> quorums;
|
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher> quorums;
|
||||||
|
|
||||||
CollectPendingSigSharesToVerify(32, sigSharesByNodes, quorums);
|
const size_t nMaxBatchSize{32};
|
||||||
|
CollectPendingSigSharesToVerify(nMaxBatchSize, sigSharesByNodes, quorums);
|
||||||
if (sigSharesByNodes.empty()) {
|
if (sigSharesByNodes.empty()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -704,7 +705,7 @@ bool CSigSharesManager::ProcessPendingSigShares(CConnman& connman)
|
|||||||
ProcessPendingSigShares(v, quorums, connman);
|
ProcessPendingSigShares(v, quorums, connman);
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return sigSharesByNodes.size() >= nMaxBatchSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
// It's ensured that no duplicates are passed to this method
|
// It's ensured that no duplicates are passed to this method
|
||||||
@ -1501,12 +1502,12 @@ void CSigSharesManager::WorkThreadMain()
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool didWork = false;
|
bool fMoreWork{false};
|
||||||
|
|
||||||
RemoveBannedNodeStates();
|
RemoveBannedNodeStates();
|
||||||
didWork |= quorumSigningManager->ProcessPendingRecoveredSigs();
|
fMoreWork |= quorumSigningManager->ProcessPendingRecoveredSigs();
|
||||||
didWork |= ProcessPendingSigShares(*g_connman);
|
fMoreWork |= ProcessPendingSigShares(*g_connman);
|
||||||
didWork |= SignPendingSigShares();
|
SignPendingSigShares();
|
||||||
|
|
||||||
if (GetTimeMillis() - lastSendTime > 100) {
|
if (GetTimeMillis() - lastSendTime > 100) {
|
||||||
SendMessages();
|
SendMessages();
|
||||||
@ -1517,12 +1518,10 @@ void CSigSharesManager::WorkThreadMain()
|
|||||||
quorumSigningManager->Cleanup();
|
quorumSigningManager->Cleanup();
|
||||||
|
|
||||||
// TODO Wakeup when pending signing is needed?
|
// TODO Wakeup when pending signing is needed?
|
||||||
if (!didWork) {
|
if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
|
||||||
if (!workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void CSigSharesManager::AsyncSign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash)
|
void CSigSharesManager::AsyncSign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash)
|
||||||
@ -1531,7 +1530,7 @@ void CSigSharesManager::AsyncSign(const CQuorumCPtr& quorum, const uint256& id,
|
|||||||
pendingSigns.emplace_back(quorum, id, msgHash);
|
pendingSigns.emplace_back(quorum, id, msgHash);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool CSigSharesManager::SignPendingSigShares()
|
void CSigSharesManager::SignPendingSigShares()
|
||||||
{
|
{
|
||||||
std::vector<std::tuple<const CQuorumCPtr, uint256, uint256>> v;
|
std::vector<std::tuple<const CQuorumCPtr, uint256, uint256>> v;
|
||||||
{
|
{
|
||||||
@ -1557,8 +1556,6 @@ bool CSigSharesManager::SignPendingSigShares()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return !v.empty();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
CSigShare CSigSharesManager::CreateSigShare(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash)
|
CSigShare CSigSharesManager::CreateSigShare(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash)
|
||||||
|
@ -451,7 +451,7 @@ private:
|
|||||||
void CollectSigSharesToSend(std::unordered_map<NodeId, std::unordered_map<uint256, CBatchedSigShares, StaticSaltedHasher>>& sigSharesToSend);
|
void CollectSigSharesToSend(std::unordered_map<NodeId, std::unordered_map<uint256, CBatchedSigShares, StaticSaltedHasher>>& sigSharesToSend);
|
||||||
void CollectSigSharesToSendConcentrated(std::unordered_map<NodeId, std::vector<CSigShare>>& sigSharesToSend, const std::vector<CNode*>& vNodes);
|
void CollectSigSharesToSendConcentrated(std::unordered_map<NodeId, std::vector<CSigShare>>& sigSharesToSend, const std::vector<CNode*>& vNodes);
|
||||||
void CollectSigSharesToAnnounce(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToAnnounce);
|
void CollectSigSharesToAnnounce(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToAnnounce);
|
||||||
bool SignPendingSigShares();
|
void SignPendingSigShares();
|
||||||
void WorkThreadMain();
|
void WorkThreadMain();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user