#!/usr/bin/env python3 """ Interactive dashd P2P network traffic monitor utilizing USDT and the net:inbound_message and net:outbound_message tracepoints. """ # This script demonstrates what USDT for Dash Core can enable. It uses BCC # (https://github.com/iovisor/bcc) to load a sandboxed eBPF program into the # Linux kernel (root privileges are required). The eBPF program attaches to two # statically defined tracepoints. The tracepoint 'net:inbound_message' is called # when a new P2P message is received, and 'net:outbound_message' is called on # outbound P2P messages. The eBPF program submits the P2P messages to # this script via a BPF ring buffer. import sys import curses from curses import wrapper, panel from bcc import BPF, USDT # BCC: The C program to be compiled to an eBPF program (by BCC) and loaded into # a sandboxed Linux kernel VM. program = """ #include // Tor v3 addresses are 62 chars + 6 chars for the port (':12345'). // I2P addresses are 60 chars + 6 chars for the port (':12345'). #define MAX_PEER_ADDR_LENGTH 62 + 6 #define MAX_PEER_CONN_TYPE_LENGTH 20 #define MAX_MSG_TYPE_LENGTH 20 struct p2p_message { u64 peer_id; char peer_addr[MAX_PEER_ADDR_LENGTH]; char peer_conn_type[MAX_PEER_CONN_TYPE_LENGTH]; char msg_type[MAX_MSG_TYPE_LENGTH]; u64 msg_size; }; // Two BPF perf buffers for pushing data (here P2P messages) to user space. BPF_PERF_OUTPUT(inbound_messages); BPF_PERF_OUTPUT(outbound_messages); int trace_inbound_message(struct pt_regs *ctx) { struct p2p_message msg = {}; bpf_usdt_readarg(1, ctx, &msg.peer_id); bpf_usdt_readarg_p(2, ctx, &msg.peer_addr, MAX_PEER_ADDR_LENGTH); bpf_usdt_readarg_p(3, ctx, &msg.peer_conn_type, MAX_PEER_CONN_TYPE_LENGTH); bpf_usdt_readarg_p(4, ctx, &msg.msg_type, MAX_MSG_TYPE_LENGTH); bpf_usdt_readarg(5, ctx, &msg.msg_size); inbound_messages.perf_submit(ctx, &msg, sizeof(msg)); return 0; }; int trace_outbound_message(struct pt_regs *ctx) { struct p2p_message msg = {}; bpf_usdt_readarg(1, ctx, &msg.peer_id); bpf_usdt_readarg_p(2, ctx, &msg.peer_addr, MAX_PEER_ADDR_LENGTH); bpf_usdt_readarg_p(3, ctx, &msg.peer_conn_type, MAX_PEER_CONN_TYPE_LENGTH); bpf_usdt_readarg_p(4, ctx, &msg.msg_type, MAX_MSG_TYPE_LENGTH); bpf_usdt_readarg(5, ctx, &msg.msg_size); outbound_messages.perf_submit(ctx, &msg, sizeof(msg)); return 0; }; """ class Message: """ A P2P network message. """ msg_type = "" size = 0 data = bytes() inbound = False def __init__(self, msg_type, size, inbound): self.msg_type = msg_type self.size = size self.inbound = inbound class Peer: """ A P2P network peer. """ id = 0 address = "" connection_type = "" last_messages = list() total_inbound_msgs = 0 total_inbound_bytes = 0 total_outbound_msgs = 0 total_outbound_bytes = 0 def __init__(self, id, address, connection_type): self.id = id self.address = address self.connection_type = connection_type self.last_messages = list() def add_message(self, message): self.last_messages.append(message) if len(self.last_messages) > 25: self.last_messages.pop(0) if message.inbound: self.total_inbound_bytes += message.size self.total_inbound_msgs += 1 else: self.total_outbound_bytes += message.size self.total_outbound_msgs += 1 def main(bitcoind_path): peers = dict() bitcoind_with_usdts = USDT(path=str(bitcoind_path)) # attaching the trace functions defined in the BPF program to the tracepoints bitcoind_with_usdts.enable_probe( probe="inbound_message", fn_name="trace_inbound_message") bitcoind_with_usdts.enable_probe( probe="outbound_message", fn_name="trace_outbound_message") bpf = BPF(text=program, usdt_contexts=[bitcoind_with_usdts]) # BCC: perf buffer handle function for inbound_messages def handle_inbound(_, data, size): """ Inbound message handler. Called each time a message is submitted to the inbound_messages BPF table.""" event = bpf["inbound_messages"].event(data) if event.peer_id not in peers: peer = Peer(event.peer_id, event.peer_addr.decode( "utf-8"), event.peer_conn_type.decode("utf-8")) peers[peer.id] = peer peers[event.peer_id].add_message( Message(event.msg_type.decode("utf-8"), event.msg_size, True)) # BCC: perf buffer handle function for outbound_messages def handle_outbound(_, data, size): """ Outbound message handler. Called each time a message is submitted to the outbound_messages BPF table.""" event = bpf["outbound_messages"].event(data) if event.peer_id not in peers: peer = Peer(event.peer_id, event.peer_addr.decode( "utf-8"), event.peer_conn_type.decode("utf-8")) peers[peer.id] = peer peers[event.peer_id].add_message( Message(event.msg_type.decode("utf-8"), event.msg_size, False)) # BCC: add handlers to the inbound and outbound perf buffers bpf["inbound_messages"].open_perf_buffer(handle_inbound) bpf["outbound_messages"].open_perf_buffer(handle_outbound) wrapper(loop, bpf, peers) def loop(screen, bpf, peers): screen.nodelay(1) cur_list_pos = 0 win = curses.newwin(30, 70, 2, 7) win.erase() win.border(ord("|"), ord("|"), ord("-"), ord("-"), ord("-"), ord("-"), ord("-"), ord("-")) info_panel = panel.new_panel(win) info_panel.hide() ROWS_AVALIABLE_FOR_LIST = curses.LINES - 5 scroll = 0 while True: try: # BCC: poll the perf buffers for new events or timeout after 50ms bpf.perf_buffer_poll(timeout=50) ch = screen.getch() if (ch == curses.KEY_DOWN or ch == ord("j")) and cur_list_pos < len( peers.keys()) -1 and info_panel.hidden(): cur_list_pos += 1 if cur_list_pos >= ROWS_AVALIABLE_FOR_LIST: scroll += 1 if (ch == curses.KEY_UP or ch == ord("k")) and cur_list_pos > 0 and info_panel.hidden(): cur_list_pos -= 1 if scroll > 0: scroll -= 1 if ch == ord('\n') or ch == ord(' '): if info_panel.hidden(): info_panel.show() else: info_panel.hide() screen.erase() render(screen, peers, cur_list_pos, scroll, ROWS_AVALIABLE_FOR_LIST, info_panel) curses.panel.update_panels() screen.refresh() except KeyboardInterrupt: exit() def render(screen, peers, cur_list_pos, scroll, ROWS_AVALIABLE_FOR_LIST, info_panel): """ renders the list of peers and details panel This code is unrelated to USDT, BCC and BPF. """ header_format = "%6s %-20s %-20s %-22s %-67s" row_format = "%6s %-5d %9d byte %-5d %9d byte %-22s %-67s" screen.addstr(0, 1, (" P2P Message Monitor "), curses.A_REVERSE) screen.addstr( 1, 0, (" Navigate with UP/DOWN or J/K and select a peer with ENTER or SPACE to see individual P2P messages"), curses.A_NORMAL) screen.addstr(3, 0, header_format % ("PEER", "OUTBOUND", "INBOUND", "TYPE", "ADDR"), curses.A_BOLD | curses.A_UNDERLINE) peer_list = sorted(peers.keys())[scroll:ROWS_AVALIABLE_FOR_LIST+scroll] for i, peer_id in enumerate(peer_list): peer = peers[peer_id] screen.addstr(i + 4, 0, row_format % (peer.id, peer.total_outbound_msgs, peer.total_outbound_bytes, peer.total_inbound_msgs, peer.total_inbound_bytes, peer.connection_type, peer.address), curses.A_REVERSE if i + scroll == cur_list_pos else curses.A_NORMAL) if i + scroll == cur_list_pos: info_window = info_panel.window() info_window.erase() info_window.border( ord("|"), ord("|"), ord("-"), ord("-"), ord("-"), ord("-"), ord("-"), ord("-")) info_window.addstr( 1, 1, f"PEER {peer.id} ({peer.address})".center(68), curses.A_REVERSE | curses.A_BOLD) info_window.addstr( 2, 1, f" OUR NODE{peer.connection_type:^54}PEER ", curses.A_BOLD) for i, msg in enumerate(peer.last_messages): if msg.inbound: info_window.addstr( i + 3, 1, "%68s" % (f"<--- {msg.msg_type} ({msg.size} bytes) "), curses.A_NORMAL) else: info_window.addstr( i + 3, 1, " %s (%d byte) --->" % (msg.msg_type, msg.size), curses.A_NORMAL) if __name__ == "__main__": if len(sys.argv) < 2: print("USAGE:", sys.argv[0], "path/to/dashd") exit() path = sys.argv[1] main(path)