Skip to content

Commit 4da5042

Browse files
Add Slurm job state-based phase detection for transfer status
Derive transfer phase from sacct job states instead of relying purely on file existence in the run directory. Correlates prepare and transfer jobs by name convention ({name}-prepare / {name}) and enriches with file-based shard progress when available. Handles edge cases like shards finishing before the Slurm job exits, and completed jobs with partial shard failures. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent d994cd3 commit 4da5042

File tree

2 files changed

+169
-12
lines changed

2 files changed

+169
-12
lines changed

src/xfer/slackbot/claude_agent.py

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
get_source_stats,
2424
get_transfer_progress,
2525
get_transfer_progress_by_job,
26+
get_transfer_status_by_thread,
2627
list_buckets,
2728
submit_transfer,
2829
)
@@ -84,9 +85,20 @@
8485
"name": "check_status",
8586
"description": """Check the status of transfer jobs in this thread. Use this when the user asks about job status, progress, or wants to know if their transfer is complete.
8687
87-
This tool finds all jobs associated with the current Slack thread and returns their status.
88+
This tool finds all jobs associated with the current Slack thread and returns their status. It correlates prepare and transfer jobs by name and derives a unified phase from Slurm job states, enriched with file-based shard progress when available.
8889
89-
When the phase is "building_manifest", the prepare job is listing files at the source. This can take up to several days for large datasets and is normal. The response may include files_listed and bytes_listed if the JSONL writing phase has started, or prepare_phase/prepare_detail for finer-grained progress. Only flag a concern if the job has been in this phase for more than 48 hours with no observable progress.""",
90+
Phases (derived from Slurm job states):
91+
- "pending" — prepare job is queued
92+
- "preparing" — prepare job is running (manifest build, sharding, etc.)
93+
- "prepare_failed" — prepare job FAILED/CANCELLED/TIMEOUT
94+
- "prepare_complete" — prepare finished but no transfer job found (anomaly)
95+
- "waiting_to_start" — prepare done, transfer job is queued
96+
- "transferring" — transfer job is running
97+
- "complete" — transfer finished successfully
98+
- "complete_with_failures" — transfer job completed but some shards failed
99+
- "failed" — transfer job FAILED/CANCELLED/TIMEOUT
100+
101+
When the phase is "preparing", the prepare job may be listing files at the source. This can take up to several days for large datasets and is normal. The prepare job has a 4-day time limit. Only flag a concern if the prepare job has been running for more than 48 hours with no progress. The progress field may include files_listed, bytes_listed, or prepare_phase/prepare_detail for finer-grained tracking.""",
90102
"input_schema": {
91103
"type": "object",
92104
"properties": {
@@ -446,16 +458,8 @@ def execute_tool(
446458
return json.dumps(job.to_dict())
447459
return json.dumps({"error": f"Job {job_id} not found"})
448460
else:
449-
# Get all jobs for this thread with progress info
450-
jobs = get_jobs_by_thread(channel_id, thread_ts)
451-
results = []
452-
for job in jobs:
453-
if job.work_dir:
454-
progress = get_transfer_progress_by_job(job.job_id)
455-
if progress:
456-
results.append(progress)
457-
continue
458-
results.append(job.to_dict())
461+
# Get grouped status for all transfers in this thread
462+
results = get_transfer_status_by_thread(channel_id, thread_ts)
459463
return json.dumps(results)
460464

461465
elif tool_name == "list_backends":

src/xfer/slackbot/slurm_tools.py

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,159 @@ def get_jobs_by_thread(channel_id: str, thread_ts: str) -> list[JobInfo]:
632632
return [j for j in all_jobs if comment_pattern in (j.comment or "")]
633633

634634

635+
# Slurm states that indicate a terminal failure
636+
_FAILED_STATES = {"FAILED", "CANCELLED", "TIMEOUT", "NODE_FAIL", "OUT_OF_MEMORY"}
637+
638+
639+
def _derive_phase(
640+
prepare_job: Optional[JobInfo],
641+
transfer_job: Optional[JobInfo],
642+
) -> tuple[str, str]:
643+
"""Derive transfer phase from Slurm job states.
644+
645+
Returns (phase, detail) tuple.
646+
"""
647+
if not prepare_job:
648+
return ("unknown", "No prepare job found")
649+
650+
prep_state = prepare_job.state
651+
652+
if prep_state == "PENDING":
653+
return ("pending", "Prepare job is queued")
654+
if prep_state == "RUNNING":
655+
return ("preparing", "Prepare job is running")
656+
if prep_state in _FAILED_STATES:
657+
return ("prepare_failed", f"Prepare job {prep_state}")
658+
659+
# Prepare is COMPLETED
660+
if prep_state == "COMPLETED":
661+
if not transfer_job:
662+
return ("prepare_complete", "Prepare finished but no transfer job found")
663+
664+
xfer_state = transfer_job.state
665+
666+
if xfer_state == "PENDING":
667+
return ("waiting_to_start", "Transfer job is queued")
668+
if xfer_state == "RUNNING":
669+
return ("transferring", "Transfer job is running")
670+
if xfer_state == "COMPLETED":
671+
return ("complete", "Transfer job completed")
672+
if xfer_state in _FAILED_STATES:
673+
return ("failed", f"Transfer job {xfer_state}")
674+
675+
return ("transferring", f"Transfer job state: {xfer_state}")
676+
677+
return ("unknown", f"Prepare job state: {prep_state}")
678+
679+
680+
def _group_jobs_by_transfer(
681+
jobs: list[JobInfo],
682+
) -> list[tuple[Optional[JobInfo], Optional[JobInfo], str]]:
683+
"""Group jobs into (prepare_job, transfer_job, base_name) tuples.
684+
685+
Matches by naming convention: prepare jobs end with ``-prepare``,
686+
transfer jobs use the base name directly. When multiple jobs share
687+
the same base name, the most recently submitted one wins.
688+
"""
689+
# Bucket jobs by base name
690+
prepare_by_base: dict[str, list[JobInfo]] = {}
691+
transfer_by_base: dict[str, list[JobInfo]] = {}
692+
693+
for job in jobs:
694+
name = job.name or ""
695+
if name.endswith("-prepare"):
696+
base = name[: -len("-prepare")]
697+
prepare_by_base.setdefault(base, []).append(job)
698+
else:
699+
base = name
700+
transfer_by_base.setdefault(base, []).append(job)
701+
702+
# Pick the most recent job per base name (highest submit_time)
703+
def _newest(job_list: list[JobInfo]) -> JobInfo:
704+
return max(job_list, key=lambda j: j.submit_time or "")
705+
706+
all_bases = set(prepare_by_base) | set(transfer_by_base)
707+
groups = []
708+
for base in sorted(all_bases):
709+
prep = _newest(prepare_by_base[base]) if base in prepare_by_base else None
710+
xfer = _newest(transfer_by_base[base]) if base in transfer_by_base else None
711+
groups.append((prep, xfer, base))
712+
713+
return groups
714+
715+
716+
def get_transfer_status_by_thread(
717+
channel_id: str, thread_ts: str
718+
) -> list[dict]:
719+
"""Get transfer status for all jobs in a Slack thread.
720+
721+
Combines Slurm job state (via sacct) with file-based progress for a
722+
unified view. Returns a list of dicts, one per transfer group.
723+
"""
724+
jobs = get_jobs_by_thread(channel_id, thread_ts)
725+
if not jobs:
726+
return []
727+
728+
groups = _group_jobs_by_transfer(jobs)
729+
results = []
730+
731+
for prep_job, xfer_job, base_name in groups:
732+
phase, detail = _derive_phase(prep_job, xfer_job)
733+
734+
entry: dict = {
735+
"base_name": base_name,
736+
"phase": phase,
737+
"detail": detail,
738+
}
739+
740+
# Attach job metadata
741+
if prep_job:
742+
entry["prepare_job"] = {
743+
"job_id": prep_job.job_id,
744+
"state": prep_job.state,
745+
"submit_time": prep_job.submit_time,
746+
}
747+
if xfer_job:
748+
entry["transfer_job"] = {
749+
"job_id": xfer_job.job_id,
750+
"state": xfer_job.state,
751+
"submit_time": xfer_job.submit_time,
752+
}
753+
754+
# Enrich with file-based progress when a work_dir is available
755+
work_dir = None
756+
if prep_job and prep_job.work_dir:
757+
work_dir = Path(prep_job.work_dir)
758+
elif xfer_job and xfer_job.work_dir:
759+
work_dir = Path(xfer_job.work_dir)
760+
761+
if work_dir and work_dir.exists():
762+
file_progress = get_transfer_progress(work_dir)
763+
entry["progress"] = file_progress
764+
765+
file_phase = file_progress.get("phase", "unknown")
766+
767+
# Let file-based signals refine the Slurm-derived phase
768+
if phase == "transferring":
769+
if file_phase == "complete":
770+
# Shards finished before job exited
771+
entry["phase"] = "complete"
772+
elif (
773+
file_phase == "failed"
774+
and file_progress.get("pending", 1) == 0
775+
and file_progress.get("in_progress", 1) == 0
776+
):
777+
entry["phase"] = "failed"
778+
elif phase == "complete":
779+
# Slurm says done — check for partial failures
780+
if file_progress.get("failed", 0) > 0:
781+
entry["phase"] = "complete_with_failures"
782+
783+
results.append(entry)
784+
785+
return results
786+
787+
635788
def get_job_status(job_id: str) -> Optional[JobInfo]:
636789
"""Get detailed status for a specific job ID."""
637790
cmd = ["sacct", "--json", "-j", job_id, "-X"]

0 commit comments

Comments
 (0)