Skip to content

Commit daf8dfb

Browse files
authored
Merge branch 'main' into weijia/video_pipeline_tests
2 parents 7185c86 + 7fc9de4 commit daf8dfb

File tree

59 files changed

+739
-486
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+739
-486
lines changed

.github/workflows/cicd-main.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ jobs:
9595
- name: Run tests ${{ matrix.folder }} (CPU)
9696
timeout-minutes: 40
9797
run: |
98+
uv venv --seed
9899
uv sync --link-mode copy --locked --extra audio_cpu --extra sdg_cpu --extra text_cpu --extra video_cpu --group test
99100
source .venv/bin/activate
100101
FOLDER="${{ matrix.folder }}"

benchmarking/run.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,6 @@ def run_entry(
193193
)
194194

195195
# Execute command with timeout
196-
logger.info(f"\tRunning command {' '.join(cmd) if isinstance(cmd, list) else cmd}")
197196
started_exec = time.time()
198197
ray_cluster_data = get_ray_cluster_data()
199198
run_data = run_command_with_timeout(

benchmarking/runner/process.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from pathlib import Path
2626
from typing import Any
2727

28+
from loguru import logger
2829
from rich.live import Live
2930
from rich.panel import Panel
3031
from rich.text import Text
@@ -109,6 +110,9 @@ def display_simple_subprocess(
109110

110111
with open(stdouterr_path, "w") as outfile:
111112
start_time = time.time()
113+
logger.info(
114+
f"\tRunning command (output to stdout/err): {' '.join(cmd_list) if isinstance(cmd_list, list) else cmd_list}"
115+
)
112116
try:
113117
process = subprocess.Popen( # noqa: S603
114118
cmd_list,
@@ -167,6 +171,8 @@ def reader() -> None:
167171
sys.stdout.write(msg)
168172
sys.stdout.flush()
169173

174+
logger.info(f"\tSubprocess completed with return code {return_code} in {time.time() - start_time:.2f}s")
175+
170176
return {"returncode": return_code, "timed_out": timed_out}
171177

172178

@@ -214,6 +220,9 @@ def display_scrolling_subprocess( # noqa: PLR0913,PLR0915
214220
):
215221
start_time = time.time()
216222
final_panel = None
223+
logger.info(
224+
f"\tRunning command in subprocess (output to scrolling window): {' '.join(cmd_list) if isinstance(cmd_list, list) else cmd_list}"
225+
)
217226
try:
218227
process = subprocess.Popen( # noqa: S603
219228
cmd_list,
@@ -313,4 +322,6 @@ def reader() -> None:
313322
outfile.write(f"\n--- {msg} ---\n")
314323
outfile.flush()
315324

325+
logger.info(f"\tSubprocess completed with return code {return_code} in {time.time() - start_time:.2f}s")
326+
316327
return {"returncode": return_code, "timed_out": timed_out}

benchmarking/runner/sinks/slack_sink.py

Lines changed: 105 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,13 @@
1111
# See the License for the specific language governing permissions and
1212
# limitations under the License.
1313

14+
import fcntl
1415
import json
1516
import os
1617
import random
18+
import time
1719
from collections.abc import Generator
20+
from pathlib import Path
1821
from typing import Any, ClassVar
1922

2023
from loguru import logger
@@ -25,6 +28,9 @@
2528
from slack_sdk import WebClient
2629
from slack_sdk.errors import SlackApiError
2730

31+
_SLACK_STATE_POLL_INTERVAL_S: float = 0.5
32+
_SLACK_STATE_POLL_TIMEOUT_S: float = 120.0
33+
2834

2935
class SlackMessageBase:
3036
"""Base class for Slack messages."""
@@ -454,15 +460,74 @@ def __init__(self, sink_config: dict[str, Any]):
454460
msg = "SlackSink: SLACK_BOT_TOKEN environment variable is not set"
455461
raise ValueError(msg)
456462

463+
# Parallel-run coordination state
464+
self._state_path: Path | None = None # Set in initialize()
465+
self._is_winner: bool = False
466+
467+
def _get_state_path(self) -> Path:
468+
return Path(self.session.results_path) / self.session_name / ".slack_state.json"
469+
470+
def _wait_for_session_state(self, state_path: Path) -> dict[str, Any]:
471+
deadline = time.monotonic() + _SLACK_STATE_POLL_TIMEOUT_S
472+
while time.monotonic() < deadline:
473+
try:
474+
with open(state_path) as f:
475+
data = json.load(f)
476+
if data.get("ts"):
477+
return data
478+
except (OSError, json.JSONDecodeError):
479+
pass
480+
time.sleep(_SLACK_STATE_POLL_INTERVAL_S)
481+
msg = f"SlackSink follower: timed out waiting for session state at {state_path}"
482+
raise TimeoutError(msg)
483+
457484
def initialize(self, session_name: str, session: Session, env_dict: dict[str, Any]) -> None:
458-
# Initializes the sink for the session.
459485
self.session_name = session_name
460486
self.env_dict = env_dict
461487
self.session = session
462-
self._parent_message = self._create_session_summary_message(env_dict)
463488
self._child_messages = []
464-
if self.live_updates:
465-
self._post_updates()
489+
self._state_path = self._get_state_path()
490+
self._state_path.parent.mkdir(parents=True, exist_ok=True)
491+
492+
fd: int | None = None
493+
try:
494+
try:
495+
# Open the state file for writing with the following flags:
496+
# - os.O_CREAT: create the file if it does not exist
497+
# - os.O_EXCL: fail if the file already exists (ensures "winner" for the current process)
498+
# - os.O_WRONLY: open for write-only access
499+
# This lets us atomically determine which process was first to create the session state file,
500+
# coordinating parallel benchmarking runs.
501+
fd = os.open(str(self._state_path), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
502+
self._is_winner = True
503+
except FileExistsError:
504+
self._is_winner = False
505+
506+
if self._is_winner:
507+
self._parent_message = self._create_session_summary_message(env_dict)
508+
self._post_message(self._parent_message)
509+
initial_state = {
510+
"ts": self._parent_message.get_timestamp(),
511+
"channel": self._parent_message.get_channel_id(),
512+
"entries": dict(self._parent_message.entries),
513+
}
514+
# NOTE: This is the only time the state file is created.
515+
# If the benchmark session is re-run using the same session name
516+
# (resulting in the same state file path), the file will already exist and
517+
# all benchmarking info will be added to the previous Slack parent message.
518+
# This is by design. New benchmark runs are assumed to use new session names,
519+
# and therefore will generate new/unique state file paths.
520+
payload = json.dumps(initial_state).encode()
521+
os.write(fd, payload)
522+
else:
523+
state = self._wait_for_session_state(self._state_path)
524+
self._parent_message = SlackParentMessage(session_name=session_name, env_dict=env_dict)
525+
self._parent_message.set_response({"ts": state["ts"], "channel": state["channel"], "ok": True})
526+
for entry_name, entry_status in state["entries"].items():
527+
self._parent_message.entries[entry_name] = entry_status
528+
finally:
529+
if fd is not None:
530+
os.close(fd)
466531

467532
def register_benchmark_entry_starting(self, result_dict: dict[str, Any], benchmark_entry: Entry) -> None: # noqa: ARG002
468533
# Register that a benchmark entry is starting.
@@ -474,8 +539,7 @@ def register_benchmark_entry_starting(self, result_dict: dict[str, Any], benchma
474539
"SlackSink: Warning: Ignoring attempt to post an entry starting message without a session summary message. Was initialize() called?"
475540
)
476541
return
477-
self._parent_message.update_entry(benchmark_entry.name, "▶️ running")
478-
self._post_updates()
542+
self._update_parent_entry(benchmark_entry.name, "▶️ running")
479543

480544
def register_benchmark_entry_finished(self, result_dict: dict[str, Any], benchmark_entry: Entry) -> None:
481545
if self._parent_message is None:
@@ -498,7 +562,7 @@ def register_benchmark_entry_finished(self, result_dict: dict[str, Any], benchma
498562
)
499563
self._child_messages.append(msg)
500564
# Update the session summary message with the new entry status.
501-
self._parent_message.update_entry(benchmark_entry.name, status_text)
565+
self._update_parent_entry(benchmark_entry.name, status_text)
502566

503567
if self.live_updates:
504568
self._post_updates()
@@ -509,10 +573,6 @@ def finalize(self) -> None:
509573
"SlackSink: Warning: Ignoring attempt to finalize without a session summary message. Was initialize() called?"
510574
)
511575
return
512-
# Unconditionally posts all unposted messages.
513-
# This will be a no-op if self.live_mode is True, otherwise this will post all
514-
# unposted messages from the entire benchmark run at once.
515-
self._finalize_session_summary_message()
516576
self._post_updates()
517577

518578
def _create_session_summary_message(self, env_dict: dict[str, Any]) -> SlackParentMessage:
@@ -545,22 +605,45 @@ def _create_benchmark_entry_message(
545605
metrics, result_dict = data
546606
return SlackMessage(entry_name=benchmark_entry.name, result_dict=result_dict, metrics=metrics, pings=pings)
547607

548-
def _finalize_session_summary_message(self) -> None:
549-
"""Finalize the session summary message with overall status."""
550-
# Check if any entries are still in "running" or "waiting to start" status and mark them as errored
551-
for entry_name, status in self._parent_message.entries.items():
552-
if "⏳" in status or "▶️" in status:
553-
self._parent_message.update_entry(entry_name, "❌ ERROR")
608+
def _update_parent_entry(self, entry_name: str, status: str) -> None:
609+
"""Update a single entry's status in the shared state file and post the update to Slack.
610+
611+
Acquires an exclusive file lock for the duration of the read-modify-write cycle and
612+
the Slack API call so that concurrent processes do not overwrite each other's updates.
613+
614+
Args:
615+
entry_name: Name of the benchmark entry to update.
616+
status: New status string for the entry.
617+
"""
618+
if self._state_path is None:
619+
logger.error("SlackSink: Cannot update parent entry — state path not set. Was initialize() called?")
620+
return
621+
try:
622+
f = open(self._state_path, "r+") # noqa: SIM115
623+
except OSError:
624+
logger.error(f"SlackSink: Cannot open state file {self._state_path} for update")
625+
return
626+
try:
627+
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
628+
state = json.load(f)
629+
state["entries"][entry_name] = status
630+
for name, st in state["entries"].items():
631+
self._parent_message.update_entry(name, st)
632+
try:
633+
self._update_message(self._parent_message)
634+
finally:
635+
# Always persist state after attempting Slack update (even if _update_message raises SlackApiError).
636+
f.seek(0)
637+
json.dump(state, f)
638+
f.truncate()
639+
finally:
640+
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
641+
f.close()
554642

555643
def _post_updates(self) -> None:
556-
if not self._parent_message.was_posted():
557-
self._post_message(self._parent_message)
558-
elif self._parent_message.has_updates():
559-
self._update_message(self._parent_message)
560644
for msg in self._child_messages:
561645
if not msg.was_posted():
562646
self._post_message(msg)
563-
# Future enhancement: support updating child messages
564647

565648
def _post_message(self, message: SlackMessageBase) -> None:
566649
"""Post a message to Slack.
@@ -635,9 +718,6 @@ def _update_message(self, message: SlackMessageBase) -> None:
635718
# Run SlackSink from the command line to post a report for existing results.
636719
if __name__ == "__main__":
637720
import argparse
638-
import os
639-
import time
640-
from pathlib import Path
641721

642722
parser = argparse.ArgumentParser(description="Post benchmark results to Slack.")
643723
parser.add_argument(

benchmarking/scripts/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import pyarrow.compute as pc
2626
import pyarrow.parquet as pq
2727

28-
from nemo_curator.backends.experimental.ray_actor_pool.executor import RayActorPoolExecutor
28+
from nemo_curator.backends.ray_actor_pool import RayActorPoolExecutor
2929
from nemo_curator.backends.ray_data import RayDataExecutor
3030
from nemo_curator.backends.xenna import XennaExecutor
3131
from nemo_curator.utils.file_utils import get_all_file_paths_and_size_under

benchmarking/tools/run.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ if [ "${GPUS}" != "none" ]; then
5656
GPUS_FLAG="--gpus=\"${GPUS}\""
5757
fi
5858

59+
# --net=host allows the container to use the host's network stack, which Ray requires to
60+
# communicate between the container and the host. When running multiple benchmarks in parallel,
61+
# remove this flag so each container uses its own network namespace — this ensures each Ray
62+
# cluster is confined to its own container and can use the same default ports without
63+
# conflicting with other containers.
5964
docker run \
6065
--rm \
6166
--net=host \

docker/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ ENV UV_PROJECT_ENVIRONMENT=/opt/venv
5656
ENV UV_CACHE_DIR=/opt/uv_cache
5757
ENV PATH="$UV_PROJECT_ENVIRONMENT/bin:$PATH"
5858
ENV UV_LINK_MODE=copy
59-
RUN uv venv ${UV_PROJECT_ENVIRONMENT} --system-site-packages
59+
RUN uv venv ${UV_PROJECT_ENVIRONMENT} --system-site-packages --seed
6060

6161
FROM build AS nemo_curator_dep
6262

0 commit comments

Comments
 (0)