Skip to content

Commit 0b22ceb

Browse files
ayushdgpraateekmahajansarahyurickabhinavg4suiyoubi
authored
Add FuzzyDeduplicationPipeline (#937)
* New API Spec with Ray Backend (#726) * Create package + reorganize (#2) * fc Signed-off-by: Praateek <praateekm@gmail.com> * remove per file ignore Signed-off-by: Praateek <praateekm@gmail.com> * sc Signed-off-by: Praateek <praateekm@gmail.com> * ruff Signed-off-by: Praateek <praateekm@gmail.com> * use curator_id_str Signed-off-by: Praateek <praateekm@gmail.com> --------- Signed-off-by: Praateek <praateekm@gmail.com> * fc Signed-off-by: Praateek <praateekm@gmail.com> * kmeans works Signed-off-by: Praateek <praateekm@gmail.com> * Fuzzy dedup fixes (#11) * high level method for each step Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Fixes/changes after testing Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Updates to existing fuzzy_dedup modules Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Add high level fuzzy dedup api and e2e example Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Add e2e example Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Add config Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> --------- Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * fc Signed-off-by: Praateek <praateekm@gmail.com> * fc Signed-off-by: Praateek <praateekm@gmail.com> * removal works Signed-off-by: Praateek <praateekm@gmail.com> * bug fix Signed-off-by: Praateek <praateekm@gmail.com> * working streaming embedding with id generator Signed-off-by: Praateek <praateekm@gmail.com> * Dump high level skeleton Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * update xenna executor Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * More changes Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * working example Signed-off-by: Praateek <praateekm@gmail.com> * Revert "working example" This reverts commit 7b3e65173dd1df92b0de9431fcfebdbc0b93d6c9. * [WIP] Add reader + utf modifier (#31) * Dump high level skeleton Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * update xenna executor Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * More changes Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Updates for utfModifier+ high level updates Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Remove old examples and add new modifier and stages Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Add modify stage Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * More updates Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> --------- Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Revert "[WIP] Add reader + utf modifier (#31)" (#32) This reverts commit ef25e3eff6502cb9bfc4a57ba48f0939284fd49b. * rebase Signed-off-by: Praateek <praateekm@gmail.com> * rebase continue Signed-off-by: Praateek <praateekm@gmail.com> * Remove older file versions Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * Final changes as per the meeting * refactor Signed-off-by: Praateek <praateekm@gmail.com> * example works Signed-off-by: Praateek <praateekm@gmail.com> * add base classes Signed-off-by: Praateek <praateekm@gmail.com> * example works Signed-off-by: Praateek <praateekm@gmail.com> * .. Signed-off-by: Praateek <praateekm@gmail.com> * more google style Signed-off-by: Praateek <praateekm@gmail.com> * add init for backends Signed-off-by: Praateek <praateekm@gmail.com> * Update example script * add impl Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * ruff Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add suggestions Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add another check Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * Move changes one level deeper in ray-curator, add pyproject toml Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Update dependencies to include cosmos-xenna and pyarrow explicitly Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Update python upper bound Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Add a simple contributing file with instructions Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Remove pyarrow check since it's an explicit dependency Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Remove unusued file utils Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> --------- Signed-off-by: Praateek <praateekm@gmail.com> Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> Co-authored-by: Praateek Mahajan <praateekmahajan@users.noreply.github.com> Co-authored-by: Praateek <praateekm@gmail.com> Co-authored-by: Sarah Yurick <sarahyurick@gmail.com> Co-authored-by: Abhinav Garg <abhinavg@stanford.edu> Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> * [Ray] Allow loguru to be serialized #729 * [Ray] Add Jsonl / Parquet Writer Stage (#730) * Update CI testing workflow for ray branch (#739) * Update ci workflow to build ray-curator package instead Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Split out CPU and GPU modules Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Update pytest command Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * update crossfit dep to use pinned version (avoiding absl dep issues) Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Explicitly add absl-py dependency to avoid python 3.10 errors Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Update paths for codecov Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> --------- Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Initial API desing doc (#737) * Intial APi desing doc Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> * Update ray-curator/api-design.md Co-authored-by: Praateek Mahajan <praateekmahajan@users.noreply.github.com> Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> * Update ray-curator/api-design.md Co-authored-by: Praateek Mahajan <praateekmahajan@users.noreply.github.com> Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> * Update ray-curator/api-design.md Co-authored-by: Praateek Mahajan <praateekmahajan@users.noreply.github.com> Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> * Update ray-curator/api-design.md Co-authored-by: Praateek Mahajan <praateekmahajan@users.noreply.github.com> Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> * Update ray-curator/api-design.md Co-authored-by: Praateek Mahajan <praateekmahajan@users.noreply.github.com> Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> * Update ray-curator/api-design.md Co-authored-by: Praateek Mahajan <praateekmahajan@users.noreply.github.com> Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> * Update ray-curator/api-design.md Co-authored-by: Praateek Mahajan <praateekmahajan@users.noreply.github.com> Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> * Update ray-curator/api-design.md Co-authored-by: Praateek Mahajan <praateekmahajan@users.noreply.github.com> Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> * Update ray-curator/api-design.md Co-authored-by: Ayush Dattagupta <ayushdg95@gmail.com> Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> * Refine map-style execution description in API design document to clarify task transformation and mapping flexibility. * Remove redundant sections on Tasks, Stages, and Pipelines from the API design document to streamline content and improve clarity. * Add quickstart example and update API design documentation - Introduced a new quickstart example in `ray_curator/examples/quickstart.py` demonstrating a sentiment analysis pipeline with three stages: TaskCreationStage, WordCountStage, and SentimentStage. - Updated `api-design.md` to include a new section for examples, linking to the quickstart for user reference. - Clarified resource requirements in `resources.py` documentation for GPU usage and constraints. * Ruff related changes Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> * PR changes Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> * Update DocumentTask to DocumentBatch in API design for improved type flexibility Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> * Add fault tolerance requirements to API design documentation - Introduced a new section outlining the necessity for fault tolerance and retry safety in all stages. - Highlighted critical aspects such as task preemption and handling of partial operations to ensure robustness during execution. Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> --------- Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> Co-authored-by: Praateek Mahajan <praateekmahajan@users.noreply.github.com> Co-authored-by: Ayush Dattagupta <ayushdg95@gmail.com> * Refactor XennaExecutor by removing the cluster initialization function and deleting the associated ray_cluster_init.py file. This streamlines the execution process by eliminating unnecessary setup code. (#768) Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> * [Ray] Add Ray Data as an experimental backend (#740) * [Ray] Add integration test to test backends for a specified pipeline (#770) * Adding with_ for options in ProcessingStage and CompositeStage (#764) * [Ray] `DocumentFilter` and `Filter`/`Score`/`ScoreFilter` (#746) * add documentfilter implementation Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * fix nits and ruff Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add additional logic for setup, setup_on_node, and process_batch Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add pytests Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add dep Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * more dep edits Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * another dep Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add fasttext dep Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add jieba and mecab Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add default None params for setup_on_node and setup functions Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add praateek's suggestions Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * organize imports Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * remove process_batch Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add _metadata to result Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add praateek's suggestions Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * ruff and post init for _name Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * modify test Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> --------- Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * [Ray] Add Download Extract Base Class + Common Crawl Stage (#738) * [Ray] Use Ray Actors where viable (#792) * Extract And download for WIkipedia (#795) * copy over Signed-off-by: Praateek <praateekm@gmail.com> * copy over Signed-off-by: Praateek <praateekm@gmail.com> * add init to download Signed-off-by: Praateek <praateekm@gmail.com> * move justext Signed-off-by: Praateek <praateekm@gmail.com> * move resiliparse Signed-off-by: Praateek <praateekm@gmail.com> * move trafilatura Signed-off-by: Praateek <praateekm@gmail.com> * move get_stop_list_dict Signed-off-by: Praateek <praateekm@gmail.com> * move download_utils.py to utils/download_utils.py Signed-off-by: Praateek <praateekm@gmail.com> * move out to download.py Signed-off-by: Praateek <praateekm@gmail.com> * move WarcIterator towarc_reader.py Signed-off-by: Praateek <praateekm@gmail.com> * move CommonCrawlWARCExtractor to html_extractor Signed-off-by: Praateek <praateekm@gmail.com> * remove commoncrawl.py Signed-off-by: Praateek <praateekm@gmail.com> * create url_generation.py from download_utils Signed-off-by: Praateek <praateekm@gmail.com> * tests dir Signed-off-by: Praateek <praateekm@gmail.com> * copy over test_download.py as test_common_crawl.py Signed-off-by: Praateek <praateekm@gmail.com> * add html_extractors/__init__ Signed-off-by: Praateek <praateekm@gmail.com> * move html_extractor to ProcessingStage Signed-off-by: Praateek <praateekm@gmail.com> * update WarcReader to use ProecssingStage Signed-off-by: Praateek <praateekm@gmail.com> * move to classes for url generation Signed-off-by: Praateek <praateekm@gmail.com> * typo in name Signed-off-by: Praateek <praateekm@gmail.com> * bug fixes in justext; rename resiliparse func; utils modular Signed-off-by: Praateek <praateekm@gmail.com> * init file in for download/text Signed-off-by: Praateek <praateekm@gmail.com> * justtext minor change Signed-off-by: Praateek <praateekm@gmail.com> * support str in htmlextractor Signed-off-by: Praateek <praateekm@gmail.com> * add a working example Signed-off-by: Praateek <praateekm@gmail.com> * set source_files so that write can be hashed Signed-off-by: Praateek <praateekm@gmail.com> * use pprint in example Signed-off-by: Praateek <praateekm@gmail.com> * update comment Signed-off-by: Praateek <praateekm@gmail.com> * all tests migrated + work Signed-off-by: Praateek <praateekm@gmail.com> * update defaults in example; comments in stage Signed-off-by: Praateek <praateekm@gmail.com> * add tests for url generation + PR review Signed-off-by: Praateek <praateekm@gmail.com> * update download for aws Signed-off-by: Praateek <praateekm@gmail.com> * rename aws to use_aws_to_donwload Signed-off-by: Praateek <praateekm@gmail.com> * update resources Signed-off-by: Praateek <praateekm@gmail.com> * change url generation to have ray-stage-spec Signed-off-by: Praateek <praateekm@gmail.com> * make download fault tolerant Signed-off-by: Praateek <praateekm@gmail.com> * refactor as per pr reviews; with tests Signed-off-by: Praateek <praateekm@gmail.com> * add readme Signed-off-by: Praateek <praateekm@gmail.com> * bug fix; update tests Signed-off-by: Praateek <praateekm@gmail.com> * update record limit to None Signed-off-by: Praateek <praateekm@gmail.com> * bug fixes Signed-off-by: Praateek <praateekm@gmail.com> * pr comments Signed-off-by: Praateek <praateekm@gmail.com> * add back test html extractor implementations Signed-off-by: Praateek <praateekm@gmail.com> * remove cc example Signed-off-by: Praateek <praateekm@gmail.com> * add column utils Signed-off-by: Praateek <praateekm@gmail.com> * add todos Signed-off-by: Praateek <praateekm@gmail.com> * Add Wikipedia download and extract stage This commit introduces a comprehensive pipeline for downloading and processing Wikipedia dump files within the ray-curator framework. Key components include: - **WikipediaUrlGenerator**: Generates URLs for Wikipedia dump files. - **WikipediaDownloader**: Downloads .bz2 dump files using wget. - **WikipediaIterator**: Parses Wikipedia XML dumps and extracts article content. - **WikipediaExtractor**: Cleans Wikipedia markup and extracts meaningful text. Additionally, an example script demonstrating the usage of the new stage is included, along with tests for each component to ensure functionality and reliability. Documentation for the new stage is also provided to guide users in implementation and usage. Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> * merge from main Signed-off-by: Praateek <praateekm@gmail.com> * move deps to text Signed-off-by: Praateek <praateekm@gmail.com> * update dev Signed-off-by: Praateek <praateekm@gmail.com> * update pyproject and test.yml Signed-off-by: Praateek <praateekm@gmail.com> * remove cugraph extra pyproject Signed-off-by: Praateek <praateekm@gmail.com> * move text to optional deps Signed-off-by: Praateek <praateekm@gmail.com> * Refactor pyproject.toml: Remove unused dependencies and clean up dev section Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> * Remove unused Wikipedia example and related README documentation from the download text stages. Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> * Add method to fetch JSON dump data for Wikipedia and refactor dump date retrieval logic - Introduced `_get_data_for_dump` method to handle fetching and parsing JSON dump data. - Refactored logic in `_get_wikipedia_urls` to iterate through available dumps and check their status. - Improved error handling for cases where dump data cannot be loaded or is not finished. Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> * Add README for custom download pipelines and remove Wikipedia stage documentation - Introduced a new README.md file detailing the structure and implementation of custom download pipelines. - Removed the outdated README.md for the Wikipedia download and extract stage to streamline documentation. Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> * Add num_workers_per_node method to DocumentDownloader and WikipediaDownloader - Implemented num_workers_per_node method in DocumentDownloader to define the number of workers per node for downloading tasks. - Overridden num_workers_per_node in WikipediaDownloader to return a fixed value of 1. - Updated xenna_stage_spec method in DocumentDownloadStage to include the number of workers per node. Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> * Update WikipediaDownloader to use 2 workers and change logging level in WikipediaIterator - Modified num_workers_per_node in WikipediaDownloader to return 2, allowing for increased parallelism during downloads. - Changed logging from info to debug level in WikipediaIterator for extracted articles to reduce log verbosity. Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> --------- Signed-off-by: Praateek <praateekm@gmail.com> Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> Co-authored-by: Praateek <praateekm@gmail.com> * Fixing tests (#827) * Refactor Wikipedia extraction and URL generation logic - Removed redundant return statement in `WikipediaExtractor` class. - Simplified status check for dump data in `WikipediaUrlGenerator` by directly accessing the dictionary keys. - Updated logging level in tests to ensure accurate assertions on log calls. - Enhanced test cases for URL generation to cover various dump statuses. These changes improve code clarity and maintainability while ensuring robust error handling in the Wikipedia download and extraction process. Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> * Add mwparserfromhell dependency to pyproject.toml - Included `mwparserfromhell==0.6.5` in the text dependencies section of `pyproject.toml` to support parsing Wikipedia markup. This addition enhances the functionality of the project by ensuring the necessary tools for processing Wikipedia data are available. Signed-off-by: [Your Name] <your.email@example.com> Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> --------- Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> Signed-off-by: [Your Name] <your.email@example.com> * Update ray version to 2.48 #839 * Re-enable CI/CD for Ray API branch (#840) * CI/CD for Ray API branch Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add text dependencies Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * only run cpu tests Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * comment instead of delete Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> --------- Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * Ray Video Pipeline : Video Reader (#775) * Add video io reader * Add test * Add VideoReaderStage to video reading pipeline and update VideoDownloadStage to accept VideoTask. Enhance video reading capabilities with new tests for VideoReaderStage. Signed-off-by: Ao Tang <aot@nvidia.com> * Update VideoDownloadStage to support verbose logging and modify video_read_example to include verbose argument. Signed-off-by: Ao Tang <aot@nvidia.com> * Update outputs for VideoDownloadStage and VideoReaderStage to include additional metadata fields. Signed-off-by: Ao Tang <aot@nvidia.com> * Update CI workflow to include video dependencies for testing Signed-off-by: Ao Tang <aot@nvidia.com> * Add tests for video tasks module - Introduced a new test package for tasks with an initial test suite for the video tasks module, including tests for the Clip, ClipStats, Video, VideoMetadata, and VideoTask classes. - Implemented various test cases to validate initialization, property calculations, metadata extraction, and size calculations. This enhances the testing coverage for video-related functionalities in the ray-curator project. Signed-off-by: Ao Tang <aot@nvidia.com> * Enhance video tasks module with additional test cases - Expanded the test suite for the video tasks module by adding new test cases for the Clip, ClipStats, Video, VideoMetadata, and VideoTask classes. - Improved coverage for various functionalities including initialization, property calculations, and metadata extraction. This update strengthens the reliability of video-related features in the ray-curator project. Signed-off-by: Ao Tang <aot@nvidia.com> * Update pyproject.toml to include a trailing comma for pynvml dependency Signed-off-by: Ao Tang <aot@nvidia.com> * Refactor video processing stages to introduce a composite VideoReaderDownloadStage - Replaced separate VideoReaderStage and VideoDownloadStage with a new VideoReaderDownloadStage that combines both functionalities. - Updated the video_read_example to utilize the new composite stage. - Adjusted inputs and outputs for VideoDownloadStage to reflect changes in the pipeline. - Added tests for the new VideoReaderDownloadStage to ensure proper functionality and integration. This refactor simplifies the video reading and downloading process within the ray-curator framework. Signed-off-by: Ao Tang <aot@nvidia.com> --------- Signed-off-by: Ao Tang <aot@nvidia.com> * chore: Add new trustees and vetters to the copy-pr-bot configuration (#841) (#842) * chore: Add new trustees and vetters to the copy-pr-bot configuration * chore: Remove empty line in copy-pr-bot configuration * chore: Remove ryantwolf from additional trustees and vetters in copy-pr-bot configuration --------- Signed-off-by: Ao Tang <aot@nvidia.com> Signed-off-by: NeMo Bot <nemo-bot@nvidia.com> Co-authored-by: Ao Tang <mike.tang96@gmail.com> * ci: Add community-bot (#846) (#849) Signed-off-by: oliver könig <okoenig@nvidia.com> Signed-off-by: NeMo Bot <nemo-bot@nvidia.com> Co-authored-by: oliver könig <okoenig@nvidia.com> * Ray Video Reader Enhancement (#848) * Refactor video reading stages: Rename VideoReaderStage to VideoListStage and update VideoReaderDownloadStage to use the new class. Adjust tests accordingly to reflect the changes in stage names and functionality. Signed-off-by: Ao Tang <aot@nvidia.com> * Rename test_video_reader to test_video_list Signed-off-by: Ao Tang <aot@nvidia.com> * Update VideoListStage name and corresponding tests to reflect new naming convention - Changed the internal name of VideoListStage from "video_reader" to "video_list". - Updated assertions in the test for VideoListStage to match the new name. - Adjusted configuration in the VideoReaderDownloadStage to use "video_list" instead of "video_reader". This ensures consistency across the codebase following the recent refactor. Signed-off-by: Ao Tang <aot@nvidia.com> * Update test assertions in VideoReaderDownloadStage to use "video_list" instead of "video_reader" Signed-off-by: Ao Tang <aot@nvidia.com> * Refactor video processing stages: Replace VideoDownloadStage with VideoReaderStage in VideoReaderDownloadStage. Update related tests to reflect the new structure and ensure consistency across the codebase. Signed-off-by: Ao Tang <aot@nvidia.com> * Enhance VideoListStage and VideoReaderStage documentation Signed-off-by: Ao Tang <aot@nvidia.com> * Refactor video reading pipeline: Introduce VideoLoadingStage as a composite stage that combines VideoListStage and VideoReaderStage. Signed-off-by: Ao Tang <aot@nvidia.com> * Remove SplitPipeTask from video module and update imports accordingly. Signed-off-by: Ao Tang <aot@nvidia.com> * Refactor video task imports: Update import statements in video_list, video_loading, video_reader, and related test files to use the new video module structure. Signed-off-by: Ao Tang <aot@nvidia.com> * ruff fix Signed-off-by: Ao Tang <aot@nvidia.com> * Implement FilePartitioningStage: Introduce a new stage for partitioning files into groups based on specified criteria, including a limit on the number of groups. Update VideoLoadingStage to utilize FilePartitioningStage instead of the deprecated VideoListStage. Refactor VideoReaderStage to accept FileGroupTask as input and adjust related tests to ensure functionality and correctness. Signed-off-by: Ao Tang <aot@nvidia.com> * Refactor video reading stages: Replace VideoLoadingStage with VideoReader as a composite stage that combines FilePartitioningStage and VideoReaderStage. Update related tests to ensure functionality and correctness. Remove deprecated VideoLoadingStage and its associated tests. Signed-off-by: Ao Tang <aot@nvidia.com> * Update video_limit type in VideoReader to support None: Changed the type of video_limit from int to int | None to allow for more flexible configuration. This enhances the usability of the VideoReader class. Signed-off-by: Ao Tang <aot@nvidia.com> * Refactor file partitioning limit check Signed-off-by: Ao Tang <aot@nvidia.com> * Remove redundant tests from TestVideoReader: Deleted tests for video limit values, verbose flag, file extensions, and files per partition configuration to streamline the test suite and focus on essential functionality. Signed-off-by: Ao Tang <aot@nvidia.com> --------- Signed-off-by: Ao Tang <aot@nvidia.com> * Enhance FilePartitioningStage to enforce task limit check earlier in the process. (#867) Signed-off-by: Ao Tang <aot@nvidia.com> * Initialize and shutdown ray session in each executor (#844) * Remove pynvml dependency from pyproject.toml (#872) * docs: refactor all the things (#826) (#859) * docs: refactor all the things * remove auto api docs * api docs to gitignore * updated readme * python linting fixes batch 1 * batch 2 * batch 3 * update --------- Signed-off-by: Lawrence Lane <llane@nvidia.com> Signed-off-by: NeMo Bot <nemo-bot@nvidia.com> Co-authored-by: L.B. <llane@nvidia.com> Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> * ci(fix): Use GITHUB_TOKEN for community bot (#853) (#854) * ci(fix): Use GITHUB_TOKEN for community bot * f --------- Signed-off-by: oliver könig <okoenig@nvidia.com> Signed-off-by: NeMo Bot <nemo-bot@nvidia.com> Co-authored-by: oliver könig <okoenig@nvidia.com> Co-authored-by: Ayush Dattagupta <ayushdg95@gmail.com> * update LLM PII redaction file - fix issue 828 (#868) (#871) * update LLM PII redaction file - fix 828 * Fix ruff check LLM PII redaction file - fix 828 * update LLM PII redaction Enron-file - fix 828 * update LLM-PII redaction README - fix 828 * updated LLM PII redaction Enron-file - fix 828 * updated LLM PII redaction file - fix 828 * Update tutorials/curator-llm-pii/README.md * removed typo from README file - fix 828 * updated LLM redaction tutorial - fix 828 * updated LLM redaction-Enron file - fix 828 * updated LLM redaction-Enron file - fix 828 * Update tutorials/curator-llm-pii/PII-LLM-modification-Enron.ipynb * Update tutorials/curator-llm-pii/PII-LLM-modification-Enron.ipynb --------- Signed-off-by: Adeola Adesoba <aadesoba@nvidia.com> Signed-off-by: aadesoba-nv <aadesoba@nvidia.com> Signed-off-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Signed-off-by: NeMo Bot <nemo-bot@nvidia.com> Co-authored-by: aadesoba-nv <aadesoba@nvidia.com> Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> * [Tutorials] Lazy import GPU modules in the Llama Nemotron tutorial (#831) (#875) Signed-off-by: Mehran Maghoumi <Maghoumi@users.noreply.github.com> Signed-off-by: NeMo Bot <nemo-bot@nvidia.com> Co-authored-by: Mehran Maghoumi <Maghoumi@users.noreply.github.com> Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> * docs: changelog update (#860) (#887) * docs: changelog update * formatting * remove item --------- Signed-off-by: Lawrence Lane <llane@nvidia.com> Signed-off-by: NeMo Bot <nemo-bot@nvidia.com> Co-authored-by: L.B. <llane@nvidia.com> * linkfixes (#865) (#882) Signed-off-by: Lawrence Lane <llane@nvidia.com> Signed-off-by: NeMo Bot <nemo-bot@nvidia.com> Co-authored-by: L.B. <llane@nvidia.com> Co-authored-by: Ayush Dattagupta <ayushdg95@gmail.com> * docs: Fixing version switcher issues (#885) (#886) Signed-off-by: Andrew Schilling <aschilling@nvidia.com> Signed-off-by: NeMo Bot <nemo-bot@nvidia.com> Co-authored-by: Andrew Schilling <85314306+aschilling-nv@users.noreply.github.com> * [Ray] Download and extract ArXiv (#805) * remove dask arxiv Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * first pass for entire arxiv implementation Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * ruff Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * fix circular import Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * working module Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add downloader tests Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * remove unused noqa Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add test_iterator Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add extractor tests Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * fix failing download tests Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add test_stage Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * sort Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add url generator tests Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * remove noqa Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * remove nemo_curator/download, outdated scripts, outdated examples Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> --------- Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> Signed-off-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> * [Ray] Classifiers (#753) * [Ray] Classifiers Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * fix ruff Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add utils file Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * commit quality classifier benchmark helpers Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * use basictokenizer as cpu tokenizer, add crossfit config Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * some ruff Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * merge upstream Signed-off-by: Praateek <praateekm@gmail.com> * use _name, remove gpu resources from labeler Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * consolidate praateek's work with distributeddataclassifier for quality classifier Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * ruff Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add content type, domain, multilingual domain, and filter_by support Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * support for fineweb, fineweb mixtral, and fineweb nemotron classifiers Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * ruff Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add prompt task complexity support Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * remove noqa Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * padding_size does not need to be exposed to user Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * max_seq_length does not need to be exposed to the user, set default micro_batch_sizes Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add max_chars, edit docstring Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * ruff Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * aegis functionality, start working on instruction data guard Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * nit fixes Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add working pytests for all classifiers Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * remove existing pytest file Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add more comments to tests Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * address review, add mem conversation, add README Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * move redundant test code Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * ruff Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * model_inference_batch_size and format_name_with_suffix Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add missing hf_token usage, remove test file, restructure dirs and files Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * delete old examples and scripts Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> --------- Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> Signed-off-by: Praateek <praateekm@gmail.com> Co-authored-by: Praateek <praateekm@gmail.com> * [RAY] Add ID Module (#876) * Add id inital working IMP Signed-off-by: Vibhu Jawa <vjawa@nvidia.com> * working add_id Signed-off-by: Vibhu Jawa <vjawa@nvidia.com> * Add ID Signed-off-by: Vibhu Jawa <vjawa@nvidia.com> * Update ray-curator/ray_curator/tasks/tasks.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Vibhu Jawa <vibhujawa@gmail.com> * Add prefix feature, overwrite, warnings Signed-off-by: Vibhu Jawa <vjawa@nvidia.com> * rename id_prefix to user_prefix Signed-off-by: Vibhu Jawa <vjawa@nvidia.com> * Add in test for tasks and fix task id Signed-off-by: VibhuJawa <vibhujawa@gmail.com> --------- Signed-off-by: Vibhu Jawa <vjawa@nvidia.com> Signed-off-by: Vibhu Jawa <vibhujawa@gmail.com> Signed-off-by: VibhuJawa <vibhujawa@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Add video splitting pipeline with fixed stride extraction and transcoding Stage (#783) * Add video splitting pipeline with fixed stride extraction and transcoding stages - Introduced `video_split_clip_example.py` to demonstrate video splitting functionality. - Added `ClipTranscodingStage` and `FixedStrideExtractorStage` for processing video clips. - Implemented command-line arguments for configuring video processing parameters. - Created utility functions for grouping iterables in `grouping.py`. - Added unit tests for the new stages in `test_clip_transcoding_stage.py` and `test_fixed_stride_extractor_stage.py`. Signed-off-by: Ao Tang <aot@nvidia.com> * Refactor video splitting pipeline to remove debug mode and enhance stage integration Signed-off-by: Ao Tang <aot@nvidia.com> * Add video limit argument to video split clip example Signed-off-by: Ao Tang <aot@nvidia.com> * Refactor video processing stages to enhance resource management and integrate new functionalities - Replaced separate VideoReaderStage and VideoDownloadStage with a composite VideoReaderDownloadStage, streamlining the video reading and downloading process. - Updated ClipTranscodingStage to improve GPU resource allocation and added detailed arguments for better configurability. - Adjusted tests to reflect changes in resource management, ensuring accurate assertions on GPU usage. These changes improve the clarity and efficiency of video processing within the ray-curator framework. Signed-off-by: Ao Tang <aot@nvidia.com> * Add mock GPU classes and enhance ClipTranscodingStage tests - Introduced MockGpuInfo and MockGpuResources classes to simulate GPU information and resources for testing. - Updated test_resources_gpu_encoder and test_resources_hwaccel_enabled methods to utilize mocks, ensuring accurate resource assertions without dependency on actual GPU hardware. - Enhanced test_different_encoder_configurations to validate resource requirements for various encoder configurations, including GPU settings. These changes improve the robustness of the ClipTranscodingStage tests by isolating them from hardware dependencies, facilitating easier testing and validation. Signed-off-by: [Your Name] <your.email@example.com> Signed-off-by: Ao Tang <aot@nvidia.com> * Remove deprecated GPU resource tests from ClipTranscodingStage Signed-off-by: Ao Tang <aot@nvidia.com> * Remove unused test for processing in debug mode from ClipTranscodingStage tests Signed-off-by: Ao Tang <aot@nvidia.com> * Add unit tests for grouping utilities in the ray_curator.utils module Signed-off-by: Ao Tang <aot@nvidia.com> * Enhance video processing stages with ray stage specifications - Added `ray_stage_spec` method to `ClipTranscodingStage`, `VideoDownloadStage`, and `VideoReaderStage` to define stage characteristics for Ray integration. - Updated input and output methods in `ClipTranscodingStage` to include additional input parameters. - Modified `SplitPipeTask` to return properties from `data` instead of `video`, ensuring consistency in task data handling. - Added unit tests to verify the correctness of the new `ray_stage_spec` implementations. These changes improve the integration of video processing stages with Ray's architecture and enhance test coverage for the new functionalities. Signed-off-by: Ao Tang <aot@nvidia.com> * Refactor video processing imports and update pipeline stages Signed-off-by: Ao Tang <aot@nvidia.com> * Remove unused `IS_ACTOR_STAGE` key from `ray_stage_spec` in `ClipTranscodingStage` and clean up commented-out code. This simplifies the stage specification and prepares for future enhancements. Signed-off-by: Ao Tang <aot@nvidia.com> * Remove redundant check for video source bytes in ClipTranscodingStage. This simplifies the process method by eliminating unnecessary error handling when source bytes are not available. Signed-off-by: Ao Tang <aot@nvidia.com> * Refactor ClipTranscodingStage to use a class variable for the stage name and implement post-initialization resource setup. Added error handling for None source bytes in the process method. Updated tests to remove redundant checks and ensure proper functionality. Signed-off-by: Ao Tang <aot@nvidia.com> * Remove unnecessary error handling for None source bytes in ClipTranscodingStage's process method, Signed-off-by: Ao Tang <aot@nvidia.com> * remove redudant test Signed-off-by: Ao Tang <aot@nvidia.com> * precommit fix Signed-off-by: Ao Tang <aot@nvidia.com> --------- Signed-off-by: Ao Tang <aot@nvidia.com> Signed-off-by: [Your Name] <your.email@example.com> * docs: ray curator api autodoc updates (#896) Signed-off-by: Lawrence Lane <llane@nvidia.com> * Move all text stages to `stages/text/` (#891) * first pass Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * ruff Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * fix tests Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * fix after merge Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> --------- Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * Add Ray Actor Pool Exceuctor (#893) * Initial Minhash implementation on Ray (#837) * Initial minhash logic without Stage API Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * update args and support passing in pre-batched files Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Remove old minhash impl Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Add Class to do GPU IO for dedup Co-authored-by: Praateek Mahajan <praateekmahajan@users.noreply.github.com> Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Add ID Generator class Co-authored-by: Praateek Mahajan <praateekmahajan@users.noreply.github.com> Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Move MinHashActor to a GPUMinHash class and create a GPUMinHash Processing stage Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Remove minhash method in favor of minhashProcessingStage Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Add mkdir logic to the writer Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Add file partitioning stage to __init__.py Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Update cuda12x extra to deduplication. Bump pynvml to avoid conflicts Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Update stage name Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Add initial minhash tests Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Add rmm pool arg to MinhashStage, default to false in the parent actor Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Move IO and ID generator logic to the Stage rather than the parent GPUMinHash class Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Update GPUMinHash Tests Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Standardize Id generator actor name Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Add GPUMinHashStage tests Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Rename GPUMinHashStage to MinHashStage Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Add marker for GPU tests Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * update cpu ci workflow to skip GPU tests Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Skip tests if imports fail Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * move cudf import checks before stage imports Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Use storage options from read_kwargs directly Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> --------- Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> Co-authored-by: Praateek Mahajan <praateekmahajan@users.noreply.github.com> * docs: curate text load data content updates for ray (#895) * docs: load text data article updates Signed-off-by: Lawrence Lane <llane@nvidia.com> * remove "ray-curator" for curator Signed-off-by: Lawrence Lane <llane@nvidia.com> * simplify naming Signed-off-by: Lawrence Lane <llane@nvidia.com> * imports Signed-off-by: Lawrence Lane <llane@nvidia.com> * imports Signed-off-by: Lawrence Lane <llane@nvidia.com> * imports Signed-off-by: Lawrence Lane <llane@nvidia.com> * linkfix Signed-off-by: Lawrence Lane <llane@nvidia.com> * read through Signed-off-by: Lawrence Lane <llane@nvidia.com> * simplification Signed-off-by: Lawrence Lane <llane@nvidia.com> * remove placeholder concept details Signed-off-by: Lawrence Lane <llane@nvidia.com> * pipeline verbiage Signed-off-by: Lawrence Lane <llane@nvidia.com> * initial feedback round Signed-off-by: Lawrence Lane <llane@nvidia.com> * reduce admonition noise Signed-off-by: Lawrence Lane <llane@nvidia.com> * minor updates Signed-off-by: Lawrence Lane <llane@nvidia.com> * minor updates Signed-off-by: Lawrence Lane <llane@nvidia.com> * feedback Signed-off-by: Lawrence Lane <llane@nvidia.com> --------- Signed-off-by: Lawrence Lane <llane@nvidia.com> * Adding function decorator for very simple functions to be converted into stages (#835) * Revert 'Add utility decorators for ProcessingStage creation' (empty cherry-pick) Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> * Add utility decorators for ProcessingStage creation This commit introduces a new module containing the `processing_stage` decorator, which allows users to easily convert plain Python functions into `ProcessingStage` instances. The decorator supports configuration options such as stage name, resource allocation, and batch size. Additionally, unit tests have been added to validate the functionality of the decorator and ensure proper handling of task processing. Signed-off-by: [Your Name] <your.email@example.com> Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> * test commit Signed-off-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> * add test_stage_registry, other nits Signed-off-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> * overwrite stage registry Signed-off-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> * ruff Signed-off-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> * propagate _metadata and _stage_perf Signed-off-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> * accept resources dict Signed-off-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> * reformat Signed-off-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> * add process_batch tests Signed-off-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> * ruff Signed-off-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> * remove todo Signed-off-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> * add pipeline example Signed-off-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> --------- Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> Signed-off-by: [Your Name] <your.email@example.com> Signed-off-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> * Add Text Embedding Model (#899) * Add Ray curator dockerfile and enable testing (#879) * Add Ray curator dockerfile and enable testing Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Fix indentation issues Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Update dockerfile and add cuda12x Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Update coverage pathes Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Update gpu tests runner Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Add gpu testing scripts and update Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Cd into ray-curator for coverage Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Create dev layer and install dev packages Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Update coverage paths Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Install opencv Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Address syntax error Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Update cv2 ubuntu dependencies Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Fix typo Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Add cudf placeholder test Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Space after import Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Add gpu_only_import Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Remove import utils for now Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Fix spacing Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Skip gpu tests for cpu Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Update unit test coverage path Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Skip gpu coverage report for now Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Use pixi Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Fix dockerfile syntax Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Try ffmpeg only Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Add extra index url for pixi Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Address typos Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Install git Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Update entrypoint Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Fix typo Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Use env var for dev install Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Resolve syntax error Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Fix env var and verbose install Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Update pixi entrypoint and pyproject install Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Trigger entrypoint before tests Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Update test entrypoint Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Source entrypoint Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Update list of dev install pixi Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Add back cuda12x and index-strategy Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Turn off verbose install Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Skip gpu coverage for now Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Support arm Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Set timeout for dockerbuild and update pyproject Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Remove retry github config Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> --------- Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * ci: Install ray-curator module (#905) * Add ray curator as pypi dependency Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Add package info and test import Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Update pyproject.toml Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Copy src for pixi install Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Update test import Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * Revert temp test Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> --------- Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com> * [REVIEW] Add modifers to ray curator (#898) * Inital WIP modifier workflows Signed-off-by: VibhuJawa <vibhujawa@gmail.com> * Moved tests and also moved modifiers to text sub module Signed-off-by: VibhuJawa <vibhujawa@gmail.com> * Add tests for the meta class and modifier and improve docstring Signed-off-by: VibhuJawa <vibhujawa@gmail.com> * Update ray-curator/ray_curator/stages/text/modifiers/slicer.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Vibhu Jawa <vibhujawa@gmail.com> * Update ray-curator/ray_curator/stages/text/modifiers/line_remover.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Vibhu Jawa <vibhujawa@gmail.com> * Delete files from dask dir and remove optional download fields Signed-off-by: Vibhu Jawa <vjawa@nvidia.com> * Add pytest as requested Signed-off-by: Vibhu Jawa <vjawa@nvidia.com> --------- Signed-off-by: VibhuJawa <vibhujawa@gmail.com> Signed-off-by: Vibhu Jawa <vibhujawa@gmail.com> Signed-off-by: Vibhu Jawa <vjawa@nvidia.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Allow users to fuse multiple `DocumentFilter` objects into a single `ScoreFilter` stage (#850) * Allow users to fuse multiple `DocumentFilter` objects into a single `ScoreFilter` stage Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * remove old example and scripts file Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add suggestions Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * add init Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * fix csv path Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * clearer error messages Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> --------- Signed-off-by: Sarah Yurick <sarahyurick@gmail.com> * Fix exception when blocksize is set (#892) (#904) If blocksize is set instead of files_per_partition, this line raised an exception. Signed-off-by: Yurii Paniv <mr.robinhad@gmail.com> Signed-off-by: NeMo Bot <nemo-bot@nvidia.com> Co-authored-by: Yurii Paniv <mr.robinhad@gmail.com> * docs: curate text - process data - language dir (#900) * docs: curate text - process data - language dir Signed-off-by: Lawrence Lane <llane@nvidia.com> * remove extra content Signed-off-by: Lawrence Lane <llane@nvidia.com> * another pass Signed-off-by: Lawrence Lane <llane@nvidia.com> * remove pool Signed-off-by: Lawrence Lane <llane@nvidia.com> * formatting Signed-off-by: Lawrence Lane <llane@nvidia.com> * feedback Signed-off-by: Lawrence Lane <llane@nvidia.com> * clarificaiton and alternative as pipeline stage. removed extra section Signed-off-by: Lawrence Lane <llane@nvidia.com> * Update docs/curate-text/process-data/language-management/language.md Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Signed-off-by: L.B. <llane@nvidia.com> --------- Signed-off-by: Lawrence Lane <llane@nvidia.com> Signed-off-by: L.B. <llane@nvidia.com> Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> * docs: add README for experimental scripts directory (#910) Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> * Add IdGenerator to JsonlReader + IdGenerator tests / write_to_disk / from_disk (#907) * Initial buckets to edges stage (#909) * Initial buckets to edges stage Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * re-add file utils from lsh pr Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Handle directory cleanup/creation logic in the stage Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Add tests for buckets to edglist Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Rename doc_id_column to doc_id_field, update storage_options to read/write_kwargs instead Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Fix indentation Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Fix kwargs args Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Add copyright headers Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * remove previous curator impl Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> --------- Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * initial Connected components stage Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * update actor pool executor to produce as many tasks as actors Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Set actor pool size in addition to raft handle Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Improve IO logic, add 2D partitioning code for subcomms, improved logging Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Use self read_parquet Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * [SemDedup] Add KMeans (#912) * add tests Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Explicitly set batchsize to none Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * update overwrite test to avoid using ray client Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Add headers Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * Remove old cc impl Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com> * S3 Client (#903) * WIP Signed-off-by: Ao Tang <aot@nvidia.com> * WIP Signed-off-by: Ao Tang <aot@nvidia.com> * Refactor S3 client configuration and enhance video reading logging - Updated S3_PROFILE_PATH to use an environment variable for better flexibility in specifying the S3 credentials file location. - Improved logging in VideoReaderStage to provide more informative messages about video byte downloads, including the size of the downloaded video. Signed-off-by: Ao Tang <aot@nvidia.com> * Enhance VideoReader functionality with S3 support and improve validation checks - Updated VideoReader to conditionally use ClientPartitioningStage for S3 paths and FilePartitioningStage for local paths, improving flexibility in handling video sources. - Enhanced validation in VideoTask to check for the existence of input videos when provided as pathlib.Path, ensuring better error handling. - Removed unused methods from S3Client to streamline the codebase. Signed-off-by: Ao Tang <aot@nvidia.com> * Remove redundant exception raising in VideoReaderStage to improve error handling during video reading. This change prevents unnecessary propagation of exceptions while still logging errors effectively. Signed-off-by: Ao Tang <aot@nvidia.com> * Refactor ClientPartitioningStage and enhance S3 client configuration - Rearranged import statements for better organization and readability in `client_partitioning.py` and `video_reader.py`. - Updated `S3ClientConfig` and `BaseClientConfig` to use `@dataclass` for improved data handling. - Added comprehensive unit tests for `ClientPartitioningStage`, covering initialization, setup, and processing methods with various scenarios. - Improved error handling and validation in the `_read_list_json` function. This refactor enhances the maintainability and test coverage of the codebase, ensuring better functionality and reliability in handling client partitioning tasks. Signed-off-by: Ao Tang <aot@nvidia.com> * Remove SPDX license comments from S3 client, storage client, and storage utilities files to streamline code readability. This change simplifies the file headers while retaining essential module documentation. Signed-off-by: Ao Tang <aot@nvidia.com> * Use Fsspec instead of boto3 Signed-off-by: Ao Tang <aot@nvidia.com> * Refactor file handling and enhance video reading capabilities - Introduced a new `FSPath` class in `client_utils.py` for improved file operations with fsspec. - Updated `ClientPartitioningStage` and `VideoReaderStage` to utilize the new `FSPath` class for better handling of file paths. - Removed unused imports and streamlined code in `client_partitioning.py` and `video_reader.py`. - Enhanced error handling in `VideoReaderStage` to support various input types for video sources. This refactor improves the maintainability and flexibility of file handling in the video processing pipeline. Signed-off-by: Ao Tang <aot@nvidia.com> * move client_partitioning.py Signed-off-by: Ao Tang <aot@nvidia.com> * ruff check Signed-off-by: Ao Tang <aot@nvidia.com> * Fix broken tests Signed-off-by: Ao Tang <aot@nvidia.com> * Add `as_posix` method to `FSPath` class and implement comprehensive test suite - Introduced `as_posix` method in the `FSPath` class to convert filesystem paths to POSIX format, accommodating various protocols. - Created a new test suite for `FSPath` in `test_client_utils.py`, covering initialization, string representation, file operations, and edge cases. - Enhanced tests for `get_bytes_cat_ranges` to handle different file sizes and error scenarios. This update improves the functionality and test coverage of the `FSPath` class, ensuring robust file handling across different filesystems. Signed-off-by: Ao Tang <aot@nvidia.com> * Remove logging of downloaded video size in VideoReaderStage to streamline error handling and reduce unnecessary output. Signed-off-by: Ao Tang <aot@nvidia.com> * Refactor video reading and splitting pipeline examples for improved readability - Reformatted the `create_video_reading_pipeline` and `create_video_splitting_pipeline` functions to enhance code clarity by aligning parameters and removing unnecessary line breaks. - Updated the `VideoReader` and `ClipTranscodingStage` instantiation to follow a consistent style. - Made minor adjustments in the `ClientPartitioningStage` to ensure consistent formatting and improved readability. These changes contribute to a cleaner and more maintainable codebase for video processing pipelines. Signed-off-by: Ao Tang <aot@nvidia.com> --------- Signed-off-by: Ao Tang <aot@nvidia.com> * Add ClipWriterStage to video splitting pipeline Clean (#897) * WIP Signed-off-by: Ao Tang <aot@nvidia.com> * WIP Signed-off-by: Ao Tang <aot@nvidia.com> * Update ClipWriterStage to clarify local storage usage Signed-off-by: [Your Name] <your.email@example.com> Signed-off-by: Ao Tang <aot@nvidia.com> * Enhance video clip processing with new GenericClipWriterStage and required output path argument - Introduced a new GenericClipWriterStage for writing video clips and their metadata, consolidating the writing process and improving resource management. - Updated the video_split_clip_example to require an output clip path, ensuring that users specify where to save the generated clips. - The new stage supports parallel writing of clips and metadata, enhancing performance and flexibility in video processing workflows. Signed-off-by: Ao Tang <aot@nvidia.com> * Enhance ClipWriterStage with additional metadata handling - Improved `ClipWriterStage` to support writing additional metadata during video processing. - Updated related utility functions to accommodate new metadata fields. - Refined unit tests to cover the new functionality and ensure reliability. Signed-off-by: Ao Tang <aot@nvidia.com> * Add ClipWriterStage to video splitting pipeline - Introduced `ClipWriterStage` for writing clips and metadata during video processing. - Updated `video_split_clip_example.py` to include the new stage, allowing for clip writing functionality. - Enhanced command-line argument parsing for output clip path. - Added utility functions for managing storage paths and writing data in various formats. - Implemented unit tests for `ClipWriterStage` to ensure functionality and reliability. Signed-off-by: Ao Tang <aot@nvidia.com> * ruff fix Signed-off-by: Ao Tang <aot@nvidia.com> * ruff format Signed-off-by: Ao Tang <aot@nvidia.com> * Refactor S3 client configuration and enhance video reading logging - Updated S3_PROFILE_PATH to use an environment variable for better flexibility in specifying the S3 credentials file location. - Improved logging in VideoReaderStage to provide more informative messages about video byte downloads, including the size of the downloaded video. Signed-off-by: Ao Tang <aot@nvidia.com> * Enhance VideoReader functionality with S3 support and improve validation checks - Updated VideoReader to conditionally use ClientPartitioningStage for S3 paths and FilePartitioningStage for local paths, improving flexibility in handling video sources. - Enhanced validation in VideoTask to check for the existence of input videos when provided as pathlib.Path, ensuring better error handling. - Removed unused methods from S3Client to streamline the codebase. Signed-off-by: Ao Tang <aot@nvidia.com> * Remove redundant exception raising in VideoReaderStage to improve error handling during video reading. This change prevents unnecessary propagation of exceptions while still logging errors effectively. Signed-off-by: Ao Tang <aot@nvidia.com> * Refactor ClientPartitioningStage and enhance S3 client configuration - Rearranged import statements for better organization and readability in `client_partitioning.py` and `video_reader.py`. - Updated `S3ClientConfig` and `BaseClientConfig` to use `@dataclass` for improved data handling. - Added comprehensive unit tests for `ClientPartitioningStage`, covering initialization, setup, and processing methods with various scenarios. - Improved error handling and validation in the `_read_list_json` function. This refactor enhances the maintainability and test coverage of the codebase, ensuring better functionality and reliability in handling client partitioning tasks. Signed-off-by: Ao Tang <aot@nvidia.com> * Remove SPDX license comments from S3 client, storage client, and storage utilities files to streamline code readability. This change simplifies the file headers while retaining essential module documentation. Signed-off-by: Ao Tang <aot@nvidia.com> * Use Fsspec instead of boto3 Signed-off-by: Ao Tang <aot@nvidia.com> * Refactor file handling and enhance video reading capabilities - Introduced a new `FSPath` class in `client_utils.py` for improved file operations with fsspec. - Updated `ClientPartitioningStage` and `VideoReaderStage` to utilize the new `FSPath` class for better handling of file paths. - Removed unused imports and streamlined code in `client_partitioning.py` and `video_reader.py`. - Enhanced error handling in `VideoReaderStage` to support various input types for video sources. This refactor improves the maintainability and flexibility of file handling in the video processing pipeline. Signed-off-by: Ao Tang <aot@nvidia.com> * move client_partitioning.py Signed-off-by: Ao Tang <aot@nvidia.com> * ruff check Signed-off-by: Ao Tang <aot@nvidia.com> * Fix broken tests Signed-off-by: Ao Tang <aot@nvidia.com> * Remove unused `generic_clip_writer.py`, `storage_client.py`, and related utility files; refactor `writer_utils.py` to eliminate storage client dependencies and streamline file writing functions. Update tests to reflect these changes and ensure compatibility with the new structure. Signed-off-by: Ao Tang <aot@nvidia.com> * Remove test file `test_client_utils.py` for the `FSPath` class, cleaning up unused test cases and ensuring the test suite reflects the current codebase structure. Signed-off-by: Ao Tang <aot@nvidia.com> * Refactor ClipWriterStage to remove storage client dependencies and streamline file writing methods. Updated method signatures to eliminate storage client parameters, enhancing code clarity and maintainability. Signed-off-by: Ao Tang <aot@nvidia.com> * Remove unused import of ClipWriterStage in video_split_clip_example.py to streamline the code and improve clarity. Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> * Remove unused `input_s3_profile_name` attribute from `VideoReader` class to streamline the code and improve clarity. Signed-off-by: Ao Tang <aot@nvidia.com> * Remove unused `input_s3_profile_name` attribute from `VideoReader` class to streamline the code and improve clarity. Signed-off-by: Abhinav Garg <abhinavg@stanford.edu> * Refact…
1 parent b44ca25 commit 0b22ceb

File tree

21 files changed

+1057
-164
lines changed

21 files changed

+1057
-164
lines changed

ray-curator/ray_curator/backends/experimental/ray_actor_pool/shuffle_adapter.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
from ray_curator.stages.shuffler.stage import ShuffleStage
1717

1818

19-
@ray.remote
19+
# TODO: Remove once UCX memory usage with GPU staging buffers is fixed.
20+
@ray.remote(runtime_env={"env_vars": {"UCX_RNDV_FRAG_MEM_TYPES": "host"}})
2021
class ShuffleStageAdapter(BaseStageAdapter):
2122
"""Ray actor that wraps a shuffle stage and its actor.
2223
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from ray_curator.stages.deduplication.fuzzy.workflow import FuzzyDeduplicationWorkflow
2+
3+
__all__ = ["FuzzyDeduplicationWorkflow"]

ray-curator/ray_curator/stages/deduplication/fuzzy/buckets_to_edges.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from ray_curator.stages.deduplication.id_generator import CURATOR_DEDUP_ID_STR
2525
from ray_curator.stages.resources import Resources
2626
from ray_curator.tasks import FileGroupTask
27-
from ray_curator.utils.file_utils import delete_dir, get_fs, is_not_empty
27+
from ray_curator.utils.file_utils import create_or_overwrite_dir, get_fs
2828

2929

3030
class BucketsToEdgesStage(ProcessingStage[FileGroupTask, FileGroupTask]):
@@ -34,7 +34,7 @@ class BucketsToEdgesStage(ProcessingStage[FileGroupTask, FileGroupTask]):
3434
3535
Args:
3636
doc_id_field: The field name containing the document ids for each bucket.
37-
output_dir: The directory to write the output file to.
37+
output_path: The directory to write the output file to.
3838
read_kwargs: Keyword arguments to pass for reading the input files.
3939
Only the storage_options key is supported for now.
4040
write_kwargs: Keyword arguments to pass for writing the output files.
@@ -46,7 +46,7 @@ class BucketsToEdgesStage(ProcessingStage[FileGroupTask, FileGroupTask]):
4646

4747
def __init__(
4848
self,
49-
output_dir: str,
49+
output_path: str,
5050
doc_id_field: str = CURATOR_DEDUP_ID_STR,
5151
read_kwargs: dict[str, Any] | None = None,
5252
write_kwargs: dict[str, Any] | None = None,
@@ -57,14 +57,11 @@ def __init__(
5757
self.read_storage_options = read_kwargs.get("storage_options") if read_kwargs is not None else None
5858
self.write_storage_options = write_kwargs.get("storage_options") if write_kwargs is not None else None
5959

60-
self.output_fs = get_fs(output_dir, self.write_storage_options)
61-
self.output_dir = self.output_fs.sep.join([output_dir, self.name])
60+
self.output_fs = get_fs(output_path, self.write_storage_options)
61+
self.output_path = self.output_fs.sep.join([output_path, self.name])
6262

6363
# Handle output directory cleanup logic
64-
if is_not_empty(self.output_dir, self.output_fs):
65-
logger.warning(f"Output directory {self.output_dir} is not empty. Deleting it.")
66-
delete_dir(self.output_dir, self.output_fs)
67-
self.output_fs.mkdirs(self.output_dir, exist_ok=True)
64+
create_or_overwrite_dir(self.output_path, fs=self.output_fs)
6865

6966
def _check_io_kwargs(self, kwargs: dict[str, Any] | None) -> None:
7067
if kwargs is not None:
@@ -81,7 +78,7 @@ def process(self, task: FileGroupTask) -> FileGroupTask:
8178
edges = [list(edge) for edge in edges]
8279
edges = pa.Table.from_pandas(pd.DataFrame(edges, columns=[f"{self.doc_id_field}_x", f"{self.doc_id_field}_y"]))
8380

84-
output_path = self.output_fs.sep.join([self.output_dir, f"{task.task_id}.parquet"])
81+
output_path = self.output_fs.sep.join([self.output_path, f"{task._uuid}.parquet"])
8582
pq.write_table(edges, output_path, filesystem=self.output_fs)
8683
return FileGroupTask(
8784
task_id=f"{task.task_id}",

ray-curator/ray_curator/stages/deduplication/fuzzy/connected_components.py

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
from ray_curator.backends.experimental.utils import RayStageSpecKeys
2525
from ray_curator.stages.base import ProcessingStage
26+
from ray_curator.stages.deduplication.fuzzy.utils import CURATOR_FUZZY_DUPLICATE_GROUP_FIELD
2627
from ray_curator.stages.deduplication.id_generator import (
2728
CURATOR_DEDUP_ID_STR,
2829
)
@@ -38,23 +39,23 @@
3839
class ConnectedComponentsStage(ProcessingStage[FileGroupTask, FileGroupTask], DeduplicationIO):
3940
def __init__(
4041
self,
41-
output_dir: str,
42-
source_column: str = f"{CURATOR_DEDUP_ID_STR}_x",
43-
destination_column: str = f"{CURATOR_DEDUP_ID_STR}_y",
42+
output_path: str,
43+
source_field: str = f"{CURATOR_DEDUP_ID_STR}_x",
44+
destination_field: str = f"{CURATOR_DEDUP_ID_STR}_y",
4445
read_kwargs: dict | None = None,
4546
write_kwargs: dict | None = None,
4647
):
4748
"""
4849
Args:
49-
output_dir: The directory to write the resulting connected components to.
50-
source_column: The column name containing the document ids of the source of the edge.
51-
destination_column: The column name containing the document ids of the destination of the edge.
50+
output_path: The path to write the resulting connected components to.
51+
source_field: The field name containing the document ids of the source of the edge.
52+
destination_field: The field name containing the document ids of the destination of the edge.
5253
read_kwargs: Keyword arguments to pass for reading the input files.
5354
write_kwargs: Keyword arguments to pass for writing the output files.
5455
"""
5556

56-
self.source_column = source_column
57-
self.destination_column = destination_column
57+
self.source_field = source_field
58+
self.destination_field = destination_field
5859
self.read_kwargs = read_kwargs if read_kwargs is not None else {}
5960
self.write_kwargs = write_kwargs if write_kwargs is not None else {}
6061

@@ -63,9 +64,9 @@ def __init__(
6364
self._batch_size = None
6465

6566
# Handle output directory cleanup logic
66-
self.output_fs = get_fs(output_dir, self.write_kwargs.get("storage_options"))
67-
self.output_dir = self.output_fs.sep.join([output_dir, self.name])
68-
create_or_overwrite_dir(self.output_dir, self.output_fs)
67+
self.output_fs = get_fs(output_path, self.write_kwargs.get("storage_options"))
68+
self.output_path = self.output_fs.sep.join([output_path, self.name])
69+
create_or_overwrite_dir(self.output_path, fs=self.output_fs)
6970

7071
def setup(self, _worker_metadata: "WorkerMetadata | None" = None) -> None:
7172
if not hasattr(self, "_raft_handle"):
@@ -159,34 +160,34 @@ def process(self, task: FileGroupTask) -> FileGroupTask:
159160

160161
def process_batch(self, tasks: list[FileGroupTask]) -> list[FileGroupTask]:
161162
"""
162-
Process an input file, compute minhashes, and write results to an output file.
163-
Automatically adds a unique _curator_id field to each document if not present.
163+
Process a batch of input files containing edges between documents.
164+
Compute the weakly connected components of the graph and write a mapping of document ids to their connected component id.
164165
165166
Parameters
166167
----------
167-
infiles: str, list[str]
168-
Path to input file (JSONL format) or list of paths
169-
outfile: str
170-
Path to output file (Parquet format)
171-
columns: list, optional
172-
Columns to read from input file
168+
tasks: list[FileGroupTask]
169+
A list of FileGroupTasks containing the input files.
170+
Returns
171+
-------
172+
list[FileGroupTask]
173+
A list of FileGroupTasks containing the output doc_id to connected component id mapping.
173174
"""
174175
input_files = []
175176
for task in tasks:
176177
input_files.extend(task.data)
177-
output_file = self.output_fs.sep.join([self.output_dir, f"{tasks[0].task_id}.parquet"])
178-
edgelist_columns = [self.source_column, self.destination_column]
178+
output_file = self.output_fs.sep.join([self.output_path, f"{tasks[0]._uuid}.parquet"])
179+
edgelist_columns = [self.source_field, self.destination_field]
179180
dfs = []
180181
for input_file in input_files:
181182
dfs.append(self.read_parquet(input_file, columns=edgelist_columns, **self.read_kwargs))
182183
df = cudf.concat(dfs)
183184
# remove duplicate edges
184185
df = df.drop_duplicates(subset=edgelist_columns, ignore_index=True)
185-
vertices, labels = self.weakly_connected_components(df, self.source_column, self.destination_column)
186+
vertices, labels = self.weakly_connected_components(df, self.source_field, self.destination_field)
186187
df = cudf.DataFrame(
187188
{
188189
CURATOR_DEDUP_ID_STR: vertices,
189-
"_duplicate_group_id": labels,
190+
CURATOR_FUZZY_DUPLICATE_GROUP_FIELD: labels,
190191
}
191192
)
192193
self.write_parquet(df=df, filepath=output_file, index=False, **self.write_kwargs)
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from typing import TYPE_CHECKING, Any, Literal
16+
17+
from ray_curator.stages.deduplication.fuzzy.utils import CURATOR_FUZZY_DUPLICATE_GROUP_FIELD
18+
from ray_curator.stages.deduplication.id_generator import CURATOR_DEDUP_ID_STR
19+
from ray_curator.stages.shuffler.rapidsmpf_shuffler import pylibcudf_to_cudf_dataframe
20+
from ray_curator.stages.shuffler.stage import ShuffleStage
21+
from ray_curator.tasks import FileGroupTask
22+
from ray_curator.utils.file_utils import get_fs
23+
24+
if TYPE_CHECKING:
25+
import cudf
26+
27+
DUPLICATE_IDS_SUBDIR = "FuzzyDuplicateIds"
28+
29+
30+
class IdentifyDuplicatesStage(ShuffleStage):
31+
"""
32+
Stage that generates removal IDs for fuzzy deduplication.
33+
The approach involves shuffling the data based on the duplicate group field similar to grouping by the group field.
34+
followed by selecting one document per group.
35+
Currently the removal strategy is to randomly keep one document per group.
36+
37+
Parameters
38+
----------
39+
duplicate_group_field
40+
Column name representing the group id for a document.
41+
total_nparts
42+
Total number of output partitions. If None, will be set automatically by the executor.
43+
output_path
44+
Path to write output files.
45+
read_kwargs
46+
Keyword arguments for cudf.read_parquet method.
47+
write_kwargs
48+
Keyword arguments for cudf.to_parquet method.
49+
rmm_pool_size
50+
Size of the RMM GPU memory pool in bytes.
51+
If "auto", the memory pool is set to 90% of the free GPU memory.
52+
If None, the memory pool is set to 50% of the free GPU memory that can expand if needed.
53+
spill_memory_limit
54+
Device memory limit in bytes for spilling to host.
55+
If "auto", the limit is set to 80% of the RMM pool size.
56+
If None spilling is disabled.
57+
enable_statistics
58+
Whether the underlying rapidsmpf shuffler should collect shuffle statistics.
59+
"""
60+
61+
_name = "IdentifyDuplicates"
62+
63+
def __init__( # noqa: PLR0913
64+
self,
65+
duplicate_group_field: str = CURATOR_FUZZY_DUPLICATE_GROUP_FIELD,
66+
document_id_field: str = CURATOR_DEDUP_ID_STR,
67+
total_nparts: int | None = None,
68+
output_path: str = "./",
69+
read_kwargs: dict[str, Any] | None = None,
70+
write_kwargs: dict[str, Any] | None = None,
71+
rmm_pool_size: int | Literal["auto"] | None = "auto",
72+
spill_memory_limit: int | Literal["auto"] | None = "auto",
73+
enable_statistics: bool = False,
74+
):
75+
self.duplicate_group_field = duplicate_group_field
76+
self.document_id_field = document_id_field
77+
self.output_fs = get_fs(
78+
output_path, storage_options=read_kwargs.get("storage_options") if read_kwargs is not None else None
79+
)
80+
self.output_path = self.output_fs.sep.join([output_path, DUPLICATE_IDS_SUBDIR])
81+
self.write_kwargs = write_kwargs
82+
83+
super().__init__(
84+
shuffle_on=[duplicate_group_field],
85+
total_nparts=total_nparts,
86+
output_path=self.output_path,
87+
read_kwargs=read_kwargs,
88+
write_kwargs=write_kwargs,
89+
rmm_pool_size=rmm_pool_size,
90+
spill_memory_limit=spill_memory_limit,
91+
enable_statistics=enable_statistics,
92+
)
93+
94+
def _get_removal_ids(self, df: "cudf.DataFrame") -> "cudf.DataFrame":
95+
"""
96+
Get the removal ids for the given dataframe.
97+
"""
98+
removal_ids = df[df[self.duplicate_group_field].duplicated(keep="first")][self.document_id_field]
99+
removal_ids = removal_ids.sort_values(ignore_index=True)
100+
return removal_ids.to_frame()
101+
102+
def process(self, task: FileGroupTask) -> FileGroupTask:
103+
return super().process(task)
104+
105+
def ray_stage_spec(self) -> dict[str, Any]:
106+
return super().ray_stage_spec()
107+
108+
def read_and_insert(self, task: FileGroupTask) -> FileGroupTask:
109+
super().read_and_insert(task)
110+
return task
111+
112+
def insert_finished(self) -> None:
113+
super().insert_finished()
114+
115+
def extract_and_write(self) -> list[FileGroupTask]:
116+
self._check_actor_obj()
117+
write_kwargs = self.write_kwargs.copy()
118+
write_kwargs["index"] = write_kwargs.get("index", False)
119+
120+
result_tasks = []
121+
for partition_id, partition in self._actor_obj.extract():
122+
shuffled_partition_df = pylibcudf_to_cudf_dataframe(partition, column_names=self.output_columns)
123+
num_groups = shuffled_partition_df[self.duplicate_group_field].nunique()
124+
removal_ids = self._get_removal_ids(shuffled_partition_df)
125+
126+
output_file = self.output_fs.sep.join([self.output_path, f"part.{partition_id}.parquet"])
127+
# If user has not specified row_group_size_rows, set it to the lower of 10% of the number of removal ids or 1M (default) or a minimum of 1k (for small datasets)
128+
write_kwargs["row_group_size_rows"] = write_kwargs.get(
129+
"row_group_size_rows", max(1000, min(len(removal_ids) // 10, 1000 * 1000))
130+
)
131+
removal_ids.to_parquet(output_file, **write_kwargs)
132+
result_tasks.append(
133+
FileGroupTask(
134+
task_id=partition_id,
135+
dataset_name=self.dataset_name + f"{self.name}",
136+
data=[output_file],
137+
_metadata={
138+
"partition_index": partition_id,
139+
"num_groups": num_groups,
140+
"num_removal_ids": len(removal_ids),
141+
},
142+
)
143+
)
144+
return result_tasks

0 commit comments

Comments
 (0)