Skip to content

Commit a91f24a

Browse files
Merge pull request #34 from Clarifai/DEVX-453-Multi-Processing-Support-for-Data-Ingestion-Pipeline
[DEVX-453]: Added Multi-Processing support for Data Ingestion Pipeline
2 parents 13cf2e7 + ba82a82 commit a91f24a

File tree

3 files changed

+29
-15
lines changed

3 files changed

+29
-15
lines changed

clarifai_datautils/multimodal/pipeline/base.py

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
from concurrent.futures import ThreadPoolExecutor
23
from typing import List, Type
34

45
from tqdm import tqdm
@@ -34,14 +35,16 @@ def run(self,
3435
files: str = None,
3536
folder: str = None,
3637
show_progress: bool = True,
37-
loader: bool = True):
38+
loader: bool = True,
39+
num_workers: int = 4):
3840
"""Runs the Data Ingestion pipeline.
3941
4042
Args:
4143
files Any[str,List]: List of files to process.
4244
folder (str): Folder containing the files.
4345
show_progress (bool): Whether to show progress bar.
4446
loader (bool): Whether to return a Clarifai Dataloader Object to pass to SDK Dataset Upload Functionality.
47+
num_workers (int): Number of workers to use for parallel processing.
4548
4649
Returns:
4750
List of transformed elements or ClarifaiDataLoader object.
@@ -57,22 +60,31 @@ def run(self,
5760

5861
# Get files
5962
if files is not None:
60-
self.elements = [files] if isinstance(files, str) else files
61-
assert isinstance(self.elements, list), ' Files should be a list of strings.'
63+
all_files = [files] if isinstance(files, str) else files
64+
assert isinstance(all_files, list), 'Files should be a list of strings.'
6265
elif folder is not None:
63-
self.elements = [os.path.join(folder, f) for f in os.listdir(folder)]
66+
all_files = [os.path.join(folder, f) for f in os.listdir(folder)]
6467

65-
# Apply transformations
66-
#TODO: num_workers support
67-
if show_progress:
68-
with tqdm(total=len(self.transformations), desc='Applying Transformations') as progress:
69-
for transform in self.transformations:
70-
self.elements = transform(self.elements)
71-
progress.update()
68+
self.elements = []
7269

73-
else:
70+
# Apply transformations
71+
def transform_file(file_elements):
7472
for transform in self.transformations:
75-
self.elements = transform(self.elements)
73+
file_elements = transform(file_elements)
74+
self.elements.extend(file_elements)
75+
76+
# num_workers support
77+
with ThreadPoolExecutor(max_workers=num_workers) as executor:
78+
if show_progress:
79+
with tqdm(total=len(all_files), desc='Transforming Files') as progress:
80+
futures = [executor.submit(transform_file, [file]) for file in all_files]
81+
for job in futures:
82+
job.result()
83+
progress.update()
84+
else:
85+
futures = [executor.submit(transform_file, [file]) for file in all_files]
86+
for job in futures:
87+
job.result()
7688

7789
if loader:
7890
if self.transformations[0].__class__.__name__ == 'PDFPartitionMultimodal':

tests/pipelines/test_pdf_pipelines.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ def test_pipeline_run(self,):
3232
])
3333
elements = pipeline.run(files=PDF_FILE_PATH)
3434
assert len(elements) == 3
35-
assert elements[0].text[:9] == 'MAIN GAME'
35+
assert (elements[0].text[:9] == 'MAIN GAME') and (
36+
elements[1].text[1:12] == 'Transcribed') and (elements[2].text[:10] == 'Thankfully')
3637
assert elements[0].metadata['filename'] == 'DA-1p.pdf'
3738
assert elements[0].metadata['page_number'] == 1
3839
assert elements[0].metadata['text_after'] == 'our assault."'

tests/pipelines/test_text_pipelines.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ def test_pipeline_run(self,):
3636
])
3737
elements = pipeline.run(files=TEXT_FILE_PATH)
3838
assert len(elements) == 4
39-
assert elements[0].text[:9] == 'CHAPTER I'
39+
assert (elements[0].text[:9] == 'CHAPTER I') and (elements[1].text[:3] == 'All') and (
40+
elements[2].text[1:6] == 'First') and (elements[3].text[1:5] == 'What')
4041
assert elements[0].metadata['filename'] == 'book-war-and-peace-1p.txt'
4142
assert elements[0].metadata['text_after'] == 'used only by the elite.'
4243

0 commit comments

Comments
 (0)