Skip to content

Commit 1648905

Browse files
Add TTFT to ollama instrumentation (#2949)
Co-authored-by: Nir Gazit <[email protected]>
1 parent 5e38812 commit 1648905

File tree

16 files changed

+332
-266
lines changed

16 files changed

+332
-266
lines changed

packages/opentelemetry-instrumentation-langchain/poetry.lock

Lines changed: 29 additions & 30 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/opentelemetry-instrumentation-llamaindex/poetry.lock

Lines changed: 11 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/opentelemetry-instrumentation-ollama/opentelemetry/instrumentation/ollama/__init__.py

Lines changed: 68 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from opentelemetry.trace import get_tracer, SpanKind, Tracer
1414
from opentelemetry.trace.status import Status, StatusCode
1515
from opentelemetry.metrics import Histogram, Meter, get_meter
16+
from opentelemetry.semconv._incubating.metrics import gen_ai_metrics as GenAIMetrics
1617

1718
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
1819
from opentelemetry.instrumentation.utils import (
@@ -253,13 +254,27 @@ def _set_response_attributes(span, token_histogram, llm_request_type, response):
253254
)
254255

255256

256-
def _accumulate_streaming_response(span, token_histogram, llm_request_type, response):
257+
def _accumulate_streaming_response(
258+
span,
259+
token_histogram,
260+
llm_request_type,
261+
response,
262+
streaming_time_to_first_token=None,
263+
start_time=None
264+
):
257265
if llm_request_type == LLMRequestTypeValues.CHAT:
258266
accumulated_response = {"message": {"content": "", "role": ""}}
259267
elif llm_request_type == LLMRequestTypeValues.COMPLETION:
260268
accumulated_response = {"response": ""}
261269

270+
first_token = True
262271
for res in response:
272+
if first_token and streaming_time_to_first_token and start_time is not None:
273+
streaming_time_to_first_token.record(
274+
time.perf_counter() - start_time,
275+
attributes={SpanAttributes.LLM_SYSTEM: "Ollama"},
276+
)
277+
first_token = False
263278
yield res
264279

265280
if llm_request_type == LLMRequestTypeValues.CHAT:
@@ -274,13 +289,28 @@ def _accumulate_streaming_response(span, token_histogram, llm_request_type, resp
274289
span.end()
275290

276291

277-
async def _aaccumulate_streaming_response(span, token_histogram, llm_request_type, response):
292+
async def _aaccumulate_streaming_response(
293+
span,
294+
token_histogram,
295+
llm_request_type,
296+
response,
297+
streaming_time_to_first_token=None,
298+
start_time=None,
299+
):
278300
if llm_request_type == LLMRequestTypeValues.CHAT:
279301
accumulated_response = {"message": {"content": "", "role": ""}}
280302
elif llm_request_type == LLMRequestTypeValues.COMPLETION:
281303
accumulated_response = {"response": ""}
282304

305+
first_token = True
306+
283307
async for res in response:
308+
if first_token and streaming_time_to_first_token and start_time is not None:
309+
streaming_time_to_first_token.record(
310+
time.perf_counter() - start_time,
311+
attributes={SpanAttributes.LLM_SYSTEM: "Ollama"},
312+
)
313+
first_token = False
284314
yield res
285315

286316
if llm_request_type == LLMRequestTypeValues.CHAT:
@@ -298,12 +328,13 @@ async def _aaccumulate_streaming_response(span, token_histogram, llm_request_typ
298328
def _with_tracer_wrapper(func):
299329
"""Helper for providing tracer for wrapper functions."""
300330

301-
def _with_tracer(tracer, token_histogram, duration_histogram, to_wrap):
331+
def _with_tracer(tracer, token_histogram, duration_histogram, streaming_time_to_first_token, to_wrap):
302332
def wrapper(wrapped, instance, args, kwargs):
303333
return func(
304334
tracer,
305335
token_histogram,
306336
duration_histogram,
337+
streaming_time_to_first_token,
307338
to_wrap,
308339
wrapped,
309340
instance,
@@ -332,6 +363,7 @@ def _wrap(
332363
tracer: Tracer,
333364
token_histogram: Histogram,
334365
duration_histogram: Histogram,
366+
streaming_time_to_first_token: Histogram,
335367
to_wrap,
336368
wrapped,
337369
instance,
@@ -372,7 +404,14 @@ def _wrap(
372404

373405
if span.is_recording():
374406
if kwargs.get("stream"):
375-
return _accumulate_streaming_response(span, token_histogram, llm_request_type, response)
407+
return _accumulate_streaming_response(
408+
span,
409+
token_histogram,
410+
llm_request_type,
411+
response,
412+
streaming_time_to_first_token,
413+
start_time,
414+
)
376415

377416
_set_response_attributes(span, token_histogram, llm_request_type, response)
378417
span.set_status(Status(StatusCode.OK))
@@ -386,6 +425,7 @@ async def _awrap(
386425
tracer: Tracer,
387426
token_histogram: Histogram,
388427
duration_histogram: Histogram,
428+
streaming_time_to_first_token: Histogram,
389429
to_wrap,
390430
wrapped,
391431
instance,
@@ -426,7 +466,14 @@ async def _awrap(
426466

427467
if span.is_recording():
428468
if kwargs.get("stream"):
429-
return _aaccumulate_streaming_response(span, token_histogram, llm_request_type, response)
469+
return _aaccumulate_streaming_response(
470+
span,
471+
token_histogram,
472+
llm_request_type,
473+
response,
474+
streaming_time_to_first_token,
475+
start_time,
476+
)
430477

431478
_set_response_attributes(span, token_histogram, llm_request_type, response)
432479
span.set_status(Status(StatusCode.OK))
@@ -448,7 +495,13 @@ def _build_metrics(meter: Meter):
448495
description="GenAI operation duration",
449496
)
450497

451-
return token_histogram, duration_histogram
498+
streaming_time_to_first_token = meter.create_histogram(
499+
name=GenAIMetrics.GEN_AI_SERVER_TIME_TO_FIRST_TOKEN,
500+
unit="s",
501+
description="Time to first token in streaming chat completions",
502+
)
503+
504+
return token_histogram, duration_histogram, streaming_time_to_first_token
452505

453506

454507
def is_metrics_collection_enabled() -> bool:
@@ -476,12 +529,14 @@ def _instrument(self, **kwargs):
476529
(
477530
token_histogram,
478531
duration_histogram,
532+
streaming_time_to_first_token,
479533
) = _build_metrics(meter)
480534
else:
481535
(
482536
token_histogram,
483537
duration_histogram,
484-
) = (None, None)
538+
streaming_time_to_first_token,
539+
) = (None, None, None)
485540

486541
# Patch _copy_messages to sanitize tool_calls arguments before Pydantic validation
487542
wrap_function_wrapper(
@@ -493,12 +548,12 @@ def _instrument(self, **kwargs):
493548
wrap_function_wrapper(
494549
"ollama._client",
495550
"Client._request",
496-
_dispatch_wrap(tracer, token_histogram, duration_histogram),
551+
_dispatch_wrap(tracer, token_histogram, duration_histogram, streaming_time_to_first_token),
497552
)
498553
wrap_function_wrapper(
499554
"ollama._client",
500555
"AsyncClient._request",
501-
_dispatch_awrap(tracer, token_histogram, duration_histogram),
556+
_dispatch_awrap(tracer, token_histogram, duration_histogram, streaming_time_to_first_token),
502557
)
503558

504559
def _uninstrument(self, **kwargs):
@@ -517,30 +572,30 @@ def _uninstrument(self, **kwargs):
517572
)
518573

519574

520-
def _dispatch_wrap(tracer, token_histogram, duration_histogram):
575+
def _dispatch_wrap(tracer, token_histogram, duration_histogram, streaming_time_to_first_token):
521576
def wrapper(wrapped, instance, args, kwargs):
522577
to_wrap = None
523578
if len(args) > 2 and isinstance(args[2], str):
524579
path = args[2]
525580
op = path.rstrip('/').split('/')[-1]
526581
to_wrap = next((m for m in WRAPPED_METHODS if m.get("method") == op), None)
527582
if to_wrap:
528-
return _wrap(tracer, token_histogram, duration_histogram, to_wrap)(
583+
return _wrap(tracer, token_histogram, duration_histogram, streaming_time_to_first_token, to_wrap)(
529584
wrapped, instance, args, kwargs
530585
)
531586
return wrapped(*args, **kwargs)
532587
return wrapper
533588

534589

535-
def _dispatch_awrap(tracer, token_histogram, duration_histogram):
590+
def _dispatch_awrap(tracer, token_histogram, duration_histogram, streaming_time_to_first_token):
536591
async def wrapper(wrapped, instance, args, kwargs):
537592
to_wrap = None
538593
if len(args) > 2 and isinstance(args[2], str):
539594
path = args[2]
540595
op = path.rstrip('/').split('/')[-1]
541596
to_wrap = next((m for m in WRAPPED_METHODS if m.get("method") == op), None)
542597
if to_wrap:
543-
return await _awrap(tracer, token_histogram, duration_histogram, to_wrap)(
598+
return await _awrap(tracer, token_histogram, duration_histogram, streaming_time_to_first_token, to_wrap)(
544599
wrapped, instance, args, kwargs
545600
)
546601
return await wrapped(*args, **kwargs)

packages/opentelemetry-instrumentation-ollama/poetry.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)