diff --git a/.github/workflows/netsim.yml b/.github/workflows/netsim.yml index 8843341..35113b9 100644 --- a/.github/workflows/netsim.yml +++ b/.github/workflows/netsim.yml @@ -49,7 +49,7 @@ jobs: - name: Fetch and build iroh run: | - git clone https://github.com/n0-computer/iroh.git + git clone --depth 1 https://github.com/n0-computer/iroh.git cd iroh cargo build --release --all-features -p iroh-relay -p iroh-dns-server cargo build --release --all-features -p iroh --examples @@ -84,7 +84,7 @@ jobs: c='${{ steps.detect_comment_config.outputs.NETSIM_CONFIG }}' if [ -z "${c}" ]; then - sudo python3 main.py sims/iroh + sudo python3 main.py --max-workers=4 sims/iroh else echo $c >> custom_sim.json sudo python3 main.py custom_sim.json diff --git a/.github/workflows/netsim_integration.yml b/.github/workflows/netsim_integration.yml index fab8260..bdf6eeb 100644 --- a/.github/workflows/netsim_integration.yml +++ b/.github/workflows/netsim_integration.yml @@ -43,7 +43,7 @@ jobs: - name: Fetch and build iroh run: | - git clone https://github.com/n0-computer/iroh.git + git clone --depth 1 https://github.com/n0-computer/iroh.git cd iroh cargo build --release --all-features -p iroh-relay -p iroh-dns-server cargo build --release --all-features -p iroh --examples @@ -60,7 +60,7 @@ jobs: cd netsim sudo kill -9 $(pgrep ovs) || true sudo mn --clean - sudo python3 main.py --integration sims/integration + sudo python3 main.py --integration --max-workers=4 sims/integration - name: Setup Environment (PR) if: ${{ github.event_name == 'pull_request' }} diff --git a/netsim/main.py b/netsim/main.py index 3b441c9..cddd9c1 100644 --- a/netsim/main.py +++ b/netsim/main.py @@ -15,7 +15,7 @@ from parsing.netsim import process_logs, process_integration_logs from sniffer.sniff import Sniffer from sniffer.process import run_viz -from util import cleanup_tmp_dirs, eject +from util import cleanup_tmp_dirs, eject, FAILED_TESTS, write_failure_summary TIMEOUT = 60 * 5 @@ -36,33 +36,34 @@ def setup_env_vars(prefix, node_name, temp_dir, node_env, debug=False): def parse_node_params(node, prefix, node_params, runner_id): - """Parse parameters from node logs with validation.""" + """Parse parameters from node logs with validation using fast polling.""" parsed_params = {} - wait_time = node.get("wait", 1) + max_wait = node.get("wait", 1) parser_type = node["param_parser"] - expected_nodes = [] + poll_interval = 0.2 - # Wait for parameters to be available - for _ in range(wait_time): - time.sleep(1) - for i in range(int(node["count"])): - node_name = f'{node["name"]}_{i}_r{runner_id}' - expected_nodes.append(node_name) - log_file = f"logs/{prefix}__{node_name}.txt" + expected_nodes = [ + f'{node["name"]}_{i}_r{runner_id}' + for i in range(int(node["count"])) + ] + + max_iterations = max(1, int(max_wait / poll_interval)) + for iteration in range(max_iterations): + for node_name in expected_nodes: + if node_name in parsed_params: + continue + log_file = f"logs/{prefix}__{node_name}.txt" if not os.path.exists(log_file): - error(f"Warning: Log file not found: {log_file}") continue try: with open(log_file, "r") as f: lines = f.readlines() for idx, line in enumerate(lines): - # Parser 1: Simple ticket (used in lossy/standard sims) if parser_type == "iroh_ticket" and line.startswith("All-in-one ticket"): parsed_params[node_name] = line[len("All-in-one ticket: "):].strip() break - # Parser 2: Endpoint with addresses (used in iroh/integration sims) if parser_type == "iroh_endpoint_with_addrs" and line.startswith("Endpoint id:"): if idx + 1 >= len(lines): break @@ -82,7 +83,10 @@ def parse_node_params(node, prefix, node_params, runner_id): except Exception as e: error(f"Error parsing parameters from {log_file}: {e}") - # Validate that all expected parameters were found + if all(n in parsed_params for n in expected_nodes): + break + time.sleep(poll_interval) + missing_params = [n for n in expected_nodes if n not in parsed_params] if missing_params: error("\n" + "=" * 80 + "\n") @@ -96,19 +100,17 @@ def parse_node_params(node, prefix, node_params, runner_id): return parsed_params -def terminate_processes(p_box): +def terminate_processes(p_box, prefix): """Gracefully terminate processes, then forcefully kill if needed.""" - for p, cmd in p_box: - error(f"Terminating process: {p.pid} {cmd[:100]}\n") + for node_name, p, cmd in p_box: + error(f"Terminating [{prefix}__{node_name}]: {cmd[:80]}\n") p.terminate() - # Wait for processes to terminate gracefully - time.sleep(2) + time.sleep(0.5) - # Force kill any remaining processes - for p, cmd in p_box: + for node_name, p, cmd in p_box: if p.poll() is None: - error(f"Force killing hung process: {p.pid} {cmd[:100]}\n") + error(f"Force killing [{prefix}__{node_name}]: {cmd[:80]}\n") p.kill() @@ -117,11 +119,12 @@ def monitor_short_processes(p_short_box, prefix): process_errors = [] start_time = time.time() - # Monitor processes until all complete or timeout - for _ in range(TIMEOUT): - time.sleep(1) + # Monitor processes until all complete or timeout (poll every 200ms) + max_polls = TIMEOUT * 5 + for _ in range(max_polls): if not any(p.poll() is None for (_, p, _) in p_short_box): break + time.sleep(0.2) elapsed_time = time.time() - start_time @@ -133,7 +136,7 @@ def monitor_short_processes(p_short_box, prefix): error(f"\nProcess timed out after {elapsed_time:.1f}s for node {node_name}\n") error(f"Command was: {cmd}\n") p.terminate() - time.sleep(1) + time.sleep(0.2) if p.poll() is None: error(f"Force killing timed out process for node {node_name}\n") p.kill() @@ -302,7 +305,7 @@ def run_case(nodes, runner_id, prefix, args, debug=False, visualize=False): if "process" in node and node["process"] == "short": p_short_box.append((node_name, p, cmd)) else: - p_box.append((p, cmd)) + p_box.append((node_name, p, cmd)) if "param_parser" in node: node_params.update(parse_node_params(node, prefix, node_params, runner_id)) @@ -314,26 +317,38 @@ def run_case(nodes, runner_id, prefix, args, debug=False, visualize=False): process_errors = monitor_short_processes(p_short_box, prefix) if process_errors: error("\n" + "=" * 80 + "\n") - error("PROCESS ERRORS DETECTED:\n") + error(f"PROCESS ERRORS DETECTED in {prefix}:\n") error("=" * 80 + "\n") for err_msg in process_errors: error(err_msg + "\n") error("=" * 80 + "\n") + failure_entry = {"prefix": prefix, "errors": []} + for err_msg in process_errors: + if err_msg.startswith("TIMEOUT:"): + node = err_msg.split("'")[1] + reason = f"timeout after {TIMEOUT}s" + elif err_msg.startswith("FAILED:"): + node = err_msg.split("'")[1] + code_start = err_msg.find("code ") + 5 + code_end = err_msg.find(".", code_start) + reason = f"exit code {err_msg[code_start:code_end]}" + else: + node = "unknown" + reason = err_msg[:50] + failure_entry["errors"].append({"node": node, "reason": reason}) + FAILED_TESTS.append(failure_entry) if args.integration: eject(nodes, prefix, runner_id, temp_dirs) else: error("WARNING: Continuing despite errors (not in integration mode)\n") - terminate_processes(p_box) + terminate_processes(p_box, prefix) cleanup_tmp_dirs(temp_dirs) return (net, sniffer) -def run(case, runner_id, name, skiplist, args): +def run(case, runner_id, name, args): prefix = name + "__" + case["name"] - if prefix in skiplist: - print("Skipping:", prefix) - return nodes = case["nodes"] viz = False if "visualize" in case: @@ -356,13 +371,25 @@ def run(case, runner_id, name, skiplist, args): def run_parallel(cases, name, skiplist, args, max_workers=4): + # Filter skipped cases before chunking for optimal parallelism + filtered = [] + for case in cases: + prefix = name + "__" + case["name"] + if prefix in skiplist: + print("Skipping:", prefix) + else: + filtered.append(case) + + if not filtered: + return + with concurrent.futures.ThreadPoolExecutor() as executor: - chunks = [cases[i : i + max_workers] for i in range(0, len(cases), max_workers)] + chunks = [filtered[i : i + max_workers] for i in range(0, len(filtered), max_workers)] for chunk in chunks: futures = [] r = [] for i, case in enumerate(chunk): - futures.append(executor.submit(run, case, i, name, skiplist, args)) + futures.append(executor.submit(run, case, i, name, args)) for future in concurrent.futures.as_completed(futures): try: rx = future.result() @@ -435,4 +462,5 @@ def run_parallel(cases, name, skiplist, args, max_workers=4): print(f"Start testing: %s\n" % path) run_parallel(config["cases"], name, skiplist, args, args.max_workers) + write_failure_summary() print("Done") diff --git a/netsim/util.py b/netsim/util.py index 80324e8..6c9b458 100644 --- a/netsim/util.py +++ b/netsim/util.py @@ -1,5 +1,32 @@ import os +FAILED_TESTS = [] + + +def write_failure_summary(output_path="logs/failed_tests.txt"): + if not FAILED_TESTS: + return + with open(output_path, "w") as f: + for failure in FAILED_TESTS: + prefix = failure["prefix"] + f.write(f"FAILED: {prefix}\n") + for node_err in failure["errors"]: + node_name = node_err["node"] + reason = node_err["reason"] + log_path = f"logs/{prefix}__{node_name}.txt" + f.write(f" - {node_name}: {reason}\n") + f.write(f" Log: {log_path}\n") + if os.path.isfile(log_path): + f.write(" --- Last 10 lines ---\n") + try: + with open(log_path, "r") as lf: + lines = lf.readlines() + for line in lines[-10:]: + f.write(f" {line.rstrip()}\n") + except Exception: + f.write(" [failed to read log]\n") + f.write("\n") + def logs_on_error(nodes, prefix, runner_id, code=1, message=None): node_counts = {} @@ -29,7 +56,7 @@ def cleanup_tmp_dirs(temp_dirs): temp_dir.cleanup() -def eject(nodes, prefix, runner_id, temp_dirs): - logs_on_error(nodes, prefix, runner_id) +def eject(_nodes, prefix, _runner_id, temp_dirs): + write_failure_summary() cleanup_tmp_dirs(temp_dirs) raise Exception("Netsim run failed: %s" % prefix)