Skip to content

Commit 1fc7976

Browse files
Parallelize manifest build and add progress tracking
Split manifest generation into up to 4 concurrent rclone lsjson workers to reduce listing time for large datasets. Add `manifest combine` CLI command, prepare job phase tracking (progress.json), manifest build progress reporting, and bump prepare resources (250G mem, 4-day limit). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent ef742db commit 1fc7976

File tree

5 files changed

+316
-10
lines changed

5 files changed

+316
-10
lines changed

CHANGELOG.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Changelog
2+
3+
## Unreleased (feature/slack-claude)
4+
5+
### Parallel Manifest Build
6+
7+
- Parallelize manifest generation into up to 4 concurrent `rclone lsjson` workers, reducing listing time for large datasets with many top-level subdirectories (`slurm_tools.py`)
8+
- Add `manifest combine` CLI command to merge parallel lsjson part files (with `.prefix` sidecars) into a unified `manifest.jsonl` (`cli.py`)
9+
- Bump prepare job memory from 16 GB to 250 GB to accommodate large listings (`slurm_tools.py`)
10+
- Add `--max-backlog=1000000` to `rclone lsjson` calls to prevent the walker from stalling on large buckets (`cli.py`, `slurm_tools.py`)
11+
- Report manifest build progress (files listed, bytes listed) via `manifest.jsonl.progress` sidecar file (`cli.py`)
12+
- Track prepare job phases (`listing_source`, `combining_manifest`, `analyzing`, `sharding`, `rendering`, `submitting`) in `progress.json` (`slurm_tools.py`)
13+
14+
### Claude-Powered Slack Bot
15+
16+
- Add Claude-powered Slack bot for interactive data transfer requests via Slack threads
17+
- Add intelligent rclone flag selection based on file size distribution analysis
18+
- Add `check_path_exists` tool to validate source paths before submitting jobs
19+
- Add `list_buckets` tool to enumerate buckets at remote endpoints
20+
- Add `read_job_logs` tool to access job analysis data, prepare logs, and shard transfer logs
21+
- Add lightweight Haiku triage to filter thread messages and skip unrelated chatter
22+
- Add per-user job ownership so only the submitting user can cancel their jobs
23+
- Restore thread context from Slack API after bot restarts
24+
- Report manifest listing progress (files_listed, bytes_listed) in job status during `building_manifest` phase
25+
26+
### Slurm Robustness
27+
28+
- Increase prepare job time limit to 4 days for very large datasets
29+
- Unset conflicting `SLURM_MEM_*` environment variables in prepare.sh
30+
- Add `--export=NONE` to all `sbatch` calls to prevent environment leakage
31+
- Allow users to set lower array concurrency (max 64)
32+
- Verify source path exists before creating run directory and submitting jobs
33+
- Reduce thread history limits to mitigate Slack rate limits
34+
35+
### Bug Fixes
36+
37+
- Fix `run_id` format to be filename-safe (no colons)
38+
- Fix markdown rendering in Slack responses
39+
- Improve error logging for manifest build failures (write to `xfer-err/` with full context)

src/xfer/cli.py

Lines changed: 133 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ def manifest_build(
246246
rclone_cmd += shlex.split(extra_lsjson_flags)
247247

248248
rclone_cmd.append("--files-only")
249+
rclone_cmd.append("--max-backlog=1000000")
249250

250251
srun_cmd = ["srun", "-n", "1", "-c", "8", "--no-container-remap-root"]
251252
srun_cmd += pyxis_container_args(
@@ -301,9 +302,22 @@ def manifest_build(
301302
eprint(f"SLURM memory env vars: {slurm_mem_vars}")
302303
raise
303304

304-
# Build JSONL
305+
# Build JSONL with progress reporting
305306
n = 0
306307
bytes_total = 0
308+
progress_file = out.parent / "manifest.jsonl.progress"
309+
last_progress_n = 0
310+
PROGRESS_INTERVAL = 10_000 # update progress file every 10k files
311+
312+
def _write_progress() -> None:
313+
"""Write current listing progress to a sidecar file."""
314+
try:
315+
progress_file.write_text(
316+
json.dumps({"files_listed": n, "bytes_listed": bytes_total})
317+
)
318+
except OSError:
319+
pass
320+
307321
with out.open("w", encoding="utf-8") as f:
308322
for item in parse_lsjson_items(cp.stdout):
309323
# Skip directories
@@ -345,6 +359,13 @@ def manifest_build(
345359
f.write(json.dumps(rec, separators=(",", ":")) + "\n")
346360
n += 1
347361

362+
if n - last_progress_n >= PROGRESS_INTERVAL:
363+
_write_progress()
364+
eprint(f" manifest progress: {n:,} files, {bytes_total:,} bytes")
365+
last_progress_n = n
366+
367+
# Final progress update and cleanup
368+
_write_progress()
348369
eprint(f"Wrote {n} items, {bytes_total} bytes -> {out}")
349370

350371

@@ -515,6 +536,117 @@ def manifest_analyze(
515536
print(json_output)
516537

517538

539+
@manifest_app.command("combine")
540+
def manifest_combine(
541+
source: str = typer.Option(
542+
..., help="rclone source root, e.g. s3src:bucket/prefix"
543+
),
544+
dest: str = typer.Option(..., help="rclone dest root, e.g. s3dst:bucket/prefix"),
545+
parts_dir: Path = typer.Option(
546+
..., exists=True, help="Directory containing lsjson-*.json part files", resolve_path=True
547+
),
548+
out: Path = typer.Option(..., help="Output manifest JSONL path", resolve_path=True),
549+
run_id: Optional[str] = typer.Option(
550+
None, help="Run identifier; default is generated"
551+
),
552+
) -> None:
553+
"""
554+
Combine multiple lsjson part files into a unified manifest.jsonl.
555+
556+
Reads lsjson-*.json files from --parts-dir, adjusts paths using .prefix
557+
sidecar files, and writes a single manifest JSONL.
558+
"""
559+
run_id = run_id or now_run_id()
560+
mkdirp(out.parent)
561+
562+
# Glob part files
563+
part_files = sorted(parts_dir.glob("lsjson-*.json"))
564+
if not part_files:
565+
eprint(f"No lsjson-*.json files found in {parts_dir}")
566+
raise typer.Exit(code=2)
567+
568+
n = 0
569+
bytes_total = 0
570+
last_progress_n = 0
571+
PROGRESS_INTERVAL = 10_000
572+
progress_file = out.parent / "manifest.jsonl.progress"
573+
574+
def _write_progress() -> None:
575+
try:
576+
progress_file.write_text(
577+
json.dumps({"files_listed": n, "bytes_listed": bytes_total})
578+
)
579+
except OSError:
580+
pass
581+
582+
with out.open("w", encoding="utf-8") as f:
583+
for part_file in part_files:
584+
# Determine prefix from sidecar file
585+
prefix_file = part_file.with_suffix(".prefix")
586+
prefix = ""
587+
if prefix_file.exists():
588+
prefix = prefix_file.read_text(encoding="utf-8").strip()
589+
590+
# Read the JSON array
591+
try:
592+
items = json.loads(part_file.read_text(encoding="utf-8"))
593+
if not isinstance(items, list):
594+
eprint(f"WARNING: {part_file} is not a JSON array, skipping")
595+
continue
596+
except (json.JSONDecodeError, OSError) as e:
597+
eprint(f"WARNING: Failed to read {part_file}: {e}, skipping")
598+
continue
599+
600+
for item in items:
601+
if not isinstance(item, dict):
602+
continue
603+
if item.get("IsDir") is True:
604+
continue
605+
606+
rel_path = item.get("Path")
607+
if not rel_path or not isinstance(rel_path, str):
608+
continue
609+
610+
# Adjust path with prefix
611+
if prefix:
612+
rel_path = prefix + "/" + rel_path
613+
614+
size = int(item.get("Size") or 0)
615+
bytes_total += size
616+
617+
mtime = item.get("ModTime")
618+
hashes = item.get("Hashes") if isinstance(item.get("Hashes"), dict) else {}
619+
etag = item.get("ETag") or item.get("etag")
620+
storage_class = item.get("StorageClass")
621+
meta = item.get("Metadata") if isinstance(item.get("Metadata"), dict) else {}
622+
623+
rec = {
624+
"schema": SCHEMA,
625+
"run_id": run_id,
626+
"source_root": source,
627+
"dest_root": dest,
628+
"source": source.rstrip("/") + "/" + rel_path,
629+
"dest": stable_dest_for_source(source, dest, rel_path),
630+
"path": rel_path,
631+
"size": size,
632+
"mtime": mtime,
633+
"hashes": hashes,
634+
"etag": etag,
635+
"storage_class": storage_class,
636+
"meta": meta,
637+
}
638+
f.write(json.dumps(rec, separators=(",", ":")) + "\n")
639+
n += 1
640+
641+
if n - last_progress_n >= PROGRESS_INTERVAL:
642+
_write_progress()
643+
eprint(f" manifest progress: {n:,} files, {bytes_total:,} bytes")
644+
last_progress_n = n
645+
646+
_write_progress()
647+
eprint(f"Combined {len(part_files)} parts -> {n} items, {bytes_total} bytes -> {out}")
648+
649+
518650
# -----------------------------
519651
# Slurm render/submit
520652
# -----------------------------

src/xfer/slackbot/claude_agent.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,9 @@
8484
"name": "check_status",
8585
"description": """Check the status of transfer jobs in this thread. Use this when the user asks about job status, progress, or wants to know if their transfer is complete.
8686
87-
This tool finds all jobs associated with the current Slack thread and returns their status.""",
87+
This tool finds all jobs associated with the current Slack thread and returns their status.
88+
89+
When the phase is "building_manifest", the prepare job is listing files at the source. This can take up to several days for large datasets and is normal. The response may include files_listed and bytes_listed if the JSONL writing phase has started, or prepare_phase/prepare_detail for finer-grained progress. Only flag a concern if the job has been in this phase for more than 48 hours with no observable progress.""",
8890
"input_schema": {
8991
"type": "object",
9092
"properties": {
@@ -292,6 +294,7 @@
292294
6. When reporting job status, include relevant details like progress and any errors
293295
7. If users want custom rclone flags (e.g., bandwidth limits, checksum verification), pass them via the rclone_flags parameter
294296
8. When a user reports a problem or asks you to investigate an issue with a transfer, ALWAYS use read_job_logs to examine the actual log files. The shard logs contain the real error messages from rclone and the transfer process. Do not guess at causes without reading the logs first.
297+
9. The preparation phase (manifest building) can take a long time for large datasets — up to several days is normal. The prepare job has a 4-day time limit. Only consider it a potential problem if the prepare job has been running for more than 48 hours with no progress. If the status includes files_listed or bytes_listed, report those numbers to the user so they can see listing is progressing. If those numbers are absent, the rclone listing is still running (it returns results all at once when complete).
295298
296299
Transfer path format:
297300
- Paths should be in rclone format: "remote:bucket/path"

0 commit comments

Comments
 (0)