Skip to content

Commit 6f22c72

Browse files
authored
Contextual Request ID for flyte requests (#531)
Signed-off-by: Ketan Umare <kumare3@users.noreply.github.com> Co-authored-by: Ketan Umare <kumare3@users.noreply.github.com>
1 parent eb431d5 commit 6f22c72

File tree

7 files changed

+367
-12
lines changed

7 files changed

+367
-12
lines changed

src/flyte/_internal/controllers/remote/_core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ async def _bg_launch(self, action: Action):
382382
e.code().name, f"Precondition failed: {e.details()}"
383383
) from e
384384
# For all other errors, we will retry with backoff
385-
logger.exception(
385+
logger.error(
386386
f"Failed to launch action: {action.name}, Code: {e.code()}, "
387387
f"Details {e.details()} backing off..."
388388
)

src/flyte/_internal/controllers/remote/_informer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,11 +248,11 @@ async def watch(self):
248248
logger.info(f"Watch cancelled: {self.name}")
249249
return
250250
except asyncio.TimeoutError as e:
251-
logger.error(f"Watch timeout: {self.name}", exc_info=e)
251+
logger.error(f"Watch timeout: {self.name}, {e}", exc_info=False)
252252
last_exc = e
253253
retries += 1
254254
except grpc.aio.AioRpcError as e:
255-
logger.exception(f"RPC error: {self.name}", exc_info=e)
255+
logger.error(f"RPC error: {self.name}, {e}", exc_info=False)
256256
last_exc = e
257257
retries += 1
258258
except Exception as e:

src/flyte/models.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,20 @@ def new_sub_action_from(self, task_call_seq: int, task_hash: str, input_hash: st
7777
new_name = base36_encode(bytes_digest)
7878
return self.new_sub_action(new_name)
7979

80+
def unique_id_str(self, salt: str | None = None) -> str:
81+
"""
82+
Generate a unique ID string for this action in the format:
83+
{project}-{domain}-{run_name}-{action_name}
84+
85+
This is optimized for performance assuming all fields are available.
86+
87+
:return: A unique ID string
88+
"""
89+
v = f"{self.project}-{self.domain}-{self.run_name}-{self.name}"
90+
if salt is not None:
91+
return f"{v}-{salt}"
92+
return v
93+
8094

8195
@rich.repr.auto
8296
@dataclass

src/flyte/remote/_client/auth/_channel.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ async def create_channel(
159159
DefaultMetadataUnaryUnaryInterceptor,
160160
)
161161

162-
# Add all types of default metadata interceptors
162+
# Add all types of default metadata interceptors (includes x-request-id)
163163
interceptors: typing.List[grpc.aio.ClientInterceptor] = [
164164
DefaultMetadataUnaryUnaryInterceptor(),
165165
DefaultMetadataUnaryStreamInterceptor(),

src/flyte/remote/_client/auth/_grpc_utils/default_metadata_interceptor.py

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,36 @@
1+
import random
2+
import string
13
import typing
4+
from uuid import uuid4
25

36
import grpc.aio
47
from grpc.aio import ClientCallDetails, Metadata
58

9+
import flyte
10+
611
_default_metadata = Metadata(("accept", "application/grpc"))
12+
ALPHABET = string.ascii_lowercase + string.digits
13+
14+
15+
def fast_short_id():
16+
return "".join(random.choices(ALPHABET, k=4))
17+
18+
19+
def _generate_request_id() -> str:
20+
"""
21+
Generate a request ID based on the current Flyte context.
22+
23+
If running within a Flyte task context, creates a request ID using the action's unique_id_str method.
24+
Otherwise, falls back to a UUID4.
25+
26+
:return: The generated request ID string
27+
"""
28+
ctx = flyte.ctx()
29+
if ctx and ctx.action:
30+
return ctx.action.unique_id_str(salt=fast_short_id())
31+
32+
# Fall back to UUID4 if context is not available
33+
return str(uuid4())
734

835

936
def with_metadata(call_details: ClientCallDetails, new_metadata: Metadata) -> ClientCallDetails:
@@ -27,20 +54,27 @@ def with_metadata(call_details: ClientCallDetails, new_metadata: Metadata) -> Cl
2754
class _BaseDefaultMetadataInterceptor:
2855
"""
2956
Base class for all default metadata interceptors that provides common functionality.
57+
Injects both default metadata (accept header) and x-request-id.
3058
"""
3159

3260
async def _inject_default_metadata(self, call_details: grpc.aio.ClientCallDetails):
3361
"""
34-
Injects default metadata into the client call details.
62+
Injects default metadata and request ID into the client call details.
3563
36-
This method adds all key-value pairs from the default metadata dictionary to the
37-
client call details metadata. If the client call details don't have metadata,
38-
a new Metadata object is created.
64+
This method adds:
65+
- Default metadata (accept: application/grpc)
66+
- x-request-id header with context-based or UUID4 value
3967
4068
:param call_details: The client call details to inject metadata into
4169
:return: A new ClientCallDetails object with the injected metadata
4270
"""
43-
return with_metadata(call_details, _default_metadata)
71+
# Generate request ID and combine with default metadata
72+
request_id = _generate_request_id()
73+
combined_metadata = Metadata(
74+
("accept", "application/grpc"),
75+
("x-request-id", request_id),
76+
)
77+
return with_metadata(call_details, combined_metadata)
4478

4579

4680
class DefaultMetadataUnaryUnaryInterceptor(_BaseDefaultMetadataInterceptor, grpc.aio.UnaryUnaryClientInterceptor):

0 commit comments

Comments
 (0)