Skip to content

Commit 66e6c9f

Browse files
committed
refactor upload_analysis_to_jumpbox
1 parent 1dca196 commit 66e6c9f

File tree

2 files changed

+135
-87
lines changed

2 files changed

+135
-87
lines changed

skills/root-cause-analysis/scripts/cli.py

Lines changed: 11 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from scripts.config import Config
1616
from scripts.correlator import build_correlation_timeline, fetch_correlated_logs
1717
from scripts.job_parser import parse_job_log
18+
from scripts.jumpbox_io import upload_to_jumpbox
1819
from scripts.log_fetcher import fetch_job_log
1920
from scripts.setup import print_checks, run_checks
2021
from scripts.step4_fetch_github import GitHubClient, Step4Analyzer
@@ -24,6 +25,7 @@
2425
from .config import Config
2526
from .correlator import build_correlation_timeline, fetch_correlated_logs
2627
from .job_parser import parse_job_log
28+
from .jumpbox_io import upload_to_jumpbox
2729
from .log_fetcher import fetch_job_log
2830
from .setup import print_checks, run_checks
2931
from .step4_fetch_github import GitHubClient, Step4Analyzer
@@ -47,95 +49,17 @@ def save_step(analysis_dir: Path, step: int, data: dict) -> Path:
4749

4850

4951
@trace(name="Upload analysis", span_type=SpanType.CHAIN if SpanType else None)
50-
def upload_analysis_to_jumpbox(args: argparse.Namespace, config: Config, span=None) -> int:
51-
"""Upload analysis directory to Jumpbox in /usr/local/mlflow/{job_id}/ with session.json."""
52-
analysis_dir = config.analysis_dir / args.job_id
53-
54-
if not analysis_dir.exists():
55-
error_message = f"No analysis found for job {args.job_id}"
56-
print(error_message)
57-
if span:
58-
span.set_outputs({"error": error_message})
59-
return 1
60-
61-
print(f"Uploading analysis for job {args.job_id}...")
62-
63-
if not config.jumpbox_uri:
64-
print(" Skipping upload: JUMPBOX_URI not configured")
65-
return 1
66-
67-
# Parse JUMPBOX_URI format: "user@host -p port"
68-
parts = config.jumpbox_uri.split()
69-
if len(parts) < 1:
70-
print(" Error: Invalid JUMPBOX_URI format")
71-
return 1
72-
73-
ssh_target = parts[0] # user@host
74-
ssh_port = None
75-
76-
# Extract port if present
77-
if "-p" in parts:
78-
try:
79-
port_idx = parts.index("-p")
80-
if port_idx + 1 < len(parts):
81-
ssh_port = parts[port_idx + 1]
82-
except (ValueError, IndexError):
83-
pass
84-
85-
session_id = os.environ.get("CLAUDE_SESSION_ID", "unknown")
52+
def cmd_upload(args: argparse.Namespace, config: Config, span=None) -> int:
53+
"""Upload analysis directory to Jumpbox via jumpbox_io.upload_to_jumpbox."""
8654
job_id = args.job_id
87-
remote_base_dir = f"/usr/local/mlflow/{job_id}"
88-
89-
# Create session.json file locally (will be overwritten if exists)
90-
session_file = analysis_dir / "session.json"
91-
try:
92-
with open(session_file, "w") as f:
93-
json.dump({"session_id": session_id, "job_id": job_id}, f, indent=2)
94-
except Exception as e:
95-
print(f" Warning: Could not create session.json: {e}")
96-
97-
# Build SSH command to create remote directory
98-
ssh_cmd = ["ssh"]
99-
if ssh_port:
100-
ssh_cmd.extend(["-p", ssh_port])
101-
ssh_cmd.extend([ssh_target, f"mkdir -p {remote_base_dir}"])
102-
103-
# Create remote directory structure
104-
try:
105-
subprocess.run(
106-
ssh_cmd,
107-
check=True,
108-
capture_output=True,
109-
timeout=30,
110-
)
111-
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
112-
print(f" Error creating remote directory: {e}")
113-
return 1
55+
analysis_dir = config.analysis_dir / job_id
56+
session_id = os.environ.get("CLAUDE_SESSION_ID", "unknown")
11457

115-
# Upload analysis directory contents using rsync
116-
try:
117-
rsync_cmd = ["rsync"]
118-
if ssh_port:
119-
rsync_cmd.extend(["-e", f"ssh -p {ssh_port}"])
120-
# Add trailing slash to source to copy contents, not the directory itself
121-
rsync_cmd.extend(
122-
["-az", "--quiet", f"{str(analysis_dir)}/", f"{ssh_target}:{remote_base_dir}/"]
123-
)
58+
success = upload_to_jumpbox(job_id, analysis_dir, config.jumpbox_uri, session_id)
12459

125-
subprocess.run(
126-
rsync_cmd,
127-
check=True,
128-
timeout=60,
129-
)
130-
print(f" Uploaded to Jumpbox ({ssh_target}): {remote_base_dir}/")
131-
if span:
132-
span.set_outputs({"job_id": job_id, "success": True})
133-
return 0
134-
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
135-
print(f" Error uploading to Jumpbox: {e}")
136-
if span:
137-
span.set_outputs({"job_id": job_id, "success": False, "error": str(e)})
138-
return 1
60+
if span:
61+
span.set_outputs({"job_id": job_id, "success": success})
62+
return 0 if success else 1
13963

14064

14165
def get_step_name(step: int) -> str:
@@ -614,7 +538,7 @@ def main() -> int:
614538
"query": cmd_query,
615539
"setup": cmd_setup,
616540
"status": cmd_status,
617-
"upload": upload_analysis_to_jumpbox,
541+
"upload": cmd_upload,
618542
}
619543

620544
exit_code = commands[args.command](args, config, span)
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
"""
2+
Jumpbox I/O functions for Root-Cause-Analysis.
3+
4+
Provides functions to parse JUMPBOX_URI and upload analysis directories
5+
to the jumpbox via rsync.
6+
"""
7+
8+
import json
9+
import os
10+
import subprocess
11+
from pathlib import Path
12+
13+
14+
def parse_jumpbox_uri(jumpbox_uri: str) -> tuple[str, str | None]:
15+
"""
16+
Parse JUMPBOX_URI format: "user@host -p port" or "user@host".
17+
18+
Returns:
19+
Tuple of (ssh_target, ssh_port) where ssh_port may be None
20+
21+
Raises:
22+
ValueError: If format is invalid
23+
"""
24+
if not jumpbox_uri:
25+
raise ValueError("JUMPBOX_URI is empty")
26+
27+
parts = jumpbox_uri.split()
28+
if len(parts) < 1:
29+
raise ValueError("Invalid JUMPBOX_URI format")
30+
31+
ssh_target = parts[0]
32+
ssh_port = None
33+
34+
if "-p" in parts:
35+
try:
36+
port_idx = parts.index("-p")
37+
if port_idx + 1 < len(parts):
38+
ssh_port = parts[port_idx + 1]
39+
except (ValueError, IndexError):
40+
pass
41+
42+
return ssh_target, ssh_port
43+
44+
45+
def upload_to_jumpbox(
46+
job_id: str,
47+
analysis_dir: Path,
48+
jumpbox_uri: str | None = None,
49+
session_id: str | None = None,
50+
) -> bool:
51+
"""
52+
Upload analysis directory to jumpbox at /usr/local/mlflow/<job_id>/.
53+
54+
Optionally writes a session.json before uploading. Uses rsync for
55+
efficient directory transfer.
56+
57+
Args:
58+
job_id: Job ID for remote path
59+
analysis_dir: Local analysis directory to upload
60+
jumpbox_uri: JUMPBOX_URI connection string (defaults to env var)
61+
session_id: If provided, writes session.json with this ID before upload
62+
63+
Returns:
64+
True on success, False on failure
65+
"""
66+
if "/" in job_id or "." in job_id:
67+
print(" Error: Invalid job_id format. Cannot contain . or /")
68+
return False
69+
70+
if not analysis_dir.exists():
71+
print(f" Error: Analysis directory not found: {analysis_dir}")
72+
return False
73+
74+
if jumpbox_uri is None:
75+
jumpbox_uri = os.environ.get("JUMPBOX_URI", "")
76+
77+
if not jumpbox_uri:
78+
print(" JUMPBOX_URI not set. Skipping upload.")
79+
return False
80+
81+
try:
82+
ssh_target, ssh_port = parse_jumpbox_uri(jumpbox_uri)
83+
except ValueError as e:
84+
print(f" Error: {e}")
85+
return False
86+
87+
remote_dir = f"/usr/local/mlflow/{job_id}"
88+
89+
if session_id:
90+
session_file = analysis_dir / "session.json"
91+
try:
92+
with open(session_file, "w") as f:
93+
json.dump({"session_id": session_id, "job_id": job_id}, f, indent=2)
94+
except Exception as e:
95+
print(f" Warning: Could not create session.json: {e}")
96+
97+
print(f" Uploading analysis for job {job_id}...")
98+
print(f" Local: {analysis_dir}/")
99+
print(f" Remote: {ssh_target}:{remote_dir}/")
100+
101+
ssh_cmd = ["ssh"]
102+
if ssh_port:
103+
ssh_cmd.extend(["-p", ssh_port])
104+
ssh_cmd.extend([ssh_target, f"mkdir -p {remote_dir}"])
105+
106+
try:
107+
subprocess.run(ssh_cmd, check=True, capture_output=True, timeout=30)
108+
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
109+
print(f" Error creating remote directory: {e}")
110+
return False
111+
112+
rsync_cmd = ["rsync"]
113+
if ssh_port:
114+
rsync_cmd.extend(["-e", f"ssh -p {ssh_port}"])
115+
rsync_cmd.extend(["-az", "--quiet", f"{analysis_dir}/", f"{ssh_target}:{remote_dir}/"])
116+
117+
try:
118+
subprocess.run(rsync_cmd, check=True, timeout=60)
119+
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
120+
print(f" Error uploading to jumpbox: {e}")
121+
return False
122+
123+
print(f" Uploaded to jumpbox: {ssh_target}:{remote_dir}/")
124+
return True

0 commit comments

Comments
 (0)