Skip to content

Commit 3a6e5ff

Browse files
authored
Enable slot supplier for nexus pollers (#994)
* Support Nexus SlotSupplier * Add failing test
1 parent 155ed46 commit 3a6e5ff

File tree

1 file changed

+159
-0
lines changed

1 file changed

+159
-0
lines changed

core/src/worker/tuner.rs

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,24 @@ impl TunerHolderOptions {
108108
}
109109
None => {}
110110
}
111+
match self.nexus_slot_options {
112+
Some(SlotSupplierOptions::FixedSize { slots }) => {
113+
builder.nexus_slot_supplier(Arc::new(FixedSizeSlotSupplier::new(slots)));
114+
}
115+
Some(SlotSupplierOptions::ResourceBased(rso)) => {
116+
builder.nexus_slot_supplier(
117+
rb_tuner
118+
.as_mut()
119+
.unwrap()
120+
.with_nexus_slots_options(rso)
121+
.nexus_task_slot_supplier(),
122+
);
123+
}
124+
Some(SlotSupplierOptions::Custom(ss)) => {
125+
builder.nexus_slot_supplier(ss);
126+
}
127+
None => {}
128+
}
111129
Ok(builder.build())
112130
}
113131
}
@@ -144,6 +162,9 @@ impl TunerHolderOptionsBuilder {
144162
) || matches!(
145163
self.local_activity_slot_options,
146164
Some(Some(SlotSupplierOptions::ResourceBased(_)))
165+
) || matches!(
166+
self.nexus_slot_options,
167+
Some(Some(SlotSupplierOptions::ResourceBased(_)))
147168
);
148169
if any_is_resource_based && matches!(self.resource_based_options, None | Some(None)) {
149170
return Err(
@@ -270,3 +291,141 @@ impl WorkerTuner for TunerHolder {
270291
self.nexus_supplier.clone()
271292
}
272293
}
294+
295+
#[cfg(test)]
296+
mod tests {
297+
use super::*;
298+
use std::time::Duration;
299+
use temporal_sdk_core_api::worker::{
300+
SlotMarkUsedContext, SlotReleaseContext, SlotReservationContext, SlotSupplierPermit,
301+
};
302+
303+
struct TestSlotSupplier;
304+
#[async_trait::async_trait]
305+
impl SlotSupplier for TestSlotSupplier {
306+
type SlotKind = NexusSlotKind;
307+
async fn reserve_slot(&self, _: &dyn SlotReservationContext) -> SlotSupplierPermit {
308+
SlotSupplierPermit::default()
309+
}
310+
fn try_reserve_slot(&self, _: &dyn SlotReservationContext) -> Option<SlotSupplierPermit> {
311+
Some(SlotSupplierPermit::default())
312+
}
313+
fn mark_slot_used(&self, _: &dyn SlotMarkUsedContext<SlotKind = Self::SlotKind>) {}
314+
fn release_slot(&self, _: &dyn SlotReleaseContext<SlotKind = Self::SlotKind>) {}
315+
}
316+
317+
#[test]
318+
fn tuner_holder_options_nexus_fixed_size() {
319+
let options = TunerHolderOptions {
320+
workflow_slot_options: None,
321+
activity_slot_options: None,
322+
local_activity_slot_options: None,
323+
nexus_slot_options: Some(SlotSupplierOptions::FixedSize { slots: 50 }),
324+
resource_based_options: None,
325+
};
326+
327+
let tuner = options.build_tuner_holder().unwrap();
328+
// The tuner is built successfully with fixed size nexus slots
329+
let _ = tuner.nexus_task_slot_supplier();
330+
}
331+
332+
#[test]
333+
fn tuner_holder_options_nexus_resource_based() {
334+
let resource_opts = ResourceBasedSlotsOptionsBuilder::default()
335+
.target_mem_usage(0.8)
336+
.target_cpu_usage(0.9)
337+
.build()
338+
.unwrap();
339+
340+
let options = TunerHolderOptions {
341+
workflow_slot_options: None,
342+
activity_slot_options: None,
343+
local_activity_slot_options: None,
344+
nexus_slot_options: Some(SlotSupplierOptions::ResourceBased(
345+
ResourceSlotOptions::new(5, 100, Duration::from_millis(100)),
346+
)),
347+
resource_based_options: Some(resource_opts),
348+
};
349+
350+
let tuner = options.build_tuner_holder().unwrap();
351+
// The tuner is built successfully with resource-based nexus slots
352+
let _ = tuner.nexus_task_slot_supplier();
353+
}
354+
355+
#[test]
356+
fn tuner_holder_options_nexus_custom() {
357+
let custom_supplier: Arc<dyn SlotSupplier<SlotKind = NexusSlotKind> + Send + Sync> =
358+
Arc::new(TestSlotSupplier);
359+
360+
let options = TunerHolderOptions {
361+
workflow_slot_options: None,
362+
activity_slot_options: None,
363+
local_activity_slot_options: None,
364+
nexus_slot_options: Some(SlotSupplierOptions::Custom(custom_supplier.clone())),
365+
resource_based_options: None,
366+
};
367+
368+
let tuner = options.build_tuner_holder().unwrap();
369+
// The tuner is built successfully with custom nexus slots
370+
let _ = tuner.nexus_task_slot_supplier();
371+
}
372+
373+
#[test]
374+
fn tuner_builder_with_nexus_slot_supplier() {
375+
let mut builder = TunerBuilder::default();
376+
let custom_supplier: Arc<dyn SlotSupplier<SlotKind = NexusSlotKind> + Send + Sync> =
377+
Arc::new(FixedSizeSlotSupplier::new(25));
378+
379+
builder.nexus_slot_supplier(custom_supplier.clone());
380+
let tuner = builder.build();
381+
382+
// The tuner is built successfully with the custom nexus slot supplier
383+
let _ = tuner.nexus_task_slot_supplier();
384+
}
385+
386+
#[test]
387+
fn tuner_holder_options_builder_validates_resource_based_requirements() {
388+
// Should fail when nexus uses ResourceBased but resource_based_options is not set
389+
let result = TunerHolderOptionsBuilder::default()
390+
.nexus_slot_options(SlotSupplierOptions::ResourceBased(
391+
ResourceSlotOptions::new(5, 100, Duration::from_millis(100)),
392+
))
393+
.build();
394+
395+
assert!(result.is_err());
396+
assert!(
397+
result
398+
.unwrap_err()
399+
.to_string()
400+
.contains("resource_based_options")
401+
);
402+
}
403+
404+
#[test]
405+
fn tuner_holder_options_all_slot_types() {
406+
let resource_opts = ResourceBasedSlotsOptionsBuilder::default()
407+
.target_mem_usage(0.8)
408+
.target_cpu_usage(0.9)
409+
.build()
410+
.unwrap();
411+
412+
let options = TunerHolderOptions {
413+
workflow_slot_options: Some(SlotSupplierOptions::FixedSize { slots: 10 }),
414+
activity_slot_options: Some(SlotSupplierOptions::FixedSize { slots: 20 }),
415+
local_activity_slot_options: Some(SlotSupplierOptions::ResourceBased(
416+
ResourceSlotOptions::new(2, 50, Duration::from_millis(100)),
417+
)),
418+
nexus_slot_options: Some(SlotSupplierOptions::ResourceBased(
419+
ResourceSlotOptions::new(5, 100, Duration::from_millis(100)),
420+
)),
421+
resource_based_options: Some(resource_opts),
422+
};
423+
424+
let tuner = options.build_tuner_holder().unwrap();
425+
// All suppliers should be successfully configured
426+
let _ = tuner.workflow_task_slot_supplier();
427+
let _ = tuner.activity_task_slot_supplier();
428+
let _ = tuner.local_activity_slot_supplier();
429+
let _ = tuner.nexus_task_slot_supplier();
430+
}
431+
}

0 commit comments

Comments
 (0)