mirror of
https://github.com/dashpay/dash.git
synced 2024-12-24 19:42:46 +01:00
Merge #19509: Per-Peer Message Capture
bff7c66e67aa2f18ef70139338643656a54444fe Add documentation to contrib folder (Troy Giorshev) 381f77be858d7417209b6de0b7cd23cb7eb99261 Add Message Capture Test (Troy Giorshev) e4f378a505922c0f544b4cfbfdb169e884e02be9 Add capture parser (Troy Giorshev) 4d1a582549bc982d55e24585b0ba06f92f21e9da Call CaptureMessage at appropriate locations (Troy Giorshev) f2a77ff97bec09dd5fcc043d8659d8ec5dfb87c2 Add CaptureMessage (Troy Giorshev) dbf779d5deb04f55c6e8493ce4e12ed4628638f3 Clean PushMessage and ProcessMessages (Troy Giorshev) Pull request description: This PR introduces per-peer message capture into Bitcoin Core. 📓 ## Purpose The purpose and scope of this feature is intentionally limited. It answers a question anyone new to Bitcoin's P2P protocol has had: "Can I see what messages my node is sending and receiving?". ## Functionality When a new debug-only command line argument `capturemessages` is set, any message that the node receives or sends is captured. The capture occurs in the MessageHandler thread. When receiving a message, it is captured as soon as the MessageHandler thread takes the message off of the vProcessMsg queue. When sending, the message is captured just before the message is pushed onto the vSendMsg queue. The message capture is as minimal as possible to reduce the performance impact on the node. Messages are captured to a new `message_capture` folder in the datadir. Each node has their own subfolder named with their IP address and port. Inside, received and sent messages are captured into two binary files, msgs_recv.dat and msgs_sent.dat, like so: ``` message_capture/203.0.113.7:56072/msgs_recv.dat message_capture/203.0.113.7:56072/msgs_sent.dat ``` Because the messages are raw binary dumps, included in this PR is a Python parsing tool to convert the binary files into human-readable JSON. This script has been placed on its own and out of the way in the new `contrib/message-capture` folder. Its usage is simple and easily discovered by the autogenerated `-h` option. ## Future Maintenance I sympathize greatly with anyone who says "the best code is no code". The future maintenance of this feature will be minimal. The logic to deserialize the payload of the p2p messages exists in our testing framework. As long as our testing framework works, so will this tool. Additionally, I hope that the simplicity of this tool will mean that it gets used frequently, so that problems will be discovered and solved when they are small. ## FAQ "Why not just use Wireshark" Yes, Wireshark has the ability to filter and decode Bitcoin messages. However, the purpose of the message capture added in this PR is to assist with debugging, primarily for new developers looking to improve their knowledge of the Bitcoin Protocol. This drives the design in a different direction than Wireshark, in two different ways. First, this tool must be convenient and simple to use. Using an external tool, like Wireshark, requires setup and interpretation of the results. To a new user who doesn't necessarily know what to expect, this is unnecessary difficulty. This tool, on the other hand, "just works". Turn on the command line flag, run your node, run the script, read the JSON. Second, because this tool is being used for debugging, we want it to be as close to the true behavior of the node as possible. A lot can happen in the SocketHandler thread that would be missed by Wireshark. Additionally, if we are to use Wireshark, we are at the mercy of whoever it maintaining the protocol in Wireshark, both as to it being accurate and recent. As can be seen by the **many** previous attempts to include Bitcoin in Wireshark (google "bitcoin dissector") this is easier said than done. Lastly, I truly believe that this tool will be used significantly more by being included in the codebase. It's just that much more discoverable. ACKs for top commit: MarcoFalke: re-ACK bff7c66e67aa2f18ef70139338643656a54444fe only some minor changes: 👚 jnewbery: utACK bff7c66e67aa2f18ef70139338643656a54444fe theStack: re-ACK bff7c66e67aa2f18ef70139338643656a54444fe Tree-SHA512: e59e3160422269221f70f98720b47842775781c247c064071d546c24fa7a35a0e5534e8baa4b4591a750d7eb16de6b4ecf54cbee6d193b261f4f104e28c15f47
This commit is contained in:
parent
a26d4b2cd2
commit
9a92452a5c
25
contrib/message-capture/message-capture-docs.md
Normal file
25
contrib/message-capture/message-capture-docs.md
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
# Per-Peer Message Capture
|
||||||
|
|
||||||
|
## Purpose
|
||||||
|
|
||||||
|
This feature allows for message capture on a per-peer basis. It answers the simple question: "Can I see what messages my node is sending and receiving?"
|
||||||
|
|
||||||
|
## Usage and Functionality
|
||||||
|
|
||||||
|
* Run `dashd` with the `-capturemessages` option.
|
||||||
|
* Look in the `message_capture` folder in your datadir.
|
||||||
|
* Typically this will be `~/.dashcore/message_capture`.
|
||||||
|
* See that there are many folders inside, one for each peer names with its IP address and port.
|
||||||
|
* Inside each peer's folder there are two `.dat` files: one is for received messages (`msgs_recv.dat`) and the other is for sent messages (`msgs_sent.dat`).
|
||||||
|
* Run `contrib/message-capture/message-capture-parser.py` with the proper arguments.
|
||||||
|
* See the `-h` option for help.
|
||||||
|
* To see all messages, both sent and received, for all peers use:
|
||||||
|
```
|
||||||
|
./contrib/message-capture/message-capture-parser.py -o out.json \
|
||||||
|
~/.dashcore/message_capture/**/*.dat
|
||||||
|
```
|
||||||
|
* Note: The messages in the given `.dat` files will be interleaved in chronological order. So, giving both received and sent `.dat` files (as above with `*.dat`) will result in all messages being interleaved in chronological order.
|
||||||
|
* If an output file is not provided (i.e. the `-o` option is not used), then the output prints to `stdout`.
|
||||||
|
* View the resulting output.
|
||||||
|
* The output file is `JSON` formatted.
|
||||||
|
* Suggestion: use `jq` to view the output, with `jq . out.json`
|
214
contrib/message-capture/message-capture-parser.py
Executable file
214
contrib/message-capture/message-capture-parser.py
Executable file
@ -0,0 +1,214 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# Copyright (c) 2020 The Bitcoin Core developers
|
||||||
|
# Distributed under the MIT software license, see the accompanying
|
||||||
|
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||||
|
"""Parse message capture binary files. To be used in conjunction with -capturemessages."""
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import sys
|
||||||
|
from io import BytesIO
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, List, Optional
|
||||||
|
|
||||||
|
sys.path.append(os.path.join(os.path.dirname(__file__), '../../test/functional'))
|
||||||
|
|
||||||
|
from test_framework.messages import ser_uint256 # noqa: E402
|
||||||
|
from test_framework.p2p import MESSAGEMAP # noqa: E402
|
||||||
|
|
||||||
|
TIME_SIZE = 8
|
||||||
|
LENGTH_SIZE = 4
|
||||||
|
MSGTYPE_SIZE = 12
|
||||||
|
|
||||||
|
# The test framework classes stores hashes as large ints in many cases.
|
||||||
|
# These are variables of type uint256 in core.
|
||||||
|
# There isn't a way to distinguish between a large int and a large int that is actually a blob of bytes.
|
||||||
|
# As such, they are itemized here.
|
||||||
|
# Any variables with these names that are of type int are actually uint256 variables.
|
||||||
|
# (These can be easily found by looking for calls to deser_uint256, deser_uint256_vector, and uint256_from_str in messages.py)
|
||||||
|
HASH_INTS = [
|
||||||
|
"blockhash",
|
||||||
|
"block_hash",
|
||||||
|
"hash",
|
||||||
|
"hashMerkleRoot",
|
||||||
|
"hashPrevBlock",
|
||||||
|
"hashstop",
|
||||||
|
"prev_header",
|
||||||
|
"sha256",
|
||||||
|
"stop_hash",
|
||||||
|
]
|
||||||
|
|
||||||
|
HASH_INT_VECTORS = [
|
||||||
|
"hashes",
|
||||||
|
"headers",
|
||||||
|
"vHave",
|
||||||
|
"vHash",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class ProgressBar:
|
||||||
|
def __init__(self, total: float):
|
||||||
|
self.total = total
|
||||||
|
self.running = 0
|
||||||
|
|
||||||
|
def set_progress(self, progress: float):
|
||||||
|
cols = shutil.get_terminal_size()[0]
|
||||||
|
if cols <= 12:
|
||||||
|
return
|
||||||
|
max_blocks = cols - 9
|
||||||
|
num_blocks = int(max_blocks * progress)
|
||||||
|
print('\r[ {}{} ] {:3.0f}%'
|
||||||
|
.format('#' * num_blocks,
|
||||||
|
' ' * (max_blocks - num_blocks),
|
||||||
|
progress * 100),
|
||||||
|
end ='')
|
||||||
|
|
||||||
|
def update(self, more: float):
|
||||||
|
self.running += more
|
||||||
|
self.set_progress(self.running / self.total)
|
||||||
|
|
||||||
|
|
||||||
|
def to_jsonable(obj: Any) -> Any:
|
||||||
|
if hasattr(obj, "__dict__"):
|
||||||
|
return obj.__dict__
|
||||||
|
elif hasattr(obj, "__slots__"):
|
||||||
|
ret = {} # type: Any
|
||||||
|
for slot in obj.__slots__:
|
||||||
|
val = getattr(obj, slot, None)
|
||||||
|
if slot in HASH_INTS and isinstance(val, int):
|
||||||
|
ret[slot] = ser_uint256(val).hex()
|
||||||
|
elif slot in HASH_INT_VECTORS and isinstance(val[0], int):
|
||||||
|
ret[slot] = [ser_uint256(a).hex() for a in val]
|
||||||
|
else:
|
||||||
|
ret[slot] = to_jsonable(val)
|
||||||
|
return ret
|
||||||
|
elif isinstance(obj, list):
|
||||||
|
return [to_jsonable(a) for a in obj]
|
||||||
|
elif isinstance(obj, bytes):
|
||||||
|
return obj.hex()
|
||||||
|
else:
|
||||||
|
return obj
|
||||||
|
|
||||||
|
|
||||||
|
def process_file(path: str, messages: List[Any], recv: bool, progress_bar: Optional[ProgressBar]) -> None:
|
||||||
|
with open(path, 'rb') as f_in:
|
||||||
|
if progress_bar:
|
||||||
|
bytes_read = 0
|
||||||
|
|
||||||
|
while True:
|
||||||
|
if progress_bar:
|
||||||
|
# Update progress bar
|
||||||
|
diff = f_in.tell() - bytes_read - 1
|
||||||
|
progress_bar.update(diff)
|
||||||
|
bytes_read = f_in.tell() - 1
|
||||||
|
|
||||||
|
# Read the Header
|
||||||
|
tmp_header_raw = f_in.read(TIME_SIZE + LENGTH_SIZE + MSGTYPE_SIZE)
|
||||||
|
if not tmp_header_raw:
|
||||||
|
break
|
||||||
|
tmp_header = BytesIO(tmp_header_raw)
|
||||||
|
time = int.from_bytes(tmp_header.read(TIME_SIZE), "little") # type: int
|
||||||
|
msgtype = tmp_header.read(MSGTYPE_SIZE).split(b'\x00', 1)[0] # type: bytes
|
||||||
|
length = int.from_bytes(tmp_header.read(LENGTH_SIZE), "little") # type: int
|
||||||
|
|
||||||
|
# Start converting the message to a dictionary
|
||||||
|
msg_dict = {}
|
||||||
|
msg_dict["direction"] = "recv" if recv else "sent"
|
||||||
|
msg_dict["time"] = time
|
||||||
|
msg_dict["size"] = length # "size" is less readable here, but more readable in the output
|
||||||
|
|
||||||
|
msg_ser = BytesIO(f_in.read(length))
|
||||||
|
|
||||||
|
# Determine message type
|
||||||
|
if msgtype not in MESSAGEMAP:
|
||||||
|
# Unrecognized message type
|
||||||
|
try:
|
||||||
|
msgtype_tmp = msgtype.decode()
|
||||||
|
if not msgtype_tmp.isprintable():
|
||||||
|
raise UnicodeDecodeError
|
||||||
|
msg_dict["msgtype"] = msgtype_tmp
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
msg_dict["msgtype"] = "UNREADABLE"
|
||||||
|
msg_dict["body"] = msg_ser.read().hex()
|
||||||
|
msg_dict["error"] = "Unrecognized message type."
|
||||||
|
messages.append(msg_dict)
|
||||||
|
print(f"WARNING - Unrecognized message type {msgtype} in {path}", file=sys.stderr)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Deserialize the message
|
||||||
|
msg = MESSAGEMAP[msgtype]()
|
||||||
|
msg_dict["msgtype"] = msgtype.decode()
|
||||||
|
|
||||||
|
try:
|
||||||
|
msg.deserialize(msg_ser)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
raise
|
||||||
|
except Exception:
|
||||||
|
# Unable to deserialize message body
|
||||||
|
msg_ser.seek(0, os.SEEK_SET)
|
||||||
|
msg_dict["body"] = msg_ser.read().hex()
|
||||||
|
msg_dict["error"] = "Unable to deserialize message."
|
||||||
|
messages.append(msg_dict)
|
||||||
|
print(f"WARNING - Unable to deserialize message in {path}", file=sys.stderr)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Convert body of message into a jsonable object
|
||||||
|
if length:
|
||||||
|
msg_dict["body"] = to_jsonable(msg)
|
||||||
|
messages.append(msg_dict)
|
||||||
|
|
||||||
|
if progress_bar:
|
||||||
|
# Update the progress bar to the end of the current file
|
||||||
|
# in case we exited the loop early
|
||||||
|
f_in.seek(0, os.SEEK_END) # Go to end of file
|
||||||
|
diff = f_in.tell() - bytes_read - 1
|
||||||
|
progress_bar.update(diff)
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description=__doc__,
|
||||||
|
epilog="EXAMPLE \n\t{0} -o out.json <data-dir>/message_capture/**/*.dat".format(sys.argv[0]),
|
||||||
|
formatter_class=argparse.RawTextHelpFormatter)
|
||||||
|
parser.add_argument(
|
||||||
|
"capturepaths",
|
||||||
|
nargs='+',
|
||||||
|
help="binary message capture files to parse.")
|
||||||
|
parser.add_argument(
|
||||||
|
"-o", "--output",
|
||||||
|
help="output file. If unset print to stdout")
|
||||||
|
parser.add_argument(
|
||||||
|
"-n", "--no-progress-bar",
|
||||||
|
action='store_true',
|
||||||
|
help="disable the progress bar. Automatically set if the output is not a terminal")
|
||||||
|
args = parser.parse_args()
|
||||||
|
capturepaths = [Path.cwd() / Path(capturepath) for capturepath in args.capturepaths]
|
||||||
|
output = Path.cwd() / Path(args.output) if args.output else False
|
||||||
|
use_progress_bar = (not args.no_progress_bar) and sys.stdout.isatty()
|
||||||
|
|
||||||
|
messages = [] # type: List[Any]
|
||||||
|
if use_progress_bar:
|
||||||
|
total_size = sum(capture.stat().st_size for capture in capturepaths)
|
||||||
|
progress_bar = ProgressBar(total_size)
|
||||||
|
else:
|
||||||
|
progress_bar = None
|
||||||
|
|
||||||
|
for capture in capturepaths:
|
||||||
|
process_file(str(capture), messages, "recv" in capture.stem, progress_bar)
|
||||||
|
|
||||||
|
messages.sort(key=lambda msg: msg['time'])
|
||||||
|
|
||||||
|
if use_progress_bar:
|
||||||
|
progress_bar.set_progress(1)
|
||||||
|
|
||||||
|
jsonrep = json.dumps(messages)
|
||||||
|
if output:
|
||||||
|
with open(str(output), 'w+', encoding="utf8") as f_out:
|
||||||
|
f_out.write(jsonrep)
|
||||||
|
else:
|
||||||
|
print(jsonrep)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
@ -715,7 +715,7 @@ void SetupServerArgs(NodeContext& node)
|
|||||||
argsman.AddArg("-stopatheight", strprintf("Stop running after reaching the given height in the main chain (default: %u)", DEFAULT_STOPATHEIGHT), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
|
argsman.AddArg("-stopatheight", strprintf("Stop running after reaching the given height in the main chain (default: %u)", DEFAULT_STOPATHEIGHT), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
|
||||||
argsman.AddArg("-watchquorums=<n>", strprintf("Watch and validate quorum communication (default: %u)", llmq::DEFAULT_WATCH_QUORUMS), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
|
argsman.AddArg("-watchquorums=<n>", strprintf("Watch and validate quorum communication (default: %u)", llmq::DEFAULT_WATCH_QUORUMS), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
|
||||||
argsman.AddArg("-addrmantest", "Allows to test address relay on localhost", ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
|
argsman.AddArg("-addrmantest", "Allows to test address relay on localhost", ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
|
||||||
|
argsman.AddArg("-capturemessages", "Capture all P2P messages to disk", ArgsManager::ALLOW_BOOL | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
|
||||||
argsman.AddArg("-debug=<category>", "Output debugging information (default: -nodebug, supplying <category> is optional). "
|
argsman.AddArg("-debug=<category>", "Output debugging information (default: -nodebug, supplying <category> is optional). "
|
||||||
"If <category> is not supplied or if <category> = 1, output all debugging information. <category> can be: " + LogInstance().LogCategoriesString() + ". This option can be specified multiple times to output multiple categories.", ArgsManager::ALLOW_ANY, OptionsCategory::DEBUG_TEST);
|
"If <category> is not supplied or if <category> = 1, output all debugging information. <category> can be: " + LogInstance().LogCategoriesString() + ". This option can be specified multiple times to output multiple categories.", ArgsManager::ALLOW_ANY, OptionsCategory::DEBUG_TEST);
|
||||||
argsman.AddArg("-debugexclude=<category>", strprintf("Exclude debugging information for a category. Can be used in conjunction with -debug=1 to output debug logs for all categories except the specified category. This option can be specified multiple times to exclude multiple categories."), ArgsManager::ALLOW_ANY, OptionsCategory::DEBUG_TEST);
|
argsman.AddArg("-debugexclude=<category>", strprintf("Exclude debugging information for a category. Can be used in conjunction with -debug=1 to output debug logs for all categories except the specified category. This option can be specified multiple times to exclude multiple categories."), ArgsManager::ALLOW_ANY, OptionsCategory::DEBUG_TEST);
|
||||||
@ -1263,16 +1263,17 @@ bool AppInitParameterInteraction(const ArgsManager& args)
|
|||||||
|
|
||||||
// Trim requested connection counts, to fit into system limitations
|
// Trim requested connection counts, to fit into system limitations
|
||||||
// <int> in std::min<int>(...) to work around FreeBSD compilation issue described in #2695
|
// <int> in std::min<int>(...) to work around FreeBSD compilation issue described in #2695
|
||||||
nFD = RaiseFileDescriptorLimit(nMaxConnections + MIN_CORE_FILEDESCRIPTORS + MAX_ADDNODE_CONNECTIONS + nBind);
|
nFD = RaiseFileDescriptorLimit(nMaxConnections + MIN_CORE_FILEDESCRIPTORS + MAX_ADDNODE_CONNECTIONS + nBind + NUM_FDS_MESSAGE_CAPTURE);
|
||||||
|
|
||||||
#ifdef USE_POLL
|
#ifdef USE_POLL
|
||||||
int fd_max = nFD;
|
int fd_max = nFD;
|
||||||
#else
|
#else
|
||||||
int fd_max = FD_SETSIZE;
|
int fd_max = FD_SETSIZE;
|
||||||
#endif
|
#endif
|
||||||
nMaxConnections = std::max(std::min<int>(nMaxConnections, fd_max - nBind - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS), 0);
|
nMaxConnections = std::max(std::min<int>(nMaxConnections, fd_max - nBind - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS - NUM_FDS_MESSAGE_CAPTURE), 0);
|
||||||
if (nFD < MIN_CORE_FILEDESCRIPTORS)
|
if (nFD < MIN_CORE_FILEDESCRIPTORS)
|
||||||
return InitError(_("Not enough file descriptors available."));
|
return InitError(_("Not enough file descriptors available."));
|
||||||
nMaxConnections = std::min(nFD - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS, nMaxConnections);
|
nMaxConnections = std::min(nFD - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS - NUM_FDS_MESSAGE_CAPTURE, nMaxConnections);
|
||||||
|
|
||||||
if (nMaxConnections < nUserMaxConnections)
|
if (nMaxConnections < nUserMaxConnections)
|
||||||
InitWarning(strprintf(_("Reducing -maxconnections from %d to %d, because of system limitations."), nUserMaxConnections, nMaxConnections));
|
InitWarning(strprintf(_("Reducing -maxconnections from %d to %d, because of system limitations."), nUserMaxConnections, nMaxConnections));
|
||||||
|
36
src/net.cpp
36
src/net.cpp
@ -4020,6 +4020,9 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
|
|||||||
{
|
{
|
||||||
size_t nMessageSize = msg.data.size();
|
size_t nMessageSize = msg.data.size();
|
||||||
LogPrint(BCLog::NET, "sending %s (%d bytes) peer=%d\n", SanitizeString(msg.command), nMessageSize, pnode->GetId());
|
LogPrint(BCLog::NET, "sending %s (%d bytes) peer=%d\n", SanitizeString(msg.command), nMessageSize, pnode->GetId());
|
||||||
|
if (gArgs.GetBoolArg("-capturemessages", false)) {
|
||||||
|
CaptureMessage(pnode->addr, msg.command, msg.data, /* incoming */ false);
|
||||||
|
}
|
||||||
|
|
||||||
// make sure we use the appropriate network transport format
|
// make sure we use the appropriate network transport format
|
||||||
std::vector<unsigned char> serializedHeader;
|
std::vector<unsigned char> serializedHeader;
|
||||||
@ -4037,11 +4040,9 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
|
|||||||
pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize;
|
pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize;
|
||||||
pnode->nSendSize += nTotalSize;
|
pnode->nSendSize += nTotalSize;
|
||||||
|
|
||||||
if (pnode->nSendSize > nSendBufferMaxSize)
|
if (pnode->nSendSize > nSendBufferMaxSize) pnode->fPauseSend = true;
|
||||||
pnode->fPauseSend = true;
|
|
||||||
pnode->vSendMsg.push_back(std::move(serializedHeader));
|
pnode->vSendMsg.push_back(std::move(serializedHeader));
|
||||||
if (nMessageSize)
|
if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data));
|
||||||
pnode->vSendMsg.push_back(std::move(msg.data));
|
|
||||||
pnode->nSendMsgSize = pnode->vSendMsg.size();
|
pnode->nSendMsgSize = pnode->vSendMsg.size();
|
||||||
|
|
||||||
{
|
{
|
||||||
@ -4229,3 +4230,30 @@ void CConnman::UnregisterEvents(CNode *pnode)
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
void CaptureMessage(const CAddress& addr, const std::string& msg_type, const Span<const unsigned char>& data, bool is_incoming)
|
||||||
|
{
|
||||||
|
// Note: This function captures the message at the time of processing,
|
||||||
|
// not at socket receive/send time.
|
||||||
|
// This ensures that the messages are always in order from an application
|
||||||
|
// layer (processing) perspective.
|
||||||
|
auto now = GetTime<std::chrono::microseconds>();
|
||||||
|
|
||||||
|
// Windows folder names can not include a colon
|
||||||
|
std::string clean_addr = addr.ToString();
|
||||||
|
std::replace(clean_addr.begin(), clean_addr.end(), ':', '_');
|
||||||
|
|
||||||
|
fs::path base_path = GetDataDir() / "message_capture" / clean_addr;
|
||||||
|
fs::create_directories(base_path);
|
||||||
|
|
||||||
|
fs::path path = base_path / (is_incoming ? "msgs_recv.dat" : "msgs_sent.dat");
|
||||||
|
CAutoFile f(fsbridge::fopen(path, "ab"), SER_DISK, CLIENT_VERSION);
|
||||||
|
|
||||||
|
ser_writedata64(f, now.count());
|
||||||
|
f.write(msg_type.data(), msg_type.length());
|
||||||
|
for (auto i = msg_type.length(); i < CMessageHeader::COMMAND_SIZE; ++i) {
|
||||||
|
f << '\0';
|
||||||
|
}
|
||||||
|
uint32_t size = data.size();
|
||||||
|
ser_writedata32(f, size);
|
||||||
|
f.write((const char*)data.data(), data.size());
|
||||||
|
}
|
||||||
|
@ -22,6 +22,7 @@
|
|||||||
#include <protocol.h>
|
#include <protocol.h>
|
||||||
#include <random.h>
|
#include <random.h>
|
||||||
#include <saltedhasher.h>
|
#include <saltedhasher.h>
|
||||||
|
#include <span.h>
|
||||||
#include <streams.h>
|
#include <streams.h>
|
||||||
#include <sync.h>
|
#include <sync.h>
|
||||||
#include <threadinterrupt.h>
|
#include <threadinterrupt.h>
|
||||||
@ -91,6 +92,8 @@ static constexpr uint64_t DEFAULT_MAX_UPLOAD_TARGET = 0;
|
|||||||
static const bool DEFAULT_BLOCKSONLY = false;
|
static const bool DEFAULT_BLOCKSONLY = false;
|
||||||
/** -peertimeout default */
|
/** -peertimeout default */
|
||||||
static const int64_t DEFAULT_PEER_CONNECT_TIMEOUT = 60;
|
static const int64_t DEFAULT_PEER_CONNECT_TIMEOUT = 60;
|
||||||
|
/** Number of file descriptors required for message capture **/
|
||||||
|
static const int NUM_FDS_MESSAGE_CAPTURE = 1;
|
||||||
|
|
||||||
static const bool DEFAULT_FORCEDNSSEED = false;
|
static const bool DEFAULT_FORCEDNSSEED = false;
|
||||||
static const size_t DEFAULT_MAXRECEIVEBUFFER = 5 * 1000;
|
static const size_t DEFAULT_MAXRECEIVEBUFFER = 5 * 1000;
|
||||||
@ -1577,4 +1580,7 @@ void EraseObjectRequest(NodeId nodeId, const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED
|
|||||||
void RequestObject(NodeId nodeId, const CInv& inv, std::chrono::microseconds current_time, bool fForce=false) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
|
void RequestObject(NodeId nodeId, const CInv& inv, std::chrono::microseconds current_time, bool fForce=false) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
|
||||||
size_t GetRequestedObjectCount(NodeId nodeId) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
|
size_t GetRequestedObjectCount(NodeId nodeId) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
|
||||||
|
|
||||||
|
/** Dump binary message to file, with timestamp */
|
||||||
|
void CaptureMessage(const CAddress& addr, const std::string& msg_type, const Span<const unsigned char>& data, bool is_incoming);
|
||||||
|
|
||||||
#endif // BITCOIN_NET_H
|
#endif // BITCOIN_NET_H
|
||||||
|
@ -4507,14 +4507,12 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Don't bother if send buffer is too full to respond anyway
|
// Don't bother if send buffer is too full to respond anyway
|
||||||
if (pfrom->fPauseSend)
|
if (pfrom->fPauseSend) return false;
|
||||||
return false;
|
|
||||||
|
|
||||||
std::list<CNetMessage> msgs;
|
std::list<CNetMessage> msgs;
|
||||||
{
|
{
|
||||||
LOCK(pfrom->cs_vProcessMsg);
|
LOCK(pfrom->cs_vProcessMsg);
|
||||||
if (pfrom->vProcessMsg.empty())
|
if (pfrom->vProcessMsg.empty()) return false;
|
||||||
return false;
|
|
||||||
// Just take one message
|
// Just take one message
|
||||||
msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
|
msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
|
||||||
pfrom->nProcessQueueSize -= msgs.front().m_raw_message_size;
|
pfrom->nProcessQueueSize -= msgs.front().m_raw_message_size;
|
||||||
@ -4523,6 +4521,10 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt
|
|||||||
}
|
}
|
||||||
CNetMessage& msg(msgs.front());
|
CNetMessage& msg(msgs.front());
|
||||||
|
|
||||||
|
if (gArgs.GetBoolArg("-capturemessages", false)) {
|
||||||
|
CaptureMessage(pfrom->addr, msg.m_command, MakeUCharSpan(msg.m_recv), /* incoming */ true);
|
||||||
|
}
|
||||||
|
|
||||||
msg.SetVersion(pfrom->GetCommonVersion());
|
msg.SetVersion(pfrom->GetCommonVersion());
|
||||||
const std::string& msg_type = msg.m_command;
|
const std::string& msg_type = msg.m_command;
|
||||||
|
|
||||||
|
76
test/functional/p2p_message_capture.py
Executable file
76
test/functional/p2p_message_capture.py
Executable file
@ -0,0 +1,76 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# Copyright (c) 2020 The Bitcoin Core developers
|
||||||
|
# Distributed under the MIT software license, see the accompanying
|
||||||
|
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||||
|
"""Test per-peer message capture capability.
|
||||||
|
|
||||||
|
Additionally, the output of contrib/message-capture/message-capture-parser.py should be verified manually.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import glob
|
||||||
|
from io import BytesIO
|
||||||
|
import os
|
||||||
|
|
||||||
|
from test_framework.p2p import P2PDataStore, MESSAGEMAP
|
||||||
|
from test_framework.test_framework import BitcoinTestFramework
|
||||||
|
from test_framework.util import assert_equal
|
||||||
|
|
||||||
|
TIME_SIZE = 8
|
||||||
|
LENGTH_SIZE = 4
|
||||||
|
MSGTYPE_SIZE = 12
|
||||||
|
|
||||||
|
def mini_parser(dat_file):
|
||||||
|
"""Parse a data file created by CaptureMessage.
|
||||||
|
|
||||||
|
From the data file we'll only check the structure.
|
||||||
|
|
||||||
|
We won't care about things like:
|
||||||
|
- Deserializing the payload of the message
|
||||||
|
- This is managed by the deserialize methods in test_framework.messages
|
||||||
|
- The order of the messages
|
||||||
|
- There's no reason why we can't, say, change the order of the messages in the handshake
|
||||||
|
- Message Type
|
||||||
|
- We can add new message types
|
||||||
|
|
||||||
|
We're ignoring these because they're simply too brittle to test here.
|
||||||
|
"""
|
||||||
|
with open(dat_file, 'rb') as f_in:
|
||||||
|
# This should have at least one message in it
|
||||||
|
assert(os.fstat(f_in.fileno()).st_size >= TIME_SIZE + LENGTH_SIZE + MSGTYPE_SIZE)
|
||||||
|
while True:
|
||||||
|
tmp_header_raw = f_in.read(TIME_SIZE + LENGTH_SIZE + MSGTYPE_SIZE)
|
||||||
|
if not tmp_header_raw:
|
||||||
|
break
|
||||||
|
tmp_header = BytesIO(tmp_header_raw)
|
||||||
|
int.from_bytes(tmp_header.read(TIME_SIZE), "little") # type: int
|
||||||
|
raw_msgtype = tmp_header.read(MSGTYPE_SIZE)
|
||||||
|
msgtype = raw_msgtype.split(b'\x00', 1)[0] # type: bytes
|
||||||
|
remainder = raw_msgtype.split(b'\x00', 1)[1]
|
||||||
|
assert(len(msgtype) > 0)
|
||||||
|
assert(msgtype in MESSAGEMAP)
|
||||||
|
assert(len(remainder) == 0 or not remainder.decode().isprintable())
|
||||||
|
length = int.from_bytes(tmp_header.read(LENGTH_SIZE), "little") # type: int
|
||||||
|
data = f_in.read(length)
|
||||||
|
assert_equal(len(data), length)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class MessageCaptureTest(BitcoinTestFramework):
|
||||||
|
def set_test_params(self):
|
||||||
|
self.num_nodes = 1
|
||||||
|
self.extra_args = [["-capturemessages"]]
|
||||||
|
self.setup_clean_chain = True
|
||||||
|
|
||||||
|
def run_test(self):
|
||||||
|
capturedir = os.path.join(self.nodes[0].datadir, "regtest/message_capture")
|
||||||
|
# Connect a node so that the handshake occurs
|
||||||
|
self.nodes[0].add_p2p_connection(P2PDataStore())
|
||||||
|
self.nodes[0].disconnect_p2ps()
|
||||||
|
recv_file = glob.glob(os.path.join(capturedir, "*/msgs_recv.dat"))[0]
|
||||||
|
mini_parser(recv_file)
|
||||||
|
sent_file = glob.glob(os.path.join(capturedir, "*/msgs_sent.dat"))[0]
|
||||||
|
mini_parser(sent_file)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
MessageCaptureTest().main()
|
@ -1732,7 +1732,7 @@ class msg_block:
|
|||||||
# for cases where a user needs tighter control over what is sent over the wire
|
# for cases where a user needs tighter control over what is sent over the wire
|
||||||
# note that the user must supply the name of the msgtype, and the data
|
# note that the user must supply the name of the msgtype, and the data
|
||||||
class msg_generic:
|
class msg_generic:
|
||||||
__slots__ = ("msgtype", "data")
|
__slots__ = ("data")
|
||||||
|
|
||||||
def __init__(self, msgtype, data=None):
|
def __init__(self, msgtype, data=None):
|
||||||
self.msgtype = msgtype
|
self.msgtype = msgtype
|
||||||
|
@ -261,6 +261,7 @@ BASE_SCRIPTS = [
|
|||||||
'feature_filelock.py',
|
'feature_filelock.py',
|
||||||
'feature_loadblock.py',
|
'feature_loadblock.py',
|
||||||
'p2p_blockfilters.py',
|
'p2p_blockfilters.py',
|
||||||
|
'p2p_message_capture.py',
|
||||||
'feature_asmap.py',
|
'feature_asmap.py',
|
||||||
'feature_includeconf.py',
|
'feature_includeconf.py',
|
||||||
'mempool_unbroadcast.py',
|
'mempool_unbroadcast.py',
|
||||||
|
Loading…
Reference in New Issue
Block a user