feat: broadcast dsq messages using the inventory system

DSQ messages are 142 bytes.

Previously, assuming a relatively highly connected masternode hosting 100 connection, each round of coinjoin will result in 14.2KB (100*142) of inbound and outbound traffic each.

Now, using the inventory system, a message will first use 36 bytes per peer (sending and receiving), plus the size of a `getdata` message and the actual message itself. As a result, bandwidth usage for 1 round of mixing would be closer to 36 * 100 + 142 (dsq) + 36 (getdata) = ~3.8KB, a reduction of around ~73%
This commit is contained in:
pasta 2024-07-23 22:59:05 -05:00
parent b38c4d3c1b
commit 57fb0874ef
No known key found for this signature in database
GPG Key ID: 52527BEDABE87984
14 changed files with 102 additions and 27 deletions

View File

@ -0,0 +1,6 @@
P2P and Network Changes
-----------------------
The DSQ message, starting in protocol version 70234, is broadcast using the inventory system, and not simply
relaying to all connected peers. This should reduce the bandwidth needs for all nodes, however, this affect will
be most noticeable on highly connected masternodes. (#6148)

View File

@ -12,6 +12,7 @@
#include <masternode/meta.h> #include <masternode/meta.h>
#include <masternode/sync.h> #include <masternode/sync.h>
#include <net.h> #include <net.h>
#include <net_processing.h>
#include <netmessagemaker.h> #include <netmessagemaker.h>
#include <shutdown.h> #include <shutdown.h>
#include <util/check.h> #include <util/check.h>
@ -22,9 +23,9 @@
#include <util/translation.h> #include <util/translation.h>
#include <validation.h> #include <validation.h>
#include <version.h> #include <version.h>
#include <walletinitinterface.h>
#include <wallet/coincontrol.h> #include <wallet/coincontrol.h>
#include <wallet/fees.h> #include <wallet/fees.h>
#include <walletinitinterface.h>
#include <memory> #include <memory>
#include <univalue.h> #include <univalue.h>
@ -47,6 +48,11 @@ PeerMsgRet CCoinJoinClientQueueManager::ProcessDSQueue(const CNode& peer, CDataS
CCoinJoinQueue dsq; CCoinJoinQueue dsq;
vRecv >> dsq; vRecv >> dsq;
{
LOCK(cs_main);
Assert(peerman)->EraseObjectRequest(peer.GetId(), CInv(MSG_DSQ, dsq.GetHash()));
}
if (dsq.masternodeOutpoint.IsNull() && dsq.m_protxHash.IsNull()) { if (dsq.masternodeOutpoint.IsNull() && dsq.m_protxHash.IsNull()) {
return tl::unexpected{100}; return tl::unexpected{100};
} }
@ -126,7 +132,7 @@ PeerMsgRet CCoinJoinClientQueueManager::ProcessDSQueue(const CNode& peer, CDataS
WITH_LOCK(cs_vecqueue, vecCoinJoinQueue.push_back(dsq)); WITH_LOCK(cs_vecqueue, vecCoinJoinQueue.push_back(dsq));
} }
} // cs_ProcessDSQueue } // cs_ProcessDSQueue
dsq.Relay(connman); dsq.Relay(connman, *peerman);
return {}; return {};
} }

View File

@ -220,6 +220,7 @@ class CCoinJoinClientQueueManager : public CCoinJoinBaseManager
{ {
private: private:
CConnman& connman; CConnman& connman;
std::unique_ptr<PeerManager>& peerman;
CoinJoinWalletManager& m_walletman; CoinJoinWalletManager& m_walletman;
CDeterministicMNManager& m_dmnman; CDeterministicMNManager& m_dmnman;
CMasternodeMetaMan& m_mn_metaman; CMasternodeMetaMan& m_mn_metaman;
@ -229,9 +230,17 @@ private:
const bool m_is_masternode; const bool m_is_masternode;
public: public:
explicit CCoinJoinClientQueueManager(CConnman& _connman, CoinJoinWalletManager& walletman, CDeterministicMNManager& dmnman, explicit CCoinJoinClientQueueManager(CConnman& _connman, std::unique_ptr<PeerManager>& _peerman,
CMasternodeMetaMan& mn_metaman, const CMasternodeSync& mn_sync, bool is_masternode) : CoinJoinWalletManager& walletman, CDeterministicMNManager& dmnman,
connman(_connman), m_walletman(walletman), m_dmnman(dmnman), m_mn_metaman(mn_metaman), m_mn_sync(mn_sync), m_is_masternode{is_masternode} {}; CMasternodeMetaMan& mn_metaman, const CMasternodeSync& mn_sync,
bool is_masternode) :
connman(_connman),
peerman(_peerman),
m_walletman(walletman),
m_dmnman(dmnman),
m_mn_metaman(mn_metaman),
m_mn_sync(mn_sync),
m_is_masternode{is_masternode} {};
PeerMsgRet ProcessMessage(const CNode& peer, std::string_view msg_type, CDataStream& vRecv) EXCLUSIVE_LOCKS_REQUIRED(!cs_vecqueue); PeerMsgRet ProcessMessage(const CNode& peer, std::string_view msg_type, CDataStream& vRecv) EXCLUSIVE_LOCKS_REQUIRED(!cs_vecqueue);
PeerMsgRet ProcessDSQueue(const CNode& peer, CDataStream& vRecv); PeerMsgRet ProcessDSQueue(const CNode& peer, CDataStream& vRecv);

View File

@ -14,6 +14,7 @@
#include <masternode/node.h> #include <masternode/node.h>
#include <masternode/sync.h> #include <masternode/sync.h>
#include <messagesigner.h> #include <messagesigner.h>
#include <net_processing.h>
#include <netmessagemaker.h> #include <netmessagemaker.h>
#include <txmempool.h> #include <txmempool.h>
#include <util/moneystr.h> #include <util/moneystr.h>
@ -46,6 +47,7 @@ uint256 CCoinJoinQueue::GetSignatureHash() const
{ {
return SerializeHash(*this, SER_GETHASH, PROTOCOL_VERSION); return SerializeHash(*this, SER_GETHASH, PROTOCOL_VERSION);
} }
uint256 CCoinJoinQueue::GetHash() const { return SerializeHash(*this, SER_NETWORK, PROTOCOL_VERSION); }
bool CCoinJoinQueue::Sign(const CActiveMasternodeManager& mn_activeman) bool CCoinJoinQueue::Sign(const CActiveMasternodeManager& mn_activeman)
{ {
@ -69,11 +71,13 @@ bool CCoinJoinQueue::CheckSignature(const CBLSPublicKey& blsPubKey) const
return true; return true;
} }
bool CCoinJoinQueue::Relay(CConnman& connman) bool CCoinJoinQueue::Relay(CConnman& connman, PeerManager& peerman)
{ {
CInv inv(MSG_DSQ, GetHash());
peerman.RelayInv(inv, DSQ_INV_VERSION);
connman.ForEachNode([&connman, this](CNode* pnode) { connman.ForEachNode([&connman, this](CNode* pnode) {
CNetMsgMaker msgMaker(pnode->GetCommonVersion()); CNetMsgMaker msgMaker(pnode->GetCommonVersion());
if (pnode->fSendDSQueue) { if (pnode->fSendDSQueue && pnode->nVersion < DSQ_INV_VERSION) {
connman.PushMessage(pnode, msgMaker.Make(NetMsgType::DSQUEUE, (*this))); connman.PushMessage(pnode, msgMaker.Make(NetMsgType::DSQUEUE, (*this)));
} }
}); });

View File

@ -30,6 +30,7 @@ class CBlockIndex;
class CMasternodeSync; class CMasternodeSync;
class CTxMemPool; class CTxMemPool;
class TxValidationState; class TxValidationState;
class PeerManager;
namespace llmq { namespace llmq {
class CChainLocksHandler; class CChainLocksHandler;
@ -206,6 +207,7 @@ public:
} }
} }
[[nodiscard]] uint256 GetHash() const;
[[nodiscard]] uint256 GetSignatureHash() const; [[nodiscard]] uint256 GetSignatureHash() const;
/** Sign this mixing transaction /** Sign this mixing transaction
* return true if all conditions are met: * return true if all conditions are met:
@ -218,7 +220,7 @@ public:
/// Check if we have a valid Masternode address /// Check if we have a valid Masternode address
[[nodiscard]] bool CheckSignature(const CBLSPublicKey& blsPubKey) const; [[nodiscard]] bool CheckSignature(const CBLSPublicKey& blsPubKey) const;
bool Relay(CConnman& connman); bool Relay(CConnman& connman, PeerManager& peerman);
/// Check if a queue is too old or too far into the future /// Check if a queue is too old or too far into the future
[[nodiscard]] bool IsTimeOutOfBounds(int64_t current_time = GetAdjustedTime()) const; [[nodiscard]] bool IsTimeOutOfBounds(int64_t current_time = GetAdjustedTime()) const;
@ -340,6 +342,18 @@ public:
int GetQueueSize() const EXCLUSIVE_LOCKS_REQUIRED(!cs_vecqueue) { LOCK(cs_vecqueue); return vecCoinJoinQueue.size(); } int GetQueueSize() const EXCLUSIVE_LOCKS_REQUIRED(!cs_vecqueue) { LOCK(cs_vecqueue); return vecCoinJoinQueue.size(); }
bool GetQueueItemAndTry(CCoinJoinQueue& dsqRet) EXCLUSIVE_LOCKS_REQUIRED(!cs_vecqueue); bool GetQueueItemAndTry(CCoinJoinQueue& dsqRet) EXCLUSIVE_LOCKS_REQUIRED(!cs_vecqueue);
bool HasQueue(const uint256& queueHash) EXCLUSIVE_LOCKS_REQUIRED(!cs_vecqueue)
{
LOCK(cs_vecqueue);
return std::any_of(vecCoinJoinQueue.begin(), vecCoinJoinQueue.end(),
[&queueHash](auto q) { return q.GetHash() == queueHash; });
}
std::optional<CCoinJoinQueue> GetQueueFromHash(const uint256& queueHash) EXCLUSIVE_LOCKS_REQUIRED(!cs_vecqueue)
{
LOCK(cs_vecqueue);
return ranges::find_if_opt(vecCoinJoinQueue, [&queueHash](const auto& q) { return q.GetHash() == queueHash; });
}
}; };
// Various helpers and dstx manager implementation // Various helpers and dstx manager implementation

View File

@ -9,15 +9,21 @@
#endif // ENABLE_WALLET #endif // ENABLE_WALLET
#include <coinjoin/server.h> #include <coinjoin/server.h>
CJContext::CJContext(CChainState& chainstate, CConnman& connman, CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_metaman, CJContext::CJContext(CChainState& chainstate, CConnman& connman, CDeterministicMNManager& dmnman,
CTxMemPool& mempool, const CActiveMasternodeManager* const mn_activeman, const CMasternodeSync& mn_sync, CMasternodeMetaMan& mn_metaman, CTxMemPool& mempool,
const std::unique_ptr<PeerManager>& peerman, bool relay_txes) : const CActiveMasternodeManager* const mn_activeman, const CMasternodeSync& mn_sync,
std::unique_ptr<PeerManager>& peerman, bool relay_txes) :
dstxman{std::make_unique<CDSTXManager>()}, dstxman{std::make_unique<CDSTXManager>()},
#ifdef ENABLE_WALLET #ifdef ENABLE_WALLET
walletman{std::make_unique<CoinJoinWalletManager>(chainstate, connman, dmnman, mn_metaman, mempool, mn_sync, queueman, /* is_masternode = */ mn_activeman != nullptr)}, walletman{std::make_unique<CoinJoinWalletManager>(chainstate, connman, dmnman, mn_metaman, mempool, mn_sync,
queueman {relay_txes ? std::make_unique<CCoinJoinClientQueueManager>(connman, *walletman, dmnman, mn_metaman, mn_sync, /* is_masternode = */ mn_activeman != nullptr) : nullptr}, queueman, /* is_masternode = */ mn_activeman != nullptr)},
queueman{relay_txes
? std::make_unique<CCoinJoinClientQueueManager>(connman, peerman, *walletman, dmnman, mn_metaman,
mn_sync, /* is_masternode = */ mn_activeman != nullptr)
: nullptr},
#endif // ENABLE_WALLET #endif // ENABLE_WALLET
server{std::make_unique<CCoinJoinServer>(chainstate, connman, dmnman, *dstxman, mn_metaman, mempool, mn_activeman, mn_sync, peerman)} server{std::make_unique<CCoinJoinServer>(chainstate, connman, dmnman, *dstxman, mn_metaman, mempool, mn_activeman,
mn_sync, peerman)}
{} {}
CJContext::~CJContext() {} CJContext::~CJContext() {}

View File

@ -31,9 +31,9 @@ class CoinJoinWalletManager;
struct CJContext { struct CJContext {
CJContext() = delete; CJContext() = delete;
CJContext(const CJContext&) = delete; CJContext(const CJContext&) = delete;
CJContext(CChainState& chainstate, CConnman& connman, CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_metaman, CJContext(CChainState& chainstate, CConnman& connman, CDeterministicMNManager& dmnman,
CTxMemPool& mempool, const CActiveMasternodeManager* const mn_activeman, const CMasternodeSync& mn_sync, CMasternodeMetaMan& mn_metaman, CTxMemPool& mempool, const CActiveMasternodeManager* const mn_activeman,
const std::unique_ptr<PeerManager>& peerman, bool relay_txes); const CMasternodeSync& mn_sync, std::unique_ptr<PeerManager>& peerman, bool relay_txes);
~CJContext(); ~CJContext();
const std::unique_ptr<CDSTXManager> dstxman; const std::unique_ptr<CDSTXManager> dstxman;

View File

@ -117,6 +117,11 @@ PeerMsgRet CCoinJoinServer::ProcessDSQUEUE(const CNode& peer, CDataStream& vRecv
CCoinJoinQueue dsq; CCoinJoinQueue dsq;
vRecv >> dsq; vRecv >> dsq;
{
LOCK(cs_main);
Assert(m_peerman)->EraseObjectRequest(peer.GetId(), CInv(MSG_DSQ, dsq.GetHash()));
}
if (dsq.masternodeOutpoint.IsNull() && dsq.m_protxHash.IsNull()) { if (dsq.masternodeOutpoint.IsNull() && dsq.m_protxHash.IsNull()) {
return tl::unexpected{100}; return tl::unexpected{100};
} }
@ -178,7 +183,7 @@ PeerMsgRet CCoinJoinServer::ProcessDSQUEUE(const CNode& peer, CDataStream& vRecv
TRY_LOCK(cs_vecqueue, lockRecv); TRY_LOCK(cs_vecqueue, lockRecv);
if (!lockRecv) return {}; if (!lockRecv) return {};
vecCoinJoinQueue.push_back(dsq); vecCoinJoinQueue.push_back(dsq);
dsq.Relay(connman); dsq.Relay(connman, *m_peerman);
} }
return {}; return {};
} }
@ -511,7 +516,7 @@ void CCoinJoinServer::CheckForCompleteQueue()
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CheckForCompleteQueue -- queue is ready, signing and relaying (%s) " /* Continued */ LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CheckForCompleteQueue -- queue is ready, signing and relaying (%s) " /* Continued */
"with %d participants\n", dsq.ToString(), vecSessionCollaterals.size()); "with %d participants\n", dsq.ToString(), vecSessionCollaterals.size());
dsq.Sign(*m_mn_activeman); dsq.Sign(*m_mn_activeman);
dsq.Relay(connman); dsq.Relay(connman, *m_peerman);
} }
} }
@ -724,7 +729,7 @@ bool CCoinJoinServer::CreateNewSession(const CCoinJoinAccept& dsa, PoolMessage&
GetAdjustedTime(), false); GetAdjustedTime(), false);
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CreateNewSession -- signing and relaying new queue: %s\n", dsq.ToString()); LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CreateNewSession -- signing and relaying new queue: %s\n", dsq.ToString());
dsq.Sign(*m_mn_activeman); dsq.Sign(*m_mn_activeman);
dsq.Relay(connman); dsq.Relay(connman, *m_peerman);
LOCK(cs_vecqueue); LOCK(cs_vecqueue);
vecCoinJoinQueue.push_back(dsq); vecCoinJoinQueue.push_back(dsq);
} }

View File

@ -35,7 +35,7 @@ private:
CTxMemPool& mempool; CTxMemPool& mempool;
const CActiveMasternodeManager* const m_mn_activeman; const CActiveMasternodeManager* const m_mn_activeman;
const CMasternodeSync& m_mn_sync; const CMasternodeSync& m_mn_sync;
const std::unique_ptr<PeerManager>& m_peerman; std::unique_ptr<PeerManager>& m_peerman;
// Mixing uses collateral transactions to trust parties entering the pool // Mixing uses collateral transactions to trust parties entering the pool
// to behave honestly. If they don't it takes their money. // to behave honestly. If they don't it takes their money.
@ -90,9 +90,10 @@ private:
void SetNull() override EXCLUSIVE_LOCKS_REQUIRED(cs_coinjoin); void SetNull() override EXCLUSIVE_LOCKS_REQUIRED(cs_coinjoin);
public: public:
explicit CCoinJoinServer(CChainState& chainstate, CConnman& _connman, CDeterministicMNManager& dmnman, CDSTXManager& dstxman, explicit CCoinJoinServer(CChainState& chainstate, CConnman& _connman, CDeterministicMNManager& dmnman,
CMasternodeMetaMan& mn_metaman, CTxMemPool& mempool, const CActiveMasternodeManager* const mn_activeman, CDSTXManager& dstxman, CMasternodeMetaMan& mn_metaman, CTxMemPool& mempool,
const CMasternodeSync& mn_sync, const std::unique_ptr<PeerManager>& peerman) : const CActiveMasternodeManager* const mn_activeman, const CMasternodeSync& mn_sync,
std::unique_ptr<PeerManager>& peerman) :
m_chainstate(chainstate), m_chainstate(chainstate),
connman(_connman), connman(_connman),
m_dmnman(dmnman), m_dmnman(dmnman),

View File

@ -2127,8 +2127,15 @@ bool PeerManagerImpl::AlreadyHave(const CInv& inv)
return m_llmq_ctx->clhandler->AlreadyHave(inv); return m_llmq_ctx->clhandler->AlreadyHave(inv);
case MSG_ISDLOCK: case MSG_ISDLOCK:
return m_llmq_ctx->isman->AlreadyHave(inv); return m_llmq_ctx->isman->AlreadyHave(inv);
case MSG_DSQ:
#ifdef ENABLE_WALLET
return m_cj_ctx->server->HasQueue(inv.hash) || m_cj_ctx->queueman->HasQueue(inv.hash);
#else
return m_cj_ctx->server->HasQueue(inv.hash);
#endif
} }
// Don't know what it is, just say we already got one // Don't know what it is, just say we already got one
return true; return true;
} }
@ -2635,6 +2642,18 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic
push = true; push = true;
} }
} }
if (!push && inv.type == MSG_DSQ) {
auto opt_dsq = m_cj_ctx->server->GetQueueFromHash(inv.hash);
#ifdef ENABLE_WALLET
if (!opt_dsq.has_value()) {
opt_dsq = m_cj_ctx->queueman->GetQueueFromHash(inv.hash);
}
#endif
if (opt_dsq.has_value()) {
m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::DSQUEUE, *opt_dsq));
push = true;
}
}
if (!push) { if (!push) {
vNotFound.push_back(inv); vNotFound.push_back(inv);

View File

@ -288,6 +288,7 @@ const char* CInv::GetCommandInternal() const
case MSG_QUORUM_RECOVERED_SIG: return NetMsgType::QSIGREC; case MSG_QUORUM_RECOVERED_SIG: return NetMsgType::QSIGREC;
case MSG_CLSIG: return NetMsgType::CLSIG; case MSG_CLSIG: return NetMsgType::CLSIG;
case MSG_ISDLOCK: return NetMsgType::ISDLOCK; case MSG_ISDLOCK: return NetMsgType::ISDLOCK;
case MSG_DSQ: return NetMsgType::DSQUEUE;
default: default:
return nullptr; return nullptr;
} }

View File

@ -518,6 +518,7 @@ enum GetDataMsg : uint32_t {
MSG_CLSIG = 29, MSG_CLSIG = 29,
/* MSG_ISLOCK = 30, */ // Non-deterministic InstantSend and not used anymore /* MSG_ISLOCK = 30, */ // Non-deterministic InstantSend and not used anymore
MSG_ISDLOCK = 31, MSG_ISDLOCK = 31,
MSG_DSQ = 32,
}; };
/** inv message data */ /** inv message data */

View File

@ -11,7 +11,7 @@
*/ */
static const int PROTOCOL_VERSION = 70233; static const int PROTOCOL_VERSION = 70234;
//! initial proto version, to be increased after version/verack negotiation //! initial proto version, to be increased after version/verack negotiation
static const int INIT_PROTO_VERSION = 209; static const int INIT_PROTO_VERSION = 209;
@ -55,6 +55,9 @@ static const int MNLISTDIFF_CHAINLOCKS_PROTO_VERSION = 70230;
//! Legacy ISLOCK messages and a corresponding INV were dropped in this version //! Legacy ISLOCK messages and a corresponding INV were dropped in this version
static const int NO_LEGACY_ISLOCK_PROTO_VERSION = 70231; static const int NO_LEGACY_ISLOCK_PROTO_VERSION = 70231;
//! Inventory type for DSQ messages added
static const int DSQ_INV_VERSION = 70234;
// Make sure that none of the values above collide with `ADDRV2_FORMAT`. // Make sure that none of the values above collide with `ADDRV2_FORMAT`.
#endif // BITCOIN_VERSION_H #endif // BITCOIN_VERSION_H

View File

@ -87,10 +87,11 @@ EXPECTED_CIRCULAR_DEPENDENCIES=(
"evo/deterministicmns -> validationinterface -> evo/deterministicmns" "evo/deterministicmns -> validationinterface -> evo/deterministicmns"
"logging -> util/system -> sync -> logging/timer -> logging" "logging -> util/system -> sync -> logging/timer -> logging"
"coinjoin/client -> net_processing -> coinjoin/client"
"coinjoin/client -> net_processing -> coinjoin/context -> coinjoin/client"
"coinjoin/context -> coinjoin/server -> net_processing -> coinjoin/context" "coinjoin/context -> coinjoin/server -> net_processing -> coinjoin/context"
"coinjoin/server -> net_processing -> coinjoin/server" "coinjoin/server -> net_processing -> coinjoin/server"
"llmq/context -> llmq/ehf_signals -> net_processing -> llmq/context" "llmq/context -> llmq/ehf_signals -> net_processing -> llmq/context"
"coinjoin/client -> coinjoin/util -> wallet/wallet -> psbt -> node/transaction -> node/context -> coinjoin/context -> coinjoin/client"
"llmq/blockprocessor -> net_processing -> llmq/blockprocessor" "llmq/blockprocessor -> net_processing -> llmq/blockprocessor"
"llmq/chainlocks -> net_processing -> llmq/chainlocks" "llmq/chainlocks -> net_processing -> llmq/chainlocks"
"llmq/dkgsession -> net_processing -> llmq/quorums -> llmq/dkgsession" "llmq/dkgsession -> net_processing -> llmq/quorums -> llmq/dkgsession"
@ -100,7 +101,6 @@ EXPECTED_CIRCULAR_DEPENDENCIES=(
"llmq/blockprocessor -> net_processing -> llmq/context -> llmq/blockprocessor" "llmq/blockprocessor -> net_processing -> llmq/context -> llmq/blockprocessor"
"llmq/blockprocessor -> net_processing -> llmq/quorums -> llmq/blockprocessor" "llmq/blockprocessor -> net_processing -> llmq/quorums -> llmq/blockprocessor"
"llmq/chainlocks -> net_processing -> llmq/context -> llmq/chainlocks" "llmq/chainlocks -> net_processing -> llmq/context -> llmq/chainlocks"
"coinjoin/client -> coinjoin/coinjoin -> llmq/chainlocks -> net_processing -> coinjoin/client"
"rpc/blockchain -> rpc/server -> rpc/blockchain" "rpc/blockchain -> rpc/server -> rpc/blockchain"
) )