Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ dev = [
"google-generativeai",
"google-cloud-aiplatform",
"mistralai",
"embedchain",
"typing-extensions",
"embedchain"
]

test = ["pytest", "pytest-vcr", "pytest-asyncio"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,24 @@
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.trace import get_tracer
from .patch import patch_vertexai
from langtrace_python_sdk.utils import is_package_installed


class VertexAIInstrumentation(BaseInstrumentor):
def instrumentation_dependencies(self) -> Collection[str]:
if is_package_installed("vertexai"):
return ["vertexai >= 1.0.0"]

return ["google-cloud-aiplatform >= 1.0.0"]

def _instrument(self, **kwargs):
trace_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, "", trace_provider)
version = v("google-cloud-aiplatform")
version = (
v("vertexai")
if is_package_installed("vertexai")
else v("google-cloud-aiplatform")
)

for _, api_config in APIS.items():

Expand Down
23 changes: 10 additions & 13 deletions src/langtrace_python_sdk/instrumentation/vertexai/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,27 +103,24 @@ def is_streaming_response(response):

def get_llm_model(instance):
if hasattr(instance, "_model_name"):
return instance._model_name.replace("models/", "")
return instance._model_name.replace("publishers/google/models/", "")
return getattr(instance, "_model_id", "unknown")


def serialize_prompts(args, kwargs):
prompt = ""
if args is not None and len(args) > 0:
if args and len(args) > 0:
prompt_parts = []
for arg in args:
if isinstance(arg, str):
prompt = f"{prompt}{arg}\n"
prompt_parts.append(arg)
elif isinstance(arg, list):
for subarg in arg:
if type(subarg).__name__ == "Part":
prompt = f"{prompt}{json.dumps(subarg.to_dict())}\n"
prompt_parts.append(json.dumps(subarg.to_dict()))
else:
prompt = f"{prompt}{subarg}\n"
prompt_parts.append(str(subarg))

return [{"role": "user", "content": "\n".join(prompt_parts)}]
else:
prompt = [
{
"role": "user",
"content": kwargs.get("prompt") or kwargs.get("message"),
}
]
return prompt
content = kwargs.get("prompt") or kwargs.get("message")
return [{"role": "user", "content": content}] if content else []
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ def _instrument(self, **kwargs):
generic_query_patch(api_name, version, tracer),
)
elif api_config.get("OPERATION") == "create":
print(
api_config["MODULE"],
api_config["METHOD"],
)
wrap_function_wrapper(
api_config["MODULE"],
api_config["METHOD"],
Expand Down
206 changes: 123 additions & 83 deletions src/langtrace_python_sdk/langtrace.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

import os
import sys
from typing import Any, Optional
import sentry_sdk
import logging
from typing import Dict, Optional, Any
from colorama import Fore
from langtrace_python_sdk.constants import LANGTRACE_SDK_NAME, SENTRY_DSN
from opentelemetry import trace
Expand Down Expand Up @@ -57,74 +59,151 @@
VertexAIInstrumentation,
WeaviateInstrumentation,
)
from langtrace_python_sdk.types import (
DisableInstrumentations,
InstrumentationMethods,
InstrumentationType,
)
from langtrace_python_sdk.types import DisableInstrumentations, InstrumentationMethods
from langtrace_python_sdk.utils import (
check_if_sdk_is_outdated,
get_sdk_version,
is_package_installed,
validate_instrumentations,
)
from langtrace_python_sdk.utils.langtrace_sampler import LangtraceSampler
import sentry_sdk
from sentry_sdk.types import Event, Hint

logging.disable(level=logging.INFO)


class LangtraceConfig:
def __init__(self, **kwargs):
self.api_key = kwargs.get("api_key")
self.batch = kwargs.get("batch", True)
self.write_spans_to_console = kwargs.get("write_spans_to_console", False)
self.custom_remote_exporter = kwargs.get("custom_remote_exporter")
self.api_host = kwargs.get("api_host", LANGTRACE_REMOTE_URL)
self.disable_instrumentations = kwargs.get("disable_instrumentations")
self.disable_tracing_for_functions = kwargs.get("disable_tracing_for_functions")
self.service_name = kwargs.get("service_name")
self.disable_logging = kwargs.get("disable_logging", False)
self.headers = kwargs.get("headers", {})


def get_host(config: LangtraceConfig) -> str:
return (
os.environ.get("LANGTRACE_API_HOST")
or os.environ.get("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT")
or os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT")
or config.api_host
or LANGTRACE_REMOTE_URL
)


def setup_tracer_provider(config: LangtraceConfig, host: str) -> TracerProvider:
sampler = LangtraceSampler(disabled_methods=config.disable_tracing_for_functions)
resource = Resource.create(
attributes={
SERVICE_NAME: os.environ.get("OTEL_SERVICE_NAME")
or config.service_name
or sys.argv[0]
}
)
return TracerProvider(resource=resource, sampler=sampler)


def get_exporter(config: LangtraceConfig, host: str):
if config.custom_remote_exporter:
return config.custom_remote_exporter

return LangTraceExporter(host, config.api_key, config.disable_logging)


def add_span_processor(provider: TracerProvider, config: LangtraceConfig, exporter):
if config.write_spans_to_console:
provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
print(Fore.BLUE + "Writing spans to console" + Fore.RESET)

elif config.custom_remote_exporter or get_host(config) != LANGTRACE_REMOTE_URL:
processor = (
BatchSpanProcessor(exporter)
if config.batch
else SimpleSpanProcessor(exporter)
)
provider.add_span_processor(processor)
print(
Fore.BLUE
+ f"Exporting spans to custom host: {get_host(config)}.."
+ Fore.RESET
)
else:
provider.add_span_processor(BatchSpanProcessor(exporter))
print(Fore.BLUE + "Exporting spans to Langtrace cloud.." + Fore.RESET)


def init_sentry(config: LangtraceConfig, host: str):
if os.environ.get("LANGTRACE_ERROR_REPORTING", "True") == "True":
sentry_sdk.init(
dsn=SENTRY_DSN,
traces_sample_rate=1.0,
profiles_sample_rate=1.0,
before_send=before_send,
)
sdk_options = {
"service_name": os.environ.get("OTEL_SERVICE_NAME")
or config.service_name
or sys.argv[0],
"disable_logging": config.disable_logging,
"disable_instrumentations": config.disable_instrumentations,
"disable_tracing_for_functions": config.disable_tracing_for_functions,
"batch": config.batch,
"write_spans_to_console": config.write_spans_to_console,
"custom_remote_exporter": config.custom_remote_exporter,
"sdk_name": LANGTRACE_SDK_NAME,
"sdk_version": get_sdk_version(),
"api_host": host,
}
sentry_sdk.set_context("sdk_init_options", sdk_options)


def init(
api_key: str = None,
api_key: Optional[str] = None,
batch: bool = True,
write_spans_to_console: bool = False,
custom_remote_exporter=None,
custom_remote_exporter: Optional[Any] = None,
api_host: Optional[str] = LANGTRACE_REMOTE_URL,
disable_instrumentations: Optional[DisableInstrumentations] = None,
disable_tracing_for_functions: Optional[InstrumentationMethods] = None,
service_name: Optional[str] = None,
disable_logging=False,
disable_logging: bool = False,
headers: Dict[str, str] = {},
):
if disable_logging:
logging.disable(level=logging.INFO)
check_if_sdk_is_outdated()
config = LangtraceConfig(
api_key=api_key,
batch=batch,
write_spans_to_console=write_spans_to_console,
custom_remote_exporter=custom_remote_exporter,
api_host=api_host,
disable_instrumentations=disable_instrumentations,
disable_tracing_for_functions=disable_tracing_for_functions,
service_name=service_name,
disable_logging=disable_logging,
headers=headers,
)

if config.disable_logging:
sys.stdout = open(os.devnull, "w")

host = (
os.environ.get("LANGTRACE_API_HOST", None)
or os.environ.get("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", None)
or os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", None)
or api_host
or LANGTRACE_REMOTE_URL
)
check_if_sdk_is_outdated()
host = get_host(config)
print(Fore.GREEN + "Initializing Langtrace SDK.." + Fore.RESET)
print(
Fore.WHITE
+ "⭐ Leave our github a star to stay on top of our updates - https://github.com/Scale3-Labs/langtrace"
+ Fore.RESET
)
sampler = LangtraceSampler(disabled_methods=disable_tracing_for_functions)
resource = Resource.create(
attributes={
SERVICE_NAME: os.environ.get("OTEL_SERVICE_NAME")
or service_name
or sys.argv[0]
}
)
provider = TracerProvider(resource=resource, sampler=sampler)

remote_write_exporter = (
LangTraceExporter(
api_key=api_key, api_host=host, disable_logging=disable_logging
)
if custom_remote_exporter is None
else custom_remote_exporter
)
console_exporter = ConsoleSpanExporter()
batch_processor_remote = BatchSpanProcessor(remote_write_exporter)
simple_processor_remote = SimpleSpanProcessor(remote_write_exporter)
simple_processor_console = SimpleSpanProcessor(console_exporter)
provider = setup_tracer_provider(config, host)
exporter = get_exporter(config, host)

os.environ["LANGTRACE_API_HOST"] = host.replace("/api/trace", "")
# Initialize tracer
trace.set_tracer_provider(provider)
all_instrumentations = {
"openai": OpenAIInstrumentation(),
Expand All @@ -146,57 +225,18 @@ def init(
"ollama": OllamaInstrumentor(),
"dspy-ai": DspyInstrumentation(),
"crewai": CrewAIInstrumentation(),
"vertexai": VertexAIInstrumentation(),
"google-cloud-aiplatform": VertexAIInstrumentation(),
"google-generativeai": GeminiInstrumentation(),
"mistralai": MistralInstrumentation(),
"autogen": AutogenInstrumentation(),
}

init_instrumentations(disable_instrumentations, all_instrumentations)
if write_spans_to_console:
print(Fore.BLUE + "Writing spans to console" + Fore.RESET)
provider.add_span_processor(simple_processor_console)

elif custom_remote_exporter is not None:
print(Fore.BLUE + "Exporting spans to custom remote exporter.." + Fore.RESET)
if batch:
provider.add_span_processor(batch_processor_remote)
else:
provider.add_span_processor(simple_processor_remote)

elif host != LANGTRACE_REMOTE_URL:
print(Fore.BLUE + f"Exporting spans to custom host: {host}.." + Fore.RESET)
if batch:
provider.add_span_processor(batch_processor_remote)
else:
provider.add_span_processor(simple_processor_remote)
else:
print(Fore.BLUE + "Exporting spans to Langtrace cloud.." + Fore.RESET)
provider.add_span_processor(batch_processor_remote)
init_instrumentations(config.disable_instrumentations, all_instrumentations)
add_span_processor(provider, config, exporter)

sys.stdout = sys.__stdout__
if os.environ.get("LANGTRACE_ERROR_REPORTING", "True") == "True":
sentry_sdk.init(
dsn=SENTRY_DSN,
traces_sample_rate=1.0,
profiles_sample_rate=1.0,
before_send=before_send,
)
sdk_options = {
"service_name": os.environ.get("OTEL_SERVICE_NAME")
or service_name
or sys.argv[0],
"disable_logging": disable_logging,
"disable_instrumentations": disable_instrumentations,
"disable_tracing_for_functions": disable_tracing_for_functions,
"batch": batch,
"write_spans_to_console": write_spans_to_console,
"custom_remote_exporter": custom_remote_exporter,
"sdk_name": LANGTRACE_SDK_NAME,
"sdk_version": get_sdk_version(),
"api_host": host,
}
sentry_sdk.set_context("sdk_init_options", sdk_options)
init_sentry(config, host)


def before_send(event: Event, hint: Hint):
Expand Down
2 changes: 1 addition & 1 deletion src/langtrace_python_sdk/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "2.3.27"
__version__ = "2.3.28"
Loading