diff --git a/airbyte_cdk/manifest_server/api_models/__init__.py b/airbyte_cdk/manifest_server/api_models/__init__.py index 3cd942dc4..3469fd7e3 100644 --- a/airbyte_cdk/manifest_server/api_models/__init__.py +++ b/airbyte_cdk/manifest_server/api_models/__init__.py @@ -12,6 +12,7 @@ DiscoverResponse, FullResolveRequest, ManifestResponse, + RequestContext, ResolveRequest, StreamTestReadRequest, ) @@ -30,6 +31,7 @@ "ConnectorConfig", "Manifest", # Manifest request/response models + "RequestContext", "FullResolveRequest", "ManifestResponse", "StreamTestReadRequest", diff --git a/airbyte_cdk/manifest_server/api_models/manifest.py b/airbyte_cdk/manifest_server/api_models/manifest.py index a13189763..a17ac9c63 100644 --- a/airbyte_cdk/manifest_server/api_models/manifest.py +++ b/airbyte_cdk/manifest_server/api_models/manifest.py @@ -13,6 +13,13 @@ from .dicts import ConnectorConfig, Manifest +class RequestContext(BaseModel): + """Optional context information for tracing and observability.""" + + workspace_id: Optional[str] = None + project_id: Optional[str] = None + + class StreamTestReadRequest(BaseModel): """Request to test read from a specific stream.""" @@ -24,6 +31,7 @@ class StreamTestReadRequest(BaseModel): record_limit: int = Field(default=100, ge=1, le=5000) page_limit: int = Field(default=5, ge=1, le=20) slice_limit: int = Field(default=5, ge=1, le=20) + context: Optional[RequestContext] = None class CheckRequest(BaseModel): @@ -31,6 +39,7 @@ class CheckRequest(BaseModel): manifest: Manifest config: ConnectorConfig + context: Optional[RequestContext] = None class CheckResponse(BaseModel): @@ -45,6 +54,7 @@ class DiscoverRequest(BaseModel): manifest: Manifest config: ConnectorConfig + context: Optional[RequestContext] = None class DiscoverResponse(BaseModel): @@ -57,6 +67,7 @@ class ResolveRequest(BaseModel): """Request to resolve a manifest.""" manifest: Manifest + context: Optional[RequestContext] = None class ManifestResponse(BaseModel): @@ -71,3 +82,4 @@ class FullResolveRequest(BaseModel): manifest: Manifest config: ConnectorConfig stream_limit: int = Field(default=100, ge=1, le=100) + context: Optional[RequestContext] = None diff --git a/airbyte_cdk/manifest_server/helpers/__init__.py b/airbyte_cdk/manifest_server/helpers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/airbyte_cdk/manifest_server/auth.py b/airbyte_cdk/manifest_server/helpers/auth.py similarity index 100% rename from airbyte_cdk/manifest_server/auth.py rename to airbyte_cdk/manifest_server/helpers/auth.py diff --git a/airbyte_cdk/manifest_server/helpers/tracing.py b/airbyte_cdk/manifest_server/helpers/tracing.py new file mode 100644 index 000000000..ef36f2f1c --- /dev/null +++ b/airbyte_cdk/manifest_server/helpers/tracing.py @@ -0,0 +1,36 @@ +import logging +from typing import Optional + +import ddtrace + +logger = logging.getLogger(__name__) + + +def apply_trace_tags_from_context( + workspace_id: Optional[str] = None, + project_id: Optional[str] = None, +) -> None: + """Apply trace tags from context to the current span.""" + if not workspace_id and not project_id: + return + + # Log the trace IDs for observability + log_parts = [] + if workspace_id: + log_parts.append(f"workspace_id={workspace_id}") + if project_id: + log_parts.append(f"project_id={project_id}") + + if log_parts: + logger.info(f"Processing request with trace tags: {', '.join(log_parts)}") + + try: + span = ddtrace.tracer.current_span() + if span: + if workspace_id: + span.set_tag("workspace_id", workspace_id) + if project_id: + span.set_tag("project_id", project_id) + except Exception: + # Silently ignore any ddtrace-related errors (e.g. if ddtrace.auto wasn't run) + pass diff --git a/airbyte_cdk/manifest_server/routers/manifest.py b/airbyte_cdk/manifest_server/routers/manifest.py index 035058ec1..4fefb2129 100644 --- a/airbyte_cdk/manifest_server/routers/manifest.py +++ b/airbyte_cdk/manifest_server/routers/manifest.py @@ -27,9 +27,10 @@ StreamReadResponse, StreamTestReadRequest, ) -from ..auth import verify_jwt_token from ..command_processor.processor import ManifestCommandProcessor from ..command_processor.utils import build_catalog, build_source +from ..helpers.auth import verify_jwt_token +from ..helpers.tracing import apply_trace_tags_from_context def safe_build_source( @@ -68,6 +69,13 @@ def test_read(request: StreamTestReadRequest) -> StreamReadResponse: """ Test reading from a specific stream in the manifest. """ + # Apply trace tags from context if provided + if request.context: + apply_trace_tags_from_context( + workspace_id=request.context.workspace_id, + project_id=request.context.project_id, + ) + config_dict = request.config.model_dump() catalog = build_catalog(request.stream_name) @@ -104,6 +112,13 @@ def test_read(request: StreamTestReadRequest) -> StreamReadResponse: @router.post("/check", operation_id="check") def check(request: CheckRequest) -> CheckResponse: """Check configuration against a manifest""" + # Apply trace tags from context if provided + if request.context: + apply_trace_tags_from_context( + workspace_id=request.context.workspace_id, + project_id=request.context.project_id, + ) + source = safe_build_source(request.manifest.model_dump(), request.config.model_dump()) runner = ManifestCommandProcessor(source) success, message = runner.check_connection(request.config.model_dump()) @@ -113,6 +128,13 @@ def check(request: CheckRequest) -> CheckResponse: @router.post("/discover", operation_id="discover") def discover(request: DiscoverRequest) -> DiscoverResponse: """Discover streams from a manifest""" + # Apply trace tags from context if provided + if request.context: + apply_trace_tags_from_context( + workspace_id=request.context.workspace_id, + project_id=request.context.project_id, + ) + source = safe_build_source(request.manifest.model_dump(), request.config.model_dump()) runner = ManifestCommandProcessor(source) catalog = runner.discover(request.config.model_dump()) @@ -124,6 +146,13 @@ def discover(request: DiscoverRequest) -> DiscoverResponse: @router.post("/resolve", operation_id="resolve") def resolve(request: ResolveRequest) -> ManifestResponse: """Resolve a manifest to its final configuration.""" + # Apply trace tags from context if provided + if request.context: + apply_trace_tags_from_context( + workspace_id=request.context.workspace_id, + project_id=request.context.project_id, + ) + source = safe_build_source(request.manifest.model_dump(), {}) return ManifestResponse(manifest=Manifest(**source.resolved_manifest)) @@ -135,6 +164,13 @@ def full_resolve(request: FullResolveRequest) -> ManifestResponse: This is a similar operation to resolve, but has an extra step which generates streams from dynamic stream templates if the manifest contains any. This is used when a user clicks the generate streams button on a stream template in the Builder UI """ + # Apply trace tags from context if provided + if request.context: + apply_trace_tags_from_context( + workspace_id=request.context.workspace_id, + project_id=request.context.project_id, + ) + source = safe_build_source(request.manifest.model_dump(), request.config.model_dump()) manifest = {**source.resolved_manifest} streams = manifest.get("streams", []) diff --git a/unit_tests/manifest_server/helpers/__init__.py b/unit_tests/manifest_server/helpers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/unit_tests/manifest_server/test_auth.py b/unit_tests/manifest_server/helpers/test_auth.py similarity index 98% rename from unit_tests/manifest_server/test_auth.py rename to unit_tests/manifest_server/helpers/test_auth.py index d9a439522..4d48a070f 100644 --- a/unit_tests/manifest_server/test_auth.py +++ b/unit_tests/manifest_server/helpers/test_auth.py @@ -7,7 +7,7 @@ from fastapi import HTTPException from fastapi.security import HTTPAuthorizationCredentials -from airbyte_cdk.manifest_server.auth import verify_jwt_token +from airbyte_cdk.manifest_server.helpers.auth import verify_jwt_token class TestVerifyJwtToken: