|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
| 3 | +import asyncio |
3 | 4 | import dataclasses |
4 | 5 | import inspect |
5 | 6 | import uuid |
@@ -52,18 +53,23 @@ class TraceData: |
52 | 53 |
|
53 | 54 | @activity.defn |
54 | 55 | async def passthrough_activity(input: TraceData) -> TraceData: |
| 56 | + activity.heartbeat(input) |
| 57 | + # Wait for the heartbeat to be processed so that it modifies the data before the activity returns |
| 58 | + await asyncio.sleep(0.2) |
55 | 59 | return input |
56 | 60 |
|
57 | 61 |
|
58 | 62 | @workflow.defn(sandboxed=False) # we want to use isinstance |
59 | 63 | class SerializationContextTestWorkflow: |
60 | 64 | @workflow.run |
61 | | - async def run(self, input: TraceData) -> TraceData: |
62 | | - return await workflow.execute_activity( |
| 65 | + async def run(self, data: TraceData) -> TraceData: |
| 66 | + data = await workflow.execute_activity( |
63 | 67 | passthrough_activity, |
64 | | - input, |
| 68 | + data, |
65 | 69 | start_to_close_timeout=timedelta(seconds=10), |
| 70 | + heartbeat_timeout=timedelta(seconds=2), |
66 | 71 | ) |
| 72 | + return data |
67 | 73 |
|
68 | 74 |
|
69 | 75 | class SerializationContextTestEncodingPayloadConverter( |
@@ -207,49 +213,55 @@ async def test_workflow_payload_conversion_can_be_given_access_to_serialization_ |
207 | 213 | context_type="workflow", |
208 | 214 | in_workflow=False, |
209 | 215 | method="to_payload", |
210 | | - context=workflow_context, |
| 216 | + context=workflow_context, # Outbound workflow input |
211 | 217 | ), |
212 | 218 | TraceItem( |
213 | 219 | context_type="workflow", |
214 | 220 | in_workflow=False, |
215 | 221 | method="from_payload", |
216 | | - context=workflow_context, |
| 222 | + context=workflow_context, # Inbound workflow input |
217 | 223 | ), |
218 | 224 | TraceItem( |
219 | 225 | context_type="activity", |
220 | 226 | in_workflow=True, |
221 | 227 | method="to_payload", |
222 | | - context=activity_context, |
| 228 | + context=activity_context, # Outbound activity input |
223 | 229 | ), |
224 | 230 | TraceItem( |
225 | 231 | context_type="activity", |
226 | 232 | in_workflow=False, |
227 | 233 | method="from_payload", |
228 | | - context=activity_context, |
| 234 | + context=activity_context, # Inbound activity input |
229 | 235 | ), |
230 | 236 | TraceItem( |
231 | 237 | context_type="activity", |
232 | 238 | in_workflow=False, |
233 | 239 | method="to_payload", |
234 | | - context=activity_context, |
| 240 | + context=activity_context, # Outbound heartbeat |
| 241 | + ), |
| 242 | + TraceItem( |
| 243 | + context_type="activity", |
| 244 | + in_workflow=False, |
| 245 | + method="to_payload", |
| 246 | + context=activity_context, # Outbound activity result |
235 | 247 | ), |
236 | 248 | TraceItem( |
237 | 249 | context_type="activity", |
238 | 250 | in_workflow=False, |
239 | 251 | method="from_payload", |
240 | | - context=activity_context, |
| 252 | + context=activity_context, # Inbound activity result |
241 | 253 | ), |
242 | 254 | TraceItem( |
243 | 255 | context_type="workflow", |
244 | 256 | in_workflow=True, |
245 | 257 | method="to_payload", |
246 | | - context=workflow_context, |
| 258 | + context=workflow_context, # Outbound workflow result |
247 | 259 | ), |
248 | 260 | TraceItem( |
249 | 261 | context_type="workflow", |
250 | 262 | in_workflow=False, |
251 | 263 | method="from_payload", |
252 | | - context=workflow_context, |
| 264 | + context=workflow_context, # Inbound workflow result |
253 | 265 | ), |
254 | 266 | ], |
255 | 267 | ) |
|
0 commit comments