Optimize sleeping behavior in CSigSharesManager::WorkThreadMain (#2707)

* Don't sleep in WorkThreadMain when CPU intensive work was done

When the current iteration resulted in CPU intensive work, it's likely that
the next iteration will result in work as well. Do not sleep in that case,
as we're otherwise wasting (unused) CPU resources.

* No matter how fast we process sig shares, always force 100ms between sending

* Apply review suggestions
This commit is contained in:
Alexander Block 2019-02-16 17:59:48 +01:00 committed by UdjinM6
parent feb4e0ac74
commit 7a192e2e4c
4 changed files with 35 additions and 16 deletions

View File

@ -421,14 +421,14 @@ void CSigningManager::CollectPendingRecoveredSigsToVerify(
} }
} }
void CSigningManager::ProcessPendingRecoveredSigs(CConnman& connman) bool CSigningManager::ProcessPendingRecoveredSigs(CConnman& connman)
{ {
std::map<NodeId, std::list<CRecoveredSig>> recSigsByNode; std::map<NodeId, std::list<CRecoveredSig>> recSigsByNode;
std::map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr> quorums; std::map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr> quorums;
CollectPendingRecoveredSigsToVerify(32, recSigsByNode, quorums); CollectPendingRecoveredSigsToVerify(32, recSigsByNode, quorums);
if (recSigsByNode.empty()) { if (recSigsByNode.empty()) {
return; return false;
} }
// It's ok to perform insecure batched verification here as we verify against the quorum public keys, which are not // It's ok to perform insecure batched verification here as we verify against the quorum public keys, which are not
@ -474,6 +474,8 @@ void CSigningManager::ProcessPendingRecoveredSigs(CConnman& connman)
ProcessRecoveredSig(nodeId, recSig, quorum, connman); ProcessRecoveredSig(nodeId, recSig, quorum, connman);
} }
} }
return true;
} }
// signature must be verified already // signature must be verified already

View File

@ -157,7 +157,7 @@ private:
bool PreVerifyRecoveredSig(NodeId nodeId, const CRecoveredSig& recoveredSig, bool& retBan); bool PreVerifyRecoveredSig(NodeId nodeId, const CRecoveredSig& recoveredSig, bool& retBan);
void CollectPendingRecoveredSigsToVerify(size_t maxUniqueSessions, std::map<NodeId, std::list<CRecoveredSig>>& retSigShares, std::map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr>& retQuorums); void CollectPendingRecoveredSigsToVerify(size_t maxUniqueSessions, std::map<NodeId, std::list<CRecoveredSig>>& retSigShares, std::map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr>& retQuorums);
void ProcessPendingRecoveredSigs(CConnman& connman); // called from the worker thread of CSigSharesManager bool ProcessPendingRecoveredSigs(CConnman& connman); // called from the worker thread of CSigSharesManager
void ProcessRecoveredSig(NodeId nodeId, const CRecoveredSig& recoveredSig, const CQuorumCPtr& quorum, CConnman& connman); void ProcessRecoveredSig(NodeId nodeId, const CRecoveredSig& recoveredSig, const CQuorumCPtr& quorum, CConnman& connman);
void Cleanup(); // called from the worker thread of CSigSharesManager void Cleanup(); // called from the worker thread of CSigSharesManager

View File

@ -454,14 +454,14 @@ void CSigSharesManager::CollectPendingSigSharesToVerify(
} }
} }
void CSigSharesManager::ProcessPendingSigShares(CConnman& connman) bool CSigSharesManager::ProcessPendingSigShares(CConnman& connman)
{ {
std::map<NodeId, std::vector<CSigShare>> sigSharesByNodes; std::map<NodeId, std::vector<CSigShare>> sigSharesByNodes;
std::map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr> quorums; std::map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr> quorums;
CollectPendingSigSharesToVerify(32, sigSharesByNodes, quorums); CollectPendingSigSharesToVerify(32, sigSharesByNodes, quorums);
if (sigSharesByNodes.empty()) { if (sigSharesByNodes.empty()) {
return; return false;
} }
// It's ok to perform insecure batched verification here as we verify against the quorum public key shares, // It's ok to perform insecure batched verification here as we verify against the quorum public key shares,
@ -521,6 +521,8 @@ void CSigSharesManager::ProcessPendingSigShares(CConnman& connman)
ProcessPendingSigSharesFromNode(nodeId, v, quorums, connman); ProcessPendingSigSharesFromNode(nodeId, v, quorums, connman);
} }
return true;
} }
// It's ensured that no duplicates are passed to this method // It's ensured that no duplicates are passed to this method
@ -881,7 +883,7 @@ void CSigSharesManager::CollectSigSharesToAnnounce(std::map<NodeId, std::map<uin
this->sigSharesToAnnounce.clear(); this->sigSharesToAnnounce.clear();
} }
void CSigSharesManager::SendMessages() bool CSigSharesManager::SendMessages()
{ {
std::multimap<CService, NodeId> nodesByAddress; std::multimap<CService, NodeId> nodesByAddress;
g_connman->ForEachNode([&nodesByAddress](CNode* pnode) { g_connman->ForEachNode([&nodesByAddress](CNode* pnode) {
@ -943,6 +945,8 @@ void CSigSharesManager::SendMessages()
if (didSend) { if (didSend) {
g_connman->WakeSelect(); g_connman->WakeSelect();
} }
return didSend;
} }
void CSigSharesManager::Cleanup() void CSigSharesManager::Cleanup()
@ -1110,6 +1114,8 @@ void CSigSharesManager::WorkThreadMain()
{ {
workInterrupt.reset(); workInterrupt.reset();
int64_t lastSendTime = 0;
while (!workInterrupt) { while (!workInterrupt) {
if (!quorumSigningManager || !g_connman || !quorumSigningManager) { if (!quorumSigningManager || !g_connman || !quorumSigningManager) {
if (!workInterrupt.sleep_for(std::chrono::milliseconds(100))) { if (!workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
@ -1118,19 +1124,28 @@ void CSigSharesManager::WorkThreadMain()
continue; continue;
} }
bool didWork = false;
RemoveBannedNodeStates(); RemoveBannedNodeStates();
quorumSigningManager->ProcessPendingRecoveredSigs(*g_connman); didWork |= quorumSigningManager->ProcessPendingRecoveredSigs(*g_connman);
ProcessPendingSigShares(*g_connman); didWork |= ProcessPendingSigShares(*g_connman);
SignPendingSigShares(); didWork |= SignPendingSigShares();
if (GetTimeMillis() - lastSendTime > 100) {
SendMessages(); SendMessages();
lastSendTime = GetTimeMillis();
}
Cleanup(); Cleanup();
quorumSigningManager->Cleanup(); quorumSigningManager->Cleanup();
// TODO Wakeup when pending signing is needed? // TODO Wakeup when pending signing is needed?
if (!didWork) {
if (!workInterrupt.sleep_for(std::chrono::milliseconds(100))) { if (!workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
return; return;
} }
} }
}
} }
void CSigSharesManager::AsyncSign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash) void CSigSharesManager::AsyncSign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash)
@ -1139,7 +1154,7 @@ void CSigSharesManager::AsyncSign(const CQuorumCPtr& quorum, const uint256& id,
pendingSigns.emplace_back(quorum, id, msgHash); pendingSigns.emplace_back(quorum, id, msgHash);
} }
void CSigSharesManager::SignPendingSigShares() bool CSigSharesManager::SignPendingSigShares()
{ {
std::vector<std::tuple<const CQuorumCPtr, uint256, uint256>> v; std::vector<std::tuple<const CQuorumCPtr, uint256, uint256>> v;
{ {
@ -1150,6 +1165,8 @@ void CSigSharesManager::SignPendingSigShares()
for (auto& t : v) { for (auto& t : v) {
Sign(std::get<0>(t), std::get<1>(t), std::get<2>(t)); Sign(std::get<0>(t), std::get<1>(t), std::get<2>(t));
} }
return !v.empty();
} }
void CSigSharesManager::Sign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash) void CSigSharesManager::Sign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash)

View File

@ -212,7 +212,7 @@ private:
bool PreVerifyBatchedSigShares(NodeId nodeId, const CBatchedSigShares& batchedSigShares, bool& retBan); bool PreVerifyBatchedSigShares(NodeId nodeId, const CBatchedSigShares& batchedSigShares, bool& retBan);
void CollectPendingSigSharesToVerify(size_t maxUniqueSessions, std::map<NodeId, std::vector<CSigShare>>& retSigShares, std::map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr>& retQuorums); void CollectPendingSigSharesToVerify(size_t maxUniqueSessions, std::map<NodeId, std::vector<CSigShare>>& retSigShares, std::map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr>& retQuorums);
void ProcessPendingSigShares(CConnman& connman); bool ProcessPendingSigShares(CConnman& connman);
void ProcessPendingSigSharesFromNode(NodeId nodeId, const std::vector<CSigShare>& sigShares, const std::map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr>& quorums, CConnman& connman); void ProcessPendingSigSharesFromNode(NodeId nodeId, const std::vector<CSigShare>& sigShares, const std::map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr>& quorums, CConnman& connman);
@ -226,11 +226,11 @@ private:
void BanNode(NodeId nodeId); void BanNode(NodeId nodeId);
void SendMessages(); bool SendMessages();
void CollectSigSharesToRequest(std::map<NodeId, std::map<uint256, CSigSharesInv>>& sigSharesToRequest); void CollectSigSharesToRequest(std::map<NodeId, std::map<uint256, CSigSharesInv>>& sigSharesToRequest);
void CollectSigSharesToSend(std::map<NodeId, std::map<uint256, CBatchedSigShares>>& sigSharesToSend); void CollectSigSharesToSend(std::map<NodeId, std::map<uint256, CBatchedSigShares>>& sigSharesToSend);
void CollectSigSharesToAnnounce(std::map<NodeId, std::map<uint256, CSigSharesInv>>& sigSharesToAnnounce); void CollectSigSharesToAnnounce(std::map<NodeId, std::map<uint256, CSigSharesInv>>& sigSharesToAnnounce);
void SignPendingSigShares(); bool SignPendingSigShares();
void WorkThreadMain(); void WorkThreadMain();
}; };