Skip to content

Commit 235fa2e

Browse files
[benchmark] Add FastText filter benchmarking script (#1411) (#1452)
* [benchmark] Add FastText filter benchmarking script (#1411) - Add fasttext_filter_benchmark.py script following the pattern from score_filter_benchmark.py - Add fasttext_filter_raydata and fasttext_filter_xenna entries to nightly-benchmark.yaml - Supports FastText language ID and quality filters with model setup requirements Fixes #1411 Signed-off-by: Kunal Sachdev <kunalmgsachdev@gmail.com> * [benchmark] Wire FastText model paths explicitly and update nightly config (#1411) - Add separate dataset entries for FastText langid and quality models - Pass FastText model paths as explicit CLI arguments to benchmarks - Remove hardcoded model paths from Hydra overrides - Update FastText filter benchmarks to use model_weights_path - Align arxiv E2E benchmark arg naming with FastText langid usage Signed-off-by: Kunal Sachdev <kunalmgsachdev@gmail.com> * Updated fasttext_filter_raydata benchmark timeout in benchmarking/nightly-benchmark.yaml basis Sarah Yurick's test run (#1411) Signed-off-by: Kunal Sachdev <kunalmgsachdev@gmail.com> * Updated fasttext_filter_xenna benchmark timeout in benchmarking/nightly-benchmark.yaml basis Sarah Yurick's test run (#1411) Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Signed-off-by: Kunal Sachdev <kunalmgsachdev@gmail.com> * Updated fasttext_quality_model dataset entry's model file name to model.bin in benchmarking/nightly-benchmark.yaml (#1411) Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Signed-off-by: Kunal Sachdev <kunalmgsachdev@gmail.com> * Adding ftz file option for fasttext_langid_model datasets entry in benchmarking/nightly-benchmark.yaml (#1411) Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Signed-off-by: Kunal Sachdev <kunalmgsachdev@gmail.com> * Moving fasttext_filter_raydata and fasttext_filter_xenna to run right after ScoreFilter benchmarks in benchmarking/nightly-benchmark.yaml (#1411) Signed-off-by: Kunal Sachdev <kunalmgsachdev@gmail.com> --------- Signed-off-by: Kunal Sachdev <kunalmgsachdev@gmail.com> Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com>
1 parent eeaa0a5 commit 235fa2e

File tree

3 files changed

+253
-9
lines changed

3 files changed

+253
-9
lines changed

benchmarking/nightly-benchmark.yaml

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,16 @@ datasets:
5555
formats:
5656
- type: "tar"
5757
path: "{datasets_path}/arxiv_downloads"
58-
- name: "fasttext_model"
58+
- name: "fasttext_langid_model"
5959
formats:
6060
- type: "bin"
6161
path: "{model_weights_path}/fasttext/lid.176.bin"
62+
- type: "ftz"
63+
path: "{model_weights_path}/fasttext/lid.176.ftz"
64+
- name: "fasttext_quality_model"
65+
formats:
66+
- type: "bin"
67+
path: "{model_weights_path}/fasttext/model.bin"
6268
- name: "gretel_symptoms"
6369
formats:
6470
- type: "jsonl"
@@ -412,6 +418,52 @@ entries:
412418
- metric: throughput_docs_per_sec
413419
min_value: 8500
414420

421+
- name: fasttext_filter_raydata
422+
enabled: true
423+
script: fasttext_filter_benchmark.py
424+
args: >-
425+
--benchmark-results-path={session_entry_dir}
426+
--output-path={session_entry_dir}/scratch/output
427+
--executor=ray_data
428+
--input-path={dataset:tinystories,parquet}
429+
--yaml-config={curator_repo_dir}/nemo_curator/config/text/fasttext_filter_pipeline.yaml
430+
--fasttext-langid-model-path={dataset:fasttext_langid_model,bin}
431+
--fasttext-quality-model-path={dataset:fasttext_quality_model,bin}
432+
--overrides="stages.0._target_=nemo_curator.stages.text.io.reader.ParquetReader"
433+
timeout_s: 200
434+
sink_data:
435+
- name: slack
436+
additional_metrics:
437+
- num_kept_documents
438+
- throughput_docs_per_sec
439+
ray:
440+
num_cpus: 64
441+
num_gpus: 0
442+
enable_object_spilling: false
443+
444+
- name: fasttext_filter_xenna
445+
enabled: true
446+
script: fasttext_filter_benchmark.py
447+
args: >-
448+
--benchmark-results-path={session_entry_dir}
449+
--output-path={session_entry_dir}/scratch/output
450+
--executor=xenna
451+
--input-path={dataset:tinystories,parquet}
452+
--yaml-config={curator_repo_dir}/nemo_curator/config/text/fasttext_filter_pipeline.yaml
453+
--fasttext-langid-model-path={dataset:fasttext_langid_model,bin}
454+
--fasttext-quality-model-path={dataset:fasttext_quality_model,bin}
455+
--overrides="stages.0._target_=nemo_curator.stages.text.io.reader.ParquetReader"
456+
timeout_s: 100
457+
sink_data:
458+
- name: slack
459+
additional_metrics:
460+
- num_kept_documents
461+
- throughput_docs_per_sec
462+
ray:
463+
num_cpus: 64
464+
num_gpus: 0
465+
enable_object_spilling: false
466+
415467
- name: modifier_raydata
416468
enabled: true
417469
script: modifier_benchmark.py
@@ -494,7 +546,7 @@ entries:
494546
--benchmark-results-path={session_entry_dir}
495547
--tar-input-path={dataset:arxiv_downloads,tar}
496548
--output-path={session_entry_dir}/scratch/output
497-
--fasttext-model-path={dataset:fasttext_model,bin}
549+
--fasttext-langid-model-path={dataset:fasttext_langid_model,bin}
498550
--executor=ray_data
499551
timeout_s: 3600
500552
sink_data:
@@ -523,7 +575,7 @@ entries:
523575
--benchmark-results-path={session_entry_dir}
524576
--tar-input-path={dataset:arxiv_downloads,tar}
525577
--output-path={session_entry_dir}/scratch/output
526-
--fasttext-model-path={dataset:fasttext_model,bin}
578+
--fasttext-langid-model-path={dataset:fasttext_langid_model,bin}
527579
--executor=xenna
528580
timeout_s: 3600
529581
sink_data:

benchmarking/scripts/arxiv_e2e_pipeline_benchmark.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ def create_e2e_pipeline( # noqa: PLR0913
143143
url_limit: int | None,
144144
record_limit: int | None,
145145
log_frequency: int,
146-
fasttext_model_path: str | None,
146+
fasttext_langid_model_path: str | None,
147147
# Output options
148148
output_dir: Path,
149149
output_format: Literal["parquet", "jsonl"],
@@ -176,7 +176,7 @@ def create_e2e_pipeline( # noqa: PLR0913
176176
max_repeated_lines_ratio: Maximum ratio of repeated lines.
177177
max_repeating_ngram_ratio: Maximum ratio of repeating top n-grams.
178178
max_punctuation_ratio: Maximum ratio of sentences without punctuation.
179-
fasttext_model_path: Path to FastText language ID model (lid.176.bin).
179+
fasttext_langid_model_path: Path to FastText language ID model (lid.176.bin).
180180
min_langid_score: Minimum language ID confidence score.
181181
classifier_batch_size: Batch size for model inference in classifiers.
182182
@@ -250,7 +250,7 @@ def create_e2e_pipeline( # noqa: PLR0913
250250
# ========== LANGUAGE ID FILTER ==========
251251
pipeline.add_stage(
252252
ScoreFilter(
253-
filter_obj=FastTextLangId(model_path=fasttext_model_path, min_langid_score=min_langid_score),
253+
filter_obj=FastTextLangId(model_path=fasttext_langid_model_path, min_langid_score=min_langid_score),
254254
text_field="text",
255255
score_field="langid_score",
256256
)
@@ -315,7 +315,7 @@ def run_benchmark(args: argparse.Namespace) -> dict:
315315
max_repeated_lines_ratio=args.max_repeated_lines_ratio,
316316
max_repeating_ngram_ratio=args.max_repeating_ngram_ratio,
317317
max_punctuation_ratio=args.max_punctuation_ratio,
318-
fasttext_model_path=args.fasttext_model_path,
318+
fasttext_langid_model_path=args.fasttext_langid_model_path,
319319
min_langid_score=args.min_langid_score,
320320
classifier_batch_size=args.classifier_batch_size,
321321
)
@@ -370,7 +370,7 @@ def run_benchmark(args: argparse.Namespace) -> dict:
370370
"max_repeated_lines_ratio": args.max_repeated_lines_ratio,
371371
"max_repeating_ngram_ratio": args.max_repeating_ngram_ratio,
372372
"max_punctuation_ratio": args.max_punctuation_ratio,
373-
"fasttext_model_path": args.fasttext_model_path,
373+
"fasttext_langid_model_path": args.fasttext_langid_model_path,
374374
"min_langid_score": args.min_langid_score,
375375
"classifier_batch_size": args.classifier_batch_size,
376376
"executor": args.executor,
@@ -439,7 +439,7 @@ def main() -> int:
439439
# ========== LANGUAGE ID OPTIONS ==========
440440
langid_group = p.add_argument_group("Language ID Options")
441441
langid_group.add_argument(
442-
"--fasttext-model-path",
442+
"--fasttext-langid-model-path",
443443
type=str,
444444
help="Path to FastText language ID model (lid.176.bin)",
445445
)
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""FastText Filter benchmarking script.
16+
17+
This script benchmarks FastText-based document filters (language ID and quality)
18+
using a Hydra-configured pipeline and various executors.
19+
"""
20+
21+
import argparse
22+
import time
23+
import traceback
24+
from pathlib import Path
25+
from typing import Any
26+
27+
import hydra
28+
from hydra import compose, initialize_config_dir
29+
from loguru import logger
30+
from omegaconf import DictConfig
31+
from utils import setup_executor, write_benchmark_results
32+
33+
from nemo_curator.pipeline import Pipeline
34+
35+
36+
def load_hydra_yaml(config_path: Path, overrides: list[str] | None = None) -> DictConfig:
37+
config_path = config_path.resolve()
38+
with initialize_config_dir(
39+
config_dir=str(config_path.parent),
40+
job_name="fasttext_filter_benchmark",
41+
version_base=None,
42+
):
43+
return compose(config_name=config_path.stem, overrides=overrides)
44+
45+
46+
def create_pipeline_from_yaml(cfg: DictConfig) -> Pipeline:
47+
pipeline = Pipeline(name="fasttext_filter_pipeline")
48+
for stage_cfg in cfg.stages:
49+
stage = hydra.utils.instantiate(stage_cfg)
50+
pipeline.add_stage(stage)
51+
return pipeline
52+
53+
54+
def run_fasttext_filter_benchmark( # noqa: PLR0913
55+
input_path: Path,
56+
output_path: Path,
57+
executor_name: str,
58+
benchmark_results_path: Path,
59+
yaml_config: Path,
60+
fasttext_langid_model_path: Path,
61+
fasttext_quality_model_path: Path,
62+
overrides: str | None = None,
63+
) -> dict[str, Any]:
64+
executor = setup_executor(executor_name)
65+
66+
input_path = input_path.absolute()
67+
output_path = output_path.absolute()
68+
output_path.mkdir(parents=True, exist_ok=True)
69+
70+
logger.info(f"Input path: {input_path}")
71+
logger.info(f"Output path: {output_path}")
72+
logger.info(f"Executor: {executor_name}")
73+
logger.info(f"FastText pipeline config: {yaml_config}")
74+
logger.info(f"FastText language ID model: {fasttext_langid_model_path}")
75+
logger.info(f"FastText quality model: {fasttext_quality_model_path}")
76+
77+
overrides_list = [
78+
f"input_path={input_path}",
79+
f"output_path={output_path}",
80+
f"fasttext_langid_model_path={fasttext_langid_model_path}",
81+
f"fasttext_quality_model_path={fasttext_quality_model_path}",
82+
]
83+
if overrides:
84+
overrides_list.extend(overrides.split(","))
85+
86+
cfg = load_hydra_yaml(yaml_config, overrides_list)
87+
pipeline = create_pipeline_from_yaml(cfg)
88+
89+
run_start_time = time.perf_counter()
90+
91+
try:
92+
logger.info("Running FastText filter pipeline...")
93+
output_tasks = pipeline.run(executor)
94+
run_time_taken = time.perf_counter() - run_start_time
95+
96+
# Stage assumptions:
97+
# 0 = partitioning (if any)
98+
# 1 = reader
99+
# -1 = writer (num_items_processed equals documents kept after all filters)
100+
num_documents_processed = sum(task._stage_perf[1].num_items_processed for task in output_tasks)
101+
num_kept_documents = sum(task._stage_perf[-1].num_items_processed for task in output_tasks)
102+
103+
logger.success(f"Benchmark completed in {run_time_taken:.2f}s")
104+
logger.success(f"Processed {num_documents_processed} documents")
105+
logger.success(f"Kept {num_kept_documents} documents")
106+
107+
success = True
108+
109+
except Exception as e: # noqa: BLE001
110+
logger.error(f"Benchmark failed: {e}")
111+
logger.debug(traceback.format_exc())
112+
output_tasks = []
113+
run_time_taken = time.perf_counter() - run_start_time
114+
num_documents_processed = 0
115+
num_kept_documents = 0
116+
success = False
117+
118+
return {
119+
"params": {
120+
"executor": executor_name,
121+
"input_path": str(input_path),
122+
"output_path": str(output_path),
123+
"benchmark_results_path": str(benchmark_results_path),
124+
"yaml_config": str(yaml_config),
125+
"fasttext_langid_model_path": str(fasttext_langid_model_path),
126+
"fasttext_quality_model_path": str(fasttext_quality_model_path),
127+
},
128+
"metrics": {
129+
"is_success": success,
130+
"time_taken_s": run_time_taken,
131+
"num_documents_processed": num_documents_processed,
132+
"num_kept_documents": num_kept_documents,
133+
"num_output_tasks": len(output_tasks),
134+
"throughput_docs_per_sec": (num_documents_processed / run_time_taken if run_time_taken > 0 else 0),
135+
},
136+
"tasks": output_tasks,
137+
}
138+
139+
140+
def main() -> int:
141+
parser = argparse.ArgumentParser(description="FastText filter benchmark")
142+
parser.add_argument("--benchmark-results-path", type=Path, required=True)
143+
parser.add_argument("--input-path", type=Path, required=True)
144+
parser.add_argument(
145+
"--output-path",
146+
type=Path,
147+
default=Path("./fasttext_filter_output"),
148+
)
149+
parser.add_argument(
150+
"--executor",
151+
default="ray_data",
152+
choices=["ray_data", "xenna"],
153+
)
154+
parser.add_argument("--yaml-config", type=Path, required=True)
155+
parser.add_argument(
156+
"--fasttext-langid-model-path", type=Path, required=True, help="Path to FastText language ID model"
157+
)
158+
parser.add_argument(
159+
"--fasttext-quality-model-path", type=Path, required=True, help="Path to FastText quality model"
160+
)
161+
parser.add_argument("--overrides", type=str)
162+
163+
args = parser.parse_args()
164+
165+
logger.info("=== FastText Filter Benchmark Starting ===")
166+
logger.info(f"Arguments: {vars(args)}")
167+
168+
try:
169+
results = run_fasttext_filter_benchmark(
170+
input_path=args.input_path,
171+
output_path=args.output_path,
172+
executor_name=args.executor,
173+
benchmark_results_path=args.benchmark_results_path,
174+
yaml_config=args.yaml_config,
175+
fasttext_langid_model_path=args.fasttext_langid_model_path,
176+
fasttext_quality_model_path=args.fasttext_quality_model_path,
177+
overrides=args.overrides,
178+
)
179+
except Exception: # noqa: BLE001
180+
results = {
181+
"params": vars(args),
182+
"metrics": {"is_success": False},
183+
"tasks": [],
184+
}
185+
finally:
186+
write_benchmark_results(results, args.benchmark_results_path)
187+
188+
return 0 if results["metrics"]["is_success"] else 1
189+
190+
191+
if __name__ == "__main__":
192+
raise SystemExit(main())

0 commit comments

Comments
 (0)