Skip to content

Commit a1f8211

Browse files
Implement Reset invocation feature
1 parent 3695fb2 commit a1f8211

File tree

26 files changed

+1470
-134
lines changed

26 files changed

+1470
-134
lines changed

crates/admin/src/rest_api/error.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,20 @@ impl_meta_api_error!(RestartInvocationMissingInputError: GONE "The invocation ca
195195
pub(crate) struct RestartInvocationNotStartedError(pub(crate) String);
196196
impl_meta_api_error!(RestartInvocationNotStartedError: TOO_EARLY "The invocation cannot be restarted because it's not running yet, meaning it might have been scheduled or inboxed.");
197197

198+
#[derive(Debug, thiserror::Error)]
199+
#[error(
200+
"Resetting the invocation '{0}' is not supported, because it was started using the old service protocol."
201+
)]
202+
pub(crate) struct ResetInvocationUnsupportedError(pub(crate) String);
203+
impl_meta_api_error!(ResetInvocationUnsupportedError: UNPROCESSABLE_ENTITY "Resetting the invocation is not supported, because it was started using the old service protocol.");
204+
205+
#[derive(Debug, thiserror::Error)]
206+
#[error(
207+
"The invocation '{0}' cannot be reset because it's not running. For completed invocations, use restart instead."
208+
)]
209+
pub(crate) struct ResetInvocationNotRunningError(pub(crate) String);
210+
impl_meta_api_error!(ResetInvocationNotRunningError: TOO_EARLY "The invocation cannot be reset because it's not running. For completed invocations, use restart instead.");
211+
198212
// --- Old Meta API errors. Please don't use these anymore.
199213

200214
/// This error is used by handlers to propagate API errors,

crates/admin/src/rest_api/invocations.rs

Lines changed: 171 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ use restate_types::invocation::client::{
2121
self, CancelInvocationResponse, InvocationClient, KillInvocationResponse,
2222
PurgeInvocationResponse,
2323
};
24+
use restate_types::invocation::reset::TruncateFrom;
2425
use restate_types::invocation::{
25-
InvocationEpoch, InvocationTermination, PurgeInvocationRequest, TerminationFlavor, restart,
26+
InvocationEpoch, InvocationTermination, PurgeInvocationRequest, TerminationFlavor, reset,
27+
restart,
2628
};
2729
use restate_wal_protocol::{Command, Envelope};
2830
use serde::{Deserialize, Serialize};
@@ -514,3 +516,171 @@ where
514516
}
515517
}
516518
}
519+
520+
#[derive(Default, Debug, Deserialize, JsonSchema)]
521+
#[serde(rename_all = "snake_case")]
522+
pub enum ResetInvocationApplyToChildInvocations {
523+
Nothing,
524+
/// Kill all the child invocations that have been created after the truncation point
525+
#[default]
526+
Kill,
527+
/// Cancel all the child invocations that have been created after the truncation point
528+
Cancel,
529+
}
530+
531+
impl From<ResetInvocationApplyToChildInvocations> for reset::ApplyToChildInvocations {
532+
fn from(value: ResetInvocationApplyToChildInvocations) -> Self {
533+
match value {
534+
ResetInvocationApplyToChildInvocations::Kill => reset::ApplyToChildInvocations::Kill,
535+
ResetInvocationApplyToChildInvocations::Nothing => {
536+
reset::ApplyToChildInvocations::Nothing
537+
}
538+
ResetInvocationApplyToChildInvocations::Cancel => {
539+
reset::ApplyToChildInvocations::Cancel
540+
}
541+
}
542+
}
543+
}
544+
545+
#[derive(Default, Debug, Deserialize, JsonSchema)]
546+
#[serde(rename_all = "snake_case")]
547+
pub enum ResetInvocationApplyToPinnedDeployment {
548+
#[default]
549+
Keep,
550+
/// Clear the pinned deployment.
551+
///
552+
/// NOTE: If the new picked up deployment doesn't support the current service protocol version, the invocation will remain stuck in a retry loop. Use with caution!
553+
Clear,
554+
}
555+
556+
impl From<ResetInvocationApplyToPinnedDeployment> for reset::ApplyToPinnedDeployment {
557+
fn from(value: ResetInvocationApplyToPinnedDeployment) -> Self {
558+
match value {
559+
ResetInvocationApplyToPinnedDeployment::Keep => reset::ApplyToPinnedDeployment::Keep,
560+
ResetInvocationApplyToPinnedDeployment::Clear => reset::ApplyToPinnedDeployment::Clear,
561+
}
562+
}
563+
}
564+
565+
#[derive(Debug, Default, Deserialize, JsonSchema)]
566+
pub struct ResetInvocationParams {
567+
pub truncate_from: Option<u32>,
568+
#[serde(
569+
default,
570+
with = "serde_with::As::<Option<restate_serde_util::DurationString>>"
571+
)]
572+
#[schemars(with = "Option<String>")]
573+
pub previous_attempt_retention: Option<Duration>,
574+
pub apply_to_child_calls: Option<ResetInvocationApplyToChildInvocations>,
575+
pub apply_to_pinned_deployment: Option<ResetInvocationApplyToPinnedDeployment>,
576+
}
577+
578+
#[derive(Debug, Serialize, JsonSchema)]
579+
pub struct ResetInvocationResponse {
580+
/// The new invocation epoch of the invocation.
581+
pub new_invocation_epoch: InvocationEpoch,
582+
}
583+
584+
generate_meta_api_error!(ResetInvocationError: [
585+
InvocationNotFoundError,
586+
InvocationClientError,
587+
InvalidFieldError,
588+
ResetInvocationUnsupportedError,
589+
ResetInvocationNotRunningError
590+
]);
591+
592+
/// Reset an invocation
593+
#[openapi(
594+
summary = "Reset an invocation",
595+
description = "Reset the given invocation, truncating the progress from the given journal entry index onward and resuming afterward.",
596+
operation_id = "reset_invocation",
597+
tags = "invocation",
598+
parameters(
599+
path(
600+
name = "invocation_id",
601+
description = "Invocation identifier.",
602+
schema = "std::string::String"
603+
),
604+
query(
605+
name = "truncate_from",
606+
description = "Journal entry index to truncate from, inclusive. The index MUST correspond to a command entry or to a signal notification, and it cannot be zero, otherwise this operation will fail. If not provided, it defaults to 1 (after the first entry).",
607+
required = false,
608+
style = "simple",
609+
allow_empty_value = false,
610+
schema = "u32",
611+
),
612+
query(
613+
name = "previous_attempt_retention",
614+
description = "If set, it will override the configured completion_retention/journal_retention when the invocation was executed the first time. If none of the completion_retention/journal_retention are configured, and neither this previous_attempt_retention, then the previous attempt won't be retained at all. Can be configured using humantime format or ISO8601.",
615+
required = false,
616+
style = "simple",
617+
allow_empty_value = false,
618+
schema = String,
619+
),
620+
query(
621+
name = "apply_to_child_calls",
622+
description = "What to do with children calls that have been created after the truncation point. By default, kills all the children calls. This doesn't apply to sends.",
623+
required = false,
624+
style = "simple",
625+
allow_empty_value = false,
626+
schema = ResetInvocationApplyToChildInvocations,
627+
),
628+
query(
629+
name = "apply_to_pinned_deployment",
630+
description = "What to do with pinned deployment. By default, the current pinned deployment will be kept.",
631+
required = false,
632+
style = "simple",
633+
allow_empty_value = false,
634+
schema = ResetInvocationApplyToPinnedDeployment,
635+
)
636+
)
637+
)]
638+
pub async fn reset_invocation<V, IC>(
639+
State(state): State<AdminServiceState<V, IC>>,
640+
Path(invocation_id): Path<String>,
641+
Query(ResetInvocationParams {
642+
truncate_from,
643+
previous_attempt_retention,
644+
apply_to_child_calls,
645+
apply_to_pinned_deployment,
646+
}): Query<ResetInvocationParams>,
647+
) -> Result<Json<ResetInvocationResponse>, ResetInvocationError>
648+
where
649+
IC: InvocationClient,
650+
{
651+
let invocation_id = invocation_id
652+
.parse::<InvocationId>()
653+
.map_err(|e| InvalidFieldError("invocation_id", e.to_string()))?;
654+
655+
match state
656+
.invocation_client
657+
.reset_invocation(
658+
PartitionProcessorRpcRequestId::new(),
659+
invocation_id,
660+
TruncateFrom::EntryIndex { entry_index: truncate_from.unwrap_or(1) },
661+
previous_attempt_retention,
662+
apply_to_child_calls.unwrap_or_default().into(),
663+
apply_to_pinned_deployment.unwrap_or_default().into(),
664+
)
665+
.await
666+
.map_err(InvocationClientError)?
667+
{
668+
client::ResetInvocationResponse::Ok { new_epoch } => Ok(ResetInvocationResponse {
669+
new_invocation_epoch: new_epoch,
670+
}
671+
.into()),
672+
673+
client::ResetInvocationResponse::NotFound => {
674+
Err(InvocationNotFoundError(invocation_id.to_string()))?
675+
}
676+
client::ResetInvocationResponse::Unsupported => {
677+
Err(ResetInvocationUnsupportedError(invocation_id.to_string()))?
678+
}
679+
client::ResetInvocationResponse::NotRunning => {
680+
Err( ResetInvocationNotRunningError(invocation_id.to_string()))?
681+
}
682+
client::ResetInvocationResponse::BadIndex => {
683+
Err(InvalidFieldError("truncate_from", "The index MUST correspond to a command entry or to a signal notification, and it cannot be zero.".to_owned()))?
684+
}
685+
}
686+
}

crates/admin/src/rest_api/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@ where
106106
"/invocations/:invocation_id/restart",
107107
patch(openapi_handler!(invocations::restart_invocation)),
108108
)
109+
.route(
110+
"/invocations/:invocation_id/reset",
111+
patch(openapi_handler!(invocations::reset_invocation)),
112+
)
109113
.route(
110114
"/subscriptions",
111115
post(openapi_handler!(subscriptions::create_subscription)),

crates/core/src/worker_api/partition_processor_rpc_client.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@ use restate_types::identifiers::{
2121
use restate_types::invocation::client::{
2222
AttachInvocationResponse, CancelInvocationResponse, GetInvocationOutputResponse,
2323
InvocationClient, InvocationClientError, InvocationOutput, KillInvocationResponse,
24-
PurgeInvocationResponse, RestartInvocationResponse, SubmittedInvocationNotification,
24+
PurgeInvocationResponse, ResetInvocationResponse, RestartInvocationResponse,
25+
SubmittedInvocationNotification,
26+
};
27+
use restate_types::invocation::reset::{
28+
ApplyToChildInvocations, ApplyToPinnedDeployment, TruncateFrom,
2529
};
2630
use restate_types::invocation::restart::{ApplyToWorkflowRun, IfRunning};
2731
use restate_types::invocation::{
@@ -531,4 +535,36 @@ where
531535
}
532536
})
533537
}
538+
539+
async fn reset_invocation(
540+
&self,
541+
request_id: PartitionProcessorRpcRequestId,
542+
invocation_id: InvocationId,
543+
truncate_from: TruncateFrom,
544+
previous_attempt_retention: Option<Duration>,
545+
apply_to_child_invocations: ApplyToChildInvocations,
546+
apply_to_pinned_deployment: ApplyToPinnedDeployment,
547+
) -> Result<ResetInvocationResponse, InvocationClientError> {
548+
let response = self
549+
.resolve_partition_id_and_send(
550+
request_id,
551+
PartitionProcessorRpcRequestInner::ResetInvocation {
552+
invocation_id,
553+
previous_attempt_retention,
554+
apply_to_child_calls: apply_to_child_invocations,
555+
apply_to_pinned_deployment,
556+
truncate_from,
557+
},
558+
)
559+
.await?;
560+
561+
Ok(match response {
562+
PartitionProcessorRpcResponse::ResetInvocation(reset_invocation_response) => {
563+
reset_invocation_response.into()
564+
}
565+
_ => {
566+
panic!("Expecting ResetInvocation rpc response")
567+
}
568+
})
569+
}
534570
}

crates/invoker-impl/src/input_command.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ pub(crate) struct InvokeCommand {
3333
pub(crate) enum InputCommand<SR> {
3434
Invoke(InvokeCommand),
3535
// TODO remove this when we remove journal v1
36-
// Journal V1 doesn't support epochs nor trim and restart
36+
// Journal V1 doesn't support epochs nor reset invocation
3737
Completion {
3838
partition: PartitionLeaderEpoch,
3939
invocation_id: InvocationId,

crates/partition-store/src/protobuf_types.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ pub mod v1 {
430430
deployment_id,
431431
service_protocol_version,
432432
current_invocation_epoch,
433-
trim_points,
433+
trim_points: truncation_points,
434434
waiting_for_completions,
435435
waiting_for_signal_indexes,
436436
waiting_for_signal_names,
@@ -550,8 +550,8 @@ pub mod v1 {
550550
idempotency_key: idempotency_key.map(ByteString::from),
551551
hotfix_apply_cancellation_after_deployment_is_pinned,
552552
current_invocation_epoch,
553-
completion_range_epoch_map: CompletionRangeEpochMap::from_trim_points(
554-
trim_points.into_iter().map(|trim_point|(trim_point.completion_id, trim_point.invocation_epoch))
553+
completion_range_epoch_map: CompletionRangeEpochMap::from_truncation_points(
554+
truncation_points.into_iter().map(|trim_point|(trim_point.completion_id, trim_point.invocation_epoch))
555555
),
556556
},
557557
))
@@ -584,8 +584,8 @@ pub mod v1 {
584584
idempotency_key: idempotency_key.map(ByteString::from),
585585
hotfix_apply_cancellation_after_deployment_is_pinned,
586586
current_invocation_epoch,
587-
completion_range_epoch_map: CompletionRangeEpochMap::from_trim_points(
588-
trim_points.into_iter().map(|trim_point|(trim_point.completion_id, trim_point.invocation_epoch))
587+
completion_range_epoch_map: CompletionRangeEpochMap::from_truncation_points(
588+
truncation_points.into_iter().map(|trim_point|(trim_point.completion_id, trim_point.invocation_epoch))
589589
),
590590
},
591591
waiting_for_notifications: waiting_for_completions
@@ -633,8 +633,8 @@ pub mod v1 {
633633
deployment_id,
634634
service_protocol_version,
635635
)?,
636-
completion_range_epoch_map: CompletionRangeEpochMap::from_trim_points(
637-
trim_points.into_iter().map(|trim_point|(trim_point.completion_id, trim_point.invocation_epoch))
636+
completion_range_epoch_map: CompletionRangeEpochMap::from_truncation_points(
637+
truncation_points.into_iter().map(|trim_point|(trim_point.completion_id, trim_point.invocation_epoch))
638638
),
639639
},
640640
))
@@ -842,7 +842,7 @@ pub mod v1 {
842842
result: None,
843843
hotfix_apply_cancellation_after_deployment_is_pinned,
844844
current_invocation_epoch,
845-
trim_points: completion_range_epoch_map.into_trim_points_iter().into_iter().map(|(completion_id, invocation_epoch)| JournalTrimPoint {
845+
trim_points: completion_range_epoch_map.into_truncation_points_iter().into_iter().map(|(completion_id, invocation_epoch)| JournalTrimPoint {
846846
completion_id,
847847
invocation_epoch,
848848
}).collect(),
@@ -938,7 +938,7 @@ pub mod v1 {
938938
result: None,
939939
hotfix_apply_cancellation_after_deployment_is_pinned,
940940
current_invocation_epoch,
941-
trim_points: completion_range_epoch_map.into_trim_points_iter().into_iter().map(|(completion_id, invocation_epoch)| JournalTrimPoint {
941+
trim_points: completion_range_epoch_map.into_truncation_points_iter().into_iter().map(|(completion_id, invocation_epoch)| JournalTrimPoint {
942942
completion_id,
943943
invocation_epoch,
944944
}).collect(),

crates/partition-store/src/tests/invocation_status_table_test/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ fn invoked_status(invocation_target: InvocationTarget) -> InvocationStatus {
9999
idempotency_key: None,
100100
hotfix_apply_cancellation_after_deployment_is_pinned: false,
101101
current_invocation_epoch: 1,
102-
completion_range_epoch_map: CompletionRangeEpochMap::from_trim_points([(5, 1)]),
102+
completion_range_epoch_map: CompletionRangeEpochMap::from_truncation_points([(5, 1)]),
103103
})
104104
}
105105

@@ -126,7 +126,7 @@ fn suspended_status(invocation_target: InvocationTarget) -> InvocationStatus {
126126
idempotency_key: None,
127127
hotfix_apply_cancellation_after_deployment_is_pinned: false,
128128
current_invocation_epoch: 1,
129-
completion_range_epoch_map: CompletionRangeEpochMap::from_trim_points([(5, 1)]),
129+
completion_range_epoch_map: CompletionRangeEpochMap::from_truncation_points([(5, 1)]),
130130
},
131131
waiting_for_notifications: HashSet::default(),
132132
}

0 commit comments

Comments
 (0)