Skip to content

Commit 727f297

Browse files
committed
use body attrs rather than headesr
1 parent ef774a6 commit 727f297

File tree

9 files changed

+100
-9
lines changed

9 files changed

+100
-9
lines changed

airbyte_cdk/manifest_server/api_models/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
DiscoverResponse,
1313
FullResolveRequest,
1414
ManifestResponse,
15+
RequestContext,
1516
ResolveRequest,
1617
StreamTestReadRequest,
1718
)
@@ -30,6 +31,7 @@
3031
"ConnectorConfig",
3132
"Manifest",
3233
# Manifest request/response models
34+
"RequestContext",
3335
"FullResolveRequest",
3436
"ManifestResponse",
3537
"StreamTestReadRequest",

airbyte_cdk/manifest_server/api_models/manifest.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,13 @@
1313
from .dicts import ConnectorConfig, Manifest
1414

1515

16+
class RequestContext(BaseModel):
17+
"""Optional context information for tracing and observability."""
18+
19+
workspace_id: Optional[str] = None
20+
project_id: Optional[str] = None
21+
22+
1623
class StreamTestReadRequest(BaseModel):
1724
"""Request to test read from a specific stream."""
1825

@@ -24,13 +31,15 @@ class StreamTestReadRequest(BaseModel):
2431
record_limit: int = Field(default=100, ge=1, le=5000)
2532
page_limit: int = Field(default=5, ge=1, le=20)
2633
slice_limit: int = Field(default=5, ge=1, le=20)
34+
context: Optional[RequestContext] = None
2735

2836

2937
class CheckRequest(BaseModel):
3038
"""Request to check a manifest."""
3139

3240
manifest: Manifest
3341
config: ConnectorConfig
42+
context: Optional[RequestContext] = None
3443

3544

3645
class CheckResponse(BaseModel):
@@ -45,6 +54,7 @@ class DiscoverRequest(BaseModel):
4554

4655
manifest: Manifest
4756
config: ConnectorConfig
57+
context: Optional[RequestContext] = None
4858

4959

5060
class DiscoverResponse(BaseModel):
@@ -57,6 +67,7 @@ class ResolveRequest(BaseModel):
5767
"""Request to resolve a manifest."""
5868

5969
manifest: Manifest
70+
context: Optional[RequestContext] = None
6071

6172

6273
class ManifestResponse(BaseModel):
@@ -71,3 +82,4 @@ class FullResolveRequest(BaseModel):
7182
manifest: Manifest
7283
config: ConnectorConfig
7384
stream_limit: int = Field(default=100, ge=1, le=100)
85+
context: Optional[RequestContext] = None

airbyte_cdk/manifest_server/dependencies/tracing.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,15 @@
99
from typing import Optional
1010

1111
import ddtrace
12-
from fastapi import Header
1312

1413
logger = logging.getLogger(__name__)
1514

1615

17-
def apply_trace_tags(
18-
workspace_id: Optional[str] = Header(None, alias="x-workspace-id"),
19-
project_id: Optional[str] = Header(None, alias="x-project-id"),
16+
def apply_trace_tags_from_context(
17+
workspace_id: Optional[str] = None,
18+
project_id: Optional[str] = None,
2019
) -> None:
21-
"""FastAPI dependency to apply trace tags from headers to the current span."""
20+
"""Apply trace tags from context to the current span."""
2221
if not workspace_id and not project_id:
2322
return
2423

File renamed without changes.
File renamed without changes.
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
"""
2+
FastAPI dependencies for the Manifest Server.
3+
4+
This module contains reusable FastAPI dependencies that can be used across
5+
different routers in the manifest server.
6+
"""
7+
8+
import logging
9+
from typing import Optional
10+
11+
import ddtrace
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
def apply_trace_tags_from_context(
17+
workspace_id: Optional[str] = None,
18+
project_id: Optional[str] = None,
19+
) -> None:
20+
"""Apply trace tags from context to the current span."""
21+
if not workspace_id and not project_id:
22+
return
23+
24+
# Log the trace IDs for observability
25+
log_parts = []
26+
if workspace_id:
27+
log_parts.append(f"workspace_id={workspace_id}")
28+
if project_id:
29+
log_parts.append(f"project_id={project_id}")
30+
31+
if log_parts:
32+
logger.info(f"Processing request with trace tags: {', '.join(log_parts)}")
33+
34+
try:
35+
span = ddtrace.tracer.current_span()
36+
if span:
37+
if workspace_id:
38+
span.set_tag("workspace_id", workspace_id)
39+
if project_id:
40+
span.set_tag("project_id", project_id)
41+
except Exception:
42+
# Silently ignore any ddtrace-related errors (e.g. if ddtrace.auto wasn't run)
43+
pass

airbyte_cdk/manifest_server/routers/manifest.py

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
)
3030
from ..command_processor.processor import ManifestCommandProcessor
3131
from ..command_processor.utils import build_catalog, build_source
32-
from ..dependencies.auth import verify_jwt_token
33-
from ..dependencies.tracing import apply_trace_tags
32+
from ..helpers.auth import verify_jwt_token
33+
from ..helpers.tracing import apply_trace_tags_from_context
3434

3535

3636
def safe_build_source(
@@ -60,7 +60,7 @@ def safe_build_source(
6060
router = APIRouter(
6161
prefix="/manifest",
6262
tags=["manifest"],
63-
dependencies=[Depends(verify_jwt_token), Depends(apply_trace_tags)],
63+
dependencies=[Depends(verify_jwt_token)],
6464
)
6565

6666

@@ -69,6 +69,13 @@ def test_read(request: StreamTestReadRequest) -> StreamReadResponse:
6969
"""
7070
Test reading from a specific stream in the manifest.
7171
"""
72+
# Apply trace tags from context if provided
73+
if request.context:
74+
apply_trace_tags_from_context(
75+
workspace_id=request.context.workspace_id,
76+
project_id=request.context.project_id,
77+
)
78+
7279
config_dict = request.config.model_dump()
7380

7481
catalog = build_catalog(request.stream_name)
@@ -105,6 +112,13 @@ def test_read(request: StreamTestReadRequest) -> StreamReadResponse:
105112
@router.post("/check", operation_id="check")
106113
def check(request: CheckRequest) -> CheckResponse:
107114
"""Check configuration against a manifest"""
115+
# Apply trace tags from context if provided
116+
if request.context:
117+
apply_trace_tags_from_context(
118+
workspace_id=request.context.workspace_id,
119+
project_id=request.context.project_id,
120+
)
121+
108122
source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
109123
runner = ManifestCommandProcessor(source)
110124
success, message = runner.check_connection(request.config.model_dump())
@@ -114,6 +128,13 @@ def check(request: CheckRequest) -> CheckResponse:
114128
@router.post("/discover", operation_id="discover")
115129
def discover(request: DiscoverRequest) -> DiscoverResponse:
116130
"""Discover streams from a manifest"""
131+
# Apply trace tags from context if provided
132+
if request.context:
133+
apply_trace_tags_from_context(
134+
workspace_id=request.context.workspace_id,
135+
project_id=request.context.project_id,
136+
)
137+
117138
source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
118139
runner = ManifestCommandProcessor(source)
119140
catalog = runner.discover(request.config.model_dump())
@@ -125,6 +146,13 @@ def discover(request: DiscoverRequest) -> DiscoverResponse:
125146
@router.post("/resolve", operation_id="resolve")
126147
def resolve(request: ResolveRequest) -> ManifestResponse:
127148
"""Resolve a manifest to its final configuration."""
149+
# Apply trace tags from context if provided
150+
if request.context:
151+
apply_trace_tags_from_context(
152+
workspace_id=request.context.workspace_id,
153+
project_id=request.context.project_id,
154+
)
155+
128156
source = safe_build_source(request.manifest.model_dump(), {})
129157
return ManifestResponse(manifest=Manifest(**source.resolved_manifest))
130158

@@ -136,6 +164,13 @@ def full_resolve(request: FullResolveRequest) -> ManifestResponse:
136164
137165
This is a similar operation to resolve, but has an extra step which generates streams from dynamic stream templates if the manifest contains any. This is used when a user clicks the generate streams button on a stream template in the Builder UI
138166
"""
167+
# Apply trace tags from context if provided
168+
if request.context:
169+
apply_trace_tags_from_context(
170+
workspace_id=request.context.workspace_id,
171+
project_id=request.context.project_id,
172+
)
173+
139174
source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
140175
manifest = {**source.resolved_manifest}
141176
streams = manifest.get("streams", [])
File renamed without changes.

unit_tests/manifest_server/dependencies/test_auth.py renamed to unit_tests/manifest_server/helpers/test_auth.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from fastapi import HTTPException
88
from fastapi.security import HTTPAuthorizationCredentials
99

10-
from airbyte_cdk.manifest_server.dependencies.auth import verify_jwt_token
10+
from airbyte_cdk.manifest_server.helpers.auth import verify_jwt_token
1111

1212

1313
class TestVerifyJwtToken:

0 commit comments

Comments
 (0)