Skip to content

Commit 003ee05

Browse files
[DEVX-453]: Added Multi-Processing support for Data Ingestion Pipeline
1 parent 765e959 commit 003ee05

File tree

1 file changed

+25
-13
lines changed
  • clarifai_datautils/multimodal/pipeline

1 file changed

+25
-13
lines changed

clarifai_datautils/multimodal/pipeline/base.py

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
from concurrent.futures import ThreadPoolExecutor
23
from typing import List, Type
34

45
from tqdm import tqdm
@@ -34,14 +35,16 @@ def run(self,
3435
files: str = None,
3536
folder: str = None,
3637
show_progress: bool = True,
37-
loader: bool = True):
38+
loader: bool = True,
39+
num_workers: int = 4):
3840
"""Runs the Data Ingestion pipeline.
3941
4042
Args:
4143
files Any[str,List]: List of files to process.
4244
folder (str): Folder containing the files.
4345
show_progress (bool): Whether to show progress bar.
4446
loader (bool): Whether to return a Clarifai Dataloader Object to pass to SDK Dataset Upload Functionality.
47+
num_workers (int): Number of workers to use for parallel processing.
4548
4649
Returns:
4750
List of transformed elements or ClarifaiDataLoader object.
@@ -57,22 +60,31 @@ def run(self,
5760

5861
# Get files
5962
if files is not None:
60-
self.elements = [files] if isinstance(files, str) else files
61-
assert isinstance(self.elements, list), ' Files should be a list of strings.'
63+
all_files = [files] if isinstance(files, str) else files
64+
assert isinstance(all_files, list), 'Files should be a list of strings.'
6265
elif folder is not None:
63-
self.elements = [os.path.join(folder, f) for f in os.listdir(folder)]
66+
all_files = [os.path.join(folder, f) for f in os.listdir(folder)]
6467

65-
# Apply transformations
66-
#TODO: num_workers support
67-
if show_progress:
68-
with tqdm(total=len(self.transformations), desc='Applying Transformations') as progress:
69-
for transform in self.transformations:
70-
self.elements = transform(self.elements)
71-
progress.update()
68+
self.elements = []
7269

73-
else:
70+
# Apply transformations
71+
def transform_file(file_elements):
7472
for transform in self.transformations:
75-
self.elements = transform(self.elements)
73+
file_elements = transform(file_elements)
74+
self.elements.extend(file_elements)
75+
76+
# num_workers support
77+
with ThreadPoolExecutor(max_workers=num_workers) as executor:
78+
if show_progress:
79+
with tqdm(total=len(all_files), desc='Transforming Files') as progress:
80+
futures = [executor.submit(transform_file, [file]) for file in all_files]
81+
for job in futures:
82+
job.result()
83+
progress.update()
84+
else:
85+
futures = [executor.submit(transform_file, [file]) for file in all_files]
86+
for job in futures:
87+
job.result()
7688

7789
if loader:
7890
if self.transformations[0].__class__.__name__ == 'PDFPartitionMultimodal':

0 commit comments

Comments
 (0)