Merge #6148: feat: broadcast dsq messages using the inventory system

57fb0874ef feat: broadcast dsq messages using the inventory system (pasta)

Pull request description:

  ## Issue being fixed or feature implemented
  DSQ messages are 142 bytes.

  Previously, assuming a relatively highly connected masternode hosting 100 connection, each round of coinjoin will result in 14.2KB (100*142) of inbound and outbound traffic each.

  ## What was done?
  Now, using the inventory system, a message will first use 36 bytes per peer (sending and receiving), plus the size of a `getdata` message and the actual message itself. As a result, bandwidth usage for 1 round of mixing would be closer to 36 * 100 + 142 (dsq) + 36 (getdata) = ~3.8KB, a reduction of around ~73%

  ## How Has This Been Tested?
  Has not been; @UdjinM6 especially please review well :)

  ## Breaking Changes
  Does introduce a new protocol version, but in a backwards compatible way. I don't think this would need to be delayed to v22 for any reason.

  ## Checklist:
    _Go over all the following points, and put an `x` in all the boxes that apply._
  - [x] I have performed a self-review of my own code
  - [ ] I have commented my code, particularly in hard-to-understand areas
  - [ ] I have added or updated relevant unit/integration/functional/e2e tests
  - [ ] I have made corresponding changes to the documentation
  - [x] I have assigned this pull request to a milestone _(for repository code-owners and collaborators only)_

Top commit has no ACKs.

Tree-SHA512: 3dc39a339cba29d8cf207cec76ecace5ad0e11d1892ca0f65f9253a2b1d90313da21c6c178c2476756c5566ece0fab777006cd609b7984df906a9206c25d921d
This commit is contained in:
pasta 2024-09-10 10:12:27 -05:00
commit 1464e69058
No known key found for this signature in database
GPG Key ID: 52527BEDABE87984
14 changed files with 102 additions and 27 deletions

View File

@ -0,0 +1,6 @@
P2P and Network Changes
-----------------------
The DSQ message, starting in protocol version 70234, is broadcast using the inventory system, and not simply
relaying to all connected peers. This should reduce the bandwidth needs for all nodes, however, this affect will
be most noticeable on highly connected masternodes. (#6148)

View File

@ -12,6 +12,7 @@
#include <masternode/meta.h>
#include <masternode/sync.h>
#include <net.h>
#include <net_processing.h>
#include <netmessagemaker.h>
#include <shutdown.h>
#include <util/check.h>
@ -22,9 +23,9 @@
#include <util/translation.h>
#include <validation.h>
#include <version.h>
#include <walletinitinterface.h>
#include <wallet/coincontrol.h>
#include <wallet/fees.h>
#include <walletinitinterface.h>
#include <memory>
#include <univalue.h>
@ -47,6 +48,11 @@ PeerMsgRet CCoinJoinClientQueueManager::ProcessDSQueue(const CNode& peer, CDataS
CCoinJoinQueue dsq;
vRecv >> dsq;
{
LOCK(cs_main);
Assert(peerman)->EraseObjectRequest(peer.GetId(), CInv(MSG_DSQ, dsq.GetHash()));
}
if (dsq.masternodeOutpoint.IsNull() && dsq.m_protxHash.IsNull()) {
return tl::unexpected{100};
}
@ -126,7 +132,7 @@ PeerMsgRet CCoinJoinClientQueueManager::ProcessDSQueue(const CNode& peer, CDataS
WITH_LOCK(cs_vecqueue, vecCoinJoinQueue.push_back(dsq));
}
} // cs_ProcessDSQueue
dsq.Relay(connman);
dsq.Relay(connman, *peerman);
return {};
}

View File

@ -220,6 +220,7 @@ class CCoinJoinClientQueueManager : public CCoinJoinBaseManager
{
private:
CConnman& connman;
std::unique_ptr<PeerManager>& peerman;
CoinJoinWalletManager& m_walletman;
CDeterministicMNManager& m_dmnman;
CMasternodeMetaMan& m_mn_metaman;
@ -229,9 +230,17 @@ private:
const bool m_is_masternode;
public:
explicit CCoinJoinClientQueueManager(CConnman& _connman, CoinJoinWalletManager& walletman, CDeterministicMNManager& dmnman,
CMasternodeMetaMan& mn_metaman, const CMasternodeSync& mn_sync, bool is_masternode) :
connman(_connman), m_walletman(walletman), m_dmnman(dmnman), m_mn_metaman(mn_metaman), m_mn_sync(mn_sync), m_is_masternode{is_masternode} {};
explicit CCoinJoinClientQueueManager(CConnman& _connman, std::unique_ptr<PeerManager>& _peerman,
CoinJoinWalletManager& walletman, CDeterministicMNManager& dmnman,
CMasternodeMetaMan& mn_metaman, const CMasternodeSync& mn_sync,
bool is_masternode) :
connman(_connman),
peerman(_peerman),
m_walletman(walletman),
m_dmnman(dmnman),
m_mn_metaman(mn_metaman),
m_mn_sync(mn_sync),
m_is_masternode{is_masternode} {};
PeerMsgRet ProcessMessage(const CNode& peer, std::string_view msg_type, CDataStream& vRecv) EXCLUSIVE_LOCKS_REQUIRED(!cs_vecqueue);
PeerMsgRet ProcessDSQueue(const CNode& peer, CDataStream& vRecv);

View File

@ -14,6 +14,7 @@
#include <masternode/node.h>
#include <masternode/sync.h>
#include <messagesigner.h>
#include <net_processing.h>
#include <netmessagemaker.h>
#include <txmempool.h>
#include <util/moneystr.h>
@ -46,6 +47,7 @@ uint256 CCoinJoinQueue::GetSignatureHash() const
{
return SerializeHash(*this, SER_GETHASH, PROTOCOL_VERSION);
}
uint256 CCoinJoinQueue::GetHash() const { return SerializeHash(*this, SER_NETWORK, PROTOCOL_VERSION); }
bool CCoinJoinQueue::Sign(const CActiveMasternodeManager& mn_activeman)
{
@ -69,11 +71,13 @@ bool CCoinJoinQueue::CheckSignature(const CBLSPublicKey& blsPubKey) const
return true;
}
bool CCoinJoinQueue::Relay(CConnman& connman)
bool CCoinJoinQueue::Relay(CConnman& connman, PeerManager& peerman)
{
CInv inv(MSG_DSQ, GetHash());
peerman.RelayInv(inv, DSQ_INV_VERSION);
connman.ForEachNode([&connman, this](CNode* pnode) {
CNetMsgMaker msgMaker(pnode->GetCommonVersion());
if (pnode->fSendDSQueue) {
if (pnode->fSendDSQueue && pnode->nVersion < DSQ_INV_VERSION) {
connman.PushMessage(pnode, msgMaker.Make(NetMsgType::DSQUEUE, (*this)));
}
});

View File

@ -30,6 +30,7 @@ class CBlockIndex;
class CMasternodeSync;
class CTxMemPool;
class TxValidationState;
class PeerManager;
namespace llmq {
class CChainLocksHandler;
@ -206,6 +207,7 @@ public:
}
}
[[nodiscard]] uint256 GetHash() const;
[[nodiscard]] uint256 GetSignatureHash() const;
/** Sign this mixing transaction
* return true if all conditions are met:
@ -218,7 +220,7 @@ public:
/// Check if we have a valid Masternode address
[[nodiscard]] bool CheckSignature(const CBLSPublicKey& blsPubKey) const;
bool Relay(CConnman& connman);
bool Relay(CConnman& connman, PeerManager& peerman);
/// Check if a queue is too old or too far into the future
[[nodiscard]] bool IsTimeOutOfBounds(int64_t current_time = GetAdjustedTime()) const;
@ -340,6 +342,18 @@ public:
int GetQueueSize() const EXCLUSIVE_LOCKS_REQUIRED(!cs_vecqueue) { LOCK(cs_vecqueue); return vecCoinJoinQueue.size(); }
bool GetQueueItemAndTry(CCoinJoinQueue& dsqRet) EXCLUSIVE_LOCKS_REQUIRED(!cs_vecqueue);
bool HasQueue(const uint256& queueHash) EXCLUSIVE_LOCKS_REQUIRED(!cs_vecqueue)
{
LOCK(cs_vecqueue);
return std::any_of(vecCoinJoinQueue.begin(), vecCoinJoinQueue.end(),
[&queueHash](auto q) { return q.GetHash() == queueHash; });
}
std::optional<CCoinJoinQueue> GetQueueFromHash(const uint256& queueHash) EXCLUSIVE_LOCKS_REQUIRED(!cs_vecqueue)
{
LOCK(cs_vecqueue);
return ranges::find_if_opt(vecCoinJoinQueue, [&queueHash](const auto& q) { return q.GetHash() == queueHash; });
}
};
// Various helpers and dstx manager implementation

View File

@ -9,15 +9,21 @@
#endif // ENABLE_WALLET
#include <coinjoin/server.h>
CJContext::CJContext(CChainState& chainstate, CConnman& connman, CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_metaman,
CTxMemPool& mempool, const CActiveMasternodeManager* const mn_activeman, const CMasternodeSync& mn_sync,
const std::unique_ptr<PeerManager>& peerman, bool relay_txes) :
CJContext::CJContext(CChainState& chainstate, CConnman& connman, CDeterministicMNManager& dmnman,
CMasternodeMetaMan& mn_metaman, CTxMemPool& mempool,
const CActiveMasternodeManager* const mn_activeman, const CMasternodeSync& mn_sync,
std::unique_ptr<PeerManager>& peerman, bool relay_txes) :
dstxman{std::make_unique<CDSTXManager>()},
#ifdef ENABLE_WALLET
walletman{std::make_unique<CoinJoinWalletManager>(chainstate, connman, dmnman, mn_metaman, mempool, mn_sync, queueman, /* is_masternode = */ mn_activeman != nullptr)},
queueman {relay_txes ? std::make_unique<CCoinJoinClientQueueManager>(connman, *walletman, dmnman, mn_metaman, mn_sync, /* is_masternode = */ mn_activeman != nullptr) : nullptr},
walletman{std::make_unique<CoinJoinWalletManager>(chainstate, connman, dmnman, mn_metaman, mempool, mn_sync,
queueman, /* is_masternode = */ mn_activeman != nullptr)},
queueman{relay_txes
? std::make_unique<CCoinJoinClientQueueManager>(connman, peerman, *walletman, dmnman, mn_metaman,
mn_sync, /* is_masternode = */ mn_activeman != nullptr)
: nullptr},
#endif // ENABLE_WALLET
server{std::make_unique<CCoinJoinServer>(chainstate, connman, dmnman, *dstxman, mn_metaman, mempool, mn_activeman, mn_sync, peerman)}
server{std::make_unique<CCoinJoinServer>(chainstate, connman, dmnman, *dstxman, mn_metaman, mempool, mn_activeman,
mn_sync, peerman)}
{}
CJContext::~CJContext() {}

View File

@ -31,9 +31,9 @@ class CoinJoinWalletManager;
struct CJContext {
CJContext() = delete;
CJContext(const CJContext&) = delete;
CJContext(CChainState& chainstate, CConnman& connman, CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_metaman,
CTxMemPool& mempool, const CActiveMasternodeManager* const mn_activeman, const CMasternodeSync& mn_sync,
const std::unique_ptr<PeerManager>& peerman, bool relay_txes);
CJContext(CChainState& chainstate, CConnman& connman, CDeterministicMNManager& dmnman,
CMasternodeMetaMan& mn_metaman, CTxMemPool& mempool, const CActiveMasternodeManager* const mn_activeman,
const CMasternodeSync& mn_sync, std::unique_ptr<PeerManager>& peerman, bool relay_txes);
~CJContext();
const std::unique_ptr<CDSTXManager> dstxman;

View File

@ -117,6 +117,11 @@ PeerMsgRet CCoinJoinServer::ProcessDSQUEUE(const CNode& peer, CDataStream& vRecv
CCoinJoinQueue dsq;
vRecv >> dsq;
{
LOCK(cs_main);
Assert(m_peerman)->EraseObjectRequest(peer.GetId(), CInv(MSG_DSQ, dsq.GetHash()));
}
if (dsq.masternodeOutpoint.IsNull() && dsq.m_protxHash.IsNull()) {
return tl::unexpected{100};
}
@ -178,7 +183,7 @@ PeerMsgRet CCoinJoinServer::ProcessDSQUEUE(const CNode& peer, CDataStream& vRecv
TRY_LOCK(cs_vecqueue, lockRecv);
if (!lockRecv) return {};
vecCoinJoinQueue.push_back(dsq);
dsq.Relay(connman);
dsq.Relay(connman, *m_peerman);
}
return {};
}
@ -511,7 +516,7 @@ void CCoinJoinServer::CheckForCompleteQueue()
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CheckForCompleteQueue -- queue is ready, signing and relaying (%s) " /* Continued */
"with %d participants\n", dsq.ToString(), vecSessionCollaterals.size());
dsq.Sign(*m_mn_activeman);
dsq.Relay(connman);
dsq.Relay(connman, *m_peerman);
}
}
@ -724,7 +729,7 @@ bool CCoinJoinServer::CreateNewSession(const CCoinJoinAccept& dsa, PoolMessage&
GetAdjustedTime(), false);
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CreateNewSession -- signing and relaying new queue: %s\n", dsq.ToString());
dsq.Sign(*m_mn_activeman);
dsq.Relay(connman);
dsq.Relay(connman, *m_peerman);
LOCK(cs_vecqueue);
vecCoinJoinQueue.push_back(dsq);
}

View File

@ -35,7 +35,7 @@ private:
CTxMemPool& mempool;
const CActiveMasternodeManager* const m_mn_activeman;
const CMasternodeSync& m_mn_sync;
const std::unique_ptr<PeerManager>& m_peerman;
std::unique_ptr<PeerManager>& m_peerman;
// Mixing uses collateral transactions to trust parties entering the pool
// to behave honestly. If they don't it takes their money.
@ -90,9 +90,10 @@ private:
void SetNull() override EXCLUSIVE_LOCKS_REQUIRED(cs_coinjoin);
public:
explicit CCoinJoinServer(CChainState& chainstate, CConnman& _connman, CDeterministicMNManager& dmnman, CDSTXManager& dstxman,
CMasternodeMetaMan& mn_metaman, CTxMemPool& mempool, const CActiveMasternodeManager* const mn_activeman,
const CMasternodeSync& mn_sync, const std::unique_ptr<PeerManager>& peerman) :
explicit CCoinJoinServer(CChainState& chainstate, CConnman& _connman, CDeterministicMNManager& dmnman,
CDSTXManager& dstxman, CMasternodeMetaMan& mn_metaman, CTxMemPool& mempool,
const CActiveMasternodeManager* const mn_activeman, const CMasternodeSync& mn_sync,
std::unique_ptr<PeerManager>& peerman) :
m_chainstate(chainstate),
connman(_connman),
m_dmnman(dmnman),

View File

@ -2128,8 +2128,15 @@ bool PeerManagerImpl::AlreadyHave(const CInv& inv)
return m_llmq_ctx->clhandler->AlreadyHave(inv);
case MSG_ISDLOCK:
return m_llmq_ctx->isman->AlreadyHave(inv);
case MSG_DSQ:
#ifdef ENABLE_WALLET
return m_cj_ctx->server->HasQueue(inv.hash) || m_cj_ctx->queueman->HasQueue(inv.hash);
#else
return m_cj_ctx->server->HasQueue(inv.hash);
#endif
}
// Don't know what it is, just say we already got one
return true;
}
@ -2636,6 +2643,18 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic
push = true;
}
}
if (!push && inv.type == MSG_DSQ) {
auto opt_dsq = m_cj_ctx->server->GetQueueFromHash(inv.hash);
#ifdef ENABLE_WALLET
if (!opt_dsq.has_value()) {
opt_dsq = m_cj_ctx->queueman->GetQueueFromHash(inv.hash);
}
#endif
if (opt_dsq.has_value()) {
m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::DSQUEUE, *opt_dsq));
push = true;
}
}
if (!push) {
vNotFound.push_back(inv);

View File

@ -288,6 +288,7 @@ const char* CInv::GetCommandInternal() const
case MSG_QUORUM_RECOVERED_SIG: return NetMsgType::QSIGREC;
case MSG_CLSIG: return NetMsgType::CLSIG;
case MSG_ISDLOCK: return NetMsgType::ISDLOCK;
case MSG_DSQ: return NetMsgType::DSQUEUE;
default:
return nullptr;
}

View File

@ -518,6 +518,7 @@ enum GetDataMsg : uint32_t {
MSG_CLSIG = 29,
/* MSG_ISLOCK = 30, */ // Non-deterministic InstantSend and not used anymore
MSG_ISDLOCK = 31,
MSG_DSQ = 32,
};
/** inv message data */

View File

@ -11,7 +11,7 @@
*/
static const int PROTOCOL_VERSION = 70233;
static const int PROTOCOL_VERSION = 70234;
//! initial proto version, to be increased after version/verack negotiation
static const int INIT_PROTO_VERSION = 209;
@ -55,6 +55,9 @@ static const int MNLISTDIFF_CHAINLOCKS_PROTO_VERSION = 70230;
//! Legacy ISLOCK messages and a corresponding INV were dropped in this version
static const int NO_LEGACY_ISLOCK_PROTO_VERSION = 70231;
//! Inventory type for DSQ messages added
static const int DSQ_INV_VERSION = 70234;
// Make sure that none of the values above collide with `ADDRV2_FORMAT`.
#endif // BITCOIN_VERSION_H

View File

@ -87,10 +87,11 @@ EXPECTED_CIRCULAR_DEPENDENCIES=(
"evo/deterministicmns -> validationinterface -> evo/deterministicmns"
"logging -> util/system -> sync -> logging/timer -> logging"
"coinjoin/client -> net_processing -> coinjoin/client"
"coinjoin/client -> net_processing -> coinjoin/context -> coinjoin/client"
"coinjoin/context -> coinjoin/server -> net_processing -> coinjoin/context"
"coinjoin/server -> net_processing -> coinjoin/server"
"llmq/context -> llmq/ehf_signals -> net_processing -> llmq/context"
"coinjoin/client -> coinjoin/util -> wallet/wallet -> psbt -> node/transaction -> node/context -> coinjoin/context -> coinjoin/client"
"llmq/blockprocessor -> net_processing -> llmq/blockprocessor"
"llmq/chainlocks -> net_processing -> llmq/chainlocks"
"llmq/dkgsession -> net_processing -> llmq/quorums -> llmq/dkgsession"
@ -100,7 +101,6 @@ EXPECTED_CIRCULAR_DEPENDENCIES=(
"llmq/blockprocessor -> net_processing -> llmq/context -> llmq/blockprocessor"
"llmq/blockprocessor -> net_processing -> llmq/quorums -> llmq/blockprocessor"
"llmq/chainlocks -> net_processing -> llmq/context -> llmq/chainlocks"
"coinjoin/client -> coinjoin/coinjoin -> llmq/chainlocks -> net_processing -> coinjoin/client"
"rpc/blockchain -> rpc/server -> rpc/blockchain"
)