Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
fc2a2e6
ensure that the context used to detach the token is the same as what …
VegetarianOrc Oct 11, 2025
7a51529
Update test to use LogCapturer helper. Move shared test workflow and …
VegetarianOrc Oct 13, 2025
1b8d9e1
run formatter
VegetarianOrc Oct 13, 2025
b656e84
Fix up test with log capturer. Only log warnings if there are no hook…
VegetarianOrc Oct 13, 2025
5abc5a4
run formatter
VegetarianOrc Oct 13, 2025
d1bf879
Move to a class based context manager to manually manage context
VegetarianOrc Oct 14, 2025
6ca781c
apply context management to openai_agents tracing
VegetarianOrc Oct 16, 2025
6060b35
use 3.11 create_task when possible to avoid an extra context copy
VegetarianOrc Oct 16, 2025
13e99a3
run formatter
VegetarianOrc Oct 16, 2025
853f603
fix typing lint errors
VegetarianOrc Oct 16, 2025
7ae97d0
Fix a few more typing errors
VegetarianOrc Oct 16, 2025
aaccb00
Use a lambda to wrap task creation to appease the type linter
VegetarianOrc Oct 16, 2025
6deb5b1
Merge branch 'main' into otel/context-detach
VegetarianOrc Oct 16, 2025
fe8b0a5
revert manual context management explorations
VegetarianOrc Oct 17, 2025
2274ba4
Add comment explaining the check. Use to ensure that the context is …
VegetarianOrc Oct 17, 2025
460ed71
use original variable name
VegetarianOrc Oct 17, 2025
83ae1fb
Fix typo
VegetarianOrc Oct 17, 2025
86b6951
Merge branch 'main' into otel/context-detach
VegetarianOrc Oct 20, 2025
1146a8c
merge main
VegetarianOrc Oct 20, 2025
da45161
Merge branch 'main' into otel/context-detach
VegetarianOrc Oct 20, 2025
c053aa6
Add some logs to help debug test flaking with timeouts
VegetarianOrc Oct 21, 2025
6e23365
apply formatting
VegetarianOrc Oct 21, 2025
db3bb0e
Revert "apply formatting"
VegetarianOrc Oct 21, 2025
50cbd42
Revert "Add some logs to help debug test flaking with timeouts"
VegetarianOrc Oct 21, 2025
c806d6b
move safe detach test to a model that forces __exit__ on a different …
VegetarianOrc Oct 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 45 additions & 27 deletions temporalio/contrib/openai_agents/_trace_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

from __future__ import annotations

import asyncio
import contextvars
import random
import sys
import uuid
from contextlib import contextmanager
from typing import Any, Mapping, Optional, Protocol, Type
Expand Down Expand Up @@ -400,48 +403,63 @@ async def signal_external_workflow(
def start_activity(
self, input: temporalio.worker.StartActivityInput
) -> temporalio.workflow.ActivityHandle:
trace = get_trace_provider().get_current_trace()
span: Optional[Span] = None
if trace:
span = custom_span(
name="temporal:startActivity", data={"activity": input.activity}
)
span.start(mark_as_current=True)

set_header_from_context(input, temporalio.workflow.payload_converter())
handle = self.next.start_activity(input)
ctx = contextvars.copy_context()
span = ctx.run(
self._create_span,
name="temporal:startActivity",
data={"activity": input.activity},
input=input,
)
handle = ctx.run(self.next.start_activity, input)
if span:
handle.add_done_callback(lambda _: span.finish()) # type: ignore
handle.add_done_callback(lambda _: ctx.run(span.finish)) # type: ignore
return handle

async def start_child_workflow(
self, input: temporalio.worker.StartChildWorkflowInput
) -> temporalio.workflow.ChildWorkflowHandle:
trace = get_trace_provider().get_current_trace()
span: Optional[Span] = None
if trace:
span = custom_span(
name="temporal:startChildWorkflow", data={"workflow": input.workflow}
ctx = contextvars.copy_context()
span = ctx.run(
self._create_span,
name="temporal:startChildWorkflow",
data={"workflow": input.workflow},
input=input,
)
if sys.version_info >= (3, 11):
handle: temporalio.workflow.ChildWorkflowHandle = await asyncio.create_task(
self.next.start_child_workflow(input), context=ctx
)
else:
handle: temporalio.workflow.ChildWorkflowHandle = await ctx.run(
lambda: asyncio.create_task(self.next.start_child_workflow(input))
)
span.start(mark_as_current=True)
set_header_from_context(input, temporalio.workflow.payload_converter())
handle = await self.next.start_child_workflow(input)
if span:
handle.add_done_callback(lambda _: span.finish()) # type: ignore
handle.add_done_callback(lambda _: ctx.run(span.finish)) # type: ignore
return handle

def start_local_activity(
self, input: temporalio.worker.StartLocalActivityInput
) -> temporalio.workflow.ActivityHandle:
ctx = contextvars.copy_context()
span = ctx.run(
self._create_span,
name="temporal:startLocalActivity",
data={"activity": input.activity},
input=input,
)
handle = ctx.run(self.next.start_local_activity, input)
if span:
handle.add_done_callback(lambda _: ctx.run(span.finish)) # type: ignore
return handle

def _create_span(
self, name: str, data: dict[str, Any], input: _InputWithHeaders
) -> Optional[Span]:
trace = get_trace_provider().get_current_trace()
span: Optional[Span] = None
if trace:
span = custom_span(
name="temporal:startLocalActivity", data={"activity": input.activity}
)
span = custom_span(name=name, data=data)
span.start(mark_as_current=True)

set_header_from_context(input, temporalio.workflow.payload_converter())
handle = self.next.start_local_activity(input)
if span:
handle.add_done_callback(lambda _: span.finish()) # type: ignore
return handle
return span
Loading
Loading