diff --git a/launcher_scripts/conf/data_curation/common_crawl/add_id/add_id.yaml b/launcher_scripts/conf/data_curation/common_crawl/add_id/add_id.yaml new file mode 100644 index 0000000000..0f6aae0d76 --- /dev/null +++ b/launcher_scripts/conf/data_curation/common_crawl/add_id/add_id.yaml @@ -0,0 +1,13 @@ +run: + name: 'add-id' + results_dir: ${data_curation.run.results_dir}/${.name} + dependency: "singleton" + time_limit: "01:00:00" + nodes: 1 + node_type: cpu + +output_data_dir: ${.run.results_dir}/id_added +id_field_name: adlr_id +id_prefix: doc_id +input_file_type: jsonl +output_file_type: jsonl \ No newline at end of file diff --git a/launcher_scripts/conf/data_curation/common_crawl/curate_common_crawl.yaml b/launcher_scripts/conf/data_curation/common_crawl/curate_common_crawl.yaml index 0baa673b91..29bc048dd0 100644 --- a/launcher_scripts/conf/data_curation/common_crawl/curate_common_crawl.yaml +++ b/launcher_scripts/conf/data_curation/common_crawl/curate_common_crawl.yaml @@ -30,6 +30,7 @@ filter_quality: - quality_filtering fuzzy_deduplication: + - add_id - compute_minhashes - minhash_buckets - jaccard_map_buckets @@ -58,6 +59,7 @@ defaults: - common_crawl/connected_component/connected_component - common_crawl/write_deduped_result_with_text/write_deduped_result_with_text - common_crawl/verify_all_pairs_jaccard/verify_all_pairs_jaccard + - common_crawl/add_id/add_id special: choose_language: diff --git a/launcher_scripts/nemo_launcher/core/data_curation_stages.py b/launcher_scripts/nemo_launcher/core/data_curation_stages.py index f3524e222b..2d306f84a9 100644 --- a/launcher_scripts/nemo_launcher/core/data_curation_stages.py +++ b/launcher_scripts/nemo_launcher/core/data_curation_stages.py @@ -248,7 +248,7 @@ def run(self): class QualityFiltering(DataCurationSubStage): - """ DataCurationSubStage for performing quality filtering on documents """ + """DataCurationSubStage for performing quality filtering on documents""" def __init__(self, cfg, memory): super().__init__(cfg, memory) @@ -259,7 +259,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg # Write out the filter configuration as a separate config file @@ -428,7 +428,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg command_groups = [[]] @@ -488,7 +488,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg # Write out the filter configuration as a separate config file @@ -544,7 +544,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg # Write out the filter configuration as a separate config file @@ -589,7 +589,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg command_groups = [[]] @@ -639,7 +639,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg # Write out the filter configuration as a separate config file @@ -698,7 +698,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg # Write out the filter configuration as a separate config file @@ -770,6 +770,7 @@ def __init__(self, cfg): "connected_component": ConnectedComponent, "write_deduped_result_with_text": WriteDedupedResultWithText, "verify_all_pairs_jaccard": VerifyAllPairsJaccard, + "add_id": AddId, } def setup_stage_vars(self, cfg): @@ -840,7 +841,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg command_groups = [[]] @@ -885,7 +886,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg command_groups = [[]] @@ -928,7 +929,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg command_groups = [[]] @@ -969,7 +970,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg command_groups = [[]] @@ -1011,7 +1012,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg command_groups = [[]] @@ -1052,7 +1053,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg command_groups = [[]] @@ -1092,7 +1093,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg command_groups = [[]] @@ -1132,7 +1133,7 @@ def setup_stage_vars(self, cfg): self.stage_cfg = self._get_sub_stage_confg(self.stage_name) def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: - """ Builds the command groups for the current stage """ + """Builds the command groups for the current stage""" stage_cfg = self.stage_cfg command_groups = [[]] @@ -1159,3 +1160,47 @@ def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: command_groups = clean_command_groups(command_groups) return command_groups + + +class AddId(DataCurationSubStage): + def __init__(self, cfg, memory): + super().__init__(cfg, memory) + + def setup_stage_vars(self, cfg): + """Setup the stage vars, i.e. stage name and stage cfg""" + self.stage_name = "add_id" + self.stage_cfg = self._get_sub_stage_confg(self.stage_name) + + def make_stage_command_groups(self, stage_cfg_path: Path) -> List[List[str]]: + """Builds the command groups for the current stage""" + stage_cfg = self.stage_cfg + + command_groups = [[]] + + # Create the list of arguments for the filter_documents command + args = create_args_list( + replace_underscore=True, + input_data_dir=self.memory.data_dir, + output_data_dir=stage_cfg.get("output_data_dir"), + id_field_name=stage_cfg.get("id_field_name"), + id_prefix=stage_cfg.get("id_prefix"), + input_file_type=stage_cfg.get("input_file_type"), + output_file_type=stage_cfg.get("output_file_type"), + scheduler_file=self.log_folder / "scheduler.json", + ) + + self.memory.data_dir = stage_cfg.get("output_data_dir") + + runscript = " \\\n ".join(["add_id", *args]) + runscript_path = os.path.join(self.log_folder, "add_id.sh") + + with open(runscript_path, "w") as f: + f.write(runscript) + + core_command = [self.make_dask_command_string(runscript_path)] + + core_command_string = " \\\n ".join(core_command) + command_groups[-1] += [core_command_string] + command_groups = clean_command_groups(command_groups) + + return command_groups