From 9981f89b3bcbd1533b333f3fef342ce36d785f71 Mon Sep 17 00:00:00 2001 From: Ali Waleed Date: Fri, 11 Oct 2024 16:55:24 +0300 Subject: [PATCH 1/7] mongodb kickstart + openai embedding enrichment --- .../mongo_vector_search_example/main.py | 63 +++++++++++++++++++ .../instrumentation/__init__.py | 2 + .../instrumentation/openai/patch.py | 14 +++++ .../instrumentation/pymongo/__init__.py | 5 ++ .../pymongo/instrumentation.py | 45 +++++++++++++ .../instrumentation/pymongo/patch.py | 8 +++ src/langtrace_python_sdk/langtrace.py | 2 + 7 files changed, 139 insertions(+) create mode 100644 src/examples/mongo_vector_search_example/main.py create mode 100644 src/langtrace_python_sdk/instrumentation/pymongo/__init__.py create mode 100644 src/langtrace_python_sdk/instrumentation/pymongo/instrumentation.py create mode 100644 src/langtrace_python_sdk/instrumentation/pymongo/patch.py diff --git a/src/examples/mongo_vector_search_example/main.py b/src/examples/mongo_vector_search_example/main.py new file mode 100644 index 00000000..9b58d475 --- /dev/null +++ b/src/examples/mongo_vector_search_example/main.py @@ -0,0 +1,63 @@ +import pymongo +import os +from dotenv import load_dotenv +from openai import OpenAI +from langtrace_python_sdk import langtrace + +load_dotenv() +langtrace.init() +MODEL = "text-embedding-ada-002" +openai_client = OpenAI() +client = pymongo.MongoClient(os.environ["MONGO_URI"]) + + +# Define a function to generate embeddings +def get_embedding(text): + """Generates vector embeddings for the given text.""" + embedding = ( + openai_client.embeddings.create(input=[text], model=MODEL).data[0].embedding + ) + return embedding + + +def vector_query(): + db = client["sample_mflix"] + + embedded_movies_collection = db["embedded_movies"] + + # define pipeline + pipeline = [ + { + "$vectorSearch": { + "index": "vector_index", + "path": "plot_embedding", + "queryVector": get_embedding( + "A movie about a hacker that had a really rough childhood and been trying to convince his father otherwise." + ), + # "numCandidates": 150, + "exact": True, + "limit": 10, + } + }, + { + "$project": { + "_id": 0, + "plot": 1, + "title": 1, + "score": {"$meta": "vectorSearchScore"}, + } + }, + ] + + result = embedded_movies_collection.aggregate(pipeline) + for doc in result: + print(doc) + + +if __name__ == "__main__": + try: + vector_query() + except Exception as e: + print(e) + finally: + client.close() diff --git a/src/langtrace_python_sdk/instrumentation/__init__.py b/src/langtrace_python_sdk/instrumentation/__init__.py index 369c5ac9..8d1609ce 100644 --- a/src/langtrace_python_sdk/instrumentation/__init__.py +++ b/src/langtrace_python_sdk/instrumentation/__init__.py @@ -20,6 +20,7 @@ from .mistral import MistralInstrumentation from .embedchain import EmbedchainInstrumentation from .litellm import LiteLLMInstrumentation +from .pymongo import PyMongoInstrumentation __all__ = [ "AnthropicInstrumentation", @@ -44,4 +45,5 @@ "VertexAIInstrumentation", "GeminiInstrumentation", "MistralInstrumentation", + "PyMongoInstrumentation", ] diff --git a/src/langtrace_python_sdk/instrumentation/openai/patch.py b/src/langtrace_python_sdk/instrumentation/openai/patch.py index d2902aa5..1afa071a 100644 --- a/src/langtrace_python_sdk/instrumentation/openai/patch.py +++ b/src/langtrace_python_sdk/instrumentation/openai/patch.py @@ -27,6 +27,7 @@ set_event_completion, StreamWrapper, set_span_attributes, + set_usage_attributes, ) from langtrace_python_sdk.types import NOT_GIVEN @@ -446,6 +447,14 @@ def traced_method( span_attributes[SpanAttributes.LLM_REQUEST_EMBEDDING_INPUTS] = json.dumps( [kwargs.get("input", "")] ) + span_attributes[SpanAttributes.LLM_PROMPTS] = json.dumps( + [ + { + "role": "user", + "content": kwargs.get("input"), + } + ] + ) attributes = LLMSpanAttributes(**filter_valid_attributes(span_attributes)) @@ -459,6 +468,11 @@ def traced_method( try: # Attempt to call the original method result = wrapped(*args, **kwargs) + usage = getattr(result, "usage", None) + if usage: + set_usage_attributes( + span, {"prompt_tokens": getattr(usage, "prompt_tokens", 0)} + ) span.set_status(StatusCode.OK) return result except Exception as err: diff --git a/src/langtrace_python_sdk/instrumentation/pymongo/__init__.py b/src/langtrace_python_sdk/instrumentation/pymongo/__init__.py new file mode 100644 index 00000000..c197384c --- /dev/null +++ b/src/langtrace_python_sdk/instrumentation/pymongo/__init__.py @@ -0,0 +1,5 @@ +from .instrumentation import PyMongoInstrumentation + +__all__ = [ + "PyMongoInstrumentation", +] diff --git a/src/langtrace_python_sdk/instrumentation/pymongo/instrumentation.py b/src/langtrace_python_sdk/instrumentation/pymongo/instrumentation.py new file mode 100644 index 00000000..09013359 --- /dev/null +++ b/src/langtrace_python_sdk/instrumentation/pymongo/instrumentation.py @@ -0,0 +1,45 @@ +""" +Copyright (c) 2024 Scale3 Labs + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.trace import get_tracer + +from typing import Collection +from importlib_metadata import version as v +from wrapt import wrap_function_wrapper as _W +from .patch import generic_patch + + +class PyMongoInstrumentation(BaseInstrumentor): + """ + The PyMongoInstrumentation class represents the PyMongo instrumentation + """ + + def instrumentation_dependencies(self) -> Collection[str]: + return ["pymongo >= 4.0.0"] + + def _instrument(self, **kwargs): + tracer_provider = kwargs.get("tracer_provider") + tracer = get_tracer(__name__, "", tracer_provider) + version = v("pymongo") + _W( + module="pymongo.collection", + name="Collection.find", + wrapper=generic_patch(version, tracer), + ) + + def _uninstrument(self, **kwargs): + pass diff --git a/src/langtrace_python_sdk/instrumentation/pymongo/patch.py b/src/langtrace_python_sdk/instrumentation/pymongo/patch.py new file mode 100644 index 00000000..6e5447cf --- /dev/null +++ b/src/langtrace_python_sdk/instrumentation/pymongo/patch.py @@ -0,0 +1,8 @@ +def generic_patch(version, tracer): + def traced_method(wrapped, instance, args, kwargs): + print("kwargs", kwargs) + print("args", args) + print("instance", instance) + wrapped(*args, **kwargs) + + return traced_method diff --git a/src/langtrace_python_sdk/langtrace.py b/src/langtrace_python_sdk/langtrace.py index 6ca027a4..5c63c4cb 100644 --- a/src/langtrace_python_sdk/langtrace.py +++ b/src/langtrace_python_sdk/langtrace.py @@ -58,6 +58,7 @@ AutogenInstrumentation, VertexAIInstrumentation, WeaviateInstrumentation, + PyMongoInstrumentation, ) from langtrace_python_sdk.types import DisableInstrumentations, InstrumentationMethods from langtrace_python_sdk.utils import ( @@ -230,6 +231,7 @@ def init( "google-generativeai": GeminiInstrumentation(), "mistralai": MistralInstrumentation(), "autogen": AutogenInstrumentation(), + "pymongo": PyMongoInstrumentation(), } init_instrumentations(config.disable_instrumentations, all_instrumentations) From 86fdc79f312ba5d6482baa18f99e53c58dc64ef2 Mon Sep 17 00:00:00 2001 From: Ali Waleed Date: Fri, 11 Oct 2024 17:00:21 +0300 Subject: [PATCH 2/7] cleanup --- .../instrumentation/pymongo/instrumentation.py | 9 +-------- .../instrumentation/pymongo/patch.py | 3 --- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/src/langtrace_python_sdk/instrumentation/pymongo/instrumentation.py b/src/langtrace_python_sdk/instrumentation/pymongo/instrumentation.py index 09013359..befa7976 100644 --- a/src/langtrace_python_sdk/instrumentation/pymongo/instrumentation.py +++ b/src/langtrace_python_sdk/instrumentation/pymongo/instrumentation.py @@ -32,14 +32,7 @@ def instrumentation_dependencies(self) -> Collection[str]: return ["pymongo >= 4.0.0"] def _instrument(self, **kwargs): - tracer_provider = kwargs.get("tracer_provider") - tracer = get_tracer(__name__, "", tracer_provider) - version = v("pymongo") - _W( - module="pymongo.collection", - name="Collection.find", - wrapper=generic_patch(version, tracer), - ) + pass def _uninstrument(self, **kwargs): pass diff --git a/src/langtrace_python_sdk/instrumentation/pymongo/patch.py b/src/langtrace_python_sdk/instrumentation/pymongo/patch.py index 6e5447cf..4398161e 100644 --- a/src/langtrace_python_sdk/instrumentation/pymongo/patch.py +++ b/src/langtrace_python_sdk/instrumentation/pymongo/patch.py @@ -1,8 +1,5 @@ def generic_patch(version, tracer): def traced_method(wrapped, instance, args, kwargs): - print("kwargs", kwargs) - print("args", args) - print("instance", instance) wrapped(*args, **kwargs) return traced_method From 15267a2981a667989a69219a9d8526881c3067fa Mon Sep 17 00:00:00 2001 From: Ali Waleed Date: Fri, 11 Oct 2024 17:00:50 +0300 Subject: [PATCH 3/7] bump version --- src/langtrace_python_sdk/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/langtrace_python_sdk/version.py b/src/langtrace_python_sdk/version.py index 131942e7..8d1c8625 100644 --- a/src/langtrace_python_sdk/version.py +++ b/src/langtrace_python_sdk/version.py @@ -1 +1 @@ -__version__ = "3.0.2" +__version__ = "3.0.3" From 2e864e7141ec26df9229c5fd56c7f96ff48fcf35 Mon Sep 17 00:00:00 2001 From: Ali Waleed Date: Sun, 13 Oct 2024 16:35:46 +0300 Subject: [PATCH 4/7] add pymongo methods to instrument --- .../constants/instrumentation/pymongo.py | 6 ++++++ .../instrumentation/pymongo/instrumentation.py | 11 ++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) create mode 100644 src/langtrace_python_sdk/constants/instrumentation/pymongo.py diff --git a/src/langtrace_python_sdk/constants/instrumentation/pymongo.py b/src/langtrace_python_sdk/constants/instrumentation/pymongo.py new file mode 100644 index 00000000..43b24d4a --- /dev/null +++ b/src/langtrace_python_sdk/constants/instrumentation/pymongo.py @@ -0,0 +1,6 @@ +APIS = { + "AGGREGATE": { + "METHOD": "aggregate", + "OPERATION": "aggregate", + }, +} diff --git a/src/langtrace_python_sdk/instrumentation/pymongo/instrumentation.py b/src/langtrace_python_sdk/instrumentation/pymongo/instrumentation.py index befa7976..cb10dfd8 100644 --- a/src/langtrace_python_sdk/instrumentation/pymongo/instrumentation.py +++ b/src/langtrace_python_sdk/instrumentation/pymongo/instrumentation.py @@ -21,6 +21,7 @@ from importlib_metadata import version as v from wrapt import wrap_function_wrapper as _W from .patch import generic_patch +from langtrace_python_sdk.constants.instrumentation.pymongo import APIS class PyMongoInstrumentation(BaseInstrumentor): @@ -32,7 +33,15 @@ def instrumentation_dependencies(self) -> Collection[str]: return ["pymongo >= 4.0.0"] def _instrument(self, **kwargs): - pass + tracer_provider = kwargs.get("tracer_provider") + tracer = get_tracer(__name__, "", tracer_provider) + version = v("pymongo") + for api in APIS.values(): + _W( + module="pymongo.collection", + name=f"Collection.{api['METHOD']}", + wrapper=generic_patch(version, tracer), + ) def _uninstrument(self, **kwargs): pass From 377c95d240b53301e69b16d5a16c13fb929a89f2 Mon Sep 17 00:00:00 2001 From: Ali Waleed Date: Thu, 24 Oct 2024 11:38:21 +0300 Subject: [PATCH 5/7] mongodb vector search instrument --- .../mongo_vector_search_example/main.py | 18 +++-- .../constants/instrumentation/common.py | 1 + .../constants/instrumentation/pymongo.py | 4 +- .../pymongo/instrumentation.py | 6 +- .../instrumentation/pymongo/patch.py | 72 ++++++++++++++++++- 5 files changed, 85 insertions(+), 16 deletions(-) diff --git a/src/examples/mongo_vector_search_example/main.py b/src/examples/mongo_vector_search_example/main.py index 9b58d475..61725b8c 100644 --- a/src/examples/mongo_vector_search_example/main.py +++ b/src/examples/mongo_vector_search_example/main.py @@ -1,11 +1,11 @@ +from langtrace_python_sdk import langtrace, with_langtrace_root_span import pymongo import os from dotenv import load_dotenv from openai import OpenAI -from langtrace_python_sdk import langtrace load_dotenv() -langtrace.init() +langtrace.init(write_spans_to_console=False, batch=False) MODEL = "text-embedding-ada-002" openai_client = OpenAI() client = pymongo.MongoClient(os.environ["MONGO_URI"]) @@ -20,22 +20,19 @@ def get_embedding(text): return embedding +@with_langtrace_root_span("mongo-vector-search") def vector_query(): db = client["sample_mflix"] embedded_movies_collection = db["embedded_movies"] - # define pipeline pipeline = [ { "$vectorSearch": { "index": "vector_index", "path": "plot_embedding", - "queryVector": get_embedding( - "A movie about a hacker that had a really rough childhood and been trying to convince his father otherwise." - ), - # "numCandidates": 150, - "exact": True, + "queryVector": get_embedding("time travel"), + "numCandidates": 150, "limit": 10, } }, @@ -51,13 +48,14 @@ def vector_query(): result = embedded_movies_collection.aggregate(pipeline) for doc in result: - print(doc) + # print(doc) + pass if __name__ == "__main__": try: vector_query() except Exception as e: - print(e) + print("error", e) finally: client.close() diff --git a/src/langtrace_python_sdk/constants/instrumentation/common.py b/src/langtrace_python_sdk/constants/instrumentation/common.py index 53ff5d65..51188fdb 100644 --- a/src/langtrace_python_sdk/constants/instrumentation/common.py +++ b/src/langtrace_python_sdk/constants/instrumentation/common.py @@ -34,6 +34,7 @@ "EMBEDCHAIN": "Embedchain", "AUTOGEN": "Autogen", "XAI": "XAI", + "MONGODB": "MongoDB", } LANGTRACE_ADDITIONAL_SPAN_ATTRIBUTES_KEY = "langtrace_additional_attributes" diff --git a/src/langtrace_python_sdk/constants/instrumentation/pymongo.py b/src/langtrace_python_sdk/constants/instrumentation/pymongo.py index 43b24d4a..d65d8052 100644 --- a/src/langtrace_python_sdk/constants/instrumentation/pymongo.py +++ b/src/langtrace_python_sdk/constants/instrumentation/pymongo.py @@ -1,6 +1,8 @@ APIS = { "AGGREGATE": { - "METHOD": "aggregate", + "MODULE": "pymongo.collection", + "METHOD": "Collection.aggregate", "OPERATION": "aggregate", + "SPAN_NAME": "MongoDB Aggregate", }, } diff --git a/src/langtrace_python_sdk/instrumentation/pymongo/instrumentation.py b/src/langtrace_python_sdk/instrumentation/pymongo/instrumentation.py index cb10dfd8..762394e6 100644 --- a/src/langtrace_python_sdk/instrumentation/pymongo/instrumentation.py +++ b/src/langtrace_python_sdk/instrumentation/pymongo/instrumentation.py @@ -38,9 +38,9 @@ def _instrument(self, **kwargs): version = v("pymongo") for api in APIS.values(): _W( - module="pymongo.collection", - name=f"Collection.{api['METHOD']}", - wrapper=generic_patch(version, tracer), + module=api["MODULE"], + name=api["METHOD"], + wrapper=generic_patch(api["SPAN_NAME"], version, tracer), ) def _uninstrument(self, **kwargs): diff --git a/src/langtrace_python_sdk/instrumentation/pymongo/patch.py b/src/langtrace_python_sdk/instrumentation/pymongo/patch.py index 4398161e..9bf089f0 100644 --- a/src/langtrace_python_sdk/instrumentation/pymongo/patch.py +++ b/src/langtrace_python_sdk/instrumentation/pymongo/patch.py @@ -1,5 +1,73 @@ -def generic_patch(version, tracer): +from langtrace_python_sdk.utils.llm import ( + get_langtrace_attributes, + get_span_name, + set_span_attributes, + set_span_attribute, +) +from langtrace_python_sdk.utils import deduce_args_and_kwargs +from opentelemetry.trace.status import Status, StatusCode +from opentelemetry.trace import SpanKind +from langtrace_python_sdk.constants.instrumentation.common import SERVICE_PROVIDERS +from langtrace.trace_attributes import DatabaseSpanAttributes + +import json + + +def generic_patch(name, version, tracer): def traced_method(wrapped, instance, args, kwargs): - wrapped(*args, **kwargs) + database = instance.database.__dict__ + span_attributes = { + **get_langtrace_attributes( + version=version, + service_provider=SERVICE_PROVIDERS["MONGODB"], + vendor_type="vectordb", + ), + "db.system": "mongodb", + "db.query": "aggregate", + } + + attributes = DatabaseSpanAttributes(**span_attributes) + + with tracer.start_as_current_span( + get_span_name(name), kind=SpanKind.CLIENT + ) as span: + if span.is_recording(): + set_input_attributes( + span, deduce_args_and_kwargs(wrapped, *args, **kwargs) + ) + set_span_attributes(span, attributes) + + try: + result = wrapped(*args, **kwargs) + print(result) + for doc in result: + if span.is_recording(): + span.add_event( + name="db.query.match", + attributes={**doc}, + ) + return result + except Exception as err: + # Record the exception in the span + span.record_exception(err) + + # Set the span status to indicate an error + span.set_status(Status(StatusCode.ERROR, str(err))) + + # Reraise the exception to ensure it's not swallowed + raise return traced_method + + +def set_input_attributes(span, args): + pipeline = args.get("pipeline", None) + for stage in pipeline: + for k, v in stage.items(): + if k == "$vectorSearch": + set_span_attribute(span, "db.index", v.get("index", None)) + set_span_attribute(span, "db.path", v.get("path", None)) + set_span_attribute(span, "db.top_k", v.get("numCandidates")) + set_span_attribute(span, "db.limit", v.get("limit")) + else: + set_span_attribute(span, k, json.dumps(v)) From eebdf85302de14a760783c93c66088b4e6bf5ac8 Mon Sep 17 00:00:00 2001 From: Ali Waleed Date: Tue, 12 Nov 2024 14:13:27 +0200 Subject: [PATCH 6/7] use abstracted handle error --- .../instrumentation/pymongo/patch.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/langtrace_python_sdk/instrumentation/pymongo/patch.py b/src/langtrace_python_sdk/instrumentation/pymongo/patch.py index 9bf089f0..dd0a59dc 100644 --- a/src/langtrace_python_sdk/instrumentation/pymongo/patch.py +++ b/src/langtrace_python_sdk/instrumentation/pymongo/patch.py @@ -4,8 +4,7 @@ set_span_attributes, set_span_attribute, ) -from langtrace_python_sdk.utils import deduce_args_and_kwargs -from opentelemetry.trace.status import Status, StatusCode +from langtrace_python_sdk.utils import deduce_args_and_kwargs, handle_span_error from opentelemetry.trace import SpanKind from langtrace_python_sdk.constants.instrumentation.common import SERVICE_PROVIDERS from langtrace.trace_attributes import DatabaseSpanAttributes @@ -48,13 +47,7 @@ def traced_method(wrapped, instance, args, kwargs): ) return result except Exception as err: - # Record the exception in the span - span.record_exception(err) - - # Set the span status to indicate an error - span.set_status(Status(StatusCode.ERROR, str(err))) - - # Reraise the exception to ensure it's not swallowed + handle_span_error(span, err) raise return traced_method From cf1b758f6906466b05b774b70da3b5d81114a393 Mon Sep 17 00:00:00 2001 From: Ali Waleed Date: Tue, 12 Nov 2024 14:13:37 +0200 Subject: [PATCH 7/7] bump --- src/langtrace_python_sdk/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/langtrace_python_sdk/version.py b/src/langtrace_python_sdk/version.py index 3e2d550b..80014d0e 100644 --- a/src/langtrace_python_sdk/version.py +++ b/src/langtrace_python_sdk/version.py @@ -1 +1 @@ -__version__ = "3.3.2" +__version__ = "3.3.3"