Skip to content

Commit 56cd7b4

Browse files
committed
Logging wrapping / error testing
1 parent 490dc21 commit 56cd7b4

File tree

4 files changed

+179
-31
lines changed

4 files changed

+179
-31
lines changed

temporalio/bridge/src/worker.rs

Lines changed: 46 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use anyhow::Context;
2+
use log::error;
23
use prost::Message;
34
use pyo3::exceptions::{PyException, PyRuntimeError, PyValueError};
45
use pyo3::prelude::*;
@@ -12,7 +13,7 @@ use temporal_sdk_core::api::errors::{PollActivityError, PollWfError};
1213
use temporal_sdk_core::replay::{HistoryForReplay, ReplayWorkerInput};
1314
use temporal_sdk_core_api::errors::WorkflowErrorType;
1415
use temporal_sdk_core_api::worker::{
15-
SlotInfo, SlotInfoTrait, SlotKind, SlotMarkUsedContext, SlotReleaseContext,
16+
SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, SlotMarkUsedContext, SlotReleaseContext,
1617
SlotReservationContext, SlotSupplier as SlotSupplierTrait, SlotSupplierPermit,
1718
};
1819
use temporal_sdk_core_api::Worker;
@@ -87,7 +88,7 @@ pub struct ResourceBasedSlotSupplier {
8788
#[pyclass]
8889
pub struct SlotReserveCtx {
8990
#[pyo3(get)]
90-
pub slot_type: String, // TODO: Real type
91+
pub slot_type: String,
9192
#[pyo3(get)]
9293
pub task_queue: String,
9394
#[pyo3(get)]
@@ -99,9 +100,13 @@ pub struct SlotReserveCtx {
99100
}
100101

101102
impl SlotReserveCtx {
102-
fn from_ctx(slot_type: String, ctx: &dyn SlotReservationContext) -> Self {
103+
fn from_ctx(slot_type: SlotKindType, ctx: &dyn SlotReservationContext) -> Self {
103104
SlotReserveCtx {
104-
slot_type,
105+
slot_type: match slot_type {
106+
SlotKindType::Workflow => "workflow".to_string(),
107+
SlotKindType::Activity => "activity".to_string(),
108+
SlotKindType::LocalActivity => "local-activity".to_string(),
109+
},
105110
task_queue: ctx.task_queue().to_string(),
106111
worker_identity: ctx.worker_identity().to_string(),
107112
worker_build_id: ctx.worker_build_id().to_string(),
@@ -191,26 +196,32 @@ impl<SK: SlotKind + Send + Sync> SlotSupplierTrait for CustomSlotSupplierOfType<
191196

192197
async fn reserve_slot(&self, ctx: &dyn SlotReservationContext) -> SlotSupplierPermit {
193198
loop {
194-
let pypermit = Python::with_gil(|py| {
199+
let pypermit = match Python::with_gil(|py| {
195200
let py_obj = self.inner.as_ref(py);
196201
let called = py_obj.call_method1(
197202
"reserve_slot",
198-
(SlotReserveCtx::from_ctx(
199-
Self::SlotKind::kind().to_string(),
200-
ctx,
201-
),),
203+
(SlotReserveCtx::from_ctx(Self::SlotKind::kind(), ctx),),
202204
)?;
203205
runtime::THREAD_TASK_LOCAL
204206
.with(|tl| pyo3_asyncio::into_future_with_locals(tl.get().unwrap(), called))
205-
})
206-
.expect("TODO")
207+
}) {
208+
Ok(f) => f,
209+
Err(e) => {
210+
error!(
211+
"Unexpected error in custom slot supplier `reserve_slot`: {}",
212+
e
213+
);
214+
continue;
215+
}
216+
}
207217
.await;
208218
match pypermit {
209219
Ok(p) => {
210220
return SlotSupplierPermit::with_user_data(p);
211221
}
212-
Err(e) => {
213-
dbg!("Error in reserve_slot", e);
222+
Err(_) => {
223+
// This is a user thrown error, re-raised by the logging wrapper so we can
224+
// loop, so do that.
214225
}
215226
}
216227
}
@@ -221,22 +232,25 @@ impl<SK: SlotKind + Send + Sync> SlotSupplierTrait for CustomSlotSupplierOfType<
221232
let py_obj = self.inner.as_ref(py);
222233
let pa = py_obj.call_method1(
223234
"try_reserve_slot",
224-
(SlotReserveCtx::from_ctx(
225-
Self::SlotKind::kind().to_string(),
226-
ctx,
227-
),),
235+
(SlotReserveCtx::from_ctx(Self::SlotKind::kind(), ctx),),
228236
)?;
229237

230238
if pa.is_none() {
231239
return Ok(None);
232240
}
233241
PyResult::Ok(Some(SlotSupplierPermit::with_user_data(pa.into_py(py))))
234242
})
235-
.expect("TODO")
243+
.unwrap_or_else(|e| {
244+
error!(
245+
"Uncaught error in custom slot supplier `try_reserve_slot`: {}",
246+
e
247+
);
248+
None
249+
})
236250
}
237251

238252
fn mark_slot_used(&self, ctx: &dyn SlotMarkUsedContext<SlotKind = Self::SlotKind>) {
239-
Python::with_gil(|py| {
253+
if let Err(e) = Python::with_gil(|py| {
240254
let permit = ctx
241255
.permit()
242256
.user_data::<PyObject>()
@@ -251,12 +265,16 @@ impl<SK: SlotKind + Send + Sync> SlotSupplierTrait for CustomSlotSupplierOfType<
251265
},),
252266
)?;
253267
PyResult::Ok(())
254-
})
255-
.expect("TODO");
268+
}) {
269+
error!(
270+
"Uncaught error in custom slot supplier `mark_slot_used`: {}",
271+
e
272+
);
273+
}
256274
}
257275

258276
fn release_slot(&self, ctx: &dyn SlotReleaseContext<SlotKind = Self::SlotKind>) {
259-
Python::with_gil(|py| {
277+
if let Err(e) = Python::with_gil(|py| {
260278
let permit = ctx
261279
.permit()
262280
.user_data::<PyObject>()
@@ -271,8 +289,12 @@ impl<SK: SlotKind + Send + Sync> SlotSupplierTrait for CustomSlotSupplierOfType<
271289
},),
272290
)?;
273291
PyResult::Ok(())
274-
})
275-
.expect("TODO");
292+
}) {
293+
error!(
294+
"Uncaught error in custom slot supplier `release_slot`: {}",
295+
e
296+
);
297+
}
276298
}
277299

278300
fn available_slots(&self) -> Option<usize> {

temporalio/bridge/worker.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
Awaitable,
1313
Callable,
1414
List,
15+
Literal,
1516
Optional,
1617
Protocol,
1718
Sequence,
@@ -34,6 +35,7 @@
3435
import temporalio.bridge.temporal_sdk_bridge
3536
import temporalio.converter
3637
import temporalio.exceptions
38+
from temporalio.bridge.temporal_sdk_bridge import PollShutdownError
3739

3840

3941
@dataclass
@@ -99,8 +101,8 @@ class SlotPermit:
99101
class SlotReserveContext(Protocol):
100102
"""Context for reserving a slot from a :py:class:`CustomSlotSupplier`."""
101103

102-
slot_type: str # TODO real type
103-
"""The type of slot trying to be reserved."""
104+
slot_type: Literal["workflow", "activity", "local-activity"]
105+
"""The type of slot trying to be reserved. Always one of "workflow", "activity", or "local-activity"."""
104106
task_queue: str
105107
"""The name of the task queue for which this reservation request is associated."""
106108
worker_identity: str
@@ -159,14 +161,13 @@ class SlotReleaseContext:
159161
class CustomSlotSupplier(ABC):
160162
"""This class can be implemented to provide custom slot supplier behavior."""
161163

162-
# TODO: AbortError equivalent
163164
@abstractmethod
164165
async def reserve_slot(self, ctx: SlotReserveContext) -> SlotPermit:
165166
"""This function is called before polling for new tasks. Your implementation must block until a
166167
slot is available then return a permit to use that slot.
167168
168-
The only acceptable exception to throw is AbortError, any other exceptions thrown will be
169-
logged and ignored.
169+
The only acceptable exception to throw is :py:class:`asyncio.CancelledError`, as invocations of this method may
170+
be cancelled. Any other exceptions thrown will be logged and ignored.
170171
171172
Args:
172173
ctx: The context for slot reservation.

temporalio/worker/_tuning.py

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import asyncio
2+
import logging
13
from abc import ABC, abstractmethod
24
from dataclasses import dataclass
35
from datetime import timedelta
@@ -21,6 +23,8 @@
2123

2224
_DEFAULT_RESOURCE_ACTIVITY_MAX = 500
2325

26+
logger = logging.getLogger(__name__)
27+
2428

2529
@dataclass(frozen=True)
2630
class FixedSizeSlotSupplier:
@@ -87,6 +91,48 @@ class ResourceBasedSlotSupplier:
8791
]
8892

8993

94+
class _ErrorLoggingSlotSupplier(CustomSlotSupplier):
95+
def __init__(self, supplier: CustomSlotSupplier):
96+
self._supplier = supplier
97+
98+
async def reserve_slot(self, ctx: SlotReserveContext) -> SlotPermit:
99+
try:
100+
return await self._supplier.reserve_slot(ctx)
101+
except asyncio.CancelledError:
102+
raise
103+
except Exception:
104+
logger.warning(
105+
"Error in custom slot supplier `reserve_slot`", exc_info=True
106+
)
107+
# Error needs to be re-thrown here so the rust code will loop
108+
raise
109+
110+
def try_reserve_slot(self, ctx: SlotReserveContext) -> Optional[SlotPermit]:
111+
try:
112+
return self._supplier.try_reserve_slot(ctx)
113+
except Exception:
114+
logger.warning(
115+
"Error in custom slot supplier `try_reserve_slot`", exc_info=True
116+
)
117+
return None
118+
119+
def release_slot(self, ctx: SlotReleaseContext) -> None:
120+
try:
121+
self._supplier.release_slot(ctx)
122+
except Exception:
123+
logger.warning(
124+
"Error in custom slot supplier `release_slot`", exc_info=True
125+
)
126+
127+
def mark_slot_used(self, ctx: SlotMarkUsedContext) -> None:
128+
try:
129+
self._supplier.mark_slot_used(ctx)
130+
except Exception:
131+
logger.warning(
132+
"Error in custom slot supplier `mark_slot_used`", exc_info=True
133+
)
134+
135+
90136
def _to_bridge_slot_supplier(
91137
slot_supplier: SlotSupplier, kind: Literal["workflow", "activity", "local_activity"]
92138
) -> temporalio.bridge.worker.SlotSupplier:
@@ -114,7 +160,9 @@ def _to_bridge_slot_supplier(
114160
),
115161
)
116162
elif isinstance(slot_supplier, CustomSlotSupplier):
117-
return temporalio.bridge.temporal_sdk_bridge.CustomSlotSupplier(slot_supplier)
163+
return temporalio.bridge.temporal_sdk_bridge.CustomSlotSupplier(
164+
_ErrorLoggingSlotSupplier(slot_supplier)
165+
)
118166
else:
119167
raise TypeError(f"Unknown slot supplier type: {slot_supplier}")
120168

tests/worker/test_worker.py

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,12 +426,89 @@ def reserve_asserts(self, ctx):
426426
# Two workflow tasks, one activity
427427
assert ss.used == 3
428428
assert ss.seen_sticky_kinds == {True, False}
429-
assert ss.seen_slot_kinds == {"Workflow", "Activity", "LocalActivity"}
429+
assert ss.seen_slot_kinds == {"workflow", "activity", "local-activity"}
430430
assert ss.seen_used_slot_kinds == {"wf", "a"}
431431
assert ss.seen_release_info_empty
432432
assert ss.seen_release_info_nonempty
433433

434434

435+
@workflow.defn
436+
class SimpleWorkflow:
437+
@workflow.run
438+
async def run(self) -> str:
439+
return "hi"
440+
441+
442+
async def test_throwing_slot_supplier(client: Client, env: WorkflowEnvironment):
443+
"""Ensures a (mostly) broken slot supplier doesn't hose everything up"""
444+
445+
class ThrowingSlotSupplier(CustomSlotSupplier):
446+
marked_used = False
447+
448+
async def reserve_slot(self, ctx: SlotReserveContext) -> SlotPermit:
449+
# Hand out workflow tasks until one is used
450+
if ctx.slot_type == "workflow" and not self.marked_used:
451+
return SlotPermit()
452+
raise ValueError("I always throw")
453+
454+
def try_reserve_slot(self, ctx: SlotReserveContext) -> Optional[SlotPermit]:
455+
raise ValueError("I always throw")
456+
457+
def mark_slot_used(self, ctx: SlotMarkUsedContext) -> None:
458+
raise ValueError("I always throw")
459+
460+
def release_slot(self, ctx: SlotReleaseContext) -> None:
461+
raise ValueError("I always throw")
462+
463+
ss = ThrowingSlotSupplier()
464+
465+
tuner = WorkerTuner.create_composite(
466+
workflow_supplier=ss, activity_supplier=ss, local_activity_supplier=ss
467+
)
468+
async with new_worker(
469+
client,
470+
SimpleWorkflow,
471+
activities=[say_hello],
472+
tuner=tuner,
473+
) as w:
474+
wf1 = await client.start_workflow(
475+
SimpleWorkflow.run,
476+
id=f"throwing-slot-supplier-{uuid.uuid4()}",
477+
task_queue=w.task_queue,
478+
)
479+
await wf1.result()
480+
481+
482+
async def test_blocking_slot_supplier(client: Client, env: WorkflowEnvironment):
483+
class BlockingSlotSupplier(CustomSlotSupplier):
484+
marked_used = False
485+
486+
async def reserve_slot(self, ctx: SlotReserveContext) -> SlotPermit:
487+
await asyncio.get_event_loop().create_future()
488+
raise ValueError("Should be unreachable")
489+
490+
def try_reserve_slot(self, ctx: SlotReserveContext) -> Optional[SlotPermit]:
491+
return None
492+
493+
def mark_slot_used(self, ctx: SlotMarkUsedContext) -> None:
494+
return None
495+
496+
def release_slot(self, ctx: SlotReleaseContext) -> None:
497+
return None
498+
499+
ss = BlockingSlotSupplier()
500+
501+
tuner = WorkerTuner.create_composite(
502+
workflow_supplier=ss, activity_supplier=ss, local_activity_supplier=ss
503+
)
504+
async with new_worker(
505+
client,
506+
SimpleWorkflow,
507+
activities=[say_hello],
508+
tuner=tuner,
509+
) as _w:
510+
await asyncio.sleep(1)
511+
435512
def create_worker(
436513
client: Client,
437514
on_fatal_error: Optional[Callable[[BaseException], Awaitable[None]]] = None,

0 commit comments

Comments
 (0)