Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions scripts/integration-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,15 @@ if [[ "$STACK_CONFIG" == *"docker:"* && "$COLLECT_ONLY" == false ]]; then
docker stop "$container_name" 2>/dev/null || true
docker rm "$container_name" 2>/dev/null || true

# Configure telemetry collector port shared between host and container
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you not need to run the docker with this additional port mapped or something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I guess we run it with --network host -- which of course doesn't quite work on a mac :)

COLLECTOR_PORT=4317
export LLAMA_STACK_TEST_COLLECTOR_PORT="${COLLECTOR_PORT}"

# Build environment variables for docker run
DOCKER_ENV_VARS=""
DOCKER_ENV_VARS="$DOCKER_ENV_VARS -e LLAMA_STACK_TEST_INFERENCE_MODE=$INFERENCE_MODE"
DOCKER_ENV_VARS="$DOCKER_ENV_VARS -e LLAMA_STACK_TEST_STACK_CONFIG_TYPE=server"
DOCKER_ENV_VARS="$DOCKER_ENV_VARS -e OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:${COLLECTOR_PORT}"

# Pass through API keys if they exist
[ -n "${TOGETHER_API_KEY:-}" ] && DOCKER_ENV_VARS="$DOCKER_ENV_VARS -e TOGETHER_API_KEY=$TOGETHER_API_KEY"
Expand Down
33 changes: 33 additions & 0 deletions tests/integration/fixtures/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,35 @@ def wait_for_server_ready(base_url: str, timeout: int = 30, process: subprocess.
return False


def stop_server_on_port(port: int, timeout: float = 10.0) -> None:
"""Terminate any server processes bound to the given port."""

try:
output = subprocess.check_output(["lsof", "-ti", f":{port}"], text=True)
except (subprocess.CalledProcessError, FileNotFoundError):
return

pids = {int(line) for line in output.splitlines() if line.strip()}
if not pids:
return

deadline = time.time() + timeout
for sig in (signal.SIGTERM, signal.SIGKILL):
for pid in list(pids):
try:
os.kill(pid, sig)
except ProcessLookupError:
pids.discard(pid)

while not is_port_available(port) and time.time() < deadline:
time.sleep(0.1)

if is_port_available(port):
return

raise RuntimeError(f"Unable to free port {port} for test server restart")


def get_provider_data():
# TODO: this needs to be generalized so each provider can have a sample provider data just
# like sample run config on which we can do replace_env_vars()
Expand Down Expand Up @@ -199,6 +228,10 @@ def instantiate_llama_stack_client(session):
port = int(parts[2]) if len(parts) > 2 else int(os.environ.get("LLAMA_STACK_PORT", DEFAULT_PORT))
base_url = f"http://localhost:{port}"

force_restart = os.environ.get("LLAMA_STACK_TEST_FORCE_SERVER_RESTART") == "1"
if force_restart:
stop_server_on_port(port)

# Check if port is available
if is_port_available(port):
print(f"Starting llama stack server with config '{config_name}' on port {port}...")
Expand Down
19 changes: 19 additions & 0 deletions tests/integration/telemetry/collectors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.

"""Telemetry collector helpers for integration tests."""

from .base import BaseTelemetryCollector, SpanStub
from .in_memory import InMemoryTelemetryCollector, InMemoryTelemetryManager
from .otlp import OtlpHttpTestCollector

__all__ = [
"BaseTelemetryCollector",
"SpanStub",
"InMemoryTelemetryCollector",
"InMemoryTelemetryManager",
"OtlpHttpTestCollector",
]
110 changes: 110 additions & 0 deletions tests/integration/telemetry/collectors/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.

"""Shared helpers for telemetry test collectors."""

from collections.abc import Iterable
from dataclasses import dataclass
from typing import Any


@dataclass
class SpanStub:
name: str
attributes: dict[str, Any]
resource_attributes: dict[str, Any] | None = None
events: list[dict[str, Any]] | None = None
trace_id: str | None = None
span_id: str | None = None


def _value_to_python(value: Any) -> Any:
kind = value.WhichOneof("value")
if kind == "string_value":
return value.string_value
if kind == "int_value":
return value.int_value
if kind == "double_value":
return value.double_value
if kind == "bool_value":
return value.bool_value
if kind == "bytes_value":
return value.bytes_value
if kind == "array_value":
return [_value_to_python(item) for item in value.array_value.values]
if kind == "kvlist_value":
return {kv.key: _value_to_python(kv.value) for kv in value.kvlist_value.values}
return None


def attributes_to_dict(key_values: Iterable[Any]) -> dict[str, Any]:
return {key_value.key: _value_to_python(key_value.value) for key_value in key_values}


def events_to_list(events: Iterable[Any]) -> list[dict[str, Any]]:
return [
{
"name": event.name,
"timestamp": event.time_unix_nano,
"attributes": attributes_to_dict(event.attributes),
}
for event in events
]


class BaseTelemetryCollector:
def get_spans(
self,
expected_count: int | None = None,
timeout: float = 5.0,
poll_interval: float = 0.05,
) -> tuple[Any, ...]:
import time

deadline = time.time() + timeout
min_count = expected_count if expected_count is not None else 1
last_len: int | None = None
stable_iterations = 0

while True:
spans = tuple(self._snapshot_spans())

if len(spans) >= min_count:
if expected_count is not None and len(spans) >= expected_count:
return spans

if last_len == len(spans):
stable_iterations += 1
if stable_iterations >= 2:
return spans
else:
stable_iterations = 1
else:
stable_iterations = 0

if time.time() >= deadline:
return spans

last_len = len(spans)
time.sleep(poll_interval)

def get_metrics(self) -> Any | None:
return self._snapshot_metrics()

def clear(self) -> None:
self._clear_impl()

def _snapshot_spans(self) -> tuple[Any, ...]: # pragma: no cover - interface hook
raise NotImplementedError

def _snapshot_metrics(self) -> Any | None: # pragma: no cover - interface hook
raise NotImplementedError

def _clear_impl(self) -> None: # pragma: no cover - interface hook
raise NotImplementedError

def shutdown(self) -> None:
"""Optional hook for subclasses with background workers."""
93 changes: 93 additions & 0 deletions tests/integration/telemetry/collectors/in_memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.

"""In-memory telemetry collector for library-client tests."""

from typing import Any

import opentelemetry.metrics as otel_metrics
import opentelemetry.trace as otel_trace
from opentelemetry import metrics, trace
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import InMemoryMetricReader
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter

import llama_stack.core.telemetry.telemetry as telemetry_module

from .base import BaseTelemetryCollector, SpanStub


class InMemoryTelemetryCollector(BaseTelemetryCollector):
def __init__(self, span_exporter: InMemorySpanExporter, metric_reader: InMemoryMetricReader) -> None:
self._span_exporter = span_exporter
self._metric_reader = metric_reader

def _snapshot_spans(self) -> tuple[Any, ...]:
spans = []
for span in self._span_exporter.get_finished_spans():
trace_id = None
span_id = None
context = getattr(span, "context", None)
if context:
trace_id = f"{context.trace_id:032x}"
span_id = f"{context.span_id:016x}"
else:
trace_id = getattr(span, "trace_id", None)
span_id = getattr(span, "span_id", None)

stub = SpanStub(
span.name,
span.attributes,
getattr(span, "resource", None),
getattr(span, "events", None),
trace_id,
span_id,
)
spans.append(stub)

return tuple(spans)

def _snapshot_metrics(self) -> Any | None:
data = self._metric_reader.get_metrics_data()
if data and data.resource_metrics:
resource_metric = data.resource_metrics[0]
if resource_metric.scope_metrics:
return resource_metric.scope_metrics[0].metrics
return None

def _clear_impl(self) -> None:
self._span_exporter.clear()
self._metric_reader.get_metrics_data()


class InMemoryTelemetryManager:
def __init__(self) -> None:
if hasattr(otel_trace, "_TRACER_PROVIDER_SET_ONCE"):
otel_trace._TRACER_PROVIDER_SET_ONCE._done = False # type: ignore[attr-defined]
if hasattr(otel_metrics, "_METER_PROVIDER_SET_ONCE"):
otel_metrics._METER_PROVIDER_SET_ONCE._done = False # type: ignore[attr-defined]

span_exporter = InMemorySpanExporter()
tracer_provider = TracerProvider()
tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter))
trace.set_tracer_provider(tracer_provider)

metric_reader = InMemoryMetricReader()
meter_provider = MeterProvider(metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)

telemetry_module._TRACER_PROVIDER = tracer_provider

self.collector = InMemoryTelemetryCollector(span_exporter, metric_reader)
self._tracer_provider = tracer_provider
self._meter_provider = meter_provider

def shutdown(self) -> None:
telemetry_module._TRACER_PROVIDER = None
self._tracer_provider.shutdown()
self._meter_provider.shutdown()
Loading
Loading