From ff8c05b4158a34bce8af5785dd2099521199863c Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Sun, 7 Sep 2025 15:39:55 +0000 Subject: [PATCH 01/15] sdks/python: properly access dimensions in `OpenAITextEmbedding` --- sdks/python/apache_beam/ml/transforms/embeddings/open_ai.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/ml/transforms/embeddings/open_ai.py b/sdks/python/apache_beam/ml/transforms/embeddings/open_ai.py index a162c333b199..3d0cfb96fee4 100644 --- a/sdks/python/apache_beam/ml/transforms/embeddings/open_ai.py +++ b/sdks/python/apache_beam/ml/transforms/embeddings/open_ai.py @@ -108,7 +108,7 @@ def request( "input": batch, } if self.dimensions: - kwargs["dimensions"] = [str(self.dimensions)] + kwargs["dimensions"] = self.dimensions if self.user: kwargs["user"] = self.user @@ -139,7 +139,7 @@ def __init__( """ Embedding Config for OpenAI Text Embedding models. Text Embeddings are generated for a batch of text using the OpenAI API. - + Args: model_name: Name of the OpenAI embedding model columns: The columns where the embeddings will be stored in the output From 71c58418a40a3cc1862ccee1e5b380aff4cf1d44 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Sun, 7 Sep 2025 15:53:21 +0000 Subject: [PATCH 02/15] sdks/python: properly scale embeddings on 0 to 1 range in OpenAI --- .../ml/transforms/embeddings/open_ai_it_test.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/ml/transforms/embeddings/open_ai_it_test.py b/sdks/python/apache_beam/ml/transforms/embeddings/open_ai_it_test.py index 118c656c33c3..99b4394a5a3d 100644 --- a/sdks/python/apache_beam/ml/transforms/embeddings/open_ai_it_test.py +++ b/sdks/python/apache_beam/ml/transforms/embeddings/open_ai_it_test.py @@ -32,6 +32,7 @@ # pylint: disable=ungrouped-imports try: import tensorflow_transform as tft + from apache_beam.ml.transforms.tft import ScaleTo01 except ImportError: tft = None @@ -76,6 +77,7 @@ def test_embeddings_with_scale_to_0_1(self): columns=[test_query_column], api_key=self.api_key, ) + scale_config = ScaleTo01(columns=['embedding']) with beam.Pipeline() as pipeline: transformed_pcoll = ( pipeline @@ -84,10 +86,12 @@ def test_embeddings_with_scale_to_0_1(self): }]) | "MLTransform" >> MLTransform( write_artifact_location=self.artifact_location).with_transform( - embedding_config)) + embedding_config).with_transform(scale_config)) def assert_element(element): - assert max(element.feature_1) == 1 + embedding_values = element.embedding + assert 0 <= max(embedding_values) <= 1 + assert 0 <= min(embedding_values) <= 1 _ = (transformed_pcoll | beam.Map(assert_element)) From 1315494aaee419e6578a68c2e8fbee843f6a2f10 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Sun, 7 Sep 2025 15:56:58 +0000 Subject: [PATCH 03/15] sdks/python: properly import `OpenAITextEmbeddings` in the test file --- .../ml/transforms/embeddings/open_ai_it_test.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/ml/transforms/embeddings/open_ai_it_test.py b/sdks/python/apache_beam/ml/transforms/embeddings/open_ai_it_test.py index 99b4394a5a3d..dc4f6a22bb0c 100644 --- a/sdks/python/apache_beam/ml/transforms/embeddings/open_ai_it_test.py +++ b/sdks/python/apache_beam/ml/transforms/embeddings/open_ai_it_test.py @@ -23,11 +23,7 @@ from apache_beam.ml.inference.base import RunInference from apache_beam.ml.transforms import base from apache_beam.ml.transforms.base import MLTransform - -try: - from sdks.python.apache_beam.ml.transforms.embeddings.open_ai import OpenAITextEmbeddings -except ImportError: - OpenAITextEmbeddings = None +from apache_beam.ml.transforms.embeddings.open_ai import OpenAITextEmbeddings # pylint: disable=ungrouped-imports try: @@ -190,7 +186,7 @@ def test_with_int_data_types(self): write_artifact_location=self.artifact_location).with_transform( embedding_config)) - def test_with_artifact_location(self): # pylint: disable=line-too-long + def test_with_artifact_location(self): """Local artifact location test""" secondary_artifact_location = tempfile.mkdtemp( prefix='_openai_secondary_test') @@ -235,7 +231,7 @@ def assert_element(element): # Clean up the temporary directory shutil.rmtree(secondary_artifact_location) - def test_mltransform_to_ptransform_with_openai(self): # pylint: disable=line-too-long + def test_mltransform_to_ptransform_with_openai(self): transforms = [ OpenAITextEmbeddings( columns=['x'], From 9031331b3c762dd30f47ffaffe0862151308d12f Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Sun, 7 Sep 2025 16:02:32 +0000 Subject: [PATCH 04/15] sdks/python: add openai as extra dependency --- sdks/python/setup.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index e7ffc0c9780c..7e082321d8fa 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -161,6 +161,7 @@ def cythonize(*args, **kwargs): ] milvus_dependency = ['pymilvus>=2.5.10,<3.0.0'] +openai_dependency = ['openai>=1.0.0,<2.0.0'] def find_by_ext(root_dir, ext): @@ -418,9 +419,8 @@ def get_portability_package_data(): 'docutils>=0.18.1', 'markdown', 'pandas<2.3.0', - 'openai', 'virtualenv-clone>=0.5,<1.0', - ], + ] + openai_dependency, 'test': [ 'cloud-sql-python-connector[pg8000]>=1.0.0,<2.0.0', 'docstring-parser>=0.15,<1.0', @@ -596,7 +596,8 @@ def get_portability_package_data(): ], 'xgboost': ['xgboost>=1.6.0,<2.1.3', 'datatable==1.0.0'], 'tensorflow-hub': ['tensorflow-hub>=0.14.0,<0.16.0'], - 'milvus': milvus_dependency + 'milvus': milvus_dependency, + 'openai': openai_dependency }, zip_safe=False, # PyPI package information. From 466d5337297901c53de4e6f9e34c58638fd60ad3 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Sun, 7 Sep 2025 16:15:04 +0000 Subject: [PATCH 05/15] sdks/python; add `OpenAITextEmbeddings` to RAG module --- .../apache_beam/ml/rag/embeddings/open_ai.py | 84 +++++++++++ .../ml/rag/embeddings/open_ai_test.py | 134 ++++++++++++++++++ 2 files changed, 218 insertions(+) create mode 100644 sdks/python/apache_beam/ml/rag/embeddings/open_ai.py create mode 100644 sdks/python/apache_beam/ml/rag/embeddings/open_ai_test.py diff --git a/sdks/python/apache_beam/ml/rag/embeddings/open_ai.py b/sdks/python/apache_beam/ml/rag/embeddings/open_ai.py new file mode 100644 index 000000000000..e0ff4173cfcf --- /dev/null +++ b/sdks/python/apache_beam/ml/rag/embeddings/open_ai.py @@ -0,0 +1,84 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""RAG-specific embedding implementations using OpenAI models.""" + +from typing import Optional + +import openai + +import apache_beam as beam +from apache_beam.ml.inference.base import RunInference +from apache_beam.ml.rag.embeddings.base import create_rag_adapter +from apache_beam.ml.rag.types import Chunk +from apache_beam.ml.transforms.base import EmbeddingsManager +from apache_beam.ml.transforms.base import _TextEmbeddingHandler +from apache_beam.ml.transforms.embeddings.open_ai import ( + _OpenAITextEmbeddingHandler, +) + +__all__ = ['OpenAITextEmbeddings'] + + +class OpenAITextEmbeddings(EmbeddingsManager): + def __init__( + self, + model_name: str, + *, + api_key: Optional[str] = None, + organization: Optional[str] = None, + dimensions: Optional[int] = None, + user: Optional[str] = None, + max_batch_size: Optional[int] = None, + **kwargs): + """Utilizes OpenAI text embeddings for semantic search and RAG pipelines. + + Args: + model_name: Name of the OpenAI embedding model + api_key: OpenAI API key + organization: OpenAI organization ID + dimensions: Specific embedding dimensions to use (if supported) + user: End-user identifier for tracking and rate limit calculations + max_batch_size: Maximum batch size for requests to OpenAI API + **kwargs: Additional arguments passed to EmbeddingsManager including + ModelHandler inference_args. + """ + super().__init__(type_adapter=create_rag_adapter(), **kwargs) + self.model_name = model_name + self.api_key = api_key + self.organization = organization + self.dimensions = dimensions + self.user = user + self.max_batch_size = max_batch_size + + def get_model_handler(self): + """Returns model handler configured with RAG adapter.""" + return _OpenAITextEmbeddingHandler( + model_name=self.model_name, + api_key=self.api_key, + organization=self.organization, + dimensions=self.dimensions, + user=self.user, + max_batch_size=self.max_batch_size, + ) + + def get_ptransform_for_processing( + self, **kwargs + ) -> beam.PTransform[beam.PCollection[Chunk], beam.PCollection[Chunk]]: + """Returns PTransform that uses the RAG adapter.""" + return RunInference( + model_handler=_TextEmbeddingHandler(self), + inference_args=self.inference_args).with_output_types(Chunk) diff --git a/sdks/python/apache_beam/ml/rag/embeddings/open_ai_test.py b/sdks/python/apache_beam/ml/rag/embeddings/open_ai_test.py new file mode 100644 index 000000000000..cd0e50772720 --- /dev/null +++ b/sdks/python/apache_beam/ml/rag/embeddings/open_ai_test.py @@ -0,0 +1,134 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import shutil +import tempfile +import unittest + +import apache_beam as beam +from apache_beam.ml.rag.types import Chunk +from apache_beam.ml.rag.types import Content +from apache_beam.ml.rag.types import Embedding +from apache_beam.ml.transforms.base import MLTransform +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to +from apache_beam.ml.rag.embeddings.open_ai import OpenAITextEmbeddings + +def chunk_approximately_equals(expected, actual): + """Compare embeddings allowing for numerical differences.""" + if not isinstance(expected, Chunk) or not isinstance(actual, Chunk): + return False + + return ( + expected.id == actual.id and expected.metadata == actual.metadata and + expected.content == actual.content and + len(expected.embedding.dense_embedding) == len( + actual.embedding.dense_embedding) and + all(isinstance(x, float) for x in actual.embedding.dense_embedding)) + + +class OpenAITextEmbeddingsTest(unittest.TestCase): + def setUp(self): + self.artifact_location = tempfile.mkdtemp(prefix='openai_') + self.test_chunks = [ + Chunk( + content=Content(text="This is a test sentence."), + id="1", + metadata={ + "source": "test.txt", "language": "en" + }), + Chunk( + content=Content(text="Another example."), + id="2", + metadata={ + "source": "test.txt", "language": "en" + }) + ] + + def tearDown(self) -> None: + shutil.rmtree(self.artifact_location) + + def test_embedding_pipeline(self): + expected = [ + Chunk( + id="1", + embedding=Embedding(dense_embedding=[0.0] * 1536), + metadata={ + "source": "test.txt", "language": "en" + }, + content=Content(text="This is a test sentence.")), + Chunk( + id="2", + embedding=Embedding(dense_embedding=[0.0] * 1536), + metadata={ + "source": "test.txt", "language": "en" + }, + content=Content(text="Another example.")) + ] + + embedder = OpenAITextEmbeddings( + model_name="text-embedding-3-small", + dimensions=1536, + api_key=os.environ.get("OPENAI_API_KEY")) + + with TestPipeline() as p: + embeddings = ( + p + | beam.Create(self.test_chunks) + | MLTransform(write_artifact_location=self.artifact_location). + with_transform(embedder)) + + assert_that( + embeddings, equal_to(expected, equals_fn=chunk_approximately_equals)) + + def test_embedding_pipeline_with_dimensions(self): + expected = [ + Chunk( + id="1", + embedding=Embedding(dense_embedding=[0.0] * 512), + metadata={ + "source": "test.txt", "language": "en" + }, + content=Content(text="This is a test sentence.")), + Chunk( + id="2", + embedding=Embedding(dense_embedding=[0.0] * 512), + metadata={ + "source": "test.txt", "language": "en" + }, + content=Content(text="Another example.")) + ] + + embedder = OpenAITextEmbeddings( + model_name="text-embedding-3-small", + dimensions=512, + api_key=os.environ.get("OPENAI_API_KEY")) + + with TestPipeline() as p: + embeddings = ( + p + | beam.Create(self.test_chunks) + | MLTransform(write_artifact_location=self.artifact_location). + with_transform(embedder)) + + assert_that( + embeddings, equal_to(expected, equals_fn=chunk_approximately_equals)) + + +if __name__ == '__main__': + unittest.main() From ab0aea8200282a5e3f1833f240f9477c1dd723e7 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Sun, 7 Sep 2025 16:20:11 +0000 Subject: [PATCH 06/15] sdks/python: reuse `chunk_approximately_equals` function across RAG --- .../ml/rag/embeddings/huggingface_test.py | 14 +------------- .../apache_beam/ml/rag/embeddings/open_ai_test.py | 14 +------------- .../ml/rag/embeddings/vertex_ai_test.py | 14 +------------- 3 files changed, 3 insertions(+), 39 deletions(-) diff --git a/sdks/python/apache_beam/ml/rag/embeddings/huggingface_test.py b/sdks/python/apache_beam/ml/rag/embeddings/huggingface_test.py index f0b9316dcee8..a993be4775ef 100644 --- a/sdks/python/apache_beam/ml/rag/embeddings/huggingface_test.py +++ b/sdks/python/apache_beam/ml/rag/embeddings/huggingface_test.py @@ -31,6 +31,7 @@ from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.ml.rag.embeddings.test_utils import chunk_approximately_equals # pylint: disable=unused-import try: @@ -40,19 +41,6 @@ SENTENCE_TRANSFORMERS_AVAILABLE = False -def chunk_approximately_equals(expected, actual): - """Compare embeddings allowing for numerical differences.""" - if not isinstance(expected, Chunk) or not isinstance(actual, Chunk): - return False - - return ( - expected.id == actual.id and expected.metadata == actual.metadata and - expected.content == actual.content and - len(expected.embedding.dense_embedding) == len( - actual.embedding.dense_embedding) and - all(isinstance(x, float) for x in actual.embedding.dense_embedding)) - - @pytest.mark.uses_transformers @unittest.skipIf( not SENTENCE_TRANSFORMERS_AVAILABLE, "sentence-transformers not available") diff --git a/sdks/python/apache_beam/ml/rag/embeddings/open_ai_test.py b/sdks/python/apache_beam/ml/rag/embeddings/open_ai_test.py index cd0e50772720..8eb6451fb765 100644 --- a/sdks/python/apache_beam/ml/rag/embeddings/open_ai_test.py +++ b/sdks/python/apache_beam/ml/rag/embeddings/open_ai_test.py @@ -13,7 +13,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import os import shutil import tempfile @@ -28,18 +27,7 @@ from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to from apache_beam.ml.rag.embeddings.open_ai import OpenAITextEmbeddings - -def chunk_approximately_equals(expected, actual): - """Compare embeddings allowing for numerical differences.""" - if not isinstance(expected, Chunk) or not isinstance(actual, Chunk): - return False - - return ( - expected.id == actual.id and expected.metadata == actual.metadata and - expected.content == actual.content and - len(expected.embedding.dense_embedding) == len( - actual.embedding.dense_embedding) and - all(isinstance(x, float) for x in actual.embedding.dense_embedding)) +from apache_beam.ml.rag.embeddings.test_utils import chunk_approximately_equals class OpenAITextEmbeddingsTest(unittest.TestCase): diff --git a/sdks/python/apache_beam/ml/rag/embeddings/vertex_ai_test.py b/sdks/python/apache_beam/ml/rag/embeddings/vertex_ai_test.py index 320a562d5009..3743cdf188ef 100644 --- a/sdks/python/apache_beam/ml/rag/embeddings/vertex_ai_test.py +++ b/sdks/python/apache_beam/ml/rag/embeddings/vertex_ai_test.py @@ -28,6 +28,7 @@ from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.ml.rag.embeddings.test_utils import chunk_approximately_equals # pylint: disable=ungrouped-imports try: @@ -38,19 +39,6 @@ VERTEX_AI_AVAILABLE = False -def chunk_approximately_equals(expected, actual): - """Compare embeddings allowing for numerical differences.""" - if not isinstance(expected, Chunk) or not isinstance(actual, Chunk): - return False - - return ( - expected.id == actual.id and expected.metadata == actual.metadata and - expected.content == actual.content and - len(expected.embedding.dense_embedding) == len( - actual.embedding.dense_embedding) and - all(isinstance(x, float) for x in actual.embedding.dense_embedding)) - - @unittest.skipIf( not VERTEX_AI_AVAILABLE, "Vertex AI dependencies not available") class VertexAITextEmbeddingsTest(unittest.TestCase): From 8a151862da31f8480e492055446ca126ee41ae78 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Sun, 7 Sep 2025 16:22:38 +0000 Subject: [PATCH 07/15] CHANGES.md: update release notes --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index e8f6f79d4dbf..3fe602d8f36c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -74,6 +74,7 @@ ## New Features / Improvements * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* OpenAI embedding model added to Beam RAG module ([#36081](https://github.com/apache/beam/pull/36081)). ## Breaking Changes From ae9b0763344935b469228c9d229e0d82afd3c732 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Sun, 7 Sep 2025 16:30:00 +0000 Subject: [PATCH 08/15] sdks/python: dont skip `OpenAIEmbeddingsTest` openAI supposed to exist --- .../apache_beam/ml/transforms/embeddings/open_ai_it_test.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdks/python/apache_beam/ml/transforms/embeddings/open_ai_it_test.py b/sdks/python/apache_beam/ml/transforms/embeddings/open_ai_it_test.py index dc4f6a22bb0c..deca2671ecca 100644 --- a/sdks/python/apache_beam/ml/transforms/embeddings/open_ai_it_test.py +++ b/sdks/python/apache_beam/ml/transforms/embeddings/open_ai_it_test.py @@ -37,8 +37,6 @@ model_name: str = "text-embedding-3-small" -@unittest.skipIf( - OpenAITextEmbeddings is None, 'OpenAI Python SDK is not installed.') class OpenAIEmbeddingsTest(unittest.TestCase): def setUp(self) -> None: self.artifact_location = tempfile.mkdtemp(prefix='_openai_test') From 0bba94365ef61694dafda65e65e4e1cd2f70ec1b Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Sun, 7 Sep 2025 16:46:31 +0000 Subject: [PATCH 09/15] sdks/python: fix formatting issues --- sdks/python/apache_beam/ml/rag/embeddings/open_ai.py | 3 +-- sdks/python/apache_beam/ml/rag/embeddings/open_ai_test.py | 6 +++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/ml/rag/embeddings/open_ai.py b/sdks/python/apache_beam/ml/rag/embeddings/open_ai.py index e0ff4173cfcf..d264e8e565dd 100644 --- a/sdks/python/apache_beam/ml/rag/embeddings/open_ai.py +++ b/sdks/python/apache_beam/ml/rag/embeddings/open_ai.py @@ -27,8 +27,7 @@ from apache_beam.ml.transforms.base import EmbeddingsManager from apache_beam.ml.transforms.base import _TextEmbeddingHandler from apache_beam.ml.transforms.embeddings.open_ai import ( - _OpenAITextEmbeddingHandler, -) + _OpenAITextEmbeddingHandler, ) __all__ = ['OpenAITextEmbeddings'] diff --git a/sdks/python/apache_beam/ml/rag/embeddings/open_ai_test.py b/sdks/python/apache_beam/ml/rag/embeddings/open_ai_test.py index 8eb6451fb765..f8ab17493414 100644 --- a/sdks/python/apache_beam/ml/rag/embeddings/open_ai_test.py +++ b/sdks/python/apache_beam/ml/rag/embeddings/open_ai_test.py @@ -70,9 +70,9 @@ def test_embedding_pipeline(self): ] embedder = OpenAITextEmbeddings( - model_name="text-embedding-3-small", - dimensions=1536, - api_key=os.environ.get("OPENAI_API_KEY")) + model_name="text-embedding-3-small", + dimensions=1536, + api_key=os.environ.get("OPENAI_API_KEY")) with TestPipeline() as p: embeddings = ( From f00f5c0512dcb9f55ea3161aea01af119f925948 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Sun, 7 Sep 2025 16:46:45 +0000 Subject: [PATCH 10/15] CHANGES.md: make PreCommit Whitespace workflow pass --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 3fe602d8f36c..cfd49085bcb0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -74,7 +74,7 @@ ## New Features / Improvements * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). -* OpenAI embedding model added to Beam RAG module ([#36081](https://github.com/apache/beam/pull/36081)). +* OpenAI embedding model added to Beam RAG module ([#36081](https://github.com/apache/beam/pull/36081)) ## Breaking Changes From 973a6338e97055bc61c7710e34228db9a11b44e5 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Sun, 7 Sep 2025 19:34:21 +0000 Subject: [PATCH 11/15] sdks/python: add missing `test_utils.py` to RAG embeddings --- .../ml/rag/embeddings/test_utils.py | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 sdks/python/apache_beam/ml/rag/embeddings/test_utils.py diff --git a/sdks/python/apache_beam/ml/rag/embeddings/test_utils.py b/sdks/python/apache_beam/ml/rag/embeddings/test_utils.py new file mode 100644 index 000000000000..7443a335b9be --- /dev/null +++ b/sdks/python/apache_beam/ml/rag/embeddings/test_utils.py @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Utility functions for RAG embeddings.""" + +from apache_beam.ml.rag.types import Chunk + + +def chunk_approximately_equals(expected, actual): + """Compare embeddings allowing for numerical differences.""" + if not isinstance(expected, Chunk) or not isinstance(actual, Chunk): + return False + + return ( + expected.id == actual.id and expected.metadata == actual.metadata and + expected.content == actual.content and + len(expected.embedding.dense_embedding) == len( + actual.embedding.dense_embedding) and + all(isinstance(x, float) for x in actual.embedding.dense_embedding)) From 60edb284d82000d729b2717bf0636d8f3464637a Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Sun, 7 Sep 2025 19:42:45 +0000 Subject: [PATCH 12/15] sdks/python: fix linting issues --- sdks/python/apache_beam/ml/rag/embeddings/open_ai.py | 2 -- sdks/python/apache_beam/ml/transforms/embeddings/open_ai.py | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/ml/rag/embeddings/open_ai.py b/sdks/python/apache_beam/ml/rag/embeddings/open_ai.py index d264e8e565dd..4e150a2d05ce 100644 --- a/sdks/python/apache_beam/ml/rag/embeddings/open_ai.py +++ b/sdks/python/apache_beam/ml/rag/embeddings/open_ai.py @@ -18,8 +18,6 @@ from typing import Optional -import openai - import apache_beam as beam from apache_beam.ml.inference.base import RunInference from apache_beam.ml.rag.embeddings.base import create_rag_adapter diff --git a/sdks/python/apache_beam/ml/transforms/embeddings/open_ai.py b/sdks/python/apache_beam/ml/transforms/embeddings/open_ai.py index 3d0cfb96fee4..a8cb43dc6f4b 100644 --- a/sdks/python/apache_beam/ml/transforms/embeddings/open_ai.py +++ b/sdks/python/apache_beam/ml/transforms/embeddings/open_ai.py @@ -103,7 +103,7 @@ def request( ) -> Iterable: """Makes a request to OpenAI embedding API and returns embeddings.""" # Prepare arguments for the API call - kwargs = { + kwargs: dict[str, Any] = { "model": self.model_name, "input": batch, } From 0fe9e6fe19581e38372c14515f7e36435f00154b Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Mon, 8 Sep 2025 00:55:27 +0000 Subject: [PATCH 13/15] sdks/python: add open API dependency to tests --- sdks/python/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 7e082321d8fa..814e5f9fdcf4 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -449,7 +449,7 @@ def get_portability_package_data(): 'pg8000>=1.31.1', "PyMySQL>=1.1.0", 'oracledb>=3.1.1' - ] + milvus_dependency, + ] + milvus_dependency + openai_dependency, 'gcp': [ 'cachetools>=3.1.0,<7', 'google-api-core>=2.0.0,<3', From e9c45c2010b85cf84b1c48ea47b1aa47525f0a88 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Mon, 8 Sep 2025 01:12:32 +0000 Subject: [PATCH 14/15] CHANGES.md: ref issue instead of PR --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index cfd49085bcb0..bf2fe433d7b2 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -74,7 +74,7 @@ ## New Features / Improvements * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). -* OpenAI embedding model added to Beam RAG module ([#36081](https://github.com/apache/beam/pull/36081)) +* OpenAI embedding model added to Beam RAG module ([#36083](https://github.com/apache/beam/issues/36083)). ## Breaking Changes From b260dd5a0b8c167c77d18ec34d5fafc038e5ee15 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Mon, 8 Sep 2025 01:50:43 +0000 Subject: [PATCH 15/15] sdks/python: fix linting issues --- sdks/python/apache_beam/ml/inference/vllm_inference.py | 5 +++-- .../apache_beam/ml/rag/embeddings/huggingface_test.py | 2 +- sdks/python/apache_beam/ml/rag/embeddings/open_ai.py | 3 +-- sdks/python/apache_beam/ml/rag/embeddings/open_ai_test.py | 4 ++-- .../python/apache_beam/ml/rag/embeddings/vertex_ai_test.py | 2 +- .../python/apache_beam/ml/transforms/embeddings/open_ai.py | 7 ++++--- 6 files changed, 12 insertions(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/vllm_inference.py b/sdks/python/apache_beam/ml/inference/vllm_inference.py index 0bb6ccd6108e..bdbee9e51fd5 100644 --- a/sdks/python/apache_beam/ml/inference/vllm_inference.py +++ b/sdks/python/apache_beam/ml/inference/vllm_inference.py @@ -31,12 +31,13 @@ from typing import Any from typing import Optional +from openai import AsyncOpenAI +from openai import OpenAI + from apache_beam.io.filesystems import FileSystems from apache_beam.ml.inference.base import ModelHandler from apache_beam.ml.inference.base import PredictionResult from apache_beam.utils import subprocess_server -from openai import AsyncOpenAI -from openai import OpenAI try: # VLLM logging config breaks beam logging. diff --git a/sdks/python/apache_beam/ml/rag/embeddings/huggingface_test.py b/sdks/python/apache_beam/ml/rag/embeddings/huggingface_test.py index a993be4775ef..8c91adafe543 100644 --- a/sdks/python/apache_beam/ml/rag/embeddings/huggingface_test.py +++ b/sdks/python/apache_beam/ml/rag/embeddings/huggingface_test.py @@ -24,6 +24,7 @@ import apache_beam as beam from apache_beam.ml.rag.embeddings.huggingface import HuggingfaceTextEmbeddings +from apache_beam.ml.rag.embeddings.test_utils import chunk_approximately_equals from apache_beam.ml.rag.types import Chunk from apache_beam.ml.rag.types import Content from apache_beam.ml.rag.types import Embedding @@ -31,7 +32,6 @@ from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to -from apache_beam.ml.rag.embeddings.test_utils import chunk_approximately_equals # pylint: disable=unused-import try: diff --git a/sdks/python/apache_beam/ml/rag/embeddings/open_ai.py b/sdks/python/apache_beam/ml/rag/embeddings/open_ai.py index 4e150a2d05ce..1dbf168a3a02 100644 --- a/sdks/python/apache_beam/ml/rag/embeddings/open_ai.py +++ b/sdks/python/apache_beam/ml/rag/embeddings/open_ai.py @@ -24,8 +24,7 @@ from apache_beam.ml.rag.types import Chunk from apache_beam.ml.transforms.base import EmbeddingsManager from apache_beam.ml.transforms.base import _TextEmbeddingHandler -from apache_beam.ml.transforms.embeddings.open_ai import ( - _OpenAITextEmbeddingHandler, ) +from apache_beam.ml.transforms.embeddings.open_ai import _OpenAITextEmbeddingHandler __all__ = ['OpenAITextEmbeddings'] diff --git a/sdks/python/apache_beam/ml/rag/embeddings/open_ai_test.py b/sdks/python/apache_beam/ml/rag/embeddings/open_ai_test.py index f8ab17493414..f263d45aae60 100644 --- a/sdks/python/apache_beam/ml/rag/embeddings/open_ai_test.py +++ b/sdks/python/apache_beam/ml/rag/embeddings/open_ai_test.py @@ -19,6 +19,8 @@ import unittest import apache_beam as beam +from apache_beam.ml.rag.embeddings.open_ai import OpenAITextEmbeddings +from apache_beam.ml.rag.embeddings.test_utils import chunk_approximately_equals from apache_beam.ml.rag.types import Chunk from apache_beam.ml.rag.types import Content from apache_beam.ml.rag.types import Embedding @@ -26,8 +28,6 @@ from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to -from apache_beam.ml.rag.embeddings.open_ai import OpenAITextEmbeddings -from apache_beam.ml.rag.embeddings.test_utils import chunk_approximately_equals class OpenAITextEmbeddingsTest(unittest.TestCase): diff --git a/sdks/python/apache_beam/ml/rag/embeddings/vertex_ai_test.py b/sdks/python/apache_beam/ml/rag/embeddings/vertex_ai_test.py index 3743cdf188ef..366975a5a76a 100644 --- a/sdks/python/apache_beam/ml/rag/embeddings/vertex_ai_test.py +++ b/sdks/python/apache_beam/ml/rag/embeddings/vertex_ai_test.py @@ -21,6 +21,7 @@ import unittest import apache_beam as beam +from apache_beam.ml.rag.embeddings.test_utils import chunk_approximately_equals from apache_beam.ml.rag.types import Chunk from apache_beam.ml.rag.types import Content from apache_beam.ml.rag.types import Embedding @@ -28,7 +29,6 @@ from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to -from apache_beam.ml.rag.embeddings.test_utils import chunk_approximately_equals # pylint: disable=ungrouped-imports try: diff --git a/sdks/python/apache_beam/ml/transforms/embeddings/open_ai.py b/sdks/python/apache_beam/ml/transforms/embeddings/open_ai.py index a8cb43dc6f4b..121fa9839ef7 100644 --- a/sdks/python/apache_beam/ml/transforms/embeddings/open_ai.py +++ b/sdks/python/apache_beam/ml/transforms/embeddings/open_ai.py @@ -21,16 +21,17 @@ from typing import TypeVar from typing import Union -import apache_beam as beam import openai +from openai import APIError +from openai import RateLimitError + +import apache_beam as beam from apache_beam.ml.inference.base import RemoteModelHandler from apache_beam.ml.inference.base import RunInference from apache_beam.ml.transforms.base import EmbeddingsManager from apache_beam.ml.transforms.base import _TextEmbeddingHandler from apache_beam.pvalue import PCollection from apache_beam.pvalue import Row -from openai import APIError -from openai import RateLimitError __all__ = ["OpenAITextEmbeddings"]