Skip to content

Commit 4015670

Browse files
committed
CI debug testing
1 parent 18264f2 commit 4015670

File tree

3 files changed

+41
-22
lines changed

3 files changed

+41
-22
lines changed

temporalio/bridge/src/worker.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,18 @@ pub struct TunerHolder {
6464
local_activity_slot_supplier: SlotSupplier,
6565
}
6666

67+
// pub fn set_task_locals_on_tuner<'a>(py: Python<'a>, tuner: &TunerHolder) -> PyResult<()> {
68+
// // TODO: All suppliers
69+
// if let SlotSupplier::Custom(ref cs) = tuner.workflow_slot_supplier {
70+
// Python::with_gil(|py| {
71+
// let py_obj = cs.inner.as_ref(py);
72+
// py_obj.call_method0("set_task_locals")?;
73+
// Ok(())
74+
// })?;
75+
// };
76+
// Ok(())
77+
// }
78+
6779
#[derive(FromPyObject)]
6880
pub enum SlotSupplier {
6981
FixedSize(FixedSizeSlotSupplier),
@@ -198,10 +210,8 @@ impl<SK: SlotKind + Send + Sync> SlotSupplierTrait for CustomSlotSupplierOfType<
198210
loop {
199211
let pypermit = match Python::with_gil(|py| {
200212
let py_obj = self.inner.as_ref(py);
201-
let called = py_obj.call_method1(
202-
"reserve_slot",
203-
(SlotReserveCtx::from_ctx(Self::SlotKind::kind(), ctx),),
204-
)?;
213+
let called = py_obj
214+
.call_method1("reserve_slot", (SlotReserveCtx::from_ctx(SK::kind(), ctx),))?;
205215
runtime::THREAD_TASK_LOCAL
206216
.with(|tl| pyo3_asyncio::into_future_with_locals(tl.get().unwrap(), called))
207217
}) {
@@ -217,11 +227,13 @@ impl<SK: SlotKind + Send + Sync> SlotSupplierTrait for CustomSlotSupplierOfType<
217227
.await;
218228
match pypermit {
219229
Ok(p) => {
230+
dbg!("reserve done", SK::kind());
220231
return SlotSupplierPermit::with_user_data(p);
221232
}
222-
Err(_) => {
233+
Err(e) => {
223234
// This is a user thrown error, re-raised by the logging wrapper so we can
224235
// loop, so do that.
236+
dbg!(e);
225237
}
226238
}
227239
}
@@ -232,7 +244,7 @@ impl<SK: SlotKind + Send + Sync> SlotSupplierTrait for CustomSlotSupplierOfType<
232244
let py_obj = self.inner.as_ref(py);
233245
let pa = py_obj.call_method1(
234246
"try_reserve_slot",
235-
(SlotReserveCtx::from_ctx(Self::SlotKind::kind(), ctx),),
247+
(SlotReserveCtx::from_ctx(SK::kind(), ctx),),
236248
)?;
237249

238250
if pa.is_none() {
@@ -274,6 +286,7 @@ impl<SK: SlotKind + Send + Sync> SlotSupplierTrait for CustomSlotSupplierOfType<
274286
}
275287

276288
fn release_slot(&self, ctx: &dyn SlotReleaseContext<SlotKind = Self::SlotKind>) {
289+
dbg!("release", SK::kind());
277290
if let Err(e) = Python::with_gil(|py| {
278291
let permit = ctx
279292
.permit()
@@ -362,6 +375,8 @@ pub fn new_replay_worker<'a>(
362375
impl WorkerRef {
363376
fn validate<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
364377
let worker = self.worker.as_ref().unwrap().clone();
378+
// Set custom slot supplier task locals so they can run futures
379+
// match worker.get_config().tuner {}
365380
self.runtime.future_into_py(py, async move {
366381
worker
367382
.validate()

temporalio/worker/_tuning.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ async def reserve_slot(self, ctx: SlotReserveContext) -> SlotPermit:
9898
try:
9999
return await self._supplier.reserve_slot(ctx)
100100
except asyncio.CancelledError:
101+
logger.exception("saw cancelled error")
101102
raise
102103
except Exception:
103104
logger.warning(

tests/worker/test_worker.py

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import uuid
66
from datetime import timedelta
77
from typing import Any, Awaitable, Callable, Optional
8+
import threading
89

910
import pytest
1011

@@ -351,6 +352,7 @@ def __init__(self, pnum: int):
351352
self.pnum = pnum
352353

353354
class MySlotSupplier(CustomSlotSupplier):
355+
lock = threading.Lock()
354356
reserves = 0
355357
releases = 0
356358
used = 0
@@ -364,7 +366,9 @@ async def reserve_slot(self, ctx: SlotReserveContext) -> SlotPermit:
364366
self.reserve_asserts(ctx)
365367
# Verify an async call doesn't bungle things
366368
await asyncio.sleep(0.01)
367-
self.reserves += 1
369+
with self.lock:
370+
self.reserves += 1
371+
print("incremented reserve")
368372
return MyPermit(self.reserves)
369373

370374
def try_reserve_slot(self, ctx: SlotReserveContext) -> Optional[SlotPermit]:
@@ -376,24 +380,26 @@ def mark_slot_used(self, ctx: SlotMarkUsedContext) -> None:
376380
assert isinstance(ctx.permit, MyPermit)
377381
assert ctx.permit.pnum is not None
378382
assert ctx.slot_info is not None
379-
if isinstance(ctx.slot_info, WorkflowSlotInfo):
380-
self.seen_used_slot_kinds.add("wf")
381-
elif isinstance(ctx.slot_info, ActivitySlotInfo):
382-
self.seen_used_slot_kinds.add("a")
383-
elif isinstance(ctx.slot_info, LocalActivitySlotInfo):
384-
self.seen_used_slot_kinds.add("la")
385-
self.used += 1
383+
with self.lock:
384+
if isinstance(ctx.slot_info, WorkflowSlotInfo):
385+
self.seen_used_slot_kinds.add("wf")
386+
elif isinstance(ctx.slot_info, ActivitySlotInfo):
387+
self.seen_used_slot_kinds.add("a")
388+
elif isinstance(ctx.slot_info, LocalActivitySlotInfo):
389+
self.seen_used_slot_kinds.add("la")
390+
self.used += 1
386391

387392
def release_slot(self, ctx: SlotReleaseContext) -> None:
388393
assert ctx.permit is not None
389394
assert isinstance(ctx.permit, MyPermit)
390395
assert ctx.permit.pnum is not None
396+
with self.lock:
391397
# Info may be empty, and we should see both empty and not
392-
if ctx.slot_info is None:
393-
self.seen_release_info_empty = True
394-
else:
395-
self.seen_release_info_nonempty = True
396-
self.releases += 1
398+
if ctx.slot_info is None:
399+
self.seen_release_info_empty = True
400+
else:
401+
self.seen_release_info_nonempty = True
402+
self.releases += 1
397403

398404
def reserve_asserts(self, ctx):
399405
assert ctx.task_queue is not None
@@ -422,9 +428,6 @@ def reserve_asserts(self, ctx):
422428
await wf1.signal(WaitOnSignalWorkflow.my_signal, "finish")
423429
await wf1.result()
424430

425-
async def releases() -> int:
426-
return ss.releases
427-
428431
assert ss.reserves == ss.releases
429432
# Two workflow tasks, one activity
430433
assert ss.used == 3

0 commit comments

Comments
 (0)