Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions logfire/_internal/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
from logfire.sampling._tail_sampling import TailSamplingProcessor
from logfire.version import VERSION

from ..experimental.uploaders import BaseUploader
from ..propagate import NoExtractTraceContextPropagator, WarnOnExtractTraceContextPropagator
from ..types import ExceptionCallback
from .client import InvalidProjectName, LogfireClient, ProjectAlreadyExists
Expand Down Expand Up @@ -192,6 +193,9 @@ class AdvancedOptions:

This is experimental and may be modified or removed."""

uploader: BaseUploader | None = None
"""Very experimental blob storage uploader."""

def generate_base_url(self, token: str) -> str:
if self.base_url is not None:
return self.base_url
Expand Down Expand Up @@ -862,7 +866,7 @@ def _initialize(self) -> None:
root_processor = TailSamplingProcessor(root_processor, self.sampling.tail)
tracer_provider.add_span_processor(
CheckSuppressInstrumentationProcessorWrapper(
MainSpanProcessorWrapper(root_processor, self.scrubber),
MainSpanProcessorWrapper(root_processor, self.scrubber, self.advanced.uploader),
)
)

Expand Down Expand Up @@ -1030,7 +1034,8 @@ def check_token():

main_multiprocessor.add_span_processor(
PendingSpanProcessor(
self.advanced.id_generator, MainSpanProcessorWrapper(pending_multiprocessor, self.scrubber)
self.advanced.id_generator,
MainSpanProcessorWrapper(pending_multiprocessor, self.scrubber, self.advanced.uploader),
)
)

Expand Down
41 changes: 41 additions & 0 deletions logfire/_internal/exporters/processor_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import base64
import binascii
import json
from contextlib import suppress
from dataclasses import dataclass
Expand All @@ -13,6 +15,7 @@

import logfire

from ...experimental.uploaders import BaseUploader, UploadItem
from ..constants import (
ATTRIBUTES_JSON_SCHEMA_KEY,
ATTRIBUTES_LOG_LEVEL_NUM_KEY,
Expand Down Expand Up @@ -64,6 +67,7 @@ class MainSpanProcessorWrapper(WrapperSpanProcessor):
"""

scrubber: BaseScrubber
uploader: BaseUploader | None

def on_start(
self,
Expand All @@ -86,10 +90,47 @@ def on_end(self, span: ReadableSpan) -> None:
_transform_google_genai_span(span_dict)
_transform_litellm_span(span_dict)
_default_gen_ai_response_model(span_dict)
self._upload_gen_ai_blobs(span_dict)
self.scrubber.scrub_span(span_dict)
span = ReadableSpan(**span_dict)
super().on_end(span)

def _upload_gen_ai_blobs(self, span: ReadableSpanDict) -> None:
if not self.uploader:
return

for attr_name in ['pydantic_ai.all_messages', 'gen_ai.input.messages', 'gen_ai.output.messages']:
attr_value = span['attributes'].get(attr_name)
if not (attr_value and isinstance(attr_value, str)):
continue
try:
messages = json.loads(attr_value)
except json.JSONDecodeError: # pragma: no cover
continue
for message in messages:
parts = message.get('parts', [])
for i, part in enumerate(parts):
# TODO otel semantic type
if part.get('type') != 'binary' or 'content' not in part:
continue
data = part['content']
if not isinstance(data, str): # pragma: no cover
continue

try:
value = base64.b64decode(data, validate=True)
except binascii.Error: # pragma: no cover
value = data.encode()

media_type = part.get('media_type')
upload_item = UploadItem.create(value, timestamp=span['start_time'], media_type=media_type)

self.uploader.upload(upload_item)

# TODO keep part, remove content, add new key, make frontend work
parts[i] = dict(type='image-url', url=self.uploader.get_attribute_value(upload_item))
span['attributes'] = {**span['attributes'], attr_name: json.dumps(messages)}


def _set_error_level_and_status(span: ReadableSpanDict) -> None:
"""Default the log level to error if the status code is error, and vice versa.
Expand Down
6 changes: 5 additions & 1 deletion logfire/_internal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,10 @@ def canonicalize_exception_traceback(exc: BaseException, seen: set[int] | None =


def sha256_string(s: str) -> str:
return sha256_bytes(s.encode('utf-8'))


def sha256_bytes(b: bytes) -> str:
hasher = hashlib.sha256()
hasher.update(s.encode('utf-8'))
hasher.update(b)
return hasher.hexdigest()
49 changes: 49 additions & 0 deletions logfire/experimental/uploaders/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from __future__ import annotations

import datetime
from abc import ABC, abstractmethod
from dataclasses import dataclass

from logfire._internal.constants import ONE_SECOND_IN_NANOSECONDS
from logfire._internal.utils import JsonValue, sha256_bytes


@dataclass
class UploadItem:
"""An item to upload."""

key: str
value: bytes
media_type: str | None = None

@classmethod
def create(cls, value: bytes, *, timestamp: int | None, media_type: str | None = None) -> UploadItem:
"""Create an UploadItem with a generated key.

Use this instead of constructing directly.
"""
parts = [sha256_bytes(value)]

if media_type: # pragma: no branch
parts.append(media_type)

if timestamp is None: # pragma: no cover
date = datetime.date.today()
else:
date = datetime.datetime.fromtimestamp(timestamp / ONE_SECOND_IN_NANOSECONDS).date()
parts.append(date.isoformat())

key = '/'.join(parts[::-1])
return cls(key=key, value=value, media_type=media_type)


class BaseUploader(ABC):
"""Abstract base class for uploaders."""

@abstractmethod
def upload(self, item: UploadItem) -> None:
"""Upload the given item."""

@abstractmethod
def get_attribute_value(self, item: UploadItem) -> JsonValue:
"""Return a reference to the uploaded item, e.g. a URL or path."""
25 changes: 25 additions & 0 deletions logfire/experimental/uploaders/gcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from __future__ import annotations

from io import BytesIO

from google.cloud import storage

from logfire.experimental.uploaders import BaseUploader, UploadItem


class GcsUploader(BaseUploader):
"""Google Cloud Storage uploader."""

def __init__(self, bucket_name: str, client: storage.Client | None = None):
self.bucket_name = bucket_name
self.client = client or storage.Client()
self.bucket: storage.Bucket = self.client.bucket(bucket_name) # pyright: ignore [reportUnknownMemberType]

def upload(self, item: UploadItem):
"""Upload the given item to GCS."""
blob: storage.Blob = self.bucket.blob(item.key) # pyright: ignore [reportUnknownMemberType]
blob.upload_from_file(BytesIO(item.value), content_type=item.media_type) # pyright: ignore [reportUnknownMemberType]

def get_attribute_value(self, item: UploadItem):
"""Return the GCS authenticated URL for the uploaded item."""
return f'https://storage.cloud.google.com/{self.bucket_name}/{item.key}'
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ dev = [
"openinference-instrumentation-litellm >= 0",
"litellm >= 0",
"pip >= 0",
"google-cloud-storage>=3.4.1",
]
docs = [
"black>=23.12.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
interactions:
- request:
body: '{"messages":[{"role":"user","content":[{"text":"What is this?","type":"text"},{"image_url":{"url":"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAACklEQVR4nGMAAQAABQABDQottAAAAABJRU5ErkJggg=="},"type":"image_url"}]}],"model":"gpt-4o","stream":false}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '271'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- pydantic-ai/1.6.0
x-stainless-arch:
- arm64
x-stainless-async:
- async:asyncio
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 2.6.1
x-stainless-read-timeout:
- '600'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.7
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAA4xTyW7bQAy9+yuIuaQFbMO7E1976IL2UqTooQkEZkRJrGfLDFUnCALkD/NLxchO
5LQp0Isg8PG9edzuBgCKS7UBpRsUbYMZvftuv37eLs+/bD3H+c2Z/fThm9ml67XHKGqYGf7qJ2l5
Yo21t8GQsHd7WEdCoaw6Xa+mZ7PT9XzZAdaXZDKtDjJa+NFsMluMJqejyepAbDxrSmoDPwYAAHfd
N1t0Jd2oDUyGTxFLKWFNavOcBKCiNzmiMCVOgm5v9wBq74Rc5/q8IWCLNUFq/C6BNATG7yhqTATv
I9EWDIlQhAsV+ELBm8eHt2N4fABOgGBRGrIorNGA9q57CyKFSImcsKs7yYjCHnwFCJqjNnSSup/W
VhTJaQLxwJKgZLQkFIeAIUR/wxaFzC3QdYsmJ83H08V0eTaGj3KSYMdlRttEJbA7cpMAXQlJcxYf
HxcfqWoT5t671pgjAJ3zkn26ru2XB+T+udHG1yH6q/QHVVXsODVFJEze5aYm8UF16P0A4LIbaPti
RipEb4MU4rfUPTdbrfZ6ql+hHl2sD6B4QdPH59P58BW9oiRBNuloJZRG3VDZU/v9wbZkfwQMjqr+
281r2vvK2dX/I98DWlMQKosQqWT9suI+LVK+sH+lPXe5M6wSxV+sqRCmmCdRUoWtOdxquk1CtqjY
1RRD5P0FVKHQV9V0fbpcrtZqcD/4DQAA//8DADK3eBAKBAAA
headers:
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Fri, 31 Oct 2025 16:38:57 GMT
Server:
- cloudflare
Strict-Transport-Security:
- max-age=31536000; includeSubDomains; preload
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-processing-ms:
- '1643'
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '1823'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-input-images:
- '50000'
x-ratelimit-limit-requests:
- '10000'
x-ratelimit-limit-tokens:
- '2000000'
x-ratelimit-remaining-input-images:
- '49999'
x-ratelimit-remaining-requests:
- '9999'
x-ratelimit-remaining-tokens:
- '1999229'
x-ratelimit-reset-input-images:
- 1ms
x-ratelimit-reset-requests:
- 6ms
x-ratelimit-reset-tokens:
- 23ms
status:
code: 200
message: OK
version: 1
Loading
Loading