Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
268 changes: 222 additions & 46 deletions tests/infra/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -1002,31 +1002,154 @@ def ignore_errors_on_shutdown(self):
def ignore_error_pattern_on_shutdown(self, pattern):
self.ignore_error_patterns.append(pattern)

def check_ledger_files_identical(self, read_recovery_ledger_files=False):
def snapshot_files_invariants(self, nodes=None):
# Note: Should be called on stopped service
# Verify that all ledger files on stopped nodes exist on most up-to-date node
# and are identical
# 1. Every snapshot corresponds to a chunk boundary

def list_files_in_dirs_with_checksums(dirs):
if nodes is None:
nodes = self.nodes

def list_snapshot_files(snapshot_paths):
return sorted(
[
(f, infra.path.compute_file_checksum(os.path.join(d, f)))
for d in dirs
os.path.join(d, f)
for d in snapshot_paths
for f in os.listdir(d)
if f.endswith(ccf.ledger.COMMITTED_FILE_SUFFIX)
or (
read_recovery_ledger_files
and f.endswith(ccf.ledger.RECOVERY_FILE_SUFFIX)
and ccf.ledger.COMMITTED_FILE_SUFFIX in f
)
if f.startswith("snapshot_")
and not f.endswith(ccf.ledger.IGNORED_FILE_SUFFIX)
],
key=lambda x: ccf.ledger.range_from_filename(x[0])[0],
key=lambda f: ccf.ledger.snapshot_index_from_filename(f)[0],
)

longest_ledger_files = None
longest_ledger_node = None
longest_ledger_seqno = 0
for node in self.nodes:
def snapshot_paths(node):
paths = [
os.path.join(node.remote.remote.root, node.remote.snapshots_dir_name)
]
if node.remote.read_only_snapshots_dir_name is not None:
paths.append(
os.path.join(
node.remote.remote.root,
node.remote.read_only_snapshots_dir_name,
)
)
return [path for path in paths if os.path.isdir(path)]

def ledger_chunk_boundaries(ledger_paths):
boundaries = set()
for ledger_path in ledger_paths:
for f in os.listdir(ledger_path):
if f.endswith(ccf.ledger.IGNORED_FILE_SUFFIX):
continue
if not f.startswith("ledger_"):
continue

range_start, range_end = ccf.ledger.range_from_filename(f)
boundaries.add(range_start - 1)
if range_end is not None:
boundaries.add(range_end)
return boundaries

for node in nodes:
if node.network_state != infra.node.NodeNetworkState.stopped:
raise RuntimeError(
f"Node {node.node_id} should be stopped before verifying snapshot consistency"
)

if node.remote is None:
continue

boundaries = ledger_chunk_boundaries(node.remote.ledger_paths())
for snapshot_file in list_snapshot_files(snapshot_paths(node)):
snapshot_seqno, _ = ccf.ledger.snapshot_index_from_filename(
snapshot_file
)
assert snapshot_seqno in boundaries, (
f"Snapshot {snapshot_file} on node {node.local_node_id} does "
"not correspond to a ledger chunk boundary"
)

def ledger_files_invariant(self, nodes=None, allow_recovery=False):
# Note: Should be called on stopped service
# 1. A node's ledger history will be contiguous from its startup seqno onwards
# 2. If two committed chunks start at the same point in the ledger they are identical
# 3. Across the network, there is a single contiguous history of committed chunks

if nodes is None:
nodes = self.nodes

def get_startup_seqno(node):
out_path, _ = node.get_logs()
if out_path is None or not os.path.isfile(out_path):
return 0

startup_seqno = 0
local_snapshot_path = None
resumed_snapshot_re = re.compile(
r"Joiner successfully resumed from snapshot at seqno (\d+) and view \d+"
)
local_snapshot_re = re.compile(
r"Found latest local snapshot file: (.*snapshot_(\d+)_\d+\.committed) "
r"\(size: \d+\)"
)
local_snapshot_error_re = re.compile(r"Error while verifying (.*):")

with open(out_path, "r", encoding="utf-8", errors="replace") as lines:
for line in lines:
resumed_snapshot = resumed_snapshot_re.search(line)
if resumed_snapshot is not None:
startup_seqno = int(resumed_snapshot.group(1))
local_snapshot_path = None
continue

local_snapshot = local_snapshot_re.search(line)
if local_snapshot is not None:
local_snapshot_path = local_snapshot.group(1)
startup_seqno = int(local_snapshot.group(2))
continue

local_snapshot_error = local_snapshot_error_re.search(line)
if (
local_snapshot_error is not None
and local_snapshot_error.group(1) == local_snapshot_path
):
local_snapshot_path = None
startup_seqno = 0

return startup_seqno

# List ledger chunks, plus checksum, in order of starting seqno
def node_ledger_files(node, allow_uncommitted):
def pred(f, allow_uncommitted, allow_recovery):
if f.endswith(ccf.ledger.IGNORED_FILE_SUFFIX):
return False
is_committed = f.endswith(ccf.ledger.COMMITTED_FILE_SUFFIX) or (
f.endswith(ccf.ledger.RECOVERY_FILE_SUFFIX)
and ccf.ledger.COMMITTED_FILE_SUFFIX in f
)
valid_committed = is_committed or (
not is_committed and allow_uncommitted
)
is_recovery = f.endswith(ccf.ledger.RECOVERY_FILE_SUFFIX)
valid_recovery = (not is_recovery) or (is_recovery and allow_recovery)
return valid_committed and valid_recovery

if node.remote is None:
return []

return [
os.path.join(d, f)
for d, allow_uncommitted, allow_recovery in [
# We potentially want all ledger files in the current dir
(node.remote.current_ledger_path(), allow_uncommitted, allow_recovery),
# We only want committed files in the read-only
*[(d, False, False) for d in node.remote.read_only_ledger_paths()],
]
for f in os.listdir(d)
if pred(f, allow_uncommitted, allow_recovery)
]

# 1. A node's ledger history is contiguous from its startup seqno onwards
for node in nodes:
if node.network_state != infra.node.NodeNetworkState.stopped:
raise RuntimeError(
f"Node {node.node_id} should be stopped before verifying ledger consistency"
Expand All @@ -1036,35 +1159,85 @@ def list_files_in_dirs_with_checksums(dirs):
continue

ledger_paths = node.remote.ledger_paths()
if not ledger_paths:
continue

# Check that at least the main ledger directory, created by
# the node on startup, exists
if not os.path.isdir(ledger_paths[0]):
return
startup = get_startup_seqno(node)
files = sorted(
node_ledger_files(node, True),
key=lambda x: ccf.ledger.range_from_filename(x)[0],
)

ledger_files = list_files_in_dirs_with_checksums(ledger_paths)
if not ledger_files:
continue
# Trace contiguous chunks after the startup snapshot. Chunks wholly
# before the snapshot may have been copied from another node, and
# are covered by the network-wide committed-history check below.
prev_range = (0, startup)
for curr in files:
curr_range = ccf.ledger.range_from_filename(curr)

last_ledger_seqno = ccf.ledger.range_from_filename(ledger_files[-1][0])[1]
ledger_files = set(ledger_files)
# Snapshots force chunk boundaries => expect a chunk starting after the startup snapshot
if curr_range[0] <= startup:
continue

if last_ledger_seqno > longest_ledger_seqno:
assert longest_ledger_files is None or longest_ledger_files.issubset(
ledger_files
), f"Ledger files on node {longest_ledger_node.local_node_id} do not match files on node {node.local_node_id}: {longest_ledger_files}, expected subset of {ledger_files}, diff: (Only on {node.local_node_id}: {ledger_files - longest_ledger_files}, Only on {longest_ledger_node.local_node_id}: {longest_ledger_files - ledger_files})"
longest_ledger_files = ledger_files
longest_ledger_node = node
longest_ledger_seqno = last_ledger_seqno
else:
assert ledger_files.issubset(
longest_ledger_files
), f"Ledger files on node {node.local_node_id} do not match files on node {longest_ledger_node.local_node_id}: {ledger_files}, expected subset of {longest_ledger_files}, diff: (Only on {longest_ledger_node.local_node_id}: {longest_ledger_files - ledger_files}, Only on {node.local_node_id}: {ledger_files - longest_ledger_files})"
# ignore duplicated files on disk
if curr_range == prev_range:
continue

if longest_ledger_files:
LOG.info(
f"Verified {len(longest_ledger_files)} ledger files consistency on all {len(self.nodes)} stopped nodes"
)
assert (
prev_range[1] + 1 == curr_range[0]
), f"Ledger is non-contiguous after startup: missing entries between {prev_range} and {curr}"

if curr_range[1] is None:
remaining_files = [
f
for f in files
if curr_range[0] < ccf.ledger.range_from_filename(f)[0]
]
assert (
len(remaining_files) == 0
), f"{remaining_files} have start indices after first incomplete file post startup"
break
prev_range = curr_range

all_committed_chunks = sorted(
[
(f, infra.path.compute_file_checksum(f))
for n in nodes
if n.remote is not None
for f in node_ledger_files(n, False)
],
key=lambda x: ccf.ledger.range_from_filename(x[0])[0],
)
if len(all_committed_chunks) > 0:

# 2. If two committed chunks start at the same point in the ledger they are identical
prev = all_committed_chunks[0]
for curr in all_committed_chunks[1:]:
range_prev = ccf.ledger.range_from_filename(prev[0])
range_curr = ccf.ledger.range_from_filename(curr[0])
if range_prev[0] == range_curr[0]:
assert (
range_prev == range_curr
), f"Mismatched ranges in committed ledger chunk with same start index at {prev} and {curr}"
assert (
prev[1] == curr[1]
), f"Mismatched contents in committed ledger chunk with same start index at {prev} and {curr}"
prev = curr

# 3. Across the network there is a single contiguous history of committed chunks
prev = all_committed_chunks[0][0]
for curr, _ in all_committed_chunks[1:]:
prev_range = ccf.ledger.range_from_filename(prev)
curr_range = ccf.ledger.range_from_filename(curr)
if curr_range == prev_range:
continue
assert (
prev_range[1] is not None
), f"Committed ledger chunk {prev} is incomplete"
assert (
prev_range[1] + 1 == curr_range[0]
), f"Ledger is non-contiguous: {curr} does not follow {prev}"
prev = curr

def check_ledger_files_chunk_flags(self):
for node in self.nodes:
Expand Down Expand Up @@ -1121,7 +1294,7 @@ def stop_all_nodes(
self,
skip_verification=False,
verbose_verification=False,
accept_ledger_diff=False,
check_file_invariants=True,
skip_verify_chunking=None,
**kwargs,
):
Expand Down Expand Up @@ -1166,8 +1339,9 @@ def stop_all_nodes(
fatal_error_found = True

LOG.info("All nodes stopped")
if not accept_ledger_diff:
self.check_ledger_files_identical(**kwargs)
if check_file_invariants:
self.ledger_files_invariant(**kwargs)
self.snapshot_files_invariants()

if not skip_verify_chunking:
LOG.info("Verifying ledger chunk flags before shutdown")
Expand Down Expand Up @@ -2169,7 +2343,9 @@ def close_on_error(net, pdb=False):

LOG.info("Stopping network")
net.stop_all_nodes(
skip_verification=True, accept_ledger_diff=True, skip_verify_chunking=True
skip_verification=True,
check_file_invariants=False,
skip_verify_chunking=True,
)

raise
Expand Down Expand Up @@ -2225,7 +2401,7 @@ def network(
LOG.info("Stopping network")
net.stop_all_nodes(
skip_verification=True,
accept_ledger_diff=True,
check_file_invariants=True,
)
if init_partitioner:
net.partitioner.cleanup()
13 changes: 11 additions & 2 deletions tests/infra/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -754,12 +754,21 @@ def get_committed_snapshots(self, pre_condition_func=lambda src_dir, _: True):
def log_path(self):
return self.remote.out

def ledger_paths(self):
paths = [os.path.join(self.remote.root, self.ledger_dir_name)]
def read_only_ledger_paths(self):
paths = []
for read_only_ledger_dir_name in self.read_only_ledger_dirs_names:
paths += [os.path.join(self.remote.root, read_only_ledger_dir_name)]
return [path for path in paths if os.path.exists(path)]

def current_ledger_path(self):
return os.path.join(self.remote.root, self.ledger_dir_name)

def ledger_paths(self):
return [
path for path in [self.current_ledger_path(), *self.read_only_ledger_paths()]
if os.path.exists(path)
]

def get_logs(self):
return self.remote.get_logs()

Expand Down
32 changes: 32 additions & 0 deletions tests/infra/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
from infra.node import strip_version
from packaging.version import Version # type: ignore
import os
import ccf
import ccf.split_ledger

from loguru import logger as LOG


def get_measurement(enclave_platform, package, library_dir="."):
Expand All @@ -32,3 +36,31 @@ def get_host_data_and_security_policy(
return hash.hexdigest(), None
else:
raise ValueError(f"Cannot get security policy on {enclave_platform}")


def write_ledger_chunk(outdir, entries, end_seqno, complete):
os.makedirs(outdir, exist_ok=True)
selected_entries = [(s, raw) for s, raw in entries if s <= end_seqno]
assert selected_entries, f"No entries selected up to {end_seqno}"

ledger_file = ccf.split_ledger.create_new_ledger_file(outdir)
entry_positions = []
for _, raw_tx in selected_entries:
entry_positions.append(ledger_file.tell())
ledger_file.write(raw_tx)

start_seqno = selected_entries[0][0]
final_seqno = selected_entries[-1][0]
final_file_name = ccf.split_ledger.make_final_ledger_file_name(
start_seqno,
final_seqno,
is_complete=complete,
is_committed=False,
)
ccf.split_ledger.close_ledger_file(
ledger_file, entry_positions, final_file_name, complete_file=complete
)
LOG.info(
f"Created recovery ledger variant {outdir}: {final_file_name} "
f"complete={complete}"
)
Loading
Loading