Skip to content

Commit 4c680eb

Browse files
authored
Merge pull request #5 from mdrideout/2025_10_11_dependency-updates
Add telemetry flush support and update dependencies
2 parents 23dd27b + fb4b5b6 commit 4c680eb

File tree

4 files changed

+52
-8
lines changed

4 files changed

+52
-8
lines changed

examples/base/src/base/main.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ async def main():
1212
load_dotenv()
1313

1414
# Setup OpenTelemetry before anything else happens
15-
init_otel(service_name="Junjo Base Example")
15+
exporter = init_otel(service_name="Junjo Base Example")
1616

1717
# Subscribe to state changes
1818
def on_state_change(new_state: SampleWorkflowState):
@@ -27,6 +27,11 @@ def on_state_change(new_state: SampleWorkflowState):
2727
await unsubscribe()
2828

2929
print("Done executing the base example workflow.")
30+
31+
# Flush telemetry before exit
32+
if exporter is not None:
33+
print("Flushing telemetry...")
34+
exporter.flush()
3035
return
3136

3237
if __name__ == "__main__":

examples/base/src/base/otel_config.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@
88
from opentelemetry.sdk.trace import TracerProvider
99

1010

11-
def init_otel(service_name: str):
11+
def init_otel(service_name: str) -> JunjoServerOtelExporter | None:
1212
"""Configure OpenTelemetry for this application."""
1313

1414
# Load the JUNJO_SERVER_API_KEY from the environment variable
1515
JUNJO_SERVER_API_KEY = os.getenv("JUNJO_SERVER_API_KEY")
1616
if JUNJO_SERVER_API_KEY is None:
1717
print("JUNJO_SERVER_API_KEY environment variable is not set. "
1818
"Generate a new API key in the Junjo Server UI.")
19-
return
19+
return None
2020

2121
# Configure OpenTelemetry for this application
2222
# Create the OpenTelemetry Resource to identify this service
@@ -51,3 +51,5 @@ def init_otel(service_name: str):
5151
# Instrument OpenInference Libraries
5252
# Google genai
5353
GoogleGenAIInstrumentor().instrument(tracer_provider=tracer_provider)
54+
55+
return junjo_server_exporter

pyproject.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "junjo"
7-
version = "0.60.2"
7+
version = "0.61.0"
88
description = "A graph workflow execution library for building agentic AI workflows."
99
readme = "README.md"
1010
requires-python = ">=3.11"
@@ -21,11 +21,11 @@ classifiers = [
2121
]
2222

2323
dependencies = [
24-
"grpcio>=1.70.0",
24+
"grpcio>=1.75.1",
2525
"jsonpatch>=1.33",
2626
"nanoid>=2.0.0",
27-
"opentelemetry-exporter-otlp-proto-grpc>=1.30.0",
28-
"opentelemetry-sdk>=1.30.0",
27+
"opentelemetry-exporter-otlp-proto-grpc>=1.37.0",
28+
"opentelemetry-sdk>=1.37.0",
2929
"pydantic>=2.10.6",
3030
]
3131

src/junjo/telemetry/junjo_server_otel_exporter.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,10 @@ def __init__(
4949

5050
# Set OTLP Span Exporter for Junjo Server
5151
oltp_exporter = OTLPSpanExporter(
52-
endpoint=self._endpoint, insecure=self._insecure, headers=exporter_headers
52+
endpoint=self._endpoint,
53+
insecure=self._insecure,
54+
headers=exporter_headers,
55+
timeout=120
5356
)
5457
self._span_processor = BatchSpanProcessor(oltp_exporter)
5558

@@ -69,3 +72,37 @@ def span_processor(self):
6972
@property
7073
def metric_reader(self):
7174
return self._metric_reader
75+
76+
def flush(self, timeout_millis: float = 120000) -> bool:
77+
"""
78+
Flush all pending telemetry manually.
79+
80+
This method blocks until all telemetry is exported or the timeout is reached.
81+
It leverages the existing retry/timeout logic in the underlying gRPC exporters.
82+
It can be used to force a flush of all pending telemetry before the application exits.
83+
84+
Args:
85+
timeout_millis: Maximum time to wait for flush in milliseconds.
86+
Defaults to 120000ms (120 seconds) to match the
87+
exporter timeout and allow for retries.
88+
89+
Returns:
90+
True if all telemetry was flushed successfully, False otherwise.
91+
"""
92+
success = True
93+
94+
# Flush span processor
95+
try:
96+
if not self._span_processor.force_flush(int(timeout_millis)):
97+
success = False
98+
except Exception:
99+
success = False
100+
101+
# Flush metric reader
102+
try:
103+
if not self._metric_reader.force_flush(timeout_millis):
104+
success = False
105+
except Exception:
106+
success = False
107+
108+
return success

0 commit comments

Comments
 (0)