Skip to content

Commit d34c1d6

Browse files
authored
Nexus Support in C Bridge (#987)
1 parent eb74c70 commit d34c1d6

File tree

2 files changed

+106
-7
lines changed

2 files changed

+106
-7
lines changed

core-c-bridge/include/temporal-sdk-core-c-bridge.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,7 @@ typedef struct TemporalCoreTunerHolder {
631631
struct TemporalCoreSlotSupplier workflow_slot_supplier;
632632
struct TemporalCoreSlotSupplier activity_slot_supplier;
633633
struct TemporalCoreSlotSupplier local_activity_slot_supplier;
634+
struct TemporalCoreSlotSupplier nexus_task_slot_supplier;
634635
} TemporalCoreTunerHolder;
635636

636637
typedef struct TemporalCorePollerBehaviorSimpleMaximum {
@@ -670,6 +671,7 @@ typedef struct TemporalCoreWorkerOptions {
670671
struct TemporalCorePollerBehavior workflow_task_poller_behavior;
671672
float nonsticky_to_sticky_poll_ratio;
672673
struct TemporalCorePollerBehavior activity_task_poller_behavior;
674+
struct TemporalCorePollerBehavior nexus_task_poller_behavior;
673675
bool nondeterminism_as_workflow_fail;
674676
struct TemporalCoreByteArrayRefArray nondeterminism_as_workflow_fail_for_types;
675677
} TemporalCoreWorkerOptions;
@@ -877,6 +879,10 @@ void temporal_core_worker_poll_activity_task(struct TemporalCoreWorker *worker,
877879
void *user_data,
878880
TemporalCoreWorkerPollCallback callback);
879881

882+
void temporal_core_worker_poll_nexus_task(struct TemporalCoreWorker *worker,
883+
void *user_data,
884+
TemporalCoreWorkerPollCallback callback);
885+
880886
void temporal_core_worker_complete_workflow_activation(struct TemporalCoreWorker *worker,
881887
struct TemporalCoreByteArrayRef completion,
882888
void *user_data,
@@ -887,6 +893,11 @@ void temporal_core_worker_complete_activity_task(struct TemporalCoreWorker *work
887893
void *user_data,
888894
TemporalCoreWorkerCallback callback);
889895

896+
void temporal_core_worker_complete_nexus_task(struct TemporalCoreWorker *worker,
897+
struct TemporalCoreByteArrayRef completion,
898+
void *user_data,
899+
TemporalCoreWorkerCallback callback);
900+
890901
/**
891902
* Returns error if any. Must be freed if returned.
892903
*/

core-c-bridge/src/worker.rs

Lines changed: 95 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use temporal_sdk_core_api::{
2020
SlotSupplierPermit,
2121
},
2222
};
23+
use temporal_sdk_core_protos::coresdk::nexus::NexusTaskCompletion;
2324
use temporal_sdk_core_protos::{
2425
coresdk::{
2526
ActivityHeartbeat, ActivityTaskCompletion,
@@ -51,6 +52,7 @@ pub struct WorkerOptions {
5152
pub workflow_task_poller_behavior: PollerBehavior,
5253
pub nonsticky_to_sticky_poll_ratio: f32,
5354
pub activity_task_poller_behavior: PollerBehavior,
55+
pub nexus_task_poller_behavior: PollerBehavior,
5456
pub nondeterminism_as_workflow_fail: bool,
5557
pub nondeterminism_as_workflow_fail_for_types: ByteArrayRefArray,
5658
}
@@ -130,6 +132,7 @@ pub struct TunerHolder {
130132
pub workflow_slot_supplier: SlotSupplier,
131133
pub activity_slot_supplier: SlotSupplier,
132134
pub local_activity_slot_supplier: SlotSupplier,
135+
pub nexus_task_slot_supplier: SlotSupplier,
133136
}
134137

135138
#[repr(C)]
@@ -558,7 +561,7 @@ pub extern "C" fn temporal_core_worker_poll_workflow_activation(
558561
worker
559562
.runtime
560563
.clone()
561-
.alloc_utf8(&format!("Poll failure: {err}"))
564+
.alloc_utf8(&format!("Workflow polling failure: {err}"))
562565
.into_raw()
563566
.cast_const(),
564567
),
@@ -592,7 +595,41 @@ pub extern "C" fn temporal_core_worker_poll_activity_task(
592595
worker
593596
.runtime
594597
.clone()
595-
.alloc_utf8(&format!("Poll failure: {err}"))
598+
.alloc_utf8(&format!("Activity polling failure: {err}"))
599+
.into_raw()
600+
.cast_const(),
601+
),
602+
};
603+
unsafe {
604+
callback(user_data.into(), success, fail);
605+
}
606+
});
607+
}
608+
609+
#[unsafe(no_mangle)]
610+
pub extern "C" fn temporal_core_worker_poll_nexus_task(
611+
worker: *mut Worker,
612+
user_data: *mut libc::c_void,
613+
callback: WorkerPollCallback,
614+
) {
615+
let worker = unsafe { &*worker };
616+
let user_data = UserDataHandle(user_data);
617+
let core_worker = worker.worker.as_ref().unwrap().clone();
618+
worker.runtime.core.tokio_handle().spawn(async move {
619+
let (success, fail) = match core_worker.poll_nexus_task().await {
620+
Ok(task) => (
621+
ByteArray::from_vec(task.encode_to_vec())
622+
.into_raw()
623+
.cast_const(),
624+
std::ptr::null(),
625+
),
626+
Err(PollError::ShutDown) => (std::ptr::null(), std::ptr::null()),
627+
Err(err) => (
628+
std::ptr::null(),
629+
worker
630+
.runtime
631+
.clone()
632+
.alloc_utf8(&format!("Nexus polling failure: {err}"))
596633
.into_raw()
597634
.cast_const(),
598635
),
@@ -620,7 +657,7 @@ pub extern "C" fn temporal_core_worker_complete_workflow_activation(
620657
worker
621658
.runtime
622659
.clone()
623-
.alloc_utf8(&format!("Decode failure: {err}"))
660+
.alloc_utf8(&format!("Workflow task decode failure: {err}"))
624661
.into_raw(),
625662
);
626663
}
@@ -635,7 +672,7 @@ pub extern "C" fn temporal_core_worker_complete_workflow_activation(
635672
Err(err) => worker
636673
.runtime
637674
.clone()
638-
.alloc_utf8(&format!("Completion failure: {err}"))
675+
.alloc_utf8(&format!("Workflow completion failure: {err}"))
639676
.into_raw()
640677
.cast_const(),
641678
};
@@ -662,7 +699,7 @@ pub extern "C" fn temporal_core_worker_complete_activity_task(
662699
worker
663700
.runtime
664701
.clone()
665-
.alloc_utf8(&format!("Decode failure: {err}"))
702+
.alloc_utf8(&format!("Activity task decode failure: {err}"))
666703
.into_raw(),
667704
);
668705
}
@@ -677,7 +714,49 @@ pub extern "C" fn temporal_core_worker_complete_activity_task(
677714
Err(err) => worker
678715
.runtime
679716
.clone()
680-
.alloc_utf8(&format!("Completion failure: {err}"))
717+
.alloc_utf8(&format!("Activity completion failure: {err}"))
718+
.into_raw()
719+
.cast_const(),
720+
};
721+
unsafe {
722+
callback(user_data.into(), fail);
723+
}
724+
});
725+
}
726+
727+
#[unsafe(no_mangle)]
728+
pub extern "C" fn temporal_core_worker_complete_nexus_task(
729+
worker: *mut Worker,
730+
completion: ByteArrayRef,
731+
user_data: *mut libc::c_void,
732+
callback: WorkerCallback,
733+
) {
734+
let worker = unsafe { &*worker };
735+
let completion = match NexusTaskCompletion::decode(completion.to_slice()) {
736+
Ok(completion) => completion,
737+
Err(err) => {
738+
unsafe {
739+
callback(
740+
user_data,
741+
worker
742+
.runtime
743+
.clone()
744+
.alloc_utf8(&format!("Nexus task decode failure: {err}"))
745+
.into_raw(),
746+
);
747+
}
748+
return;
749+
}
750+
};
751+
let user_data = UserDataHandle(user_data);
752+
let core_worker = worker.worker.as_ref().unwrap().clone();
753+
worker.runtime.core.tokio_handle().spawn(async move {
754+
let fail = match core_worker.complete_nexus_task(completion).await {
755+
Ok(_) => std::ptr::null(),
756+
Err(err) => worker
757+
.runtime
758+
.clone()
759+
.alloc_utf8(&format!("Nexus completion failure: {err}"))
681760
.into_raw()
682761
.cast_const(),
683762
};
@@ -707,7 +786,7 @@ pub extern "C" fn temporal_core_worker_record_activity_heartbeat(
707786
Err(err) => worker
708787
.runtime
709788
.clone()
710-
.alloc_utf8(&format!("Decode failure: {err}"))
789+
.alloc_utf8(&format!("Activity heartbeat decode failure: {err}"))
711790
.into_raw(),
712791
}
713792
}
@@ -958,6 +1037,7 @@ impl TryFrom<&WorkerOptions> for temporal_sdk_core::WorkerConfig {
9581037
.workflow_task_poller_behavior(temporal_sdk_core_api::worker::PollerBehavior::try_from(&opt.workflow_task_poller_behavior)?)
9591038
.nonsticky_to_sticky_poll_ratio(opt.nonsticky_to_sticky_poll_ratio)
9601039
.activity_task_poller_behavior(temporal_sdk_core_api::worker::PollerBehavior::try_from(&opt.activity_task_poller_behavior)?)
1040+
.nexus_task_poller_behavior(temporal_sdk_core_api::worker::PollerBehavior::try_from(&opt.nexus_task_poller_behavior)?)
9611041
.workflow_failure_errors(if opt.nondeterminism_as_workflow_fail {
9621042
HashSet::from([WorkflowErrorType::Nondeterminism])
9631043
} else {
@@ -1003,10 +1083,17 @@ impl TryFrom<&TunerHolder> for temporal_sdk_core::TunerHolder {
10031083
} else {
10041084
None
10051085
};
1086+
let maybe_nexus_resource_opts =
1087+
if let SlotSupplier::ResourceBased(ref ss) = holder.nexus_task_slot_supplier {
1088+
Some(&ss.tuner_options)
1089+
} else {
1090+
None
1091+
};
10061092
let all_resource_opts = [
10071093
maybe_wf_resource_opts,
10081094
maybe_act_resource_opts,
10091095
maybe_local_act_resource_opts,
1096+
maybe_nexus_resource_opts,
10101097
];
10111098
let mut set_resource_opts = all_resource_opts.iter().flatten();
10121099
let first = set_resource_opts.next();
@@ -1033,6 +1120,7 @@ impl TryFrom<&TunerHolder> for temporal_sdk_core::TunerHolder {
10331120
.workflow_slot_options(holder.workflow_slot_supplier.try_into()?)
10341121
.activity_slot_options(holder.activity_slot_supplier.try_into()?)
10351122
.local_activity_slot_options(holder.local_activity_slot_supplier.try_into()?)
1123+
.nexus_slot_options(holder.nexus_task_slot_supplier.try_into()?)
10361124
.build()
10371125
.context("Invalid tuner holder options")?
10381126
.build_tuner_holder()

0 commit comments

Comments
 (0)