Skip to content

Commit 6899691

Browse files
authored
Support for user metadata in local activity (#974)
1 parent 199880d commit 6899691

File tree

5 files changed

+76
-8
lines changed

5 files changed

+76
-8
lines changed

core/src/protosext/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use temporal_sdk_core_protos::{
3737
failure::v1::Failure,
3838
history::v1::{History, HistoryEvent, MarkerRecordedEventAttributes, history_event},
3939
query::v1::WorkflowQuery,
40+
sdk::v1::UserMetadata,
4041
workflowservice::v1::PollWorkflowTaskQueueResponse,
4142
},
4243
utilities::TryIntoOrNone,
@@ -320,6 +321,7 @@ pub(crate) struct ValidScheduleLA {
320321
pub(crate) retry_policy: RetryPolicy,
321322
pub(crate) local_retry_threshold: Duration,
322323
pub(crate) cancellation_type: ActivityCancellationType,
324+
pub(crate) user_metadata: Option<UserMetadata>,
323325
}
324326

325327
#[derive(Debug, Clone, Copy)]
@@ -349,7 +351,10 @@ impl Default for LACloseTimeouts {
349351
}
350352

351353
impl ValidScheduleLA {
352-
pub(crate) fn from_schedule_la(v: ScheduleLocalActivity) -> Result<Self, anyhow::Error> {
354+
pub(crate) fn from_schedule_la(
355+
v: ScheduleLocalActivity,
356+
user_metadata: Option<UserMetadata>,
357+
) -> Result<Self, anyhow::Error> {
353358
let original_schedule_time = v
354359
.original_schedule_time
355360
.map(|x| {
@@ -423,6 +428,7 @@ impl ValidScheduleLA {
423428
retry_policy,
424429
local_retry_threshold,
425430
cancellation_type,
431+
user_metadata,
426432
})
427433
}
428434
}

core/src/worker/workflow/machines/local_activity_state_machine.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use temporal_sdk_core_protos::{
3131
workflow_commands::ActivityCancellationType,
3232
},
3333
temporal::api::{
34-
command::v1::{RecordMarkerCommandAttributes, command},
34+
command::v1::{Command as ProtoCommand, RecordMarkerCommandAttributes, command},
3535
enums::v1::{CommandType, EventType, RetryState},
3636
failure::v1::{Failure, failure::FailureInfo},
3737
},
@@ -772,9 +772,11 @@ impl WFMachinesAdapter for LocalActivityMachine {
772772
header: None,
773773
failure: maybe_failure,
774774
};
775-
responses.push(MachineResponse::IssueNewCommand(
776-
command::Attributes::RecordMarkerCommandAttributes(marker_data).into(),
777-
));
775+
let command = ProtoCommand {
776+
user_metadata: self.shared_state.attrs.user_metadata.clone(),
777+
..command::Attributes::RecordMarkerCommandAttributes(marker_data).into()
778+
};
779+
responses.push(MachineResponse::IssueNewCommand(command));
778780
}
779781
Ok(responses)
780782
}

core/src/worker/workflow/machines/workflow_machines.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1352,7 +1352,7 @@ impl WorkflowMachines {
13521352
WFCommandVariant::AddLocalActivity(attrs) => {
13531353
let seq = attrs.seq;
13541354
let attrs: ValidScheduleLA =
1355-
ValidScheduleLA::from_schedule_la(attrs).map_err(|e| {
1355+
ValidScheduleLA::from_schedule_la(attrs, cmd.metadata).map_err(|e| {
13561356
WFMachinesError::Fatal(format!(
13571357
"Invalid schedule local activity request (seq {seq}): {e}"
13581358
))

sdk/src/workflow_context/options.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::{collections::HashMap, time::Duration};
33
use temporal_client::{Priority, WorkflowOptions};
44
use temporal_sdk_core_protos::{
55
coresdk::{
6+
AsJsonPayloadExt,
67
child_workflow::ChildWorkflowCancellationType,
78
nexus::NexusOperationCancellationType,
89
workflow_commands::{
@@ -155,6 +156,8 @@ pub struct LocalActivityOptions {
155156
/// specified. If set, this must be <= `schedule_to_close_timeout`, if not, it will be clamped
156157
/// down.
157158
pub start_to_close_timeout: Option<Duration>,
159+
/// Single-line summary for this activity that will appear in UI/CLI.
160+
pub summary: Option<String>,
158161
}
159162

160163
impl IntoWorkflowCommand for LocalActivityOptions {
@@ -194,7 +197,13 @@ impl IntoWorkflowCommand for LocalActivityOptions {
194197
}
195198
.into(),
196199
),
197-
user_metadata: None,
200+
user_metadata: self
201+
.summary
202+
.and_then(|summary| summary.as_json_payload().ok())
203+
.map(|summary| UserMetadata {
204+
summary: Some(summary),
205+
details: None,
206+
}),
198207
}
199208
}
200209
}

tests/integ_tests/workflow_tests/local_activities.rs

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@ use temporal_sdk_core::replay::HistoryForReplay;
1919
use temporal_sdk_core_protos::{
2020
TestHistoryBuilder,
2121
coresdk::{
22-
AsJsonPayloadExt, IntoPayloadsExt,
22+
AsJsonPayloadExt, FromJsonPayloadExt, IntoPayloadsExt,
2323
workflow_commands::{ActivityCancellationType, workflow_command::Variant},
2424
workflow_completion,
2525
workflow_completion::{WorkflowActivationCompletion, workflow_activation_completion},
2626
},
2727
temporal::api::{
2828
common::v1::RetryPolicy,
2929
enums::v1::{TimeoutType, UpdateWorkflowExecutionLifecycleStage},
30+
history::v1::history_event::Attributes::MarkerRecordedEventAttributes,
3031
update::v1::WaitPolicy,
3132
},
3233
};
@@ -906,3 +907,53 @@ async fn local_activity_with_heartbeat_only_causes_one_wakeup() {
906907
.unwrap();
907908
assert_eq!(res[0], replay_res.unwrap());
908909
}
910+
911+
pub(crate) async fn local_activity_with_summary_wf(ctx: WfContext) -> WorkflowResult<()> {
912+
ctx.local_activity(LocalActivityOptions {
913+
activity_type: "echo_activity".to_string(),
914+
input: "hi!".as_json_payload().expect("serializes fine"),
915+
summary: Some("Echo summary".to_string()),
916+
..Default::default()
917+
})
918+
.await;
919+
Ok(().into())
920+
}
921+
922+
#[tokio::test]
923+
async fn local_activity_with_summary() {
924+
let wf_name = "local_activity_with_summary";
925+
let mut starter = CoreWfStarter::new(wf_name);
926+
let mut worker = starter.worker().await;
927+
worker.register_wf(wf_name.to_owned(), local_activity_with_summary_wf);
928+
worker.register_activity("echo_activity", echo);
929+
930+
let handle = starter.start_with_worker(wf_name, &mut worker).await;
931+
worker.run_until_done().await.unwrap();
932+
handle
933+
.fetch_history_and_replay(worker.inner_mut())
934+
.await
935+
.unwrap();
936+
937+
let la_events = starter
938+
.get_history()
939+
.await
940+
.events
941+
.into_iter()
942+
.filter(|e| match e.attributes {
943+
Some(MarkerRecordedEventAttributes(ref a)) => a.marker_name == "core_local_activity",
944+
_ => false,
945+
})
946+
.collect::<Vec<_>>();
947+
assert_eq!(la_events.len(), 1);
948+
let summary = la_events[0]
949+
.user_metadata
950+
.as_ref()
951+
.expect("metadata missing from local activity marker")
952+
.summary
953+
.as_ref()
954+
.expect("summary missing from local activity marker");
955+
assert_eq!(
956+
"Echo summary",
957+
String::from_json_payload(summary).expect("failed to parse summary")
958+
);
959+
}

0 commit comments

Comments
 (0)