Skip to content

Commit 2331b9b

Browse files
authored
Adds FuzzyDedup identification and Removal benchmarks (#1233)
1 parent a1706fd commit 2331b9b

16 files changed

+676
-284
lines changed

benchmarking/Dockerfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,13 @@ RUN apt-get update \
2828

2929
# Add dependencies for benchmarking to the Curator Python environment
3030
RUN cd /opt/Curator \
31+
&& uv sync --extra all \
3132
&& uv add \
3233
GitPython \
3334
oauth2client \
3435
pydrive2 \
3536
pynvml \
37+
pyyaml \
3638
rich \
3739
&& uv cache prune
3840

benchmarking/nightly-benchmark.yaml

Lines changed: 66 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,30 @@
66
# Paths can be used in other values in this file by using their placeholder
77
# (e.g. {datasets_path}/my/test/dataset.parquet) and will be resolved to the
88
# appropriate path at runtime.
9-
results_path: /raid/curator-team/nightly/results
10-
artifacts_path: /raid/curator-team/nightly/artifacts
11-
datasets_path: /raid
9+
results_path: /path/where/results/are/stored
10+
artifacts_path: /path/where/artifacts/are/stored
11+
datasets_path: /path/to/datasets
1212

1313
datasets:
14-
- name: "tinystories_train"
14+
- name: "tinystories"
1515
formats:
1616
- type: "parquet"
17-
path: "{datasets_path}/prospector-lm/clean/tinystories_train_parquet"
17+
path: "{datasets_path}/tinystories/parquet_data"
18+
- type: "jsonl"
19+
path: "{datasets_path}/tinystories/jsonl_data"
20+
- name: "commoncrawl"
21+
formats:
22+
- type: "jsonl"
23+
path: "{datasets_path}/commoncrawl/jsonl_data"
24+
- name: "commoncrawl_id_map"
25+
formats:
26+
- type: "json"
27+
path: "{datasets_path}/commoncrawl/id_generator.json"
28+
- name: "commoncrawl_ids"
29+
formats:
30+
- type: "parquet"
31+
path: "{datasets_path}/commoncrawl/IDs/parquet_data"
32+
1833

1934
default_timeout_s: 7200
2035

@@ -42,7 +57,7 @@ entries:
4257
script: domain_classification_benchmark.py
4358
args: >-
4459
--executor=ray_data
45-
--input-path={dataset:tinystories_train,parquet}
60+
--input-path={dataset:tinystories,parquet}
4661
--dataset-size-gb=10
4762
--model-inference-batch-size=1024
4863
timeout_s: 20000
@@ -54,17 +69,17 @@ entries:
5469
num_cpus: 64
5570
num_gpus: 4
5671
enable_object_spilling: false
57-
# Optional: Requirements for the benchmark to pass. These will result in the benchmark being marked as failed if not met.
72+
# Additional requirements for the benchmark to pass. These will result in the benchmark being marked as failed if not met.
5873
requirements:
5974
- metric: throughput_docs_per_sec
6075
min_value: 0.2
6176

6277
- name: domain_classification_xenna
63-
enabled: false
78+
enabled: true
6479
script: domain_classification_benchmark.py
6580
args: >-
6681
--executor=xenna
67-
--input-path={dataset:tinystories_train,parquet}
82+
--input-path={dataset:tinystories,parquet}
6883
--dataset-size-gb=10
6984
--model-inference-batch-size=1024
7085
timeout_s: 20000
@@ -74,65 +89,79 @@ entries:
7489
script: embedding_generation_benchmark.py
7590
args: >-
7691
--executor=ray_data
77-
--input-path={dataset:tinystories_train,parquet}
92+
--input-path={dataset:tinystories,parquet}
7893
--dataset-size-gb=10
7994
--model-identifier=sentence-transformers/all-MiniLM-L6-v2
8095
--model-inference-batch-size=1024
8196
timeout_s: 20000
8297
sink_data:
8398
- name: slack
84-
# Additional metrics to include in the Slack report. These must be present in the metrics.json file generated by the script.
8599
additional_metrics: ["num_documents_processed", "throughput_docs_per_sec"]
86100
ray:
87101
num_cpus: 64
88102
num_gpus: 4
89103
enable_object_spilling: false
90104

91105
- name: embedding_generation_xenna
92-
enabled: false
106+
enabled: true
93107
script: embedding_generation_benchmark.py
94108
args: >-
95109
--executor=xenna
96-
--input-path={dataset:tinystories_train,parquet}
110+
--input-path={dataset:tinystories,parquet}
97111
--dataset-size-gb=10
98112
--model-identifier=sentence-transformers/all-MiniLM-L6-v2
99113
--model-inference-batch-size=1024
100114
timeout_s: 20000
101115

102-
- name: removal_raydata
103-
enabled: false
104-
script: removal_benchmark.py
116+
- name: fuzzy_dedup_identification
117+
enabled: true
118+
script: fuzzy_dedup_identification_benchmark.py
105119
args: >-
106-
--executor=ray_data
107-
--input-path={dataset:tinystories_train,parquet}
108-
--ids-to-remove-path=some_path
109-
--id-generator-path=some_path
120+
--input-path={dataset:commoncrawl,jsonl}
121+
--cache-path={session_entry_dir}/scratch/cache
122+
--output-path={session_entry_dir}/output
123+
--input-filetype=jsonl
124+
--bands-per-iteration=20
125+
--text-field=text
126+
--input-blocksize=1.5GiB
127+
timeout_s: 20000
128+
ray:
129+
num_cpus: 64
130+
num_gpus: 4
131+
enable_object_spilling: false
132+
133+
- name: dedup_removal_raydata
134+
enabled: true
135+
script: dedup_removal_benchmark.py
136+
args: >-
137+
--input-path={dataset:commoncrawl,jsonl}
138+
--id-generator-path={dataset:commoncrawl_id_map,json}
139+
--ids-to-remove-path={dataset:commoncrawl_ids,parquet}
110140
--output-path={session_entry_dir}/scratch/output
111-
--input-filetype=parquet
112-
--input-fields=id,text
113-
--input-id-field=CURATOR_DEDUP_ID_STR
114-
--input-files-per-partition=1
115-
--ids-to-remove-fields=id
141+
--executor=ray_data
142+
--input-filetype=jsonl
116143
--output-filetype=parquet
144+
--id-field=_curator_dedup_id
145+
--duplicate-id-field=_curator_dedup_id
146+
--blocksize=1.5GiB
117147
timeout_s: 20000
118148
ray:
119149
num_cpus: 64
120150
num_gpus: 4
121151
enable_object_spilling: false
122152

123-
- name: removal_xenna
124-
enabled: false
125-
script: removal_benchmark.py
153+
- name: dedup_removal_xenna
154+
enabled: true
155+
script: dedup_removal_benchmark.py
126156
args: >-
127-
--executor=xenna
128-
--input-path={dataset:tinystories_train,parquet}
129-
--ids-to-remove-path=some_path
130-
--id-generator-path=some_path
157+
--input-path={dataset:commoncrawl,jsonl}
158+
--id-generator-path={dataset:commoncrawl_id_map,json}
159+
--ids-to-remove-path={dataset:commoncrawl_ids,parquet}
131160
--output-path={session_entry_dir}/scratch/output
132-
--input-filetype=parquet
133-
--input-fields=id,text
134-
--input-id-field=CURATOR_DEDUP_ID_STR
135-
--input-files-per-partition=1
136-
--ids-to-remove-fields=id
161+
--executor=xenna
162+
--input-filetype=jsonl
137163
--output-filetype=parquet
164+
--id-field=_curator_dedup_id
165+
--duplicate-id-field=_curator_dedup_id
166+
--blocksize=1.5GiB
138167
timeout_s: 20000

benchmarking/run.py

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,15 @@
3838

3939
# ruff: noqa: E402
4040
from runner.datasets import DatasetResolver
41+
from runner.entry import Entry
4142
from runner.env_capture import dump_env
42-
from runner.matrix import MatrixConfig, MatrixEntry
4343
from runner.path_resolver import PathResolver
4444
from runner.process import run_command_with_timeout
4545
from runner.ray_cluster import (
4646
setup_ray_cluster_and_env,
4747
teardown_ray_cluster_and_env,
4848
)
49+
from runner.session import Session
4950
from runner.utils import find_result, get_obj_for_json, resolve_env_vars
5051

5152

@@ -131,7 +132,7 @@ def check_requirements_update_results(result_data: dict[str, Any], requirements:
131132

132133

133134
def run_entry(
134-
entry: MatrixEntry,
135+
entry: Entry,
135136
path_resolver: PathResolver,
136137
dataset_resolver: DatasetResolver,
137138
session_path: Path,
@@ -148,17 +149,19 @@ def run_entry(
148149
]
149150
cmd = entry.get_command_to_run(session_entry_path, benchmark_results_path, path_resolver, dataset_resolver)
150151
run_id = result_data.get("run_id", f"{entry.name}-{int(time.time())}")
152+
ray_client = ray_temp_dir = None
151153

152154
try:
153155
# Create directories individually
154156
for directory in [scratch_path, ray_cluster_path, logs_path, benchmark_results_path]:
155157
create_or_overwrite_dir(directory)
156158

157-
ray_client, ray_temp_dir, ray_env = setup_ray_cluster_and_env(
159+
ray_client, ray_temp_dir = setup_ray_cluster_and_env(
158160
num_cpus=entry.ray.get("num_cpus", os.cpu_count() or 1),
159161
num_gpus=entry.ray.get("num_gpus", 0),
160162
enable_object_spilling=bool(entry.ray.get("enable_object_spilling", False)),
161163
ray_log_path=logs_path / "ray.log",
164+
object_store_size_bytes=entry.object_store_size_bytes,
162165
)
163166

164167
# Execute command with timeout
@@ -168,7 +171,6 @@ def run_entry(
168171
command=cmd,
169172
timeout=entry.timeout_s,
170173
stdouterr_path=logs_path / "stdouterr.log",
171-
env=ray_env,
172174
run_id=run_id,
173175
fancy=os.environ.get("CURATOR_BENCHMARKING_DEBUG", "0") == "0",
174176
)
@@ -252,40 +254,47 @@ def main() -> int:
252254
config_dict.update(d)
253255
# Preprocess the config dict prior to creating objects from it
254256
try:
255-
MatrixConfig.assert_valid_config_dict(config_dict)
257+
Session.assert_valid_config_dict(config_dict)
256258
config_dict = resolve_env_vars(config_dict)
257259
except ValueError as e:
258260
logger.error(f"Invalid configuration: {e}")
259261
return 1
260262

261-
config = MatrixConfig.create_from_dict(config_dict)
263+
session = Session.create_from_dict(config_dict)
262264

263265
# Create session folder under results_dir
264266
session_name = args.session_name or time.strftime("benchmark-run__%Y-%m-%d__%H-%M-%S")
265-
session_path = (config.results_path / session_name).absolute()
267+
session_path = (session.results_path / session_name).absolute()
266268
ensure_dir(session_path)
267269

268270
session_overall_success = True
269271
logger.info(f"Started session {session_name}...")
270-
env_dict = dump_env(session_path)
272+
env_dict = dump_env(session_obj=session, output_path=session_path)
271273

272-
for sink in config.sinks:
273-
sink.initialize(session_name=session_name, matrix_config=config, env_dict=env_dict)
274+
for sink in session.sinks:
275+
sink.initialize(session_name=session_name, matrix_config=session, env_dict=env_dict)
274276

275-
for entry in config.entries:
277+
# Print a summary of the entries that will be run in the for loop below
278+
# Disabled entries will not be printed
279+
# TODO: should entries be created unconditionally and have an "enabled" field instead?
280+
logger.info("Benchmark entries to be run in this session:")
281+
for idx, entry in enumerate(session.entries, start=1):
282+
logger.info(f"\t{idx}. {entry.name}")
283+
284+
for entry in session.entries:
276285
run_success = False
277286
run_id = f"{entry.name}-{int(time.time())}"
278287
result_data = {
279288
"name": entry.name,
280289
"run_id": run_id,
281290
"success": run_success,
282291
}
283-
logger.info(f"\tRunning {entry.name} (run ID: {run_id})")
292+
logger.info(f"🚀 Running {entry.name} (run ID: {run_id})")
284293
try:
285294
run_success = run_entry(
286295
entry=entry,
287-
path_resolver=config.path_resolver,
288-
dataset_resolver=config.dataset_resolver,
296+
path_resolver=session.path_resolver,
297+
dataset_resolver=session.dataset_resolver,
289298
session_path=session_path,
290299
result_data=result_data,
291300
)
@@ -305,10 +314,10 @@ def main() -> int:
305314

306315
finally:
307316
session_overall_success &= run_success
308-
for sink in config.sinks:
317+
for sink in session.sinks:
309318
sink.process_result(result_dict=result_data, matrix_entry=entry)
310319

311-
for sink in config.sinks:
320+
for sink in session.sinks:
312321
sink.finalize()
313322
logger.info(f"Session {session_name} completed with overall success: {session_overall_success}")
314323
return 0 if session_overall_success else 1

0 commit comments

Comments
 (0)