Skip to content

Commit 490b22d

Browse files
authored
feat: otel tracer use global tracer provider (#50)
* feat: update tracer init * feat: add ut * feat: format code
1 parent 33de53c commit 490b22d

File tree

2 files changed

+96
-17
lines changed

2 files changed

+96
-17
lines changed

tests/test_tracing.py

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,18 @@
2929
)
3030
from veadk.tracing.telemetry.opentelemetry_tracer import OpentelemetryTracer
3131

32+
from opentelemetry import trace as trace_api
33+
from opentelemetry.sdk import trace as trace_sdk
34+
from opentelemetry.sdk.trace.export import BatchSpanProcessor
35+
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
36+
OTLPSpanExporter,
37+
)
38+
3239
APP_NAME = "app"
3340
USER_ID = "testuser"
3441
SESSION_ID = "testsession"
3542

36-
37-
@pytest.mark.asyncio
38-
async def test_tracing():
43+
def init_exporters():
3944
cozeloop_exporter = CozeloopExporter(
4045
config=CozeloopExporterConfig(
4146
endpoint="http://localhost:8000",
@@ -61,12 +66,52 @@ async def test_tracing():
6166
secret_key="test_secret_key",
6267
)
6368
)
69+
return [cozeloop_exporter, apmplus_exporter, tls_exporter]
6470

65-
exporters = [cozeloop_exporter, apmplus_exporter, tls_exporter]
71+
def gen_span_processor(endpoint: str):
72+
otlp_exporter = OTLPSpanExporter(
73+
endpoint=endpoint,
74+
)
75+
span_processor = BatchSpanProcessor(otlp_exporter)
76+
return span_processor
77+
78+
@pytest.mark.asyncio
79+
async def test_tracing():
80+
exporters = init_exporters()
6681
tracer = OpentelemetryTracer(exporters=exporters)
6782

6883
assert len(tracer.exporters) == 5 # with extra 2 built-in exporters
6984

7085
# TODO: Ensure the tracing provider is set correctly after loading SDK
7186
# TODO: Ensure the tracing provider is set correctly after loading SDK
7287
# TODO: Ensure the tracing provider is set correctly after loading SDK
88+
89+
@pytest.mark.asyncio
90+
async def test_tracing_with_global_provider():
91+
exporters = init_exporters()
92+
# set global tracer provider before init OpentelemetryTracer
93+
trace_api.set_tracer_provider(trace_sdk.TracerProvider())
94+
tracer_provider = trace_api.get_tracer_provider()
95+
tracer_provider.add_span_processor(gen_span_processor("http://localhost:8000"))
96+
trace_api.set_tracer_provider(tracer_provider)
97+
#
98+
tracer = OpentelemetryTracer(exporters=exporters)
99+
100+
assert len(tracer.exporters) == 5 # with extra 2 built-in exporters
101+
102+
103+
@pytest.mark.asyncio
104+
async def test_tracing_with_apmplus_global_provider():
105+
exporters = init_exporters()
106+
# add apmplus exporter to global tracer provider before init OpentelemetryTracer
107+
trace_api.set_tracer_provider(trace_sdk.TracerProvider())
108+
tracer_provider = trace_api.get_tracer_provider()
109+
tracer_provider.add_span_processor(gen_span_processor("http://apmplus-region.com"))
110+
111+
# init OpentelemetryTracer
112+
tracer = OpentelemetryTracer(exporters=exporters)
113+
114+
# apmplus exporter won't init again
115+
assert len(tracer.exporters) == 4 # with extra 2 built-in exporters
116+
117+

veadk/tracing/telemetry/opentelemetry_tracer.py

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,17 @@
2020

2121
from openinference.instrumentation.google_adk import GoogleADKInstrumentor
2222
from opentelemetry.sdk import trace as trace_sdk
23+
from opentelemetry import trace as trace_api
2324
from opentelemetry.sdk.resources import Resource
2425
from opentelemetry.sdk.trace import TracerProvider
26+
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor
27+
2528
from pydantic import BaseModel, ConfigDict, Field
2629
from typing_extensions import override
2730

2831
from veadk.tracing.base_tracer import BaseTracer
2932
from veadk.tracing.telemetry.exporters.apiserver_exporter import ApiServerExporter
33+
from veadk.tracing.telemetry.exporters.apmplus_exporter import APMPlusExporter
3034
from veadk.tracing.telemetry.exporters.base_exporter import BaseExporter
3135
from veadk.tracing.telemetry.exporters.inmemory_exporter import InMemoryExporter
3236
from veadk.tracing.telemetry.metrics.opentelemetry_metrics import MeterUploader
@@ -60,7 +64,6 @@ def model_post_init(self, context: Any, /) -> None:
6064
self._processors = []
6165
self._inmemory_exporter: InMemoryExporter = None
6266
self._apiserver_exporter: ApiServerExporter = None
63-
6467
# Inmemory & APIServer are the default exporters
6568
have_inmemory_exporter = False
6669
have_apiserver_exporter = False
@@ -81,17 +84,6 @@ def model_post_init(self, context: Any, /) -> None:
8184
self.exporters.append(apiserver_exporter)
8285
self._apiserver_exporter = apiserver_exporter
8386

84-
tracer_provider = trace_sdk.TracerProvider()
85-
for exporter in self.exporters:
86-
processor, resource_attributes = exporter.get_processor()
87-
if resource_attributes is not None:
88-
update_resource_attributions(tracer_provider, resource_attributes)
89-
tracer_provider.add_span_processor(processor)
90-
91-
logger.debug(f"Add exporter `{exporter.__class__.__name__}` to tracing.")
92-
self._processors.append(processor)
93-
logger.debug(f"Init OpentelemetryTracer with {len(self.exporters)} exporters.")
94-
9587
self._meter_contexts = []
9688
self._meter_uploaders = []
9789
for exporter in self.exporters:
@@ -103,13 +95,55 @@ def model_post_init(self, context: Any, /) -> None:
10395
meter_uploader = MeterUploader(meter_context)
10496
self._meter_uploaders.append(meter_uploader)
10597

98+
# init tracer provider
99+
self._init_tracer_provider()
100+
106101
# just for debug
107102
self._trace_file_path = ""
108103

109104
# patch this before starting instrumentation
110105
# enable_veadk_tracing(self.dump)
111106

112-
GoogleADKInstrumentor().instrument(tracer_provider=tracer_provider)
107+
GoogleADKInstrumentor().instrument()
108+
109+
def _init_tracer_provider(self):
110+
# 1. get global trace provider
111+
global_tracer_provider = trace_api.get_tracer_provider()
112+
113+
if not isinstance(global_tracer_provider, TracerProvider):
114+
logger.info(
115+
f"Global tracer provider has not been set. Create tracer provider and set it now."
116+
)
117+
# 1.1 init tracer provider
118+
tracer_provider = trace_sdk.TracerProvider()
119+
trace_api.set_tracer_provider(tracer_provider)
120+
global_tracer_provider = trace_api.get_tracer_provider()
121+
122+
have_apmplus_exporter = False
123+
# 2. check if apmplus exporter is already exist
124+
for processor in global_tracer_provider._active_span_processor._span_processors:
125+
if isinstance(processor, (BatchSpanProcessor, SimpleSpanProcessor)):
126+
# check exporter endpoint
127+
if "apmplus" in processor.span_exporter._endpoint:
128+
have_apmplus_exporter = True
129+
130+
# 3. add exporters to global tracer_provider
131+
# range over a copy of exporters to avoid index issues
132+
for exporter in self.exporters[:]:
133+
if have_apmplus_exporter and isinstance(exporter, APMPlusExporter):
134+
# apmplus exporter has been int in global tracer provider, need to remove from exporters.
135+
self.exporters.remove(exporter)
136+
continue
137+
processor, resource_attributes = exporter.get_processor()
138+
if resource_attributes is not None:
139+
update_resource_attributions(
140+
global_tracer_provider, resource_attributes
141+
)
142+
global_tracer_provider.add_span_processor(processor)
143+
logger.debug(f"Add exporter `{exporter.__class__.__name__}` to tracing.")
144+
self._processors.append(processor)
145+
logger.debug(f"Init OpentelemetryTracer with {len(self.exporters)} exporters.")
146+
113147

114148
@override
115149
def dump(

0 commit comments

Comments
 (0)