Skip to content

Commit 955521a

Browse files
feat: Allow transformations on writes to output list of entities (#5209)
* feat: Adding Docling RAG demo Signed-off-by: Francisco Javier Arceo <[email protected]> * updated demo Signed-off-by: Francisco Javier Arceo <[email protected]> * cleaned up notebook Signed-off-by: Francisco Javier Arceo <[email protected]> * adding chunk id Signed-off-by: Francisco Javier Arceo <[email protected]> * adding quickstart demo that is WIP and updating docling-demo to export unique chunk-id Signed-off-by: Francisco Javier Arceo <[email protected]> * adding current tentative exmaple repo Signed-off-by: Francisco Javier Arceo <[email protected]> * adding current temporary work Signed-off-by: Francisco Javier Arceo <[email protected]> * updating demo script to rename things Signed-off-by: Francisco Javier Arceo <[email protected]> * updated quickstart Signed-off-by: Francisco Javier Arceo <[email protected]> * added comment Signed-off-by: Francisco Javier Arceo <[email protected]> * checking in progress Signed-off-by: Francisco Javier Arceo <[email protected]> * checking in progress for now, still have some issues with vector retrieval Signed-off-by: Francisco Javier Arceo <[email protected]> * okay think i have most things working Signed-off-by: Francisco Javier Arceo <[email protected]> * removing commenting and unnecessary code Signed-off-by: Francisco Javier Arceo <[email protected]> * uploading demo Signed-off-by: Francisco Javier Arceo <[email protected]> * uploading other files Signed-off-by: Francisco Javier Arceo <[email protected]> * updated repo exaxmple Signed-off-by: Francisco Javier Arceo <[email protected]> * checking in current notebook, almost there Signed-off-by: Francisco Javier Arceo <[email protected]> * fixed linter Signed-off-by: Francisco Javier Arceo <[email protected]> * fixed transformation logic: Signed-off-by: Francisco Javier Arceo <[email protected]> * removed print Signed-off-by: Francisco Javier Arceo <[email protected]> * added README with description Signed-off-by: Francisco Javier Arceo <[email protected]> * removing print Signed-off-by: Francisco Javier Arceo <[email protected]> * updating Signed-off-by: Francisco Javier Arceo <[email protected]> * updating metadata file Signed-off-by: Francisco Javier Arceo <[email protected]> * updated readme and adding dataset Signed-off-by: Francisco Javier Arceo <[email protected]> * removing files Signed-off-by: Francisco Javier Arceo <[email protected]> --------- Signed-off-by: Francisco Javier Arceo <[email protected]>
1 parent ab35b0b commit 955521a

File tree

3 files changed

+138
-33
lines changed

3 files changed

+138
-33
lines changed

sdk/python/feast/feature_store.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1567,14 +1567,20 @@ def _get_feature_view_and_df_for_online_write(
15671567
else df.to_dict(orient="list")
15681568
)
15691569
if feature_view.singleton:
1570-
transformed_data = df.apply(
1571-
feature_view.feature_transformation.udf, axis=1
1572-
)
1573-
transformed_data = pd.DataFrame(
1574-
transformed_data.to_list()
1575-
).applymap(
1576-
lambda x: x[0] if isinstance(x, list) and len(x) == 1 else x
1577-
)
1570+
transformed_rows = []
1571+
1572+
for i, row in df.iterrows():
1573+
output = feature_view.feature_transformation.udf(row.to_dict())
1574+
if i == 0:
1575+
transformed_rows = output
1576+
else:
1577+
for k in output:
1578+
if isinstance(output[k], list):
1579+
transformed_rows[k].extend(output[k])
1580+
else:
1581+
transformed_rows[k].append(output[k])
1582+
1583+
transformed_data = pd.DataFrame(transformed_rows)
15781584
else:
15791585
transformed_data = feature_view.feature_transformation.udf(
15801586
input_dict

sdk/python/tests/unit/online_store/test_online_writes.py

Lines changed: 92 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,21 @@
1818
from datetime import datetime, timedelta
1919
from typing import Any
2020

21-
from feast import Entity, FeatureStore, FeatureView, FileSource, RepoConfig
21+
import pandas as pd
22+
23+
from feast import (
24+
Entity,
25+
FeatureStore,
26+
FeatureView,
27+
FileSource,
28+
RepoConfig,
29+
RequestSource,
30+
)
2231
from feast.driver_test_data import create_driver_hourly_stats_df
2332
from feast.field import Field
2433
from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig
2534
from feast.on_demand_feature_view import on_demand_feature_view
26-
from feast.types import Float32, Float64, Int64
35+
from feast.types import Array, Float32, Float64, Int64, PdfBytes, String, ValueType
2736

2837

2938
class TestOnlineWrites(unittest.TestCase):
@@ -144,3 +153,84 @@ def test_online_retrieval(self):
144153
"conv_rate_plus_acc",
145154
]
146155
)
156+
157+
158+
class TestOnlineWritesWithTransform(unittest.TestCase):
159+
def test_transform_on_write_pdf(self):
160+
with tempfile.TemporaryDirectory() as data_dir:
161+
self.store = FeatureStore(
162+
config=RepoConfig(
163+
project="test_write_to_online_store_with_transform",
164+
registry=os.path.join(data_dir, "registry.db"),
165+
provider="local",
166+
entity_key_serialization_version=2,
167+
online_store=SqliteOnlineStoreConfig(
168+
path=os.path.join(data_dir, "online.db")
169+
),
170+
)
171+
)
172+
173+
chunk = Entity(
174+
name="chunk_id",
175+
description="Chunk ID",
176+
value_type=ValueType.STRING,
177+
join_keys=["chunk_id"],
178+
)
179+
180+
document = Entity(
181+
name="document_id",
182+
description="Document ID",
183+
value_type=ValueType.STRING,
184+
join_keys=["document_id"],
185+
)
186+
187+
input_request_pdf = RequestSource(
188+
name="pdf_request_source",
189+
schema=[
190+
Field(name="document_id", dtype=String),
191+
Field(name="pdf_bytes", dtype=PdfBytes),
192+
Field(name="file_name", dtype=String),
193+
],
194+
)
195+
196+
@on_demand_feature_view(
197+
entities=[chunk, document],
198+
sources=[input_request_pdf],
199+
schema=[
200+
Field(name="document_id", dtype=String),
201+
Field(name="chunk_id", dtype=String),
202+
Field(name="chunk_text", dtype=String),
203+
Field(
204+
name="vector",
205+
dtype=Array(Float32),
206+
vector_index=True,
207+
vector_search_metric="L2",
208+
),
209+
],
210+
mode="python",
211+
write_to_online_store=True,
212+
singleton=True,
213+
)
214+
def transform_pdf_on_write_view(inputs: dict[str, Any]) -> dict[str, Any]:
215+
k = 10
216+
return {
217+
"document_id": ["doc_1", "doc_2"],
218+
"chunk_id": ["chunk-1", "chunk-2"],
219+
"vector": [[0.5] * k, [0.4] * k],
220+
"chunk_text": ["chunk text 1", "chunk text 2"],
221+
}
222+
223+
self.store.apply([chunk, document, transform_pdf_on_write_view])
224+
225+
sample_pdf = b"%PDF-1.3\n3 0 obj\n<</Type /Page\n/Parent 1 0 R\n/Resources 2 0 R\n/Contents 4 0 R>>\nendobj\n4 0 obj\n<</Filter /FlateDecode /Length 115>>\nstream\nx\x9c\x15\xcc1\x0e\x820\x18@\xe1\x9dS\xbcM]jk$\xd5\xd5(\x83!\x86\xa1\x17\xf8\xa3\xa5`LIh+\xd7W\xc6\xf7\r\xef\xc0\xbd\xd2\xaa\xb6,\xd5\xc5\xb1o\x0c\xa6VZ\xe3znn%\xf3o\xab\xb1\xe7\xa3:Y\xdc\x8bm\xeb\xf3&1\xc8\xd7\xd3\x97\xc82\xe6\x81\x87\xe42\xcb\x87Vb(\x12<\xdd<=}Jc\x0cL\x91\xee\xda$\xb5\xc3\xbd\xd7\xe9\x0f\x8d\x97 $\nendstream\nendobj\n1 0 obj\n<</Type /Pages\n/Kids [3 0 R ]\n/Count 1\n/MediaBox [0 0 595.28 841.89]\n>>\nendobj\n5 0 obj\n<</Type /Font\n/BaseFont /Helvetica\n/Subtype /Type1\n/Encoding /WinAnsiEncoding\n>>\nendobj\n2 0 obj\n<<\n/ProcSet [/PDF /Text /ImageB /ImageC /ImageI]\n/Font <<\n/F1 5 0 R\n>>\n/XObject <<\n>>\n>>\nendobj\n6 0 obj\n<<\n/Producer (PyFPDF 1.7.2 http://pyfpdf.googlecode.com/)\n/Title (This is a sample title. And this is another sentence. Finally, this is the third sentence.)\n/Author (Francisco Javier Arceo)\n/CreationDate (D:20250312165548)\n>>\nendobj\n7 0 obj\n<<\n/Type /Catalog\n/Pages 1 0 R\n/OpenAction [3 0 R /FitH null]\n/PageLayout /OneColumn\n>>\nendobj\nxref\n0 8\n0000000000 65535 f \n0000000272 00000 n \n0000000455 00000 n \n0000000009 00000 n \n0000000087 00000 n \n0000000359 00000 n \n0000000559 00000 n \n0000000734 00000 n \ntrailer\n<<\n/Size 8\n/Root 7 0 R\n/Info 6 0 R\n>>\nstartxref\n837\n%%EOF\n"
226+
sample_input = {
227+
"pdf_bytes": sample_pdf,
228+
"file_name": "sample_pdf",
229+
"document_id": "doc_1",
230+
}
231+
input_df = pd.DataFrame([sample_input])
232+
233+
self.store.write_to_online_store(
234+
feature_view_name="transform_pdf_on_write_view",
235+
df=input_df,
236+
)

sdk/python/tests/unit/test_on_demand_python_transformation.py

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1314,7 +1314,7 @@ def test_docling_transform(self):
13141314

13151315
EMBED_MODEL_ID = "sentence-transformers/all-MiniLM-L6-v2"
13161316
VECTOR_LEN = 10
1317-
MAX_TOKENS = 64 # Small token limit for demonstration
1317+
MAX_TOKENS = 5 # Small token limit for demonstration
13181318
tokenizer = AutoTokenizer.from_pretrained(EMBED_MODEL_ID)
13191319
chunker = HybridChunker(
13201320
tokenizer=tokenizer, max_tokens=MAX_TOKENS, merge_peers=True
@@ -1421,37 +1421,49 @@ def docling_transform_docs(inputs: dict[str, Any]):
14211421
sample_input
14221422
)
14231423

1424-
self.store.apply([docling_transform_docs])
1424+
sample_inputs = {}
1425+
for key in sample_input.keys():
1426+
sample_inputs[key] = [sample_input[key], sample_input_2[key]]
14251427

14261428
assert docling_output == {
1427-
"document_id": ["doc_1"],
1428-
"chunk_id": ["chunk-0"],
1429-
"vector": [[0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5]],
1429+
"document_id": ["doc_1", "doc_1", "doc_1"],
1430+
"chunk_id": ["chunk-0", "chunk-1", "chunk-2"],
1431+
"vector": [
1432+
[0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5],
1433+
[0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5],
1434+
[0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5],
1435+
],
14301436
"chunk_text": [
1431-
"Let's have fun with Natural Language Processing on PDFs."
1437+
"Let's have fun",
1438+
"with Natural Language Processing on",
1439+
"PDFs.",
14321440
],
14331441
}
14341442

1435-
input_df = pd.DataFrame([sample_input])
1436-
self.store.write_to_online_store("docling_transform_docs", input_df)
1443+
self.store.apply([docling_transform_docs])
1444+
1445+
multiple_inputs_df = pd.DataFrame(sample_inputs)
1446+
self.store.write_to_online_store(
1447+
"docling_transform_docs", multiple_inputs_df
1448+
)
14371449

14381450
conn = self.store._provider._online_store._conn
14391451
document_table = self.store._provider._online_store._conn.execute(
1440-
"SELECT name FROM sqlite_master WHERE type='table' and name like '%docling%';"
1452+
"SELECT name FROM sqlite_master WHERE type='table' and name like '%docling_transform%';"
14411453
).fetchall()[0][0]
14421454
written_data = pd.read_sql_query(f"select * from {document_table}", conn)
1443-
assert (
1455+
assert sorted(
14441456
written_data[written_data["feature_name"] == "document_id"][
14451457
"vector_value"
1446-
].values[0]
1447-
== "doc_1"
1448-
)
1449-
assert (
1450-
written_data[written_data["feature_name"] == "chunk_id"][
1451-
"vector_value"
1452-
].values[0]
1453-
== "chunk-0"
1454-
)
1458+
]
1459+
.unique()
1460+
.tolist()
1461+
) == ["doc_1", "doc_2"]
1462+
assert sorted(
1463+
written_data[written_data["feature_name"] == "chunk_id"]["vector_value"]
1464+
.unique()
1465+
.tolist()
1466+
) == ["chunk-0", "chunk-1", "chunk-2"]
14551467

14561468
online_features = self.store.get_online_features(
14571469
features=[
@@ -1461,14 +1473,11 @@ def docling_transform_docs(inputs: dict[str, Any]):
14611473
],
14621474
entity_rows=[{"document_id": "doc_1", "chunk_id": "chunk-0"}],
14631475
).to_dict()
1476+
14641477
online_features == {
14651478
"document_id": ["doc_1"],
14661479
"chunk_id": ["chunk-0"],
14671480
"chunk_text": [
14681481
"Let's have fun with Natural Language Processing on PDFs."
14691482
],
14701483
}
1471-
1472-
multiple_inputs_df = pd.DataFrame([sample_input, sample_input_2])
1473-
# note this test needs to be updated with writing to the online store to verify this behavior works
1474-
assert multiple_inputs_df is not None

0 commit comments

Comments
 (0)