diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp index 8c3e96a22c..80fc983824 100644 --- a/src/rpc/client.cpp +++ b/src/rpc/client.cpp @@ -230,6 +230,7 @@ static const CRPCConvertParam vRPCConvertParams[] = { "getnodeaddresses", 0, "count"}, { "addpeeraddress", 1, "port"}, { "addpeeraddress", 2, "tried"}, + { "sendmsgtopeer", 0, "peer_id" }, { "stop", 0, "wait" }, { "verifychainlock", 2, "blockHeight" }, { "verifyislock", 3, "maxHeight" }, diff --git a/src/rpc/net.cpp b/src/rpc/net.cpp index da1534ecad..7d5418c829 100644 --- a/src/rpc/net.cpp +++ b/src/rpc/net.cpp @@ -1021,6 +1021,53 @@ static RPCHelpMan addpeeraddress() }; } +static RPCHelpMan sendmsgtopeer() +{ + return RPCHelpMan{ + "sendmsgtopeer", + "Send a p2p message to a peer specified by id.\n" + "The message type and body must be provided, the message header will be generated.\n" + "This RPC is for testing only.", + { + {"peer_id", RPCArg::Type::NUM, RPCArg::Optional::NO, "The peer to send the message to."}, + {"msg_type", RPCArg::Type::STR, RPCArg::Optional::NO, strprintf("The message type (maximum length %i)", CMessageHeader::COMMAND_SIZE)}, + {"msg", RPCArg::Type::STR_HEX, RPCArg::Optional::NO, "The serialized message body to send, in hex, without a message header"}, + }, + RPCResult{RPCResult::Type::NONE, "", ""}, + RPCExamples{ + HelpExampleCli("sendmsgtopeer", "0 \"addr\" \"ffffff\"") + HelpExampleRpc("sendmsgtopeer", "0 \"addr\" \"ffffff\"")}, + [&](const RPCHelpMan& self, const JSONRPCRequest& request) -> UniValue { + const NodeId peer_id{request.params[0].get_int()}; + const std::string& msg_type{request.params[1].get_str()}; + if (msg_type.size() > CMessageHeader::COMMAND_SIZE) { + throw JSONRPCError(RPC_INVALID_PARAMETER, strprintf("Error: msg_type too long, max length is %i", CMessageHeader::COMMAND_SIZE)); + } + const std::string& msg{request.params[2].get_str()}; + if (!msg.empty() && !IsHex(msg)) { + throw JSONRPCError(RPC_INVALID_PARAMETER, "Error parsing input for msg"); + } + + NodeContext& node = EnsureAnyNodeContext(request.context); + CConnman& connman = EnsureConnman(node); + + CSerializedNetMsg msg_ser; + msg_ser.data = ParseHex(msg); + msg_ser.m_type = msg_type; + + bool success = connman.ForNode(peer_id, [&](CNode* node) { + connman.PushMessage(node, std::move(msg_ser)); + return true; + }); + + if (!success) { + throw JSONRPCError(RPC_MISC_ERROR, "Error: Could not send message to peer"); + } + + return NullUniValue; + }, + }; +} + static RPCHelpMan setmnthreadactive() { return RPCHelpMan{"setmnthreadactive", @@ -1070,6 +1117,7 @@ static const CRPCCommand commands[] = { "hidden", &addconnection, }, { "hidden", &addpeeraddress, }, + { "hidden", &sendmsgtopeer }, { "hidden", &setmnthreadactive }, }; // clang-format on diff --git a/src/test/fuzz/rpc.cpp b/src/test/fuzz/rpc.cpp index 7a2e052649..435ebc1b91 100644 --- a/src/test/fuzz/rpc.cpp +++ b/src/test/fuzz/rpc.cpp @@ -149,6 +149,7 @@ const std::vector RPC_COMMANDS_SAFE_FOR_FUZZING{ "pruneblockchain", "reconsiderblock", "scantxoutset", + "sendmsgtopeer", // when no peers are connected, no p2p message is sent "sendrawtransaction", "setmnthreadactive", "setmocktime", diff --git a/test/functional/p2p_net_deadlock.py b/test/functional/p2p_net_deadlock.py new file mode 100755 index 0000000000..cf62d13310 --- /dev/null +++ b/test/functional/p2p_net_deadlock.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 +# Copyright (c) 2023-present The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +import threading +from test_framework.messages import MAX_PROTOCOL_MESSAGE_LENGTH +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import random_bytes + +class NetDeadlockTest(BitcoinTestFramework): + def set_test_params(self): + self.setup_clean_chain = True + self.num_nodes = 2 + + def run_test(self): + node0 = self.nodes[0] + node1 = self.nodes[1] + + self.log.info("Simultaneously send a large message on both sides") + rand_msg = random_bytes(MAX_PROTOCOL_MESSAGE_LENGTH).hex() + + thread0 = threading.Thread(target=node0.sendmsgtopeer, args=(0, "unknown", rand_msg)) + thread1 = threading.Thread(target=node1.sendmsgtopeer, args=(0, "unknown", rand_msg)) + + thread0.start() + thread1.start() + thread0.join() + thread1.join() + + self.log.info("Check whether a deadlock happened") + self.nodes[0].generate(1) + self.sync_blocks() + + +if __name__ == '__main__': + NetDeadlockTest().main() diff --git a/test/functional/rpc_net.py b/test/functional/rpc_net.py index 1b7c6311bf..30b626ea23 100755 --- a/test/functional/rpc_net.py +++ b/test/functional/rpc_net.py @@ -10,6 +10,7 @@ Tests correspond to code in rpc/net.cpp. from test_framework.p2p import P2PInterface import test_framework.messages from test_framework.messages import ( + MAX_PROTOCOL_MESSAGE_LENGTH, NODE_NETWORK, ) @@ -66,6 +67,7 @@ class NetTest(DashTestFramework): self.test_service_flags() self.test_getnodeaddresses() self.test_addpeeraddress() + self.test_sendmsgtopeer() def test_connection_count(self): self.log.info("Test getconnectioncount") @@ -341,6 +343,37 @@ class NetTest(DashTestFramework): addrs = node.getnodeaddresses(count=0) # getnodeaddresses re-runs the addrman checks assert_equal(len(addrs), 2) + def test_sendmsgtopeer(self): + node = self.nodes[0] + + self.restart_node(0) + self.connect_nodes(0, 1) + + self.log.info("Test sendmsgtopeer") + self.log.debug("Send a valid message") + with self.nodes[1].assert_debug_log(expected_msgs=["received: addr"]): + node.sendmsgtopeer(peer_id=0, msg_type="addr", msg="FFFFFF") + + self.log.debug("Test error for sending to non-existing peer") + assert_raises_rpc_error(-1, "Error: Could not send message to peer", node.sendmsgtopeer, peer_id=100, msg_type="addr", msg="FF") + + self.log.debug("Test that zero-length msg_type is allowed") + node.sendmsgtopeer(peer_id=0, msg_type="addr", msg="") + + self.log.debug("Test error for msg_type that is too long") + assert_raises_rpc_error(-8, "Error: msg_type too long, max length is 12", node.sendmsgtopeer, peer_id=0, msg_type="long_msg_type", msg="FF") + + self.log.debug("Test that unknown msg_type is allowed") + node.sendmsgtopeer(peer_id=0, msg_type="unknown", msg="FF") + + self.log.debug("Test that empty msg is allowed") + node.sendmsgtopeer(peer_id=0, msg_type="addr", msg="FF") + + self.log.debug("Test that oversized messages are allowed, but get us disconnected") + zero_byte_string = b'\x00' * int(MAX_PROTOCOL_MESSAGE_LENGTH + 1) + node.sendmsgtopeer(peer_id=0, msg_type="addr", msg=zero_byte_string.hex()) + self.wait_until(lambda: len(self.nodes[0].getpeerinfo()) == 0, timeout=10) + if __name__ == '__main__': NetTest().main() diff --git a/test/functional/test_framework/util.py b/test/functional/test_framework/util.py index 3577e98a20..d014fd2dda 100644 --- a/test/functional/test_framework/util.py +++ b/test/functional/test_framework/util.py @@ -13,6 +13,7 @@ import inspect import json import logging import os +import random import shutil import re import time @@ -274,6 +275,11 @@ def sha256sum_file(filename): d = f.read(4096) return h.digest() +# TODO: Remove and use random.randbytes(n) instead, available in Python 3.9 +def random_bytes(n): + """Return a random bytes object of length n.""" + return bytes(random.getrandbits(8) for i in range(n)) + # RPC/P2P connection constants and functions ############################################ diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py index d973f61e22..1ad6cc54eb 100755 --- a/test/functional/test_runner.py +++ b/test/functional/test_runner.py @@ -259,6 +259,7 @@ BASE_SCRIPTS = [ 'p2p_leak_tx.py', 'p2p_eviction.py', 'p2p_ibd_stalling.py', + 'p2p_net_deadlock.py', 'rpc_signmessage.py', 'rpc_generateblock.py', 'rpc_generate.py',