Skip to content

Commit 50cecde

Browse files
Merge pull request #35 from Clarifai/DEVX-452-Add-Schema-for-Data-Ingestion-Pipeline
[DEVX-452]: Added Schema for Data Ingestion Pipeline Transformations
2 parents a91f24a + 0d166e3 commit 50cecde

File tree

4 files changed

+74
-7
lines changed

4 files changed

+74
-7
lines changed

clarifai_datautils/multimodal/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from clarifai_datautils.multimodal.pipeline.base import Pipeline
22
from clarifai_datautils.multimodal.pipeline.Docx import DocxPartition
33
from clarifai_datautils.multimodal.pipeline.Markdown import MarkdownPartition
4-
from clarifai_datautils.multimodal.pipeline.PDF import PDFPartition
4+
from clarifai_datautils.multimodal.pipeline.PDF import PDFPartition, PDFPartitionMultimodal
55
from clarifai_datautils.multimodal.pipeline.Text import TextPartition
66

77
__all__ = [

clarifai_datautils/multimodal/pipeline/base.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,29 @@
11
import os
22
from concurrent.futures import ThreadPoolExecutor
33
from typing import List, Type
4+
from schema import And, Schema
45

56
from tqdm import tqdm
67

78
from .basetransform import BaseTransform
89
from .loaders import MultiModalLoader, TextDataLoader
910

1011

12+
def get_schema() -> Schema:
13+
"""Initialize the schema for Data Ingestion Pipeline transformations.
14+
15+
This schema validates:
16+
17+
- transformations must be a list
18+
- First item in the list must be one of the following: PDFPartition, TextPartition, PDFPartitionMultimodal, DocxPartition, MarkdownPartition
19+
- Each item in the list must be of BaseTransform instance
20+
21+
Returns:
22+
Schema: The schema for transformations.
23+
"""
24+
return Schema(And(list, lambda x: x[0].__class__.__name__ in ['PDFPartition', 'TextPartition', 'PDFPartitionMultimodal', 'DocxPartition', 'MarkdownPartition'], lambda x: all(isinstance(item, BaseTransform) for item in x)), error="Invalid transformations data.")
25+
26+
1127
class Pipeline:
1228
"""Text processing pipeline object from files"""
1329

@@ -25,11 +41,8 @@ def __init__(
2541
"""
2642
self.name = name
2743
self.transformations = transformations
28-
for transform in self.transformations:
29-
if not isinstance(transform, BaseTransform):
30-
raise ValueError('All transformations should be of type BaseTransform.')
31-
32-
#TODO: Schema for transformations
44+
self.transformation_schema = get_schema()
45+
self.transformation_schema.validate(self.transformations)
3346

3447
def run(self,
3548
files: str = None,

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ llama-index-llms-clarifai==0.1.2
33
pi_heif==0.18.0
44
markdown==3.7
55
python-docx==1.1.2
6+
schema==0.7.5

tests/pipelines/test_ready_to_use_pipelines.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,30 @@
11
import os.path as osp
2+
from typing import List
23

3-
from clarifai_datautils.multimodal import Pipeline
4+
import pytest
5+
from schema import SchemaError
6+
7+
from clarifai_datautils.multimodal import PDFPartition, Pipeline
8+
from clarifai_datautils.multimodal.pipeline.cleaners import Clean_extra_whitespace
9+
from clarifai_datautils.multimodal.pipeline.extractors import (ExtractDateTimeTz,
10+
ExtractEmailAddress)
411

512
PDF_FILE_PATH = osp.abspath(osp.join(osp.dirname(__file__), "assets", "DA-1p.pdf"))
613
TEXT_FILE_PATH = osp.abspath(
714
osp.join(osp.dirname(__file__), "assets", "book-war-and-peace-1p.txt"))
815

916

17+
class Test_transformation():
18+
19+
def __init__(self):
20+
pass
21+
22+
def __call__(self,) -> List:
23+
"""Applies the transformation.
24+
"""
25+
pass
26+
27+
1028
class TestReadyToUsePipelines:
1129
"""Tests for ready to use pipelines."""
1230

@@ -77,3 +95,38 @@ def test_pipeline_standard_markdown(self,):
7795
assert pipeline.transformations[0].__class__.__name__ == 'MarkdownPartition'
7896
assert pipeline.transformations[1].__class__.__name__ == 'Clean_extra_whitespace'
7997
assert pipeline.transformations[2].__class__.__name__ == 'Group_broken_paragraphs'
98+
99+
def test_schema_error(self):
100+
# Incorrect type of transformations object
101+
with pytest.raises(SchemaError):
102+
_ = Pipeline(
103+
name='test-1',
104+
transformations=(
105+
PDFPartition(max_characters=1024, overlap=None),
106+
Clean_extra_whitespace(),
107+
ExtractDateTimeTz(),
108+
ExtractEmailAddress(),
109+
))
110+
111+
# Incorrect First transformation
112+
with pytest.raises(SchemaError):
113+
_ = Pipeline(
114+
name='test-2',
115+
transformations=[
116+
Clean_extra_whitespace(),
117+
PDFPartition(max_characters=1024, overlap=None),
118+
ExtractDateTimeTz(),
119+
ExtractEmailAddress(),
120+
])
121+
122+
# Incorrect Instance of transformation
123+
with pytest.raises(SchemaError):
124+
_ = Pipeline(
125+
name='test-3',
126+
transformations=[
127+
PDFPartition(max_characters=1024, overlap=None),
128+
Clean_extra_whitespace(),
129+
ExtractDateTimeTz(),
130+
ExtractEmailAddress(),
131+
Test_transformation(),
132+
])

0 commit comments

Comments
 (0)