partial bitcoin#26036: add NetEventsInterface::g_msgproc_mutex

This backport excludes annotations for members introduced in
bitcoin#25717 as it hasn't been backported yet.
This commit is contained in:
Kittywhiskers Van Gogh 2022-09-07 13:57:18 +10:00
parent f6c943922f
commit 2ecba6ba5f
No known key found for this signature in database
GPG Key ID: 30CD0C065E5C4AAD
11 changed files with 77 additions and 86 deletions

View File

@ -3142,8 +3142,12 @@ void CConnman::OpenMasternodeConnection(const CAddress &addrConnect, MasternodeP
OpenNetworkConnection(addrConnect, false, nullptr, nullptr, ConnectionType::OUTBOUND_FULL_RELAY, MasternodeConn::IsConnection, probe);
}
Mutex NetEventsInterface::g_msgproc_mutex;
void CConnman::ThreadMessageHandler()
{
LOCK(NetEventsInterface::g_msgproc_mutex);
int64_t nLastSendMessagesTimeMasternodes = 0;
FastRandomContext rng;
@ -3173,7 +3177,6 @@ void CConnman::ThreadMessageHandler()
return;
// Send messages
if (!fSkipSendMessagesForMasternodes || !pnode->m_masternode_connection) {
LOCK(pnode->cs_sendProcessing);
m_msgproc->SendMessages(pnode);
}

View File

@ -481,8 +481,6 @@ public:
std::list<CNetMessage> vProcessMsg GUARDED_BY(cs_vProcessMsg);
size_t nProcessQueueSize GUARDED_BY(cs_vProcessMsg){0};
RecursiveMutex cs_sendProcessing;
uint64_t nRecvBytes GUARDED_BY(cs_vRecv){0};
std::atomic<std::chrono::seconds> m_last_send{0s};
@ -816,6 +814,9 @@ private:
class NetEventsInterface
{
public:
/** Mutex for anything that is only accessed via the msg processing thread */
static Mutex g_msgproc_mutex;
/** Initialize a peer (setup state, queue any initial messages) */
virtual void InitializeNode(CNode& node, ServiceFlags our_services) = 0;
@ -829,7 +830,7 @@ public:
* @param[in] interrupt Interrupt condition for processing threads
* @return True if there is more work to be done
*/
virtual bool ProcessMessages(CNode* pnode, std::atomic<bool>& interrupt) = 0;
virtual bool ProcessMessages(CNode* pnode, std::atomic<bool>& interrupt) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0;
/**
* Send queued protocol messages to a given node.
@ -837,7 +838,7 @@ public:
* @param[in] pnode The node which we are sending messages to.
* @return True if there is more work to be done
*/
virtual bool SendMessages(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(pnode->cs_sendProcessing) = 0;
virtual bool SendMessages(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0;
protected:

View File

@ -286,7 +286,7 @@ struct Peer {
bool m_send_mempool GUARDED_BY(m_tx_inventory_mutex){false};
// Last time a "MEMPOOL" request was serviced.
std::atomic<std::chrono::seconds> m_last_mempool_req{0s};
std::chrono::microseconds m_next_inv_send_time{0};
std::chrono::microseconds m_next_inv_send_time GUARDED_BY(NetEventsInterface::g_msgproc_mutex){0};
};
// in bitcoin: m_tx_relay == nullptr if we're not relaying transactions with this peer
@ -295,7 +295,7 @@ struct Peer {
std::unique_ptr<TxRelay> m_tx_relay{std::make_unique<TxRelay>()};
/** A vector of addresses to send to the peer, limited to MAX_ADDR_TO_SEND. */
std::vector<CAddress> m_addrs_to_send;
std::vector<CAddress> m_addrs_to_send GUARDED_BY(NetEventsInterface::g_msgproc_mutex);
/** Probabilistic filter to track recent addr messages relayed with this
* peer. Used to avoid relaying redundant addresses to this peer.
*
@ -305,7 +305,7 @@ struct Peer {
*
* Presence of this filter must correlate with m_addr_relay_enabled.
**/
std::unique_ptr<CRollingBloomFilter> m_addr_known;
std::unique_ptr<CRollingBloomFilter> m_addr_known GUARDED_BY(NetEventsInterface::g_msgproc_mutex);
/** Whether we are participating in address relay with this connection.
*
* We set this bool to true for outbound peers (other than
@ -324,7 +324,7 @@ struct Peer {
/** Whether a Peer can only be relayed blocks */
const bool m_block_relay_only{false};
/** Whether a getaddr request to this peer is outstanding. */
bool m_getaddr_sent{false};
bool m_getaddr_sent GUARDED_BY(NetEventsInterface::g_msgproc_mutex){false};
/** Guards address sending timers. */
mutable Mutex m_addr_send_times_mutex;
/** Time point to send the next ADDR message to this peer. */
@ -335,12 +335,12 @@ struct Peer {
* messages, indicating a preference to receive ADDRv2 instead of ADDR ones. */
std::atomic_bool m_wants_addrv2{false};
/** Whether this peer has already sent us a getaddr message. */
bool m_getaddr_recvd{false};
bool m_getaddr_recvd GUARDED_BY(NetEventsInterface::g_msgproc_mutex){false};
/** Number of addresses that can be processed from this peer. Start at 1 to
* permit self-announcement. */
double m_addr_token_bucket{1.0};
double m_addr_token_bucket GUARDED_BY(NetEventsInterface::g_msgproc_mutex){1.0};
/** When m_addr_token_bucket was last updated */
std::chrono::microseconds m_addr_token_timestamp{GetTime<std::chrono::microseconds>()};
std::chrono::microseconds m_addr_token_timestamp GUARDED_BY(NetEventsInterface::g_msgproc_mutex){GetTime<std::chrono::microseconds>()};
/** Total number of addresses that were dropped due to rate limiting. */
std::atomic<uint64_t> m_addr_rate_limited{0};
/** Total number of addresses that were processed (excludes rate-limited ones). */
@ -350,7 +350,7 @@ struct Peer {
std::set<uint256> m_orphan_work_set GUARDED_BY(g_cs_orphans);
/** Whether we've sent this peer a getheaders in response to an inv prior to initial-headers-sync completing */
bool m_inv_triggered_getheaders_before_sync{false};
bool m_inv_triggered_getheaders_before_sync GUARDED_BY(NetEventsInterface::g_msgproc_mutex){false};
/** Protects m_getdata_requests **/
Mutex m_getdata_requests_mutex;
@ -358,7 +358,7 @@ struct Peer {
std::deque<CInv> m_getdata_requests GUARDED_BY(m_getdata_requests_mutex);
/** Time of the last getheaders message to this peer */
std::atomic<std::chrono::seconds> m_last_getheaders_timestamp{0s};
std::atomic<std::chrono::seconds> m_last_getheaders_timestamp GUARDED_BY(NetEventsInterface::g_msgproc_mutex){0s};
explicit Peer(NodeId id, ServiceFlags our_services, bool block_relay_only)
: m_id(id)
@ -549,9 +549,9 @@ public:
void InitializeNode(CNode& node, ServiceFlags our_services) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void FinalizeNode(const CNode& node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
bool ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt) override
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex);
bool SendMessages(CNode* pto) override EXCLUSIVE_LOCKS_REQUIRED(pto->cs_sendProcessing)
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex);
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, g_msgproc_mutex);
bool SendMessages(CNode* pto) override
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, g_msgproc_mutex);
/** Implement PeerManager */
void StartScheduledTasks(CScheduler& scheduler) override;
@ -570,7 +570,7 @@ public:
void Misbehaving(const NodeId pnode, const int howmuch, const std::string& message = "") override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv,
const std::chrono::microseconds time_received, const std::atomic<bool>& interruptMsgProc) override
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex);
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, g_msgproc_mutex);
void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) override;
bool IsBanned(NodeId pnode) override EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_peer_mutex);
void EraseObjectRequest(NodeId nodeid, const CInv& inv) override EXCLUSIVE_LOCKS_REQUIRED(::cs_main);
@ -584,7 +584,7 @@ private:
void ProcessPeerMsgRet(const PeerMsgRet& ret, CNode& pfrom) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
/** Consider evicting an outbound peer based on the amount of time they've been behind our tip */
void ConsiderEviction(CNode& pto, Peer& peer, std::chrono::seconds time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
void ConsiderEviction(CNode& pto, Peer& peer, std::chrono::seconds time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_msgproc_mutex);
/** If we have extra outbound peers, try to disconnect the one with the oldest block announcement */
void EvictExtraOutboundPeers(std::chrono::seconds now) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
@ -638,20 +638,21 @@ private:
void ProcessHeadersMessage(CNode& pfrom, Peer& peer,
const std::vector<CBlockHeader>& headers,
bool via_compact_block)
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex);
/** Various helpers for headers processing, invoked by ProcessHeadersMessage() */
/** Deal with state tracking and headers sync for peers that send the
* occasional non-connecting header (this can happen due to BIP 130 headers
* announcements for blocks interacting with the 2hr (MAX_FUTURE_BLOCK_TIME) rule). */
void HandleFewUnconnectingHeaders(CNode& pfrom, Peer& peer, const std::vector<CBlockHeader>& headers)
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex);
/** Return true if the headers connect to each other, false otherwise */
bool CheckHeadersAreContinuous(const std::vector<CBlockHeader>& headers) const;
/** Request further headers from this peer with a given locator.
* We don't issue a getheaders message if we have a recent one outstanding.
* This returns true if a getheaders is actually sent, and false otherwise.
*/
bool MaybeSendGetHeaders(CNode& pfrom, const std::string& msg_type, const CBlockLocator& locator, Peer& peer);
bool MaybeSendGetHeaders(CNode& pfrom, const std::string& msg_type, const CBlockLocator& locator, Peer& peer)
EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
/** Potentially fetch blocks from this peer upon receipt of a new headers tip */
void HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, const CBlockIndex* pindexLast);
/** Update peer state based on received headers message */
@ -670,7 +671,8 @@ private:
void MaybeSendPing(CNode& node_to, Peer& peer, std::chrono::microseconds now);
/** Send `addr` messages on a regular schedule. */
void MaybeSendAddr(CNode& node, Peer& peer, std::chrono::microseconds current_time);
void MaybeSendAddr(CNode& node, Peer& peer, std::chrono::microseconds current_time)
EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
/** Relay (gossip) an address to a few randomly chosen nodes.
*
@ -679,7 +681,8 @@ private:
* @param[in] fReachable Whether the address' network is reachable. We relay unreachable
* addresses less.
*/
void RelayAddress(NodeId originator, const CAddress& addr, bool fReachable) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void RelayAddress(NodeId originator, const CAddress& addr, bool fReachable)
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex);
const CChainParams& m_chainparams;
CConnman& m_connman;
@ -807,13 +810,16 @@ private:
* @return True if address relay is enabled with peer
* False if address relay is disallowed
*/
bool SetupAddressRelay(const CNode& node, Peer& peer);
bool SetupAddressRelay(const CNode& node, Peer& peer) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
void AddAddressKnown(Peer& peer, const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
void PushAddress(Peer& peer, const CAddress& addr, FastRandomContext& insecure_rand) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
/** Number of nodes with fSyncStarted. */
int nSyncStarted GUARDED_BY(cs_main) = 0;
/** Hash of the last block we received via INV */
uint256 m_last_block_inv_triggering_headers_sync{};
uint256 m_last_block_inv_triggering_headers_sync GUARDED_BY(g_msgproc_mutex){};
/**
* Sources of received blocks, saved to be able punish them when processing
@ -889,7 +895,7 @@ private:
uint256 m_most_recent_block_hash GUARDED_BY(m_most_recent_block_mutex);
/** Height of the highest block announced using BIP 152 high-bandwidth mode. */
int m_highest_fast_announce{0};
int m_highest_fast_announce GUARDED_BY(::cs_main){0};
/* Returns a bool indicating whether we requested this block.
* Also used if a block was /not/ received and timed out or started with another peer
@ -981,13 +987,13 @@ static bool IsAddrCompatible(const Peer& peer, const CAddress& addr)
return peer.m_wants_addrv2 || addr.IsAddrV1Compatible();
}
static void AddAddressKnown(Peer& peer, const CAddress& addr)
void PeerManagerImpl::AddAddressKnown(Peer& peer, const CAddress& addr)
{
assert(peer.m_addr_known);
peer.m_addr_known->insert(addr.GetKey());
}
static void PushAddress(Peer& peer, const CAddress& addr, FastRandomContext& insecure_rand)
void PeerManagerImpl::PushAddress(Peer& peer, const CAddress& addr, FastRandomContext& insecure_rand)
{
// Known checking here is only to save space from duplicates.
// Before sending, we'll filter it again for known addresses that were
@ -3250,6 +3256,8 @@ void PeerManagerImpl::ProcessMessage(
const std::chrono::microseconds time_received,
const std::atomic<bool>& interruptMsgProc)
{
AssertLockHeld(g_msgproc_mutex);
LogPrint(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(msg_type), vRecv.size(), pfrom.GetId());
statsClient.inc("message.received." + SanitizeString(msg_type), 1.0f);
@ -4971,6 +4979,8 @@ bool PeerManagerImpl::MaybeDiscourageAndDisconnect(CNode& pnode, Peer& peer)
bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgProc)
{
AssertLockHeld(g_msgproc_mutex);
bool fMoreWork = false;
PeerRef peer = GetPeerRef(pfrom->GetId());
@ -5313,7 +5323,7 @@ void PeerManagerImpl::MaybeSendAddr(CNode& node, Peer& peer, std::chrono::micros
// Remove addr records that the peer already knows about, and add new
// addrs to the m_addr_known filter on the same pass.
auto addr_already_known = [&peer](const CAddress& addr) {
auto addr_already_known = [&peer](const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) {
bool ret = peer.m_addr_known->contains(addr.GetKey());
if (!ret) peer.m_addr_known->insert(addr.GetKey());
return ret;
@ -5379,6 +5389,8 @@ bool PeerManagerImpl::SetupAddressRelay(const CNode& node, Peer& peer)
bool PeerManagerImpl::SendMessages(CNode* pto)
{
AssertLockHeld(g_msgproc_mutex);
assert(m_llmq_ctx);
const bool is_masternode = m_mn_activeman != nullptr;

View File

@ -125,7 +125,7 @@ public:
/** Process a single message from a peer. Public for fuzz testing */
virtual void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv,
const std::chrono::microseconds time_received, const std::atomic<bool>& interruptMsgProc) = 0;
const std::chrono::microseconds time_received, const std::atomic<bool>& interruptMsgProc) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0;
/** This function is used for testing the stale tip eviction logic, see denialofservice_tests.cpp */
virtual void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) = 0;

View File

@ -46,6 +46,8 @@ BOOST_FIXTURE_TEST_SUITE(denialofservice_tests, TestingSetup)
// work.
BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction)
{
LOCK(NetEventsInterface::g_msgproc_mutex);
ConnmanTestMsg& connman = static_cast<ConnmanTestMsg&>(*m_node.connman);
// Disable inactivity checks for this test to avoid interference
connman.SetPeerConnectTimeout(99999s);
@ -82,10 +84,8 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction)
}
// Test starts here
{
LOCK(dummyNode1.cs_sendProcessing);
BOOST_CHECK(peerman.SendMessages(&dummyNode1)); // should result in getheaders
}
BOOST_CHECK(peerman.SendMessages(&dummyNode1)); // should result in getheaders
{
LOCK(dummyNode1.cs_vSend);
BOOST_CHECK(dummyNode1.vSendMsg.size() > 0);
@ -96,20 +96,14 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction)
int64_t nStartTime = GetTime();
// Wait 21 minutes
SetMockTime(nStartTime+21*60);
{
LOCK(dummyNode1.cs_sendProcessing);
BOOST_CHECK(peerman.SendMessages(&dummyNode1)); // should result in getheaders
}
BOOST_CHECK(peerman.SendMessages(&dummyNode1)); // should result in getheaders
{
LOCK(dummyNode1.cs_vSend);
BOOST_CHECK(dummyNode1.vSendMsg.size() > 0);
}
// Wait 3 more minutes
SetMockTime(nStartTime+24*60);
{
LOCK(dummyNode1.cs_sendProcessing);
BOOST_CHECK(peerman.SendMessages(&dummyNode1)); // should result in disconnect
}
BOOST_CHECK(peerman.SendMessages(&dummyNode1)); // should result in disconnect
BOOST_CHECK(dummyNode1.fDisconnect == true);
peerman.FinalizeNode(dummyNode1);
@ -283,6 +277,8 @@ BOOST_AUTO_TEST_CASE(block_relay_only_eviction)
BOOST_AUTO_TEST_CASE(peer_discouragement)
{
LOCK(NetEventsInterface::g_msgproc_mutex);
const CChainParams& chainparams = Params();
auto banman = std::make_unique<BanMan>(m_args.GetDataDirBase() / "banlist", nullptr, DEFAULT_MISBEHAVING_BANTIME);
auto connman = std::make_unique<ConnmanTestMsg>(0x1337, 0x1337, *m_node.addrman);
@ -320,10 +316,7 @@ BOOST_AUTO_TEST_CASE(peer_discouragement)
nodes[0]->fSuccessfullyConnected = true;
connman->AddTestNode(*nodes[0]);
peerLogic->Misbehaving(nodes[0]->GetId(), DISCOURAGEMENT_THRESHOLD); // Should be discouraged
{
LOCK(nodes[0]->cs_sendProcessing);
BOOST_CHECK(peerLogic->SendMessages(nodes[0]));
}
BOOST_CHECK(peerLogic->SendMessages(nodes[0]));
BOOST_CHECK(banman->IsDiscouraged(addr[0]));
BOOST_CHECK(nodes[0]->fDisconnect);
BOOST_CHECK(!banman->IsDiscouraged(other_addr)); // Different address, not discouraged
@ -342,10 +335,7 @@ BOOST_AUTO_TEST_CASE(peer_discouragement)
nodes[1]->fSuccessfullyConnected = true;
connman->AddTestNode(*nodes[1]);
peerLogic->Misbehaving(nodes[1]->GetId(), DISCOURAGEMENT_THRESHOLD - 1);
{
LOCK(nodes[1]->cs_sendProcessing);
BOOST_CHECK(peerLogic->SendMessages(nodes[1]));
}
BOOST_CHECK(peerLogic->SendMessages(nodes[1]));
// [0] is still discouraged/disconnected.
BOOST_CHECK(banman->IsDiscouraged(addr[0]));
BOOST_CHECK(nodes[0]->fDisconnect);
@ -353,10 +343,7 @@ BOOST_AUTO_TEST_CASE(peer_discouragement)
BOOST_CHECK(!banman->IsDiscouraged(addr[1]));
BOOST_CHECK(!nodes[1]->fDisconnect);
peerLogic->Misbehaving(nodes[1]->GetId(), 1); // [1] reaches discouragement threshold
{
LOCK(nodes[1]->cs_sendProcessing);
BOOST_CHECK(peerLogic->SendMessages(nodes[1]));
}
BOOST_CHECK(peerLogic->SendMessages(nodes[1]));
// Expect both [0] and [1] to be discouraged/disconnected now.
BOOST_CHECK(banman->IsDiscouraged(addr[0]));
BOOST_CHECK(nodes[0]->fDisconnect);
@ -379,10 +366,7 @@ BOOST_AUTO_TEST_CASE(peer_discouragement)
nodes[2]->fSuccessfullyConnected = true;
connman->AddTestNode(*nodes[2]);
peerLogic->Misbehaving(nodes[2]->GetId(), DISCOURAGEMENT_THRESHOLD, /* message */ "");
{
LOCK(nodes[2]->cs_sendProcessing);
BOOST_CHECK(peerLogic->SendMessages(nodes[2]));
}
BOOST_CHECK(peerLogic->SendMessages(nodes[2]));
BOOST_CHECK(banman->IsDiscouraged(addr[0]));
BOOST_CHECK(banman->IsDiscouraged(addr[1]));
BOOST_CHECK(banman->IsDiscouraged(addr[2]));
@ -398,6 +382,8 @@ BOOST_AUTO_TEST_CASE(peer_discouragement)
BOOST_AUTO_TEST_CASE(DoS_bantime)
{
LOCK(NetEventsInterface::g_msgproc_mutex);
const CChainParams& chainparams = Params();
auto banman = std::make_unique<BanMan>(m_args.GetDataDirBase() / "banlist", nullptr, DEFAULT_MISBEHAVING_BANTIME);
auto connman = std::make_unique<CConnman>(0x1337, 0x1337, *m_node.addrman);
@ -426,10 +412,7 @@ BOOST_AUTO_TEST_CASE(DoS_bantime)
dummyNode.fSuccessfullyConnected = true;
peerLogic->Misbehaving(dummyNode.GetId(), DISCOURAGEMENT_THRESHOLD);
{
LOCK(dummyNode.cs_sendProcessing);
BOOST_CHECK(peerLogic->SendMessages(&dummyNode));
}
BOOST_CHECK(peerLogic->SendMessages(&dummyNode));
BOOST_CHECK(banman->IsDiscouraged(addr));
peerLogic->FinalizeNode(dummyNode);

View File

@ -81,6 +81,8 @@ void fuzz_target(FuzzBufferType buffer, const std::string& LIMIT_TO_MESSAGE_TYPE
SetMockTime(1610000000); // any time to successfully reset ibd
chainstate.ResetIbd();
LOCK(NetEventsInterface::g_msgproc_mutex);
const std::string random_message_type{fuzzed_data_provider.ConsumeBytesAsString(CMessageHeader::COMMAND_SIZE).c_str()};
if (!LIMIT_TO_MESSAGE_TYPE.empty() && random_message_type != LIMIT_TO_MESSAGE_TYPE) {
return;
@ -99,10 +101,7 @@ void fuzz_target(FuzzBufferType buffer, const std::string& LIMIT_TO_MESSAGE_TYPE
g_setup->m_node.peerman->ProcessMessage(p2p_node, random_message_type, random_bytes_data_stream, GetTime<std::chrono::microseconds>(), std::atomic<bool>{false});
} catch (const std::ios_base::failure& e) {
}
{
LOCK(p2p_node.cs_sendProcessing);
g_setup->m_node.peerman->SendMessages(&p2p_node);
}
g_setup->m_node.peerman->SendMessages(&p2p_node);
SyncWithValidationInterfaceQueue();
LOCK2(::cs_main, g_cs_orphans); // See init.cpp for rationale for implicit locking order requirement
g_setup->m_node.connman->StopNodes();

View File

@ -40,6 +40,8 @@ FUZZ_TARGET_INIT(process_messages, initialize_process_messages)
SetMockTime(1610000000); // any time to successfully reset ibd
chainstate.ResetIbd();
LOCK(NetEventsInterface::g_msgproc_mutex);
std::vector<CNode*> peers;
const auto num_peers_to_add = fuzzed_data_provider.ConsumeIntegralInRange(1, 3);
for (int i = 0; i < num_peers_to_add; ++i) {
@ -70,10 +72,7 @@ FUZZ_TARGET_INIT(process_messages, initialize_process_messages)
connman.ProcessMessagesOnce(random_node);
} catch (const std::ios_base::failure&) {
}
{
LOCK(random_node.cs_sendProcessing);
g_setup->m_node.peerman->SendMessages(&random_node);
}
g_setup->m_node.peerman->SendMessages(&random_node);
}
SyncWithValidationInterfaceQueue();
LOCK2(::cs_main, g_cs_orphans); // See init.cpp for rationale for implicit locking order requirement

View File

@ -390,7 +390,7 @@ auto ConsumeNode(FuzzedDataProvider& fuzzed_data_provider, const std::optional<N
}
inline std::unique_ptr<CNode> ConsumeNodeAsUniquePtr(FuzzedDataProvider& fdp, const std::optional<NodeId>& node_id_in = std::nullopt) { return ConsumeNode<true>(fdp, node_id_in); }
void FillNode(FuzzedDataProvider& fuzzed_data_provider, ConnmanTestMsg& connman, CNode& node) noexcept;
void FillNode(FuzzedDataProvider& fuzzed_data_provider, ConnmanTestMsg& connman, CNode& node) noexcept EXCLUSIVE_LOCKS_REQUIRED(NetEventsInterface::g_msgproc_mutex);
class FuzzedFileProvider
{

View File

@ -809,6 +809,8 @@ BOOST_AUTO_TEST_CASE(LocalAddress_BasicLifecycle)
BOOST_AUTO_TEST_CASE(initial_advertise_from_version_message)
{
LOCK(NetEventsInterface::g_msgproc_mutex);
// Tests the following scenario:
// * -bind=3.4.5.6:20001 is specified
// * we make an outbound connection to a peer
@ -893,10 +895,7 @@ BOOST_AUTO_TEST_CASE(initial_advertise_from_version_message)
}
};
{
LOCK(peer.cs_sendProcessing);
m_node.peerman->SendMessages(&peer);
}
m_node.peerman->SendMessages(&peer);
BOOST_CHECK(sent);

View File

@ -44,10 +44,7 @@ void ConnmanTestMsg::Handshake(CNode& node,
(void)connman.ReceiveMsgFrom(node, msg_version);
node.fPauseSend = false;
connman.ProcessMessagesOnce(node);
{
LOCK(node.cs_sendProcessing);
peerman.SendMessages(&node);
}
peerman.SendMessages(&node);
if (node.fDisconnect) return;
assert(node.nVersion == version);
assert(node.GetCommonVersion() == std::min(version, PROTOCOL_VERSION));
@ -61,10 +58,7 @@ void ConnmanTestMsg::Handshake(CNode& node,
(void)connman.ReceiveMsgFrom(node, msg_verack);
node.fPauseSend = false;
connman.ProcessMessagesOnce(node);
{
LOCK(node.cs_sendProcessing);
peerman.SendMessages(&node);
}
peerman.SendMessages(&node);
assert(node.fSuccessfullyConnected == true);
}
}

View File

@ -44,9 +44,10 @@ struct ConnmanTestMsg : public CConnman {
ServiceFlags local_services,
NetPermissionFlags permission_flags,
int32_t version,
bool relay_txs);
bool relay_txs)
EXCLUSIVE_LOCKS_REQUIRED(NetEventsInterface::g_msgproc_mutex);
void ProcessMessagesOnce(CNode& node) { m_msgproc->ProcessMessages(&node, flagInterruptMsgProc); }
void ProcessMessagesOnce(CNode& node) EXCLUSIVE_LOCKS_REQUIRED(NetEventsInterface::g_msgproc_mutex) { m_msgproc->ProcessMessages(&node, flagInterruptMsgProc); }
void NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_bytes, bool& complete) const;