diff --git a/sdp/processors/base_processor.py b/sdp/processors/base_processor.py index 6fc22ee8..fe1eb4c3 100644 --- a/sdp/processors/base_processor.py +++ b/sdp/processors/base_processor.py @@ -166,28 +166,43 @@ def _process_with_dask(self, metrics): logger.info(f"Using Dask client with dashboard at: {client.dashboard_link}") # Delegate manifest reading to read_manifest() which returns a Dask bag. - bag = self.read_manifest() - - if not isinstance(bag, db.Bag): - bag = db.from_sequence(bag) - total_entries = bag.count().compute() - - if total_entries == 0: - logger.info("No entries found in the manifest input. Proceeding to create an empty output manifest.") - results = [] - else: - processed_bag = bag.map(lambda entry: self.process_dataset_entry(entry)).flatten() - results = processed_bag.compute() with open(self.output_manifest_file, "wt", encoding="utf8") as fout: - for entry in results: - metrics.append(entry.metrics) - if entry.data is not None: - json.dump(entry.data, fout, ensure_ascii=False) - fout.write("\n") - self.number_of_entries += 1 - self.total_duration += entry.data.get("duration", 0) - logger.info(f"Processed {total_entries} entries using Dask.") + for manifest_chunk in self._chunk_manifest(): + chunk_bag = db.from_sequence(manifest_chunk) + processed_chunk = chunk_bag.map(self.process_dataset_entry).flatten().compute() + + # Write results from this chunk to the output file + for entry in processed_chunk: + metrics.append(entry.metrics) + if entry.data is not None: + json.dump(entry.data, fout, ensure_ascii=False) + fout.write("\n") + self.number_of_entries += 1 + self.total_duration += entry.data.get("duration", 0) + + # bag = self.read_manifest() + + # if not isinstance(bag, db.Bag): + # bag = db.from_sequence(bag) + # total_entries = bag.count().compute() + + # if total_entries == 0: + # logger.info("No entries found in the manifest input. Proceeding to create an empty output manifest.") + # results = [] + # else: + # processed_bag = bag.map(lambda entry: self.process_dataset_entry(entry)).flatten() + # results = processed_bag.compute() + + # with open(self.output_manifest_file, "wt", encoding="utf8") as fout: + # for entry in results: + # metrics.append(entry.metrics) + # if entry.data is not None: + # json.dump(entry.data, fout, ensure_ascii=False) + # fout.write("\n") + # self.number_of_entries += 1 + # self.total_duration += entry.data.get("duration", 0) + # logger.info(f"Processed {total_entries} entries using Dask.") def _process_with_multiprocessing(self, metrics): with open(self.output_manifest_file, "wt", encoding="utf8") as fout: