diff --git a/sdp/processors/__init__.py b/sdp/processors/__init__.py index 19694290..9b55e6f9 100644 --- a/sdp/processors/__init__.py +++ b/sdp/processors/__init__.py @@ -14,6 +14,10 @@ # let's import all supported processors here to simplify target specification +from sdp.processors.group_processors import ( + GroupProcessors, +) + from sdp.processors.datasets.coraa.create_initial_manifest import ( CreateInitialManifestCORAA, ) diff --git a/sdp/processors/group_processors.py b/sdp/processors/group_processors.py new file mode 100644 index 00000000..fb0cb748 --- /dev/null +++ b/sdp/processors/group_processors.py @@ -0,0 +1,47 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sdp.processors.base_processor import BaseProcessor +from sdp.utils.chunk_processing import ChunkProcessingPipeline + + +class GroupProcessors(BaseProcessor): + def __init__( + self, + output_manifest_file: str, + input_manifest_file: str, + chunksize: int = 500, + **processors_cfg, + ): + super().__init__( + output_manifest_file=output_manifest_file, + input_manifest_file=input_manifest_file, + ) + + self.initial_manifest_file = input_manifest_file + self.chunksize = chunksize + self.processors_cfg = processors_cfg["processors"] + + def test(self): + pass + + def process(self): + chunked_pipeline = ChunkProcessingPipeline( + initial_manifest_file=self.initial_manifest_file, + last_output_manifest_file=self.output_manifest_file, + chunksize=self.chunksize, + processors_cfgs=self.processors_cfg, + ) + + chunked_pipeline.run() diff --git a/sdp/run_processors.py b/sdp/run_processors.py index b9002de6..c3596146 100644 --- a/sdp/run_processors.py +++ b/sdp/run_processors.py @@ -22,11 +22,13 @@ from omegaconf import OmegaConf, open_dict from sdp.logging import logger +from sdp.utils.chunk_processing import get_last_output_manifest_file_in_group # registering new resolvers to simplify config files OmegaConf.register_new_resolver("subfield", lambda node, field: node[field]) OmegaConf.register_new_resolver("not", lambda x: not x) OmegaConf.register_new_resolver("equal", lambda field, value: field == value) +OmegaConf.register_new_resolver("inside_group", lambda value: f"${{{value.replace('processors.', '')}}}") # customizing logger @@ -128,6 +130,12 @@ def run_processors(cfg): # and "input_manifest_file" keys, which can be optional. In case they # are missing, we create tmp files here for them # (1) first use a temporary file for the "output_manifest_file" if it is unspecified + if processor_cfg["_target_"] == "sdp.processors.GroupProcessors" and "output_manifest_file" not in processor_cfg: + last_output_manifest_file_in_group = get_last_output_manifest_file_in_group(processor_cfg['processors']) + if last_output_manifest_file_in_group: + with open_dict(processor_cfg): + processor_cfg["output_manifest_file"] = last_output_manifest_file_in_group + if "output_manifest_file" not in processor_cfg: tmp_file_path = os.path.join(tmp_dir, str(uuid.uuid4())) with open_dict(processor_cfg): @@ -139,7 +147,7 @@ def run_processors(cfg): with open_dict(processors_cfgs[idx + 1]): processors_cfgs[idx + 1]["input_manifest_file"] = processor_cfg["output_manifest_file"] - processor = hydra.utils.instantiate(processor_cfg) + processor = hydra.utils.instantiate(processor_cfg, _recursive_=False) # running runtime tests to fail right-away if something is not # matching users expectations processor.test() diff --git a/sdp/utils/chunk_processing.py b/sdp/utils/chunk_processing.py new file mode 100644 index 00000000..5b1644b2 --- /dev/null +++ b/sdp/utils/chunk_processing.py @@ -0,0 +1,219 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import tempfile +import uuid +from typing import Any + +import hydra +from omegaconf import OmegaConf + +from sdp.logging import logger +from sdp.utils.common import read_manifest, write_manifest + +def get_last_output_manifest_file_in_group(group_processors_cfg): + return group_processors_cfg[-1].get("output_manifest_file", None) + +class ChunkedProcessor: + def __init__( + self, + chunk_input_file: str, + chunk_output_file: str, + output_manifest_file: str, + **processor_kwargs: Any, + ) -> None: + self.processor_cfg = processor_kwargs + self.chunk_input_file = chunk_input_file + self.chunk_output_file = chunk_output_file + self.agg_output_manifest_file = output_manifest_file + + self.processor = None + + def build_processor(self): + if "input_manifest_file" in self.processor_cfg: + logger.warning( + f"Processor inside chunked pipeline can't have `input_manifest_file` argument [{self.processor_cfg['_target_']}: {self.processor_cfg['input_manifest_file']}]. It will be chaged to the value of `chunk_input_file` ({self.processor_cfg['chunk_input_file']})." + ) + + self.processor_cfg["input_manifest_file"] = self.chunk_input_file + self.processor_cfg["output_manifest_file"] = self.chunk_output_file + + self.processor = hydra.utils.instantiate(self.processor_cfg, _recursive_=False) + self.processor.test() + + def append_chunk_to_agg_output(self): + samples = [sample for sample in read_manifest(self.chunk_output_file)] + + write_manifest( + samples=samples, + manifest_filepath=self.agg_output_manifest_file, + mode="a", + ) + logger.info( + f"Chunk output of processor `{self.processor_cfg['_target_']}` added to {self.agg_output_manifest_file}." + ) + + def process(self): + logger.info('=> Running processor "%s"', self.processor) + self.processor.process() + + +class СhunkRunner: + def __init__( + self, + initial_manifest_chunk_file: str, + chunk_steps_dir: str, + processors_cfgs: list[dict], + aggregation_at_end: bool = True, + ): + self.initial_manifest_chunk_file = initial_manifest_chunk_file + self.chunk_steps_dir = chunk_steps_dir + self.chunk_processors_cfgs = processors_cfgs.copy() + self.processors = None + self.aggregation_at_end = aggregation_at_end + + def prepare(self): + os.makedirs(self.chunk_steps_dir, exist_ok=True) + + def set_chunk_configs(self): + if "chunk_input_file" in self.chunk_processors_cfgs[0]: + logger.warning( + f"`chunk_input_file` can't be set for the 1st processor in chunked pipeline processing. Value will be set as path to file of manifest chunk ({self.initial_manifest_chunk_file})." + ) + + self.chunk_processors_cfgs[0][ + "chunk_input_file" + ] = self.initial_manifest_chunk_file + self.chunk_processors_cfgs[0].setdefault( + "chunk_output_file", os.path.join(self.chunk_steps_dir, str(uuid.uuid4())) + ) + + for i, processor_cfg in enumerate(self.chunk_processors_cfgs[1:]): + processor_cfg.setdefault( + "chunk_input_file", + self.chunk_processors_cfgs[i]["chunk_output_file"], + ) + processor_cfg.setdefault( + "chunk_output_file", + os.path.join(self.chunk_steps_dir, str(uuid.uuid4())), + ) + + self.chunk_processors_cfgs = OmegaConf.to_container( + OmegaConf.create(self.chunk_processors_cfgs), resolve=True + ) + + logger.info( + f"Chunk hydra config:\n{OmegaConf.to_yaml(self.chunk_processors_cfgs)}" + ) + + def build_processors(self): + self.processors = [] + for processor_cfg in self.chunk_processors_cfgs: + processor = ChunkedProcessor(**processor_cfg) + processor.build_processor() + self.processors.append(processor) + + def run_processors(self): + for processor in self.processors: + logger.info('=> Running processor "%s"', processor) + processor.process() + + if not self.aggregation_at_end: + processor.append_chunk_to_agg_output() + + def process(self): + self.prepare() + self.set_chunk_configs() + self.build_processors() + self.run_processors() + + if self.aggregation_at_end: + logger.info("Appending chunk outputs to `output_manifest_file`..") + for processor in self.processors: + processor.append_chunk_to_agg_output() + + +class ChunkProcessingPipeline: + def __init__( + self, + initial_manifest_file: str, + last_output_manifest_file: str, + processors_cfgs: list[dict], + chunksize: int = 100, + aggregation_at_end: bool = True, + light_logging: bool = True, + ): + self.initial_manifest_file = initial_manifest_file + self.last_output_manifest_file = last_output_manifest_file + self.chunksize = chunksize + self.processors_cfgs = processors_cfgs + self.aggregation_at_end = aggregation_at_end + + self.tmp_dir = None + + def prepare(self): + for processor_cfg in self.processors_cfgs[:-1]: + if "output_manifest_file" not in processor_cfg: + processor_cfg["output_manifest_file"] = os.path.join( + self.tmp_dir, str(uuid.uuid4()) + ) + os.makedirs( + os.path.dirname(processor_cfg["output_manifest_file"]), exist_ok=True + ) + write_manifest(processor_cfg["output_manifest_file"]) + + if "output_manifest_file" not in self.processors_cfgs[-1]: + self.processors_cfgs[-1]['output_manifest_file'] = self.last_output_manifest_file + os.makedirs( + os.path.dirname(self.last_output_manifest_file), exist_ok=True + ) + write_manifest(self.last_output_manifest_file) + + def chunk_manifest(self): + """Splits the manifest into smaller chunks defined by ``chunksize``.""" + manifest_chunk = [] + for idx, data_entry in enumerate(read_manifest(self.initial_manifest_file), 1): + manifest_chunk.append(data_entry) + if idx % self.chunksize == 0: + yield manifest_chunk + manifest_chunk = [] + if len(manifest_chunk) > 0: + yield manifest_chunk + + def run(self): + with tempfile.TemporaryDirectory() as pipeline_tmp_dir: + self.tmp_dir = pipeline_tmp_dir + self.prepare() + + chunk_no = 1 + for chunk_samples in self.chunk_manifest(): + logger.info(f"Starting batch #{chunk_no} processing:..".center(50, "-")) + + with tempfile.TemporaryDirectory() as chunk_tmp_dir: + initial_chunk_file = os.path.join(chunk_tmp_dir, str(uuid.uuid4())) + write_manifest( + manifest_filepath=initial_chunk_file, + samples=chunk_samples, + ) + + chunk = СhunkRunner( + initial_manifest_chunk_file=initial_chunk_file, + chunk_steps_dir=chunk_tmp_dir, + processors_cfgs=self.processors_cfgs, + aggregation_at_end=self.aggregation_at_end, + ) + chunk.process() + + logger.info(f"Batch #{chunk_no} processing finished.".center(50, "-")) + chunk_no += 1 diff --git a/sdp/utils/common.py b/sdp/utils/common.py index a47552f1..2960f4da 100644 --- a/sdp/utils/common.py +++ b/sdp/utils/common.py @@ -16,6 +16,7 @@ import os import subprocess import tarfile +from tqdm import tqdm import urllib import zipfile from pathlib import Path @@ -36,22 +37,70 @@ def load_manifest(manifest: Path) -> List[Dict[str, Union[str, float]]]: return result +def read_manifest(manifest_filepath: str | Path): + """ + Generator function that yields samples from a manifest file. + + Args: + manifest_filepath (str | Path): The path to the manifest file. + + Yields: + dict: A sample from the manifest file. + """ + with open(manifest_filepath, "rt", encoding="utf8") as fin: + for line in fin: + yield json.loads(line) + + +def write_manifest(manifest_filepath: str, samples: list[dict] = [], mode: str = "w"): + """ + Writes or appends samples to a manifest file. + + Args: + samples (list[dict]): List of samples to write to the manifest file. + manifest_filepath (str): The path to the manifest file. + mode (str, optional): The mode in which to open the file. Defaults to "w". + Use "w" for writing (overwriting) and "a" for appending. + + """ + if mode not in ["w", "a"]: + raise ValueError( + f"mode can't be `{mode}`. Use `w` for writing or `a` for appending." + ) + + os.makedirs(os.path.dirname(manifest_filepath), exist_ok=True) + + with open(manifest_filepath, mode=mode, encoding="utf8") as manifest: + for sample in tqdm(samples, desc="Writing samples:.."): + line = json.dumps(sample) + manifest.writelines(f"{line}\n") + logger.info(f"Manifest is saved: {manifest_filepath}") + + def download_file(source_url: str, target_directory: str, verbose=True): # make sure target_directory is an absolute path to avoid bugs when we change directories to download data later target_directory = os.path.abspath(target_directory) if verbose: - logger.info(f"Trying to download data from {source_url} and save it in this directory: {target_directory}") + logger.info( + f"Trying to download data from {source_url} and save it in this directory: {target_directory}" + ) filename = os.path.basename(urllib.parse.urlparse(source_url).path) target_filepath = os.path.join(target_directory, filename) if os.path.exists(target_filepath): if verbose: - logger.info(f"Found file {target_filepath} => will not be attempting download from {source_url}") + logger.info( + f"Found file {target_filepath} => will not be attempting download from {source_url}" + ) else: logger.info(f"Not found file {target_filepath}") - original_dir = os.getcwd() # record current working directory so can cd back to it - os.chdir(target_directory) # cd to target dir so that temporary download file will be saved in target dir + original_dir = ( + os.getcwd() + ) # record current working directory so can cd back to it + os.chdir( + target_directory + ) # cd to target dir so that temporary download file will be saved in target dir wget.download(source_url, target_directory) @@ -63,8 +112,12 @@ def download_file(source_url: str, target_directory: str, verbose=True): return target_filepath -def extract_archive(archive_path: str, extract_path: str, force_extract: bool = False) -> str: - logger.info(f"Attempting to extract all contents from tar file {archive_path} and save in {extract_path}") +def extract_archive( + archive_path: str, extract_path: str, force_extract: bool = False +) -> str: + logger.info( + f"Attempting to extract all contents from tar file {archive_path} and save in {extract_path}" + ) if not force_extract: if tarfile.is_tarfile(archive_path): with tarfile.open(archive_path, "r") as archive: @@ -73,12 +126,16 @@ def extract_archive(archive_path: str, extract_path: str, force_extract: bool = with zipfile.ZipFile(archive_path, "r") as archive: archive_extracted_dir = archive.namelist()[0] else: - raise RuntimeError(f"Unknown archive format: {archive_path}. We only support tar and zip archives.") + raise RuntimeError( + f"Unknown archive format: {archive_path}. We only support tar and zip archives." + ) archive_contents_dir = os.path.join(extract_path, archive_extracted_dir) if not force_extract and os.path.exists(archive_contents_dir): - logger.info(f"Directory {archive_contents_dir} already exists => will not attempt to extract file") + logger.info( + f"Directory {archive_contents_dir} already exists => will not attempt to extract file" + ) else: if tarfile.is_tarfile(archive_path): with tarfile.open(archive_path, "r") as archive: @@ -94,15 +151,30 @@ def extract_archive(archive_path: str, extract_path: str, force_extract: bool = def ffmpeg_convert(jpg: str, wav: str, ar: int = 0, ac: int = 1): - process_args = ["ffmpeg", "-nostdin", "-i", jpg, '-ac', str(ac), "-map", "0:a", "-c:a", "pcm_s16le", "-y", wav] + process_args = [ + "ffmpeg", + "-nostdin", + "-i", + jpg, + "-ac", + str(ac), + "-map", + "0:a", + "-c:a", + "pcm_s16le", + "-y", + wav, + ] if ar: process_args = process_args[:-1] process_args.extend(["-ar", str(ar), wav]) - return subprocess.run(process_args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + return subprocess.run( + process_args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL + ) def extract_tar_with_strip_components(tar_path, extract_path, strip_components=1): - with tarfile.open(tar_path, 'r') as tar: + with tarfile.open(tar_path, "r") as tar: members = tar.getmembers() for member in members: components = member.name.split(os.path.sep)