Skip to content

Commit 44d5db1

Browse files
[DS-21] specify fields to remain in milvus stager (#37)
* Add field filtering for Milvus stager * Add changelog entry and bump version
1 parent 744f6a8 commit 44d5db1

File tree

4 files changed

+201
-14
lines changed

4 files changed

+201
-14
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
## 0.0.8
2+
3+
### Enhancements
4+
5+
* **Add fields_to_include option for Milvus Stager** Adds support for filtering which fields will remain in the document so user can align document structure to collection schema.
6+
* **Add flatten_metadata option for Milvus Stager** Flattening metadata is now optional (enabled by default) step in processing the document.
7+
18
## 0.0.7
29

310
### Enhancements
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
import pytest
2+
3+
from unstructured_ingest.v2.processes.connectors.milvus import (
4+
MilvusUploadStager,
5+
MilvusUploadStagerConfig,
6+
)
7+
8+
9+
@pytest.mark.parametrize(
10+
("given_element", "given_field_include_list", "then_element", "then_error"),
11+
[
12+
({"x": "y"}, None, {"x": "y", "is_continuation": False}, False),
13+
(
14+
{
15+
"type": "UncategorizedText",
16+
"element_id": "be34cd2d71310ec72bfef3d1be2b2b36",
17+
},
18+
["type", "element_id"],
19+
{
20+
"type": "UncategorizedText",
21+
"element_id": "be34cd2d71310ec72bfef3d1be2b2b36",
22+
},
23+
False,
24+
),
25+
(
26+
{
27+
"type": "UncategorizedText",
28+
"element_id": "be34cd2d71310ec72bfef3d1be2b2b36",
29+
"parent_id": "qwerty",
30+
},
31+
["type", "element_id"],
32+
{
33+
"type": "UncategorizedText",
34+
"element_id": "be34cd2d71310ec72bfef3d1be2b2b36",
35+
},
36+
False,
37+
),
38+
(
39+
{
40+
"type": "UncategorizedText",
41+
"parent_id": "qwerty",
42+
},
43+
["type", "element_id"],
44+
{},
45+
True,
46+
),
47+
],
48+
)
49+
def test_milvus_stager_processes_elements_correctly(
50+
given_element, given_field_include_list, then_element, then_error
51+
):
52+
config = MilvusUploadStagerConfig(fields_to_include=given_field_include_list)
53+
stager = MilvusUploadStager(upload_stager_config=config)
54+
55+
if then_error:
56+
with pytest.raises(KeyError):
57+
stager.conform_dict(data=given_element)
58+
else:
59+
stager.conform_dict(data=given_element)
60+
assert given_element == then_element
61+
62+
63+
@pytest.mark.parametrize(
64+
(
65+
"given_flatten_metadata",
66+
"given_element",
67+
"then_element",
68+
),
69+
[
70+
(
71+
True,
72+
{
73+
"type": "UncategorizedText",
74+
"metadata": {
75+
"filename": "fake-memo.pdf",
76+
},
77+
"element_id": "be34cd2d71310ec72bfef3d1be2b2b36",
78+
},
79+
{
80+
"type": "UncategorizedText",
81+
"filename": "fake-memo.pdf",
82+
"element_id": "be34cd2d71310ec72bfef3d1be2b2b36",
83+
"is_continuation": False,
84+
},
85+
),
86+
(
87+
False,
88+
{
89+
"type": "UncategorizedText",
90+
"metadata": {
91+
"filename": "fake-memo.pdf",
92+
},
93+
"element_id": "be34cd2d71310ec72bfef3d1be2b2b36",
94+
},
95+
{
96+
"type": "UncategorizedText",
97+
"metadata": {
98+
"filename": "fake-memo.pdf",
99+
},
100+
"element_id": "be34cd2d71310ec72bfef3d1be2b2b36",
101+
"is_continuation": False,
102+
},
103+
),
104+
],
105+
)
106+
def test_milvus_stager_processes_metadata_correctly(
107+
given_flatten_metadata, given_element, then_element
108+
):
109+
config = MilvusUploadStagerConfig(flatten_metadata=given_flatten_metadata)
110+
stager = MilvusUploadStager(upload_stager_config=config)
111+
112+
stager.conform_dict(data=given_element)
113+
assert given_element == then_element
114+
115+
116+
@pytest.mark.parametrize(
117+
(
118+
"given_field_include_list",
119+
"given_element",
120+
"then_element",
121+
),
122+
[
123+
(
124+
["type", "filename", "element_id"],
125+
{
126+
"type": "UncategorizedText",
127+
"metadata": {
128+
"filename": "fake-memo.pdf",
129+
},
130+
"element_id": "be34cd2d71310ec72bfef3d1be2b2b36",
131+
},
132+
{
133+
"type": "UncategorizedText",
134+
"filename": "fake-memo.pdf",
135+
"element_id": "be34cd2d71310ec72bfef3d1be2b2b36",
136+
},
137+
),
138+
(
139+
["type", "element_id"],
140+
{
141+
"type": "UncategorizedText",
142+
"metadata": {
143+
"filename": "fake-memo.pdf",
144+
},
145+
"element_id": "be34cd2d71310ec72bfef3d1be2b2b36",
146+
},
147+
{
148+
"type": "UncategorizedText",
149+
"element_id": "be34cd2d71310ec72bfef3d1be2b2b36",
150+
},
151+
),
152+
],
153+
)
154+
def test_milvus_stager_processes_metadata_correctly_when_using_include_list(
155+
given_field_include_list, given_element, then_element
156+
):
157+
config = MilvusUploadStagerConfig(
158+
flatten_metadata=True, fields_to_include=given_field_include_list
159+
)
160+
stager = MilvusUploadStager(upload_stager_config=config)
161+
162+
stager.conform_dict(data=given_element)
163+
assert given_element == then_element

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.0.7" # pragma: no cover
1+
__version__ = "0.0.8" # pragma: no cover

unstructured_ingest/v2/processes/connectors/milvus.py

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,15 @@ def get_client(self) -> "MilvusClient":
6767

6868

6969
class MilvusUploadStagerConfig(UploadStagerConfig):
70-
pass
70+
71+
fields_to_include: Optional[list[str]] = None
72+
"""If set - list of fields to include in the output.
73+
Unspecified fields are removed from the elements.
74+
This action takse place after metadata flattening.
75+
Missing fields will cause stager to throw KeyError."""
76+
77+
flatten_metadata: bool = True
78+
"""If set - flatten "metadata" key and put contents directly into data"""
7179

7280

7381
@dataclass
@@ -85,8 +93,26 @@ def parse_date_string(date_string: str) -> float:
8593
pass
8694
return parser.parse(date_string).timestamp()
8795

88-
@classmethod
89-
def conform_dict(cls, data: dict) -> None:
96+
def conform_dict(self, data: dict) -> None:
97+
if self.upload_stager_config.flatten_metadata and (metadata := data.pop("metadata", None)):
98+
data.update(flatten_dict(metadata, keys_to_omit=["data_source_record_locator"]))
99+
100+
# TODO: milvus sdk doesn't seem to support defaults via the schema yet,
101+
# remove once that gets updated
102+
defaults = {"is_continuation": False}
103+
for default in defaults:
104+
if default not in data:
105+
data[default] = defaults[default]
106+
107+
if self.upload_stager_config.fields_to_include:
108+
data_keys = set(data.keys())
109+
for data_key in data_keys:
110+
if data_key not in self.upload_stager_config.fields_to_include:
111+
data.pop(data_key)
112+
for field_include_key in self.upload_stager_config.fields_to_include:
113+
if field_include_key not in data:
114+
raise KeyError(f"Field '{field_include_key}' is missing in data!")
115+
90116
datetime_columns = [
91117
"data_source_date_created",
92118
"data_source_date_modified",
@@ -96,21 +122,12 @@ def conform_dict(cls, data: dict) -> None:
96122

97123
json_dumps_fields = ["languages", "data_source_permissions_data"]
98124

99-
# TODO: milvus sdk doesn't seem to support defaults via the schema yet,
100-
# remove once that gets updated
101-
defaults = {"is_continuation": False}
102-
103-
if metadata := data.pop("metadata", None):
104-
data.update(flatten_dict(metadata, keys_to_omit=["data_source_record_locator"]))
105125
for datetime_column in datetime_columns:
106126
if datetime_column in data:
107-
data[datetime_column] = cls.parse_date_string(data[datetime_column])
127+
data[datetime_column] = self.parse_date_string(data[datetime_column])
108128
for json_dumps_field in json_dumps_fields:
109129
if json_dumps_field in data:
110130
data[json_dumps_field] = json.dumps(data[json_dumps_field])
111-
for default in defaults:
112-
if default not in data:
113-
data[default] = defaults[default]
114131

115132
def run(
116133
self,

0 commit comments

Comments
 (0)