diff --git a/python/src/etos_api/__init__.py b/python/src/etos_api/__init__.py index 4946bba..522eb78 100644 --- a/python/src/etos_api/__init__.py +++ b/python/src/etos_api/__init__.py @@ -19,21 +19,9 @@ from importlib.metadata import PackageNotFoundError, version from etos_lib.logging.logger import setup_logging -from opentelemetry import trace -from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter -from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor -from opentelemetry.sdk.resources import ( - SERVICE_NAME, - SERVICE_VERSION, - OTELResourceDetector, - ProcessResourceDetector, - Resource, - get_aggregated_resources, -) -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor from etos_api.library.context_logging import ContextLogging +from etos_api.library.opentelemetry import setup_opentelemetry from .library.providers.register import RegisterProviders from .main import APP @@ -50,25 +38,8 @@ DEV = os.getenv("DEV", "false").lower() == "true" if os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT"): - OTEL_RESOURCE = Resource.create( - { - SERVICE_NAME: "etos-api", - SERVICE_VERSION: VERSION, - }, - ) - - OTEL_RESOURCE = get_aggregated_resources( - [OTELResourceDetector(), ProcessResourceDetector()], - ).merge(OTEL_RESOURCE) - - PROVIDER = TracerProvider(resource=OTEL_RESOURCE) - EXPORTER = OTLPSpanExporter() - PROCESSOR = BatchSpanProcessor(EXPORTER) - PROVIDER.add_span_processor(PROCESSOR) - trace.set_tracer_provider(PROVIDER) + OTEL_RESOURCE = setup_opentelemetry(APP, VERSION) setup_logging("ETOS API", VERSION, otel_resource=OTEL_RESOURCE) - - FastAPIInstrumentor().instrument_app(APP, tracer_provider=PROVIDER, excluded_urls=".*/ping") else: setup_logging("ETOS API", VERSION) diff --git a/python/src/etos_api/library/opentelemetry.py b/python/src/etos_api/library/opentelemetry.py new file mode 100644 index 0000000..66adb4d --- /dev/null +++ b/python/src/etos_api/library/opentelemetry.py @@ -0,0 +1,85 @@ +# Copyright Axis Communications AB. +# +# For a full list of individual contributors, please see the commit history. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""ETOS opentelemetry helpers.""" + +import logging +from typing import Annotated + +from fastapi import FastAPI, Header +from opentelemetry import context as otel_context +from opentelemetry import trace +from opentelemetry.baggage.propagation import W3CBaggagePropagator +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from opentelemetry.propagate import extract, set_global_textmap +from opentelemetry.propagators.composite import CompositePropagator +from opentelemetry.sdk.resources import ( + SERVICE_NAME, + SERVICE_VERSION, + OTELResourceDetector, + ProcessResourceDetector, + Resource, + get_aggregated_resources, +) +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator +from pydantic import BaseModel + +LOGGER = logging.getLogger(__name__) + + +def setup_opentelemetry(app: FastAPI, version: str) -> Resource: + """Set up OpenTelemetry for ETOS API.""" + otel_resource = Resource.create( + { + SERVICE_NAME: "etos-api", + SERVICE_VERSION: version, + }, + ) + + otel_resource = get_aggregated_resources( + [OTELResourceDetector(), ProcessResourceDetector()], + ).merge(otel_resource) + + provider = TracerProvider(resource=otel_resource) + exporter = OTLPSpanExporter() + processor = BatchSpanProcessor(exporter) + provider.add_span_processor(processor) + trace.set_tracer_provider(provider) + propagator = CompositePropagator( + [ + TraceContextTextMapPropagator(), + W3CBaggagePropagator(), + ] + ) + set_global_textmap(propagator) + FastAPIInstrumentor().instrument_app(app, tracer_provider=provider, excluded_urls=".*/ping") + return otel_resource + + +class OTELHeaders(BaseModel): + """Headers model.""" + + baggage: str | None = None + traceparent: str | None = None + + +def context(headers: Annotated[OTELHeaders, Header()]) -> otel_context.Context: + """Extract OpenTelemetry context from headers.""" + ctx = extract(headers.model_dump(), context=otel_context.get_current()) + LOGGER.debug("Extracted context from headers: %r", ctx) + return ctx diff --git a/python/src/etos_api/main.py b/python/src/etos_api/main.py index 11337fe..a54c364 100644 --- a/python/src/etos_api/main.py +++ b/python/src/etos_api/main.py @@ -16,12 +16,13 @@ """ETOS API.""" from fastapi import FastAPI -from etos_api.routers.v0 import ETOSv0 -from etos_api.routers.v1alpha import ETOSv1Alpha -DEFAULT_VERSION = ETOSv0 +from etos_api.routers.v0 import ETOSV0 +from etos_api.routers.v1alpha import ETOSV1ALPHA + +DEFAULT_VERSION = ETOSV0 APP = FastAPI() -APP.mount("/api/v1alpha", ETOSv1Alpha, "ETOS V1 Alpha") -APP.mount("/api/v0", ETOSv0, "ETOS V0") +APP.mount("/api/v1alpha", ETOSV1ALPHA, "ETOS V1 Alpha") +APP.mount("/api/v0", ETOSV0, "ETOS V0") APP.mount("/api", DEFAULT_VERSION, "ETOS V0") diff --git a/python/src/etos_api/routers/v0/__init__.py b/python/src/etos_api/routers/v0/__init__.py index 7235df4..2e6d885 100644 --- a/python/src/etos_api/routers/v0/__init__.py +++ b/python/src/etos_api/routers/v0/__init__.py @@ -15,5 +15,5 @@ # limitations under the License. """ETOS API etos module.""" -from .router import ETOSv0 from . import schemas +from .router import ETOSV0 diff --git a/python/src/etos_api/routers/v0/router.py b/python/src/etos_api/routers/v0/router.py index b80c62e..c95c42d 100644 --- a/python/src/etos_api/routers/v0/router.py +++ b/python/src/etos_api/routers/v0/router.py @@ -17,61 +17,74 @@ import logging import os +from typing import Annotated from uuid import uuid4 from eiffellib.events import EiffelTestExecutionRecipeCollectionCreatedEvent from etos_lib import ETOS from etos_lib.kubernetes import Kubernetes -from fastapi import FastAPI, HTTPException -from starlette.responses import RedirectResponse, Response +from fastapi import Depends, FastAPI, HTTPException from kubernetes import client +from opentelemetry import baggage as otel_baggage +from opentelemetry import context as otel_context from opentelemetry import trace from opentelemetry.trace import Span +from starlette.responses import RedirectResponse, Response from etos_api.library.environment import Configuration, configure_testrun +from etos_api.library.opentelemetry import context from etos_api.library.utilities import sync_to_async from .schemas import AbortEtosResponse, StartEtosRequest, StartEtosResponse -from .utilities import wait_for_artifact_created, validate_suite +from .utilities import validate_suite, wait_for_artifact_created -ETOSv0 = FastAPI( +ETOSV0 = FastAPI( title="ETOS", version="v0", summary="API endpoints for ETOS v0 - I.e. the version before versions", root_path_in_servers=False, + dependencies=[Depends(context)], ) TRACER = trace.get_tracer("etos_api.routers.etos.router") LOGGER = logging.getLogger(__name__) logging.getLogger("pika").setLevel(logging.WARNING) +# pylint:disable=too-many-locals,too-many-statements -@ETOSv0.post("/etos", tags=["etos"], response_model=StartEtosResponse) -async def start_etos(etos: StartEtosRequest): +@ETOSV0.post("/etos", tags=["etos"], response_model=StartEtosResponse) +async def start_etos( + etos: StartEtosRequest, + ctx: Annotated[otel_context.Context, Depends(context)], +) -> dict: """Start ETOS execution on post. :param etos: ETOS pydantic model. :type etos: :obj:`etos_api.routers.etos.schemas.StartEtosRequest` + :param ctx: OpenTelemetry context with extracted headers. + :type ctx: :obj:`opentelemetry.context.Context` :return: JSON dictionary with response. :rtype: dict """ - with TRACER.start_as_current_span("start-etos") as span: - return await _start(etos, span) + with TRACER.start_as_current_span("start-etos", context=ctx) as span: + return await _start(etos, span, otel_context.get_current()) -@ETOSv0.delete("/etos/{suite_id}", tags=["etos"], response_model=AbortEtosResponse) -async def abort_etos(suite_id: str): +@ETOSV0.delete("/etos/{suite_id}", tags=["etos"], response_model=AbortEtosResponse) +async def abort_etos(suite_id: str, ctx: Annotated[otel_context.Context, Depends(context)]) -> dict: """Abort ETOS execution on delete. :param suite_id: ETOS suite id :type suite_id: str + :param ctx: OpenTelemetry context with extracted headers. + :type ctx: :obj:`opentelemetry.context.Context` :return: JSON dictionary with response. :rtype: dict """ - with TRACER.start_as_current_span("abort-etos"): + with TRACER.start_as_current_span("abort-etos", context=ctx): return await _abort(suite_id) -@ETOSv0.get("/ping", tags=["etos"], status_code=204) +@ETOSV0.get("/ping", tags=["etos"], status_code=204) async def ping(): """Ping the ETOS service in order to check if it is up and running. @@ -81,7 +94,7 @@ async def ping(): return Response(status_code=204) -@ETOSv0.get("/selftest/ping") +@ETOSV0.get("/selftest/ping") async def oldping(): """Ping the ETOS service in order to check if it is up and running. @@ -93,16 +106,20 @@ async def oldping(): return RedirectResponse("/api/ping") -async def _start(etos: StartEtosRequest, span: Span) -> dict: # pylint:disable=too-many-statements +async def _start(etos: StartEtosRequest, span: Span, ctx: otel_context.Context) -> dict: """Start ETOS execution. :param etos: ETOS pydantic model. :param span: An opentelemetry span for tracing. + :param ctx: OpenTelemetry context with extracted headers. :return: JSON dictionary with response. """ tercc = EiffelTestExecutionRecipeCollectionCreatedEvent() LOGGER.identifier.set(tercc.meta.event_id) span.set_attribute("etos.id", tercc.meta.event_id) + span.set_attribute( + "parent_activity", str(etos.parent_activity) if etos.parent_activity else "None" + ) LOGGER.info("Validating test suite.") span.set_attribute("etos.test_suite.uri", etos.test_suite_url) @@ -175,6 +192,9 @@ async def _start(etos: StartEtosRequest, span: Span) -> dict: # pylint:disable= ) from exception LOGGER.info("Environment provider configured.") + ctx = otel_baggage.set_baggage("testrun_id", tercc.meta.event_id, context=ctx) + ctx = otel_baggage.set_baggage("artifact_id", artifact_id, context=ctx) + LOGGER.info("Start event publisher.") await sync_to_async(etos_library.start_publisher) if not etos_library.debug.disable_sending_events: @@ -182,7 +202,7 @@ async def _start(etos: StartEtosRequest, span: Span) -> dict: # pylint:disable= LOGGER.info("Event published started successfully.") LOGGER.info("Publish TERCC event.") try: - event = etos_library.events.send(tercc, links, data) + event = etos_library.events.send(tercc, links, data, ctx=ctx) await sync_to_async(etos_library.publisher.wait_for_unpublished_events) finally: if not etos_library.debug.disable_sending_events: diff --git a/python/src/etos_api/routers/v1alpha/__init__.py b/python/src/etos_api/routers/v1alpha/__init__.py index e7fde71..d4daea7 100644 --- a/python/src/etos_api/routers/v1alpha/__init__.py +++ b/python/src/etos_api/routers/v1alpha/__init__.py @@ -15,5 +15,5 @@ # limitations under the License. """ETOS API testrun module.""" -from .router import ETOSv1Alpha from . import schemas +from .router import ETOSV1ALPHA diff --git a/python/src/etos_api/routers/v1alpha/router.py b/python/src/etos_api/routers/v1alpha/router.py index 9644fba..cda55ce 100644 --- a/python/src/etos_api/routers/v1alpha/router.py +++ b/python/src/etos_api/routers/v1alpha/router.py @@ -17,73 +17,81 @@ import logging import os +from typing import Annotated from uuid import uuid4 from etos_lib import ETOS -from etos_lib.kubernetes.schemas.testrun import ( - TestRun as TestRunSchema, - TestRunSpec, - Providers, - Image, - Metadata, - Retention, - TestRunner, -) -from etos_lib.kubernetes import TestRun, Environment, Kubernetes -from fastapi import FastAPI, HTTPException -from starlette.responses import Response -from opentelemetry import trace, context +from etos_lib.kubernetes import Environment, Kubernetes, TestRun +from etos_lib.kubernetes.schemas.testrun import Image, Metadata, Providers, Retention +from etos_lib.kubernetes.schemas.testrun import TestRun as TestRunSchema +from etos_lib.kubernetes.schemas.testrun import TestRunner, TestRunSpec +from fastapi import Depends, FastAPI, HTTPException +from opentelemetry import baggage as otel_baggage +from opentelemetry import context as otel_context +from opentelemetry import trace from opentelemetry.propagate import inject from opentelemetry.trace import Span +from starlette.responses import Response +from etos_api.library.opentelemetry import context from .schemas import AbortTestrunResponse, StartTestrunRequest, StartTestrunResponse from .utilities import ( - wait_for_artifact_created, - download_suite, - validate_suite, convert_to_rfc1123, + download_suite, recipes_from_tests, + validate_suite, + wait_for_artifact_created, ) -ETOSv1Alpha = FastAPI( +ETOSV1ALPHA = FastAPI( title="ETOS", version="v1alpha", summary="API endpoints for ETOS v1 Alpha", root_path_in_servers=False, + dependencies=[Depends(context)], ) TRACER = trace.get_tracer("etos_api.routers.testrun.router") LOGGER = logging.getLogger(__name__) logging.getLogger("pika").setLevel(logging.WARNING) +# pylint:disable=too-many-locals,too-many-statements -@ETOSv1Alpha.post("/testrun", tags=["etos"], response_model=StartTestrunResponse) -async def start_testrun(etos: StartTestrunRequest): +@ETOSV1ALPHA.post("/testrun", tags=["etos"], response_model=StartTestrunResponse) +async def start_testrun( + etos: StartTestrunRequest, ctx: Annotated[otel_context.Context, Depends(context)] +) -> dict: """Start ETOS testrun on post. :param etos: ETOS pydantic model. :type etos: :obj:`etos_api.routers.etos.schemas.StartTestrunRequest` + :param ctx: OpenTelemetry context with extracted headers. + :type ctx: :obj:`opentelemetry.context.Context` :return: JSON dictionary with response. :rtype: dict """ - with TRACER.start_as_current_span("start-etos") as span: - return await _create_testrun(etos, span) + with TRACER.start_as_current_span("start-etos", context=ctx) as span: + return await _create_testrun(etos, span, otel_context.get_current()) -@ETOSv1Alpha.delete("/testrun/{suite_id}", tags=["etos"], response_model=AbortTestrunResponse) -async def abort_testrun(suite_id: str): +@ETOSV1ALPHA.delete("/testrun/{suite_id}", tags=["etos"], response_model=AbortTestrunResponse) +async def abort_testrun( + suite_id: str, ctx: Annotated[otel_context.Context, Depends(context)] +) -> dict: """Abort ETOS testrun on delete. :param suite_id: ETOS suite id :type suite_id: str + :param ctx: OpenTelemetry context with extracted headers. + :type ctx: :obj:`opentelemetry.context.Context` :return: JSON dictionary with response. :rtype: dict """ - with TRACER.start_as_current_span("abort-etos"): + with TRACER.start_as_current_span("abort-etos", context=ctx): return await _abort(suite_id) -@ETOSv1Alpha.get("/testrun/{sub_suite_id}", tags=["etos"]) +@ETOSV1ALPHA.get("/testrun/{sub_suite_id}", tags=["etos"]) async def get_subsuite(sub_suite_id: str) -> dict: """Get sub suite returns the sub suite definition for the ETOS test runner. @@ -100,29 +108,18 @@ async def get_subsuite(sub_suite_id: str) -> dict: return environment_spec -@ETOSv1Alpha.get("/ping", tags=["etos"], status_code=204) +@ETOSV1ALPHA.get("/ping", tags=["etos"], status_code=204) async def health_check(): """Check the status of the API and verify the client version.""" return Response(status_code=204) -def get_current_context() -> str: - """Get the current OpenTelemetry context.""" - ctx = context.get_current() - LOGGER.info("Current OpenTelemetry context: %s", ctx) - carrier = {} - # inject() creates a dict with context reference, - # e. g. {'traceparent': '00-0be6c260d9cbe9772298eaf19cb90a5b-371353ee8fbd3ced-01'} - inject(carrier) - env = ",".join(f"{k}={v}" for k, v in carrier.items()) - return env - - -async def _create_testrun(etos: StartTestrunRequest, span: Span) -> dict: +async def _create_testrun(etos: StartTestrunRequest, span: Span, ctx: otel_context.Context) -> dict: """Create a testrun for ETOS to execute. :param etos: Testrun pydantic model. :param span: An opentelemetry span for tracing. + :param ctx: OpenTelemetry context with extracted headers. :return: JSON dictionary with response. """ testrun_id = str(uuid4()) @@ -202,6 +199,22 @@ async def _create_testrun(etos: StartTestrunRequest, span: Span) -> dict: success=os.getenv("TESTRUN_SUCCESS_RETENTION"), ) + ctx = otel_baggage.set_baggage("testrun_id", testrun_id, context=ctx) + ctx = otel_baggage.set_baggage("artifact_id", artifact_id, context=ctx) + ctx = otel_baggage.set_baggage( + "etos_cluster", os.getenv("ETOS_CLUSTER", "Unknown"), context=ctx + ) + carrier = {} + # inject() creates a dict with context reference, + # e. g. {'traceparent': '00-0be6c260d9cbe9772298eaf19cb90a5b-371353ee8fbd3ced-01'} + inject(carrier, context=ctx) + + annotations = {} + if carrier.get("traceparent"): + annotations["etos.eiffel-community.github.io/traceparent"] = carrier["traceparent"] + if carrier.get("baggage"): + annotations["etos.eiffel-community.github.io/baggage"] = carrier["baggage"] + kubernetes = Kubernetes() testrun_spec = TestRunSchema( metadata=Metadata( @@ -211,9 +224,7 @@ async def _create_testrun(etos: StartTestrunRequest, span: Span) -> dict: "etos.eiffel-community.github.io/id": testrun_id, "etos.eiffel-community.github.io/cluster": os.getenv("ETOS_CLUSTER", "Unknown"), }, - annotations={ - "etos.eiffel-community.github.io/traceparent": get_current_context(), - }, + annotations=annotations, ), spec=TestRunSpec( cluster=os.getenv("ETOS_CLUSTER", "Unknown"),