Skip to content

Commit 6c06132

Browse files
Merge pull request dashpay#5766 from vijaydasmp/bp22_25
backport: Merge bitcoin#21029, 21060, 19509, 21107, 25117
2 parents 6991337 + 639705e commit 6c06132

File tree

12 files changed

+369
-20
lines changed

12 files changed

+369
-20
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Per-Peer Message Capture
2+
3+
## Purpose
4+
5+
This feature allows for message capture on a per-peer basis. It answers the simple question: "Can I see what messages my node is sending and receiving?"
6+
7+
## Usage and Functionality
8+
9+
* Run `dashd` with the `-capturemessages` option.
10+
* Look in the `message_capture` folder in your datadir.
11+
* Typically this will be `~/.dashcore/message_capture`.
12+
* See that there are many folders inside, one for each peer names with its IP address and port.
13+
* Inside each peer's folder there are two `.dat` files: one is for received messages (`msgs_recv.dat`) and the other is for sent messages (`msgs_sent.dat`).
14+
* Run `contrib/message-capture/message-capture-parser.py` with the proper arguments.
15+
* See the `-h` option for help.
16+
* To see all messages, both sent and received, for all peers use:
17+
```
18+
./contrib/message-capture/message-capture-parser.py -o out.json \
19+
~/.dashcore/message_capture/**/*.dat
20+
```
21+
* Note: The messages in the given `.dat` files will be interleaved in chronological order. So, giving both received and sent `.dat` files (as above with `*.dat`) will result in all messages being interleaved in chronological order.
22+
* If an output file is not provided (i.e. the `-o` option is not used), then the output prints to `stdout`.
23+
* View the resulting output.
24+
* The output file is `JSON` formatted.
25+
* Suggestion: use `jq` to view the output, with `jq . out.json`
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
#!/usr/bin/env python3
2+
# Copyright (c) 2020 The Bitcoin Core developers
3+
# Distributed under the MIT software license, see the accompanying
4+
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
5+
"""Parse message capture binary files. To be used in conjunction with -capturemessages."""
6+
7+
import argparse
8+
import os
9+
import shutil
10+
import sys
11+
from io import BytesIO
12+
import json
13+
from pathlib import Path
14+
from typing import Any, List, Optional
15+
16+
sys.path.append(os.path.join(os.path.dirname(__file__), '../../test/functional'))
17+
18+
from test_framework.messages import ser_uint256 # noqa: E402
19+
from test_framework.p2p import MESSAGEMAP # noqa: E402
20+
21+
TIME_SIZE = 8
22+
LENGTH_SIZE = 4
23+
MSGTYPE_SIZE = 12
24+
25+
# The test framework classes stores hashes as large ints in many cases.
26+
# These are variables of type uint256 in core.
27+
# There isn't a way to distinguish between a large int and a large int that is actually a blob of bytes.
28+
# As such, they are itemized here.
29+
# Any variables with these names that are of type int are actually uint256 variables.
30+
# (These can be easily found by looking for calls to deser_uint256, deser_uint256_vector, and uint256_from_str in messages.py)
31+
HASH_INTS = [
32+
"blockhash",
33+
"block_hash",
34+
"hash",
35+
"hashMerkleRoot",
36+
"hashPrevBlock",
37+
"hashstop",
38+
"prev_header",
39+
"sha256",
40+
"stop_hash",
41+
]
42+
43+
HASH_INT_VECTORS = [
44+
"hashes",
45+
"headers",
46+
"vHave",
47+
"vHash",
48+
]
49+
50+
51+
class ProgressBar:
52+
def __init__(self, total: float):
53+
self.total = total
54+
self.running = 0
55+
56+
def set_progress(self, progress: float):
57+
cols = shutil.get_terminal_size()[0]
58+
if cols <= 12:
59+
return
60+
max_blocks = cols - 9
61+
num_blocks = int(max_blocks * progress)
62+
print('\r[ {}{} ] {:3.0f}%'
63+
.format('#' * num_blocks,
64+
' ' * (max_blocks - num_blocks),
65+
progress * 100),
66+
end ='')
67+
68+
def update(self, more: float):
69+
self.running += more
70+
self.set_progress(self.running / self.total)
71+
72+
73+
def to_jsonable(obj: Any) -> Any:
74+
if hasattr(obj, "__dict__"):
75+
return obj.__dict__
76+
elif hasattr(obj, "__slots__"):
77+
ret = {} # type: Any
78+
for slot in obj.__slots__:
79+
val = getattr(obj, slot, None)
80+
if slot in HASH_INTS and isinstance(val, int):
81+
ret[slot] = ser_uint256(val).hex()
82+
elif slot in HASH_INT_VECTORS and isinstance(val[0], int):
83+
ret[slot] = [ser_uint256(a).hex() for a in val]
84+
else:
85+
ret[slot] = to_jsonable(val)
86+
return ret
87+
elif isinstance(obj, list):
88+
return [to_jsonable(a) for a in obj]
89+
elif isinstance(obj, bytes):
90+
return obj.hex()
91+
else:
92+
return obj
93+
94+
95+
def process_file(path: str, messages: List[Any], recv: bool, progress_bar: Optional[ProgressBar]) -> None:
96+
with open(path, 'rb') as f_in:
97+
if progress_bar:
98+
bytes_read = 0
99+
100+
while True:
101+
if progress_bar:
102+
# Update progress bar
103+
diff = f_in.tell() - bytes_read - 1
104+
progress_bar.update(diff)
105+
bytes_read = f_in.tell() - 1
106+
107+
# Read the Header
108+
tmp_header_raw = f_in.read(TIME_SIZE + LENGTH_SIZE + MSGTYPE_SIZE)
109+
if not tmp_header_raw:
110+
break
111+
tmp_header = BytesIO(tmp_header_raw)
112+
time = int.from_bytes(tmp_header.read(TIME_SIZE), "little") # type: int
113+
msgtype = tmp_header.read(MSGTYPE_SIZE).split(b'\x00', 1)[0] # type: bytes
114+
length = int.from_bytes(tmp_header.read(LENGTH_SIZE), "little") # type: int
115+
116+
# Start converting the message to a dictionary
117+
msg_dict = {}
118+
msg_dict["direction"] = "recv" if recv else "sent"
119+
msg_dict["time"] = time
120+
msg_dict["size"] = length # "size" is less readable here, but more readable in the output
121+
122+
msg_ser = BytesIO(f_in.read(length))
123+
124+
# Determine message type
125+
if msgtype not in MESSAGEMAP:
126+
# Unrecognized message type
127+
try:
128+
msgtype_tmp = msgtype.decode()
129+
if not msgtype_tmp.isprintable():
130+
raise UnicodeDecodeError
131+
msg_dict["msgtype"] = msgtype_tmp
132+
except UnicodeDecodeError:
133+
msg_dict["msgtype"] = "UNREADABLE"
134+
msg_dict["body"] = msg_ser.read().hex()
135+
msg_dict["error"] = "Unrecognized message type."
136+
messages.append(msg_dict)
137+
print(f"WARNING - Unrecognized message type {msgtype} in {path}", file=sys.stderr)
138+
continue
139+
140+
# Deserialize the message
141+
msg = MESSAGEMAP[msgtype]()
142+
msg_dict["msgtype"] = msgtype.decode()
143+
144+
try:
145+
msg.deserialize(msg_ser)
146+
except KeyboardInterrupt:
147+
raise
148+
except Exception:
149+
# Unable to deserialize message body
150+
msg_ser.seek(0, os.SEEK_SET)
151+
msg_dict["body"] = msg_ser.read().hex()
152+
msg_dict["error"] = "Unable to deserialize message."
153+
messages.append(msg_dict)
154+
print(f"WARNING - Unable to deserialize message in {path}", file=sys.stderr)
155+
continue
156+
157+
# Convert body of message into a jsonable object
158+
if length:
159+
msg_dict["body"] = to_jsonable(msg)
160+
messages.append(msg_dict)
161+
162+
if progress_bar:
163+
# Update the progress bar to the end of the current file
164+
# in case we exited the loop early
165+
f_in.seek(0, os.SEEK_END) # Go to end of file
166+
diff = f_in.tell() - bytes_read - 1
167+
progress_bar.update(diff)
168+
169+
170+
def main():
171+
parser = argparse.ArgumentParser(
172+
description=__doc__,
173+
epilog="EXAMPLE \n\t{0} -o out.json <data-dir>/message_capture/**/*.dat".format(sys.argv[0]),
174+
formatter_class=argparse.RawTextHelpFormatter)
175+
parser.add_argument(
176+
"capturepaths",
177+
nargs='+',
178+
help="binary message capture files to parse.")
179+
parser.add_argument(
180+
"-o", "--output",
181+
help="output file. If unset print to stdout")
182+
parser.add_argument(
183+
"-n", "--no-progress-bar",
184+
action='store_true',
185+
help="disable the progress bar. Automatically set if the output is not a terminal")
186+
args = parser.parse_args()
187+
capturepaths = [Path.cwd() / Path(capturepath) for capturepath in args.capturepaths]
188+
output = Path.cwd() / Path(args.output) if args.output else False
189+
use_progress_bar = (not args.no_progress_bar) and sys.stdout.isatty()
190+
191+
messages = [] # type: List[Any]
192+
if use_progress_bar:
193+
total_size = sum(capture.stat().st_size for capture in capturepaths)
194+
progress_bar = ProgressBar(total_size)
195+
else:
196+
progress_bar = None
197+
198+
for capture in capturepaths:
199+
process_file(str(capture), messages, "recv" in capture.stem, progress_bar)
200+
201+
messages.sort(key=lambda msg: msg['time'])
202+
203+
if use_progress_bar:
204+
progress_bar.set_progress(1)
205+
206+
jsonrep = json.dumps(messages)
207+
if output:
208+
with open(str(output), 'w+', encoding="utf8") as f_out:
209+
f_out.write(jsonrep)
210+
else:
211+
print(jsonrep)
212+
213+
if __name__ == "__main__":
214+
main()

src/bitcoin-cli.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ static void SetupCliArgs(ArgsManager& argsman)
6060
argsman.AddArg("-version", "Print version and exit", ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);
6161
argsman.AddArg("-conf=<file>", strprintf("Specify configuration file. Relative paths will be prefixed by datadir location. (default: %s)", BITCOIN_CONF_FILENAME), ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);
6262
argsman.AddArg("-datadir=<dir>", "Specify data directory", ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);
63-
argsman.AddArg("-generate", strprintf("Generate blocks immediately, equivalent to RPC generatenewaddress followed by RPC generatetoaddress. Optional positional integer arguments are number of blocks to generate (default: %s) and maximum iterations to try (default: %s), equivalent to RPC generatetoaddress nblocks and maxtries arguments. Example: dash-cli -generate 4 1000", DEFAULT_NBLOCKS, DEFAULT_MAX_TRIES), ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);
63+
argsman.AddArg("-generate", strprintf("Generate blocks immediately, equivalent to RPC getnewaddress followed by RPC generatetoaddress. Optional positional integer arguments are number of blocks to generate (default: %s) and maximum iterations to try (default: %s), equivalent to RPC generatetoaddress nblocks and maxtries arguments. Example: dash-cli -generate 4 1000", DEFAULT_NBLOCKS, DEFAULT_MAX_TRIES), ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);
6464
argsman.AddArg("-getinfo", "Get general information from the remote server. Note that unlike server-side RPC calls, the results of -getinfo is the result of multiple non-atomic requests. Some entries in the result may represent results from different states (e.g. wallet balance may be as of a different block from the chain state reported)", ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);
6565
argsman.AddArg("-netinfo", "Get network peer connection information from the remote server. An optional integer argument from 0 to 4 can be passed for different peers listings (default: 0). Pass \"help\" for detailed help documentation.", ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);
6666
argsman.AddArg("-named", strprintf("Pass named instead of positional arguments (default: %s)", DEFAULT_NAMED), ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);

src/init.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -715,10 +715,10 @@ void SetupServerArgs(NodeContext& node)
715715
argsman.AddArg("-stopatheight", strprintf("Stop running after reaching the given height in the main chain (default: %u)", DEFAULT_STOPATHEIGHT), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
716716
argsman.AddArg("-watchquorums=<n>", strprintf("Watch and validate quorum communication (default: %u)", llmq::DEFAULT_WATCH_QUORUMS), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
717717
argsman.AddArg("-addrmantest", "Allows to test address relay on localhost", ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
718-
718+
argsman.AddArg("-capturemessages", "Capture all P2P messages to disk", ArgsManager::ALLOW_BOOL | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
719719
argsman.AddArg("-debug=<category>", "Output debugging information (default: -nodebug, supplying <category> is optional). "
720-
"If <category> is not supplied or if <category> = 1, output all debugging information. <category> can be: " + LogInstance().LogCategoriesString() + ".", ArgsManager::ALLOW_ANY, OptionsCategory::DEBUG_TEST);
721-
argsman.AddArg("-debugexclude=<category>", strprintf("Exclude debugging information for a category. Can be used in conjunction with -debug=1 to output debug logs for all categories except one or more specified categories."), ArgsManager::ALLOW_ANY, OptionsCategory::DEBUG_TEST);
720+
"If <category> is not supplied or if <category> = 1, output all debugging information. <category> can be: " + LogInstance().LogCategoriesString() + ". This option can be specified multiple times to output multiple categories.", ArgsManager::ALLOW_ANY, OptionsCategory::DEBUG_TEST);
721+
argsman.AddArg("-debugexclude=<category>", strprintf("Exclude debugging information for a category. Can be used in conjunction with -debug=1 to output debug logs for all categories except the specified category. This option can be specified multiple times to exclude multiple categories."), ArgsManager::ALLOW_ANY, OptionsCategory::DEBUG_TEST);
722722
argsman.AddArg("-disablegovernance", strprintf("Disable governance validation (0-1, default: %u)", 0), ArgsManager::ALLOW_ANY, OptionsCategory::DEBUG_TEST);
723723
argsman.AddArg("-logips", strprintf("Include IP addresses in debug output (default: %u)", DEFAULT_LOGIPS), ArgsManager::ALLOW_ANY, OptionsCategory::DEBUG_TEST);
724724
argsman.AddArg("-logtimemicros", strprintf("Add microsecond precision to debug timestamps (default: %u)", DEFAULT_LOGTIMEMICROS), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
@@ -1263,16 +1263,17 @@ bool AppInitParameterInteraction(const ArgsManager& args)
12631263

12641264
// Trim requested connection counts, to fit into system limitations
12651265
// <int> in std::min<int>(...) to work around FreeBSD compilation issue described in #2695
1266-
nFD = RaiseFileDescriptorLimit(nMaxConnections + MIN_CORE_FILEDESCRIPTORS + MAX_ADDNODE_CONNECTIONS + nBind);
1266+
nFD = RaiseFileDescriptorLimit(nMaxConnections + MIN_CORE_FILEDESCRIPTORS + MAX_ADDNODE_CONNECTIONS + nBind + NUM_FDS_MESSAGE_CAPTURE);
1267+
12671268
#ifdef USE_POLL
12681269
int fd_max = nFD;
12691270
#else
12701271
int fd_max = FD_SETSIZE;
12711272
#endif
1272-
nMaxConnections = std::max(std::min<int>(nMaxConnections, fd_max - nBind - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS), 0);
1273+
nMaxConnections = std::max(std::min<int>(nMaxConnections, fd_max - nBind - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS - NUM_FDS_MESSAGE_CAPTURE), 0);
12731274
if (nFD < MIN_CORE_FILEDESCRIPTORS)
12741275
return InitError(_("Not enough file descriptors available."));
1275-
nMaxConnections = std::min(nFD - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS, nMaxConnections);
1276+
nMaxConnections = std::min(nFD - MIN_CORE_FILEDESCRIPTORS - MAX_ADDNODE_CONNECTIONS - NUM_FDS_MESSAGE_CAPTURE, nMaxConnections);
12761277

12771278
if (nMaxConnections < nUserMaxConnections)
12781279
InitWarning(strprintf(_("Reducing -maxconnections from %d to %d, because of system limitations."), nUserMaxConnections, nMaxConnections));

src/net.cpp

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4020,6 +4020,9 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
40204020
{
40214021
size_t nMessageSize = msg.data.size();
40224022
LogPrint(BCLog::NET, "sending %s (%d bytes) peer=%d\n", SanitizeString(msg.command), nMessageSize, pnode->GetId());
4023+
if (gArgs.GetBoolArg("-capturemessages", false)) {
4024+
CaptureMessage(pnode->addr, msg.command, msg.data, /* incoming */ false);
4025+
}
40234026

40244027
// make sure we use the appropriate network transport format
40254028
std::vector<unsigned char> serializedHeader;
@@ -4037,11 +4040,9 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
40374040
pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize;
40384041
pnode->nSendSize += nTotalSize;
40394042

4040-
if (pnode->nSendSize > nSendBufferMaxSize)
4041-
pnode->fPauseSend = true;
4043+
if (pnode->nSendSize > nSendBufferMaxSize) pnode->fPauseSend = true;
40424044
pnode->vSendMsg.push_back(std::move(serializedHeader));
4043-
if (nMessageSize)
4044-
pnode->vSendMsg.push_back(std::move(msg.data));
4045+
if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data));
40454046
pnode->nSendMsgSize = pnode->vSendMsg.size();
40464047

40474048
{
@@ -4229,3 +4230,30 @@ void CConnman::UnregisterEvents(CNode *pnode)
42294230
}
42304231
#endif
42314232
}
4233+
void CaptureMessage(const CAddress& addr, const std::string& msg_type, const Span<const unsigned char>& data, bool is_incoming)
4234+
{
4235+
// Note: This function captures the message at the time of processing,
4236+
// not at socket receive/send time.
4237+
// This ensures that the messages are always in order from an application
4238+
// layer (processing) perspective.
4239+
auto now = GetTime<std::chrono::microseconds>();
4240+
4241+
// Windows folder names can not include a colon
4242+
std::string clean_addr = addr.ToString();
4243+
std::replace(clean_addr.begin(), clean_addr.end(), ':', '_');
4244+
4245+
fs::path base_path = GetDataDir() / "message_capture" / clean_addr;
4246+
fs::create_directories(base_path);
4247+
4248+
fs::path path = base_path / (is_incoming ? "msgs_recv.dat" : "msgs_sent.dat");
4249+
CAutoFile f(fsbridge::fopen(path, "ab"), SER_DISK, CLIENT_VERSION);
4250+
4251+
ser_writedata64(f, now.count());
4252+
f.write(msg_type.data(), msg_type.length());
4253+
for (auto i = msg_type.length(); i < CMessageHeader::COMMAND_SIZE; ++i) {
4254+
f << '\0';
4255+
}
4256+
uint32_t size = data.size();
4257+
ser_writedata32(f, size);
4258+
f.write((const char*)data.data(), data.size());
4259+
}

src/net.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <protocol.h>
2323
#include <random.h>
2424
#include <saltedhasher.h>
25+
#include <span.h>
2526
#include <streams.h>
2627
#include <sync.h>
2728
#include <threadinterrupt.h>
@@ -91,6 +92,8 @@ static constexpr uint64_t DEFAULT_MAX_UPLOAD_TARGET = 0;
9192
static const bool DEFAULT_BLOCKSONLY = false;
9293
/** -peertimeout default */
9394
static const int64_t DEFAULT_PEER_CONNECT_TIMEOUT = 60;
95+
/** Number of file descriptors required for message capture **/
96+
static const int NUM_FDS_MESSAGE_CAPTURE = 1;
9497

9598
static const bool DEFAULT_FORCEDNSSEED = false;
9699
static const size_t DEFAULT_MAXRECEIVEBUFFER = 5 * 1000;
@@ -1577,4 +1580,7 @@ void EraseObjectRequest(NodeId nodeId, const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED
15771580
void RequestObject(NodeId nodeId, const CInv& inv, std::chrono::microseconds current_time, bool fForce=false) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
15781581
size_t GetRequestedObjectCount(NodeId nodeId) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
15791582

1583+
/** Dump binary message to file, with timestamp */
1584+
void CaptureMessage(const CAddress& addr, const std::string& msg_type, const Span<const unsigned char>& data, bool is_incoming);
1585+
15801586
#endif // BITCOIN_NET_H

src/net_processing.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4507,14 +4507,12 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt
45074507
}
45084508

45094509
// Don't bother if send buffer is too full to respond anyway
4510-
if (pfrom->fPauseSend)
4511-
return false;
4510+
if (pfrom->fPauseSend) return false;
45124511

45134512
std::list<CNetMessage> msgs;
45144513
{
45154514
LOCK(pfrom->cs_vProcessMsg);
4516-
if (pfrom->vProcessMsg.empty())
4517-
return false;
4515+
if (pfrom->vProcessMsg.empty()) return false;
45184516
// Just take one message
45194517
msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
45204518
pfrom->nProcessQueueSize -= msgs.front().m_raw_message_size;
@@ -4523,6 +4521,10 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt
45234521
}
45244522
CNetMessage& msg(msgs.front());
45254523

4524+
if (gArgs.GetBoolArg("-capturemessages", false)) {
4525+
CaptureMessage(pfrom->addr, msg.m_command, MakeUCharSpan(msg.m_recv), /* incoming */ true);
4526+
}
4527+
45264528
msg.SetVersion(pfrom->GetCommonVersion());
45274529
const std::string& msg_type = msg.m_command;
45284530

0 commit comments

Comments
 (0)