Skip to content

Commit adc9e24

Browse files
Merge pull request #5 from Clarifai/DEVX-375-Data_Ingestion_Pipeline
[DEVX-375]: Data Ingestion Pipeline
2 parents 7b79489 + cbeadd6 commit adc9e24

25 files changed

+1295
-50
lines changed

README.md

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ This is a collection of utilities for handling various types of multimedia data.
1717
* **[Getting Started](#getting-started)**
1818
* **[Features](#features)**
1919
* [Image Utils](#image-utils)
20+
* [Data Ingestion Pipeline](#ingestion-pipeline)
2021
* **[Usage](#usage)**
2122
* **[Examples](#more-examples)**
2223

@@ -58,7 +59,9 @@ annotated_dataset = ImageAnnotations.import_from(path= 'folder_path', format= 'a
5859
- Load various annotated image datasets and export to clarifai Platform
5960
- Convert from one annotation format to other supported annotation formats
6061

61-
62+
### Data Ingestion Pipeline
63+
- Easy to use pipelines to load data from files and ingest into clarifai platfrom.
64+
- Load text files(pdf, doc, etc..) , transform, chunk and upload to the Clarifai Platform
6265

6366
## Usage
6467
### Image Annotation Loader
@@ -81,6 +84,30 @@ coco_dataset.get_info()
8184
coco_dataset.export_to('voc_detection')
8285
```
8386

87+
88+
### Data Ingestion Pipelines
89+
```python
90+
from clarifai_datautils.text import Pipeline, PDFPartition
91+
from clarifai_datautils.text.pipeline.cleaners import Clean_extra_whitespace
92+
93+
# Define the pipeline
94+
pipeline = Pipeline(
95+
name='pipeline-1',
96+
transformations=[
97+
PDFPartition(chunking_strategy = "by_title",max_characters = 1024),
98+
Clean_extra_whitespace()
99+
]
100+
)
101+
102+
103+
# Using SDK to upload
104+
from clarifai.client import Dataset
105+
dataset = Dataset(dataset_url)
106+
dataset.upload_dataset(pipeline.run(files = file_path, loader = True))
107+
108+
```
109+
110+
84111
## More Examples
85112

86113
See many more code examples in this [repo](https://github.com/Clarifai/examples).

clarifai_datautils/image/annotation_conversion/base.py renamed to clarifai_datautils/base/__init__.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
from typing import TypeVar, Union
22

3-
from .features import (VisualClassificationFeatures, VisualDetectionFeatures,
4-
VisualSegmentationFeatures)
3+
from clarifai_datautils.constants.base import DATASET_UPLOAD_TASKS
54

6-
DATASET_UPLOAD_TASKS = ["visual_classification", "visual_detection", "visual_segmentation"]
5+
from .features import (TextFeatures, VisualClassificationFeatures, VisualDetectionFeatures,
6+
VisualSegmentationFeatures)
77

88
OutputFeaturesType = TypeVar(
99
'OutputFeaturesType',
10-
bound=Union[VisualClassificationFeatures, VisualDetectionFeatures, VisualSegmentationFeatures])
10+
bound=Union[VisualClassificationFeatures, VisualDetectionFeatures, VisualSegmentationFeatures,
11+
TextFeatures])
1112

1213

1314
class ClarifaiDataLoader:

clarifai_datautils/image/annotation_conversion/features.py renamed to clarifai_datautils/base/features.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ class VisualClassificationFeatures:
1212
id: Optional[int] = None # image_id
1313
metadata: Optional[dict] = None
1414
image_bytes: Optional[bytes] = None
15+
label_ids: Optional[List[str]] = None
1516

1617

1718
@dataclass
@@ -24,6 +25,7 @@ class VisualDetectionFeatures:
2425
id: Optional[int] = None # image_id
2526
metadata: Optional[dict] = None
2627
image_bytes: Optional[bytes] = None
28+
label_ids: Optional[List[str]] = None
2729

2830

2931
@dataclass
@@ -36,3 +38,14 @@ class VisualSegmentationFeatures:
3638
id: Optional[int] = None # image_id
3739
metadata: Optional[dict] = None
3840
image_bytes: Optional[bytes] = None
41+
label_ids: Optional[List[str]] = None
42+
43+
44+
@dataclass
45+
class TextFeatures:
46+
"""Text classification datasets preprocessing output features."""
47+
text: str
48+
labels: List[Union[str, int]] # List[str or int] to cater for multi-class tasks
49+
id: Optional[int] = None # text_id
50+
metadata: Optional[dict] = None
51+
label_ids: Optional[List[str]] = None
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from dataclasses import dataclass
2+
3+
4+
@dataclass
5+
class DATASET_UPLOAD_TASKS:
6+
VISUAL_CLASSIFICATION: str = "visual_classification"
7+
VISUAL_DETECTION: str = "visual_detection"
8+
VISUAL_SEGMENTATION: str = "visual_segmentation"
9+
TEXT_CLASSIFICATION: str = "text_classification"
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
MAX_CHARACTERS = 500
2+
3+
MAX_NODES = 10
4+
SKIP_NODES = 1

clarifai_datautils/image/annotation_conversion/annotations.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@
44
from datumaro.components.errors import (DatasetError, DatasetImportError, DatasetNotFoundError,
55
MultipleFormatsMatchError)
66

7+
from clarifai_datautils.base import ClarifaiDataLoader
78
from clarifai_datautils.constants.annotations import (IMAGE_ANNOTATION_FORMATS,
89
IMAGE_ANNOTATION_FORMATS_TO_TASKS,
910
IMAGE_FORMAT_MAP)
11+
from clarifai_datautils.constants.base import DATASET_UPLOAD_TASKS
1012
from clarifai_datautils.errors import AnnotationsDatasetError, AnnotationsFormatError
11-
from clarifai_datautils.image.annotation_conversion.base import ClarifaiDataLoader
1213
from clarifai_datautils.image.annotation_conversion.loaders import (ClassificationDataLoader,
1314
DetectionDataLoader,
1415
SegmentationDataLoader)
@@ -165,11 +166,11 @@ def dataloader(self) -> ClarifaiDataLoader:
165166
>>> format = ImageAnnotations.import_from(path=folder_path, format = 'coco_detection')
166167
>>> clarifai_dataset_loader = format.dataloader
167168
"""
168-
if self.task == 'visual_classification':
169+
if self.task == DATASET_UPLOAD_TASKS.VISUAL_CLASSIFICATION:
169170
return ClassificationDataLoader(self._dataset)
170-
elif self.task == 'visual_detection':
171+
elif self.task == DATASET_UPLOAD_TASKS.VISUAL_DETECTION:
171172
return DetectionDataLoader(self._dataset)
172-
elif self.task == 'visual_segmentation':
173+
elif self.task == DATASET_UPLOAD_TASKS.VISUAL_SEGMENTATION:
173174
return SegmentationDataLoader(self._dataset)
174175

175176
def __str__(self) -> str:

clarifai_datautils/image/annotation_conversion/loaders.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@
55
import numpy as np
66
from datumaro.components.annotation import AnnotationType
77
from datumaro.components.media import ImageFromNumpy
8+
from clarifai_datautils.constants.base import DATASET_UPLOAD_TASKS
89

9-
from .base import ClarifaiDataLoader
10-
from .features import (VisualClassificationFeatures, VisualDetectionFeatures,
11-
VisualSegmentationFeatures)
10+
from ...base import ClarifaiDataLoader
11+
from ...base.features import (VisualClassificationFeatures, VisualDetectionFeatures,
12+
VisualSegmentationFeatures)
1213

1314
delimiters = [",", "|", ";", "/", "\\", ":", " "]
1415

@@ -35,7 +36,7 @@ def __init__(self, annotation_object):
3536

3637
@property
3738
def task(self):
38-
return "visual_classification"
39+
return DATASET_UPLOAD_TASKS.VISUAL_CLASSIFICATION
3940

4041
def __getitem__(self, index: int):
4142
dataset_item = self.annotation_object.get(
@@ -90,7 +91,7 @@ def __init__(self, annotation_object):
9091

9192
@property
9293
def task(self):
93-
return "visual_detection"
94+
return DATASET_UPLOAD_TASKS.VISUAL_DETECTION
9495

9596
def __getitem__(self, index: int):
9697
dataset_item = self.annotation_object.get(
@@ -170,7 +171,7 @@ def __init__(self, annotation_object):
170171

171172
@property
172173
def task(self):
173-
return "visual_segmentation"
174+
return DATASET_UPLOAD_TASKS.VISUAL_SEGMENTATION
174175

175176
def __getitem__(self, index: int):
176177
dataset_item = self.annotation_object.get(
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from clarifai_datautils.text.pipeline.base import Pipeline
2+
from clarifai_datautils.text.pipeline.PDF import PDFPartition
3+
from clarifai_datautils.text.pipeline.Text import TextPartition
4+
5+
__all__ = ['Pipeline', 'PDFPartition', 'TextPartition']
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
from typing import List
2+
3+
from unstructured.partition.pdf import partition_pdf
4+
5+
from clarifai_datautils.constants.pipeline import MAX_CHARACTERS
6+
7+
from .base import BaseTransform
8+
9+
10+
class PDFPartition(BaseTransform):
11+
"""Partitions PDF file into text elements."""
12+
13+
def __init__(self,
14+
ocr: bool = False,
15+
chunking_strategy: str = "basic",
16+
max_characters=MAX_CHARACTERS,
17+
overlap=None,
18+
overlap_all=True,
19+
**kwargs):
20+
"""Initializes an PDFPartition object.
21+
22+
Args:
23+
ocr (bool): Whether to use OCR.
24+
chunking_strategy (str): Chunking strategy to use.
25+
max_characters (int): Maximum number of characters in a chunk.
26+
overlap (int): Number of characters to overlap between chunks.
27+
overlap_all (bool): Whether to overlap all chunks.
28+
kwargs: Additional keyword arguments.
29+
30+
"""
31+
if chunking_strategy not in ["basic", "by_title"]:
32+
raise ValueError("chunking_strategy should be either 'basic' or 'by_title'.")
33+
self.chunking_strategy = chunking_strategy
34+
self.strategy = "fast" #"ocr" if ocr else "fast" #TODO: Add OCR Strategy and hi_res strategy
35+
self.max_characters = max_characters
36+
self.overlap = overlap
37+
self.overlap_all = overlap_all
38+
self.kwargs = kwargs
39+
40+
def __call__(self, elements: List[str]) -> List[str]:
41+
"""Applies the transformation.
42+
43+
Args:
44+
elements (List[str]): List of text elements.
45+
46+
Returns:
47+
List of transformed text elements.
48+
49+
"""
50+
file_elements = []
51+
for filename in elements:
52+
file_element = partition_pdf(
53+
filename=filename,
54+
strategy=self.strategy,
55+
chunking_strategy=self.chunking_strategy,
56+
max_characters=self.max_characters,
57+
overlap=self.overlap,
58+
overlap_all=self.overlap_all,
59+
**self.kwargs)
60+
file_elements.extend(file_element)
61+
del file_element
62+
63+
return file_elements
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# Data Ingestion Pipeline
2+
Load text files(pdf, doc, etc..) , transform, chunk and upload to the Clarifai Platform
3+
4+
## Features
5+
6+
- File Partitioning
7+
- Cleaning Chunks
8+
- Metadata Extraction
9+
10+
11+
## Usage
12+
13+
```python
14+
from clarifai_datautils.text import Pipeline, PDFPartition
15+
from clarifai_datautils.text.pipeline.cleaners import Clean_extra_whitespace
16+
17+
# Define the pipeline
18+
pipeline = Pipeline(
19+
name='pipeline-1',
20+
transformations=[
21+
PDFPartition(chunking_strategy = "by_title",max_characters = 1024),
22+
Clean_extra_whitespace()
23+
]
24+
)
25+
26+
27+
# Using SDK to upload
28+
from clarifai.client import Dataset
29+
dataset = Dataset(dataset_url)
30+
dataset.upload_dataset(pipeline.run(files = file_path, loader = True))
31+
32+
```
33+
34+
## Supported File Formats
35+
- PDF
36+
- Text(.txt)
37+
38+
39+
## Resources
40+
This functionality makes use of the [Unstructured Framework](https://github.com/Unstructured-IO/unstructured)

0 commit comments

Comments
 (0)