Skip to content

Commit eeaa0a5

Browse files
authored
[benchmarking] Updates Slack sink to address table size limitation (#1480)
1 parent b83c9ff commit eeaa0a5

File tree

9 files changed

+619
-239
lines changed

9 files changed

+619
-239
lines changed

benchmarking/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ RUN cd /opt/Curator \
3636
pynvml \
3737
pyyaml \
3838
rich \
39+
slack_sdk \
3940
&& uv cache prune
4041

4142
# Add the Curator repo to the safe.directory list to avoid GitPython warnings

benchmarking/README.md

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ sinks:
148148
experiment: my-experiment
149149
- name: slack
150150
enabled: true
151-
webhook_url: ${SLACK_WEBHOOK_URL}
151+
channel_id: ${SLACK_CHANNEL_ID}
152152
default_metrics: ["exec_time_s"] # Metrics to report by default for all entries
153153
- name: gdrive
154154
enabled: false
@@ -223,7 +223,7 @@ Configuration values can reference environment variables using `${VAR_NAME}` syn
223223
results_path: "${HOME}/benchmarks/results"
224224
sinks:
225225
- name: slack
226-
webhook_url: ${SLACK_WEBHOOK_URL}
226+
channel_id: ${SLACK_CHANNEL_ID}
227227
- name: mlflow
228228
tracking_uri: ${MLFLOW_TRACKING_URI}
229229
```
@@ -311,7 +311,7 @@ This command:
311311
- Reads the configuration file and extracts `results_path` and `datasets_path`
312312
- Automatically creates volume mounts to map these paths into the container
313313
- Runs the benchmarking framework with the Curator code built into the Docker image
314-
- Passes environment variables like `SLACK_WEBHOOK_URL` and `MLFLOW_TRACKING_URI` to the container
314+
- Passes environment variables like `SLACK_BOT_TOKEN`, `SLACK_CHANNEL_ID`, and `MLFLOW_TRACKING_URI` to the container
315315

316316
### Using Host Curator Sources
317317

@@ -459,11 +459,13 @@ Posts results to Slack channels:
459459
```yaml
460460
sinks:
461461
- name: slack
462-
webhook_url: https://hooks.slack.com/services/YOUR/WEBHOOK/URL
462+
channel_id: C1234567890 # Your Slack channel ID
463463
enabled: true
464464
```
465465

466-
Results are formatted as interactive Slack messages with environment info and metrics.
466+
Results are posted as interactive Slack messages with environment info and metrics. Requires:
467+
- `SLACK_BOT_TOKEN` environment variable set to your Slack Bot User OAuth Token
468+
- `SLACK_CHANNEL_ID` in config or environment variable for the target channel
467469

468470
#### Google Drive Sink
469471

benchmarking/nightly-benchmark.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ sinks:
7373
# experiment: ray-curator-common-crawl
7474
- name: slack
7575
enabled: true
76-
webhook_url: ${SLACK_WEBHOOK_URL}
76+
live_updates: true
77+
channel_id: ${SLACK_CHANNEL_ID}
7778
default_metrics: ["exec_time_s"]
7879
# - name: gdrive
7980
# enabled: false

benchmarking/run.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -255,15 +255,15 @@ def run_entry(
255255
shutil.rmtree(scratch_path, ignore_errors=True)
256256

257257

258-
def main() -> int: # noqa: C901
258+
def main() -> int: # noqa: C901, PLR0912
259259
parser = argparse.ArgumentParser(description="Runs the benchmarking application")
260260
parser.add_argument(
261261
"--config",
262262
type=Path,
263263
action="append",
264264
required=True,
265265
help=(
266-
"Path to YAML config for benchmark matrix, machine paths, etc. Can be "
266+
"Path to YAML config for the benchmark entries, machine paths, etc. Can be "
267267
"specified multiple times to merge configs."
268268
),
269269
)
@@ -322,7 +322,7 @@ def main() -> int: # noqa: C901
322322
env_dict = dump_env(session_obj=session, output_path=session_path)
323323

324324
for sink in session.sinks:
325-
sink.initialize(session_name=session_name, matrix_config=session, env_dict=env_dict)
325+
sink.initialize(session_name=session_name, session=session, env_dict=env_dict)
326326

327327
# Print a summary of the entries that will be run in the for loop below
328328
# Disabled entries will not be printed
@@ -339,6 +339,10 @@ def main() -> int: # noqa: C901
339339
"success": run_success,
340340
}
341341
logger.info(f"🚀 Running {entry.name} (run ID: {run_id})")
342+
343+
for sink in session.sinks:
344+
sink.register_benchmark_entry_starting(result_dict=result_data, benchmark_entry=entry)
345+
342346
try:
343347
run_success = run_entry(
344348
entry=entry,
@@ -364,7 +368,7 @@ def main() -> int: # noqa: C901
364368
finally:
365369
session_overall_success &= run_success
366370
for sink in session.sinks:
367-
sink.process_result(result_dict=result_data, matrix_entry=entry)
371+
sink.register_benchmark_entry_finished(result_dict=result_data, benchmark_entry=entry)
368372

369373
for sink in session.sinks:
370374
sink.finalize()

benchmarking/runner/sinks/gdrive_sink.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,15 @@ def __init__(self, sink_config: dict[str, Any]):
3333
super().__init__(sink_config)
3434
self.sink_config = sink_config
3535
self.results: list[dict[str, Any]] = []
36-
self.session_name: str = None
37-
self.matrix_config: Session = None
38-
self.env_dict: dict[str, Any] = None
39-
self.drive_folder_id: str = None
40-
self.service_account_file: str = None
36+
self.session_name: str | None = None
37+
self.session: Session | None = None
38+
self.env_dict: dict[str, Any] | None = None
39+
self.drive_folder_id: str | None = None
40+
self.service_account_file: str | None = None
4141

42-
def initialize(self, session_name: str, matrix_config: Session, env_dict: dict[str, Any]) -> None:
42+
def initialize(self, session_name: str, session: Session, env_dict: dict[str, Any]) -> None:
4343
self.session_name = session_name
44-
self.matrix_config = matrix_config
44+
self.session = session
4545
self.env_dict = env_dict
4646
self.drive_folder_id = self.sink_config.get("drive_folder_id")
4747
if not self.drive_folder_id:
@@ -52,7 +52,10 @@ def initialize(self, session_name: str, matrix_config: Session, env_dict: dict[s
5252
msg = "GdriveSink: No service account file configured"
5353
raise ValueError(msg)
5454

55-
def process_result(self, result_dict: dict[str, Any], matrix_entry: Entry) -> None:
55+
def register_benchmark_entry_starting(self, result_dict: dict[str, Any], benchmark_entry: Entry) -> None:
56+
pass
57+
58+
def register_benchmark_entry_finished(self, result_dict: dict[str, Any], benchmark_entry: Entry) -> None:
5659
pass
5760

5861
def finalize(self) -> None:
@@ -66,8 +69,8 @@ def finalize(self) -> None:
6669
self._delete_tar_file(tar_path)
6770

6871
def _tar_results_and_artifacts(self) -> Path:
69-
results_path = Path(self.matrix_config.results_path)
70-
artifacts_path = Path(self.matrix_config.artifacts_dir)
72+
results_path = Path(self.session.results_path)
73+
artifacts_path = Path(self.session.artifacts_dir)
7174
tar_path = results_path / f"{self.session_name}.tar.gz"
7275
with tarfile.open(tar_path, "w:gz") as tar:
7376
tar.add(results_path, arcname=results_path.name)

benchmarking/runner/sinks/mlflow_sink.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,20 +34,23 @@ def __init__(self, sink_config: dict[str, Any]):
3434
msg = "MlflowSink: No experiment configured"
3535
raise ValueError(msg)
3636
self.results: list[dict[str, Any]] = []
37-
self.session_name: str = None
38-
self.matrix_config: Session = None
39-
self.env_dict: dict[str, Any] = None
37+
self.session_name: str | None = None
38+
self.session: Session | None = None
39+
self.env_dict: dict[str, Any] | None = None
4040

41-
def initialize(self, session_name: str, matrix_config: Session, env_dict: dict[str, Any]) -> None:
41+
def initialize(self, session_name: str, session: Session, env_dict: dict[str, Any]) -> None:
4242
self.session_name = session_name
43-
self.matrix_config = matrix_config
43+
self.session = session
4444
self.env_dict = env_dict
4545

46-
def process_result(self, result_dict: dict[str, Any], matrix_entry: Entry) -> None:
47-
# Use the matrix_entry to get any entry-specific settings for the Slack report
46+
def register_benchmark_entry_starting(self, result_dict: dict[str, Any], benchmark_entry: Entry) -> None:
47+
pass
48+
49+
def register_benchmark_entry_finished(self, result_dict: dict[str, Any], benchmark_entry: Entry) -> None:
50+
# Use the benchmark_entry to get any entry-specific settings for the Slack report
4851
# such as additional metrics to include in the report.
49-
if matrix_entry:
50-
additional_metrics = matrix_entry.get_sink_data(self.name).get("additional_metrics", [])
52+
if benchmark_entry:
53+
additional_metrics = benchmark_entry.get_sink_data(self.name).get("additional_metrics", [])
5154
else:
5255
additional_metrics = []
5356
self.results.append((additional_metrics, result_dict))

benchmarking/runner/sinks/sink.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,24 +33,33 @@ def __init__(self, sink_config: dict[str, Any]):
3333
def initialize(
3434
self,
3535
session_name: str,
36-
matrix_config: Session,
36+
session: Session,
3737
env_dict: dict[str, Any],
3838
) -> None:
3939
"""Initialize the sink for a benchmark session.
4040
4141
Args:
4242
session_name: Name of the benchmark session.
43-
matrix_config: Session configuration for the session.
43+
session: Session configuration for the session.
4444
env_dict: Environment dictionary for the session.
4545
"""
4646

4747
@abstractmethod
48-
def process_result(self, result_dict: dict[str, Any], matrix_entry: Entry) -> None:
49-
"""Process an individual benchmark result.
48+
def register_benchmark_entry_starting(self, result_dict: dict[str, Any], benchmark_entry: Entry) -> None:
49+
"""Register that a benchmark entry is starting.
50+
51+
Args:
52+
result_dict: Dictionary containing benchmark entry data.
53+
benchmark_entry: Entry configuration.
54+
"""
55+
56+
@abstractmethod
57+
def register_benchmark_entry_finished(self, result_dict: dict[str, Any], benchmark_entry: Entry) -> None:
58+
"""Register that a benchmark entry has finished.
5059
5160
Args:
5261
result_dict: Dictionary containing benchmark result data.
53-
matrix_entry: Entry configuration.
62+
benchmark_entry: Entry configuration.
5463
"""
5564

5665
@abstractmethod

0 commit comments

Comments
 (0)