Skip to content

Commit c8bc53c

Browse files
authored
feat(sdk): support multiple span processors (#3207)
1 parent c0dd1c8 commit c8bc53c

File tree

6 files changed

+362
-83
lines changed

6 files changed

+362
-83
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
2+
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
3+
from traceloop.sdk import Traceloop
4+
from traceloop.sdk.decorators import task
5+
from openai import OpenAI
6+
import os
7+
8+
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
9+
10+
11+
traceloop_processor = Traceloop.get_default_span_processor(disable_batch=True)
12+
13+
console_processor = SimpleSpanProcessor(ConsoleSpanExporter())
14+
15+
Traceloop.init(processor=[traceloop_processor, console_processor])
16+
17+
18+
@task(name="joke_creation", version=1)
19+
def create_joke():
20+
completion = client.chat.completions.create(
21+
model="gpt-3.5-turbo",
22+
messages=[{"role": "user", "content": "Tell me a joke about opentelemetry multiple span processors"}],
23+
)
24+
25+
result = completion.choices[0].message.content
26+
print(result)
27+
return result
28+
29+
30+
def main():
31+
create_joke()
32+
33+
34+
if __name__ == "__main__":
35+
main()

packages/traceloop-sdk/tests/conftest.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,3 +155,63 @@ def exporter_with_no_metrics():
155155
if _trace_wrapper_instance:
156156
TracerWrapper.instance = _trace_wrapper_instance
157157
os.environ["TRACELOOP_METRICS_ENABLED"] = "true"
158+
159+
160+
@pytest.fixture
161+
def exporters_with_multiple_span_processors():
162+
# Clear singleton if existed
163+
if hasattr(TracerWrapper, "instance"):
164+
_trace_wrapper_instance = TracerWrapper.instance
165+
del TracerWrapper.instance
166+
167+
class CustomSpanProcessor(SimpleSpanProcessor):
168+
def on_start(self, span, parent_context=None):
169+
span.set_attribute("custom_processor", "enabled")
170+
span.set_attribute("processor_type", "custom")
171+
172+
class MetricsSpanProcessor(SimpleSpanProcessor):
173+
def __init__(self, exporter):
174+
super().__init__(exporter)
175+
self.span_count = 0
176+
177+
def on_start(self, span, parent_context=None):
178+
self.span_count += 1
179+
span.set_attribute("metrics_processor", "enabled")
180+
span.set_attribute("span_count", self.span_count)
181+
182+
# Create exporters for different processors
183+
default_exporter = InMemorySpanExporter()
184+
custom_exporter = InMemorySpanExporter()
185+
metrics_exporter = InMemorySpanExporter()
186+
187+
# Get the default Traceloop processor
188+
default_processor = Traceloop.get_default_span_processor(
189+
disable_batch=True,
190+
exporter=default_exporter
191+
)
192+
193+
# Create custom processors
194+
custom_processor = CustomSpanProcessor(custom_exporter)
195+
metrics_processor = MetricsSpanProcessor(metrics_exporter)
196+
197+
# Initialize with multiple processors
198+
processors = [default_processor, custom_processor, metrics_processor]
199+
200+
Traceloop.init(
201+
app_name="test_multiple_processors",
202+
api_endpoint="http://localhost:4318", # Use local endpoint to avoid API key requirement
203+
processor=processors,
204+
disable_batch=True,
205+
)
206+
207+
# Return all exporters so we can verify each processor worked
208+
yield {
209+
"default": default_exporter,
210+
"custom": custom_exporter,
211+
"metrics": metrics_exporter,
212+
"processor": processors
213+
}
214+
215+
# Restore singleton if any
216+
if _trace_wrapper_instance:
217+
TracerWrapper.instance = _trace_wrapper_instance

packages/traceloop-sdk/tests/test_sdk_initialization.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,78 @@ def run_workflow():
6868
spans = exporter_with_no_metrics.get_finished_spans()
6969
workflow_span = spans[0]
7070
assert workflow_span
71+
72+
73+
def test_multiple_span_processors(exporters_with_multiple_span_processors):
74+
"""Test that multiple span processors work correctly together."""
75+
from traceloop.sdk.decorators import workflow, task
76+
77+
@task(name="test_task")
78+
def test_task():
79+
return "task_result"
80+
81+
@workflow(name="test_workflow")
82+
def test_workflow():
83+
return test_task()
84+
85+
# Run the workflow to generate spans
86+
result = test_workflow()
87+
assert result == "task_result"
88+
89+
exporters = exporters_with_multiple_span_processors
90+
91+
# Check that all processors received spans
92+
default_spans = exporters["default"].get_finished_spans()
93+
custom_spans = exporters["custom"].get_finished_spans()
94+
metrics_spans = exporters["metrics"].get_finished_spans()
95+
96+
# All processors should have received the spans
97+
assert len(default_spans) == 2, "Default processor should have received spans"
98+
assert len(custom_spans) == 2, "Custom processor should have received spans"
99+
assert len(metrics_spans) == 2, "Metrics processor should have received spans"
100+
101+
# Verify that the default processor (Traceloop) added its attributes
102+
default_span = default_spans[0]
103+
# The default processor should have Traceloop-specific attributes
104+
assert hasattr(default_span, 'attributes')
105+
106+
# Verify that custom processor added its attributes
107+
custom_span = custom_spans[0]
108+
assert custom_span.attributes.get("custom_processor") == "enabled"
109+
assert custom_span.attributes.get("processor_type") == "custom"
110+
111+
# Verify that metrics processor added its attributes
112+
# Now that we fixed the double-call bug, the span_count should be correct
113+
workflow_spans = [s for s in metrics_spans if "workflow" in s.name]
114+
task_spans = [s for s in metrics_spans if "task" in s.name]
115+
assert len(workflow_spans) == 1
116+
assert len(task_spans) == 1
117+
118+
# The workflow span should be processed first (span_count=1)
119+
# The task span should be processed second (span_count=2)
120+
workflow_span = workflow_spans[0]
121+
task_span = task_spans[0]
122+
123+
assert workflow_span.attributes.get("metrics_processor") == "enabled"
124+
assert workflow_span.attributes.get("span_count") == 1
125+
126+
assert task_span.attributes.get("metrics_processor") == "enabled"
127+
assert task_span.attributes.get("span_count") == 2
128+
129+
130+
def test_get_default_span_processor():
131+
"""Test that get_default_span_processor returns a valid processor."""
132+
from traceloop.sdk import Traceloop
133+
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, BatchSpanProcessor
134+
135+
# Test with batch disabled
136+
processor = Traceloop.get_default_span_processor(disable_batch=True)
137+
assert isinstance(processor, SimpleSpanProcessor)
138+
assert hasattr(processor, "_traceloop_processor")
139+
assert getattr(processor, "_traceloop_processor") is True
140+
141+
# Test with batch enabled
142+
processor = Traceloop.get_default_span_processor(disable_batch=False)
143+
assert isinstance(processor, BatchSpanProcessor)
144+
assert hasattr(processor, "_traceloop_processor")
145+
assert getattr(processor, "_traceloop_processor") is True

packages/traceloop-sdk/tests/test_workflows.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def joke_workflow():
4949
assert open_ai_span.attributes.get("traceloop.prompt.template") == "Tell me a {what} about {subject}"
5050
assert open_ai_span.attributes.get("traceloop.prompt.template_variables.what") == "joke"
5151
assert open_ai_span.attributes.get("traceloop.prompt.template_variables.subject") == "OpenTelemetry"
52-
assert open_ai_span.attributes.get("traceloop.prompt.version") == 5
52+
assert open_ai_span.attributes.get("traceloop.prompt.version") == "5"
5353

5454
workflow_span = next(span for span in spans if span.name == "pirate_joke_generator.workflow")
5555
task_span = next(span for span in spans if span.name == "something_creator.task")
@@ -95,7 +95,7 @@ async def joke_workflow():
9595
assert open_ai_span.attributes.get("traceloop.prompt.template") == "Tell me a {what} about {subject}"
9696
assert open_ai_span.attributes.get("traceloop.prompt.template_variables.what") == "joke"
9797
assert open_ai_span.attributes.get("traceloop.prompt.template_variables.subject") == "OpenTelemetry"
98-
assert open_ai_span.attributes.get("traceloop.prompt.version") == 5
98+
assert open_ai_span.attributes.get("traceloop.prompt.version") == "5"
9999

100100
workflow_span = next(span for span in spans if span.name == "pirate_joke_generator.workflow")
101101
task_span = next(span for span in spans if span.name == "something_creator.task")

packages/traceloop-sdk/traceloop/sdk/__init__.py

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import sys
33
from pathlib import Path
44

5-
from typing import Callable, Optional, Set
5+
from typing import Callable, List, Optional, Set, Union
66
from colorama import Fore
77
from opentelemetry.sdk.trace import SpanProcessor, ReadableSpan
88
from opentelemetry.sdk.trace.sampling import Sampler
@@ -59,7 +59,7 @@ def init(
5959
metrics_headers: Dict[str, str] = None,
6060
logging_exporter: LogExporter = None,
6161
logging_headers: Dict[str, str] = None,
62-
processor: Optional[SpanProcessor] = None,
62+
processor: Optional[Union[SpanProcessor, List[SpanProcessor]]] = None,
6363
propagator: TextMapPropagator = None,
6464
sampler: Optional[Sampler] = None,
6565
traceloop_sync_enabled: bool = False,
@@ -207,6 +207,49 @@ def set_association_properties(properties: dict) -> None:
207207
def set_prompt(template: str, variables: dict, version: int):
208208
set_external_prompt_tracing_context(template, variables, version)
209209

210+
@staticmethod
211+
def get_default_span_processor(
212+
disable_batch: bool = False,
213+
api_endpoint: Optional[str] = None,
214+
api_key: Optional[str] = None,
215+
headers: Optional[Dict[str, str]] = None,
216+
exporter: Optional[SpanExporter] = None
217+
) -> SpanProcessor:
218+
"""
219+
Creates and returns the default Traceloop span processor.
220+
221+
This function allows users to get the default Traceloop span processor
222+
to combine it with their custom processors when using the processors parameter.
223+
224+
Args:
225+
disable_batch: If True, uses SimpleSpanProcessor, otherwise BatchSpanProcessor
226+
api_endpoint: The endpoint URL for the exporter (uses current config if None)
227+
headers: Headers for the exporter (uses current config if None)
228+
exporter: Custom exporter to use (creates default if None)
229+
230+
Returns:
231+
SpanProcessor: The default Traceloop span processor
232+
233+
Example:
234+
# Get the default processor and combine with custom one
235+
default_processor = Traceloop.get_default_span_processor()
236+
custom_processor = MyCustomSpanProcessor()
237+
238+
Traceloop.init(
239+
processors=[default_processor, custom_processor]
240+
)
241+
"""
242+
from traceloop.sdk.tracing.tracing import get_default_span_processor
243+
if headers is None:
244+
if api_key is None:
245+
api_key = os.getenv("TRACELOOP_API_KEY")
246+
headers = {
247+
"Authorization": f"Bearer {api_key}",
248+
}
249+
if api_endpoint is None:
250+
api_endpoint = os.getenv("TRACELOOP_BASE_URL")
251+
return get_default_span_processor(disable_batch, api_endpoint, headers, exporter)
252+
210253
@staticmethod
211254
def get():
212255
"""

0 commit comments

Comments
 (0)