Skip to content

Commit be0b332

Browse files
Make sure we don't send the awakeable completion twice (#1973)
Fix #1972
1 parent c17f82b commit be0b332

File tree

2 files changed

+95
-72
lines changed

2 files changed

+95
-72
lines changed

crates/worker/src/partition/state_machine/mod.rs

Lines changed: 75 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ use std::marker::PhantomData;
8989
use std::ops::RangeInclusive;
9090
use std::time::Duration;
9191
use std::time::Instant;
92+
use tracing::error;
9293
use utils::SpanExt;
9394

9495
pub struct StateMachine<Codec> {
@@ -1847,7 +1848,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
18471848
.unwrap_or(CompletionResult::Empty);
18481849
Codec::write_completion(&mut journal_entry, completion_result.clone())?;
18491850

1850-
Self::do_forward_completion(
1851+
Self::forward_completion(
18511852
ctx,
18521853
invocation_id,
18531854
Completion::new(entry_index, completion_result),
@@ -1857,7 +1858,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
18571858
"Trying to process entry {} for a target that has no state",
18581859
journal_entry.header().as_entry_type()
18591860
);
1860-
Self::do_forward_completion(
1861+
Self::forward_completion(
18611862
ctx,
18621863
invocation_id,
18631864
Completion::new(entry_index, CompletionResult::Empty),
@@ -1976,7 +1977,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
19761977
Codec::write_completion(&mut journal_entry, completion_result.clone())?;
19771978

19781979
// We can already forward the completion
1979-
Self::do_forward_completion(
1980+
Self::forward_completion(
19801981
ctx,
19811982
invocation_id,
19821983
Completion::new(entry_index, completion_result),
@@ -2008,7 +2009,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
20082009
)?;
20092010

20102011
// Forward completion
2011-
Self::do_forward_completion(
2012+
Self::forward_completion(
20122013
ctx,
20132014
invocation_id,
20142015
Completion::new(entry_index, completion_result),
@@ -2047,7 +2048,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
20472048
"Trying to process entry {} for a target that has no promises",
20482049
journal_entry.header().as_entry_type()
20492050
);
2050-
Self::do_forward_completion(
2051+
Self::forward_completion(
20512052
ctx,
20522053
invocation_id,
20532054
Completion::new(entry_index, CompletionResult::Success(Bytes::new())),
@@ -2078,7 +2079,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
20782079
Codec::write_completion(&mut journal_entry, completion_result.clone())?;
20792080

20802081
// Forward completion
2081-
Self::do_forward_completion(
2082+
Self::forward_completion(
20822083
ctx,
20832084
invocation_id,
20842085
Completion::new(entry_index, completion_result),
@@ -2088,7 +2089,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
20882089
"Trying to process entry {} for a target that has no promises",
20892090
journal_entry.header().as_entry_type()
20902091
);
2091-
Self::do_forward_completion(
2092+
Self::forward_completion(
20922093
ctx,
20932094
invocation_id,
20942095
Completion::new(entry_index, CompletionResult::Empty),
@@ -2165,7 +2166,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
21652166
Codec::write_completion(&mut journal_entry, completion_result.clone())?;
21662167

21672168
// Forward completion
2168-
Self::do_forward_completion(
2169+
Self::forward_completion(
21692170
ctx,
21702171
invocation_id,
21712172
Completion::new(entry_index, completion_result),
@@ -2175,7 +2176,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
21752176
"Trying to process entry {} for a target that has no promises",
21762177
journal_entry.header().as_entry_type()
21772178
);
2178-
Self::do_forward_completion(
2179+
Self::forward_completion(
21792180
ctx,
21802181
invocation_id,
21812182
Completion::new(entry_index, CompletionResult::Empty),
@@ -2337,7 +2338,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
23372338
{
23382339
Codec::write_completion(&mut journal_entry, completion_result.clone())?;
23392340

2340-
Self::do_forward_completion(
2341+
Self::forward_completion(
23412342
ctx,
23422343
invocation_id,
23432344
Completion::new(entry_index, completion_result),
@@ -2433,7 +2434,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
24332434
waiting_for_completed_entries: &HashSet<EntryIndex>,
24342435
) -> Result<bool, Error> {
24352436
let resume_invocation = waiting_for_completed_entries.contains(&completion.entry_index);
2436-
Self::do_store_completion(ctx, invocation_id, completion).await?;
2437+
Self::store_completion(ctx, invocation_id, completion).await?;
24372438

24382439
Ok(resume_invocation)
24392440
}
@@ -2443,8 +2444,9 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
24432444
invocation_id: InvocationId,
24442445
completion: Completion,
24452446
) -> Result<(), Error> {
2446-
Self::do_store_completion(ctx, invocation_id, completion.clone()).await?;
2447-
Self::do_forward_completion(ctx, invocation_id, completion);
2447+
if Self::store_completion(ctx, invocation_id, completion.clone()).await? {
2448+
Self::forward_completion(ctx, invocation_id, completion);
2449+
}
24482450
Ok(())
24492451
}
24502452

@@ -3162,35 +3164,88 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
31623164
Ok(())
31633165
}
31643166

3165-
async fn do_store_completion<State: JournalTable>(
3167+
/// Returns `true` if the completion should be forwarded.
3168+
async fn store_completion<State: JournalTable>(
31663169
ctx: &mut StateMachineApplyContext<'_, State>,
31673170
invocation_id: InvocationId,
31683171
Completion {
31693172
entry_index,
31703173
result,
31713174
}: Completion,
3172-
) -> Result<(), Error> {
3175+
) -> Result<bool, Error> {
31733176
debug_if_leader!(
31743177
ctx.is_leader,
31753178
restate.journal.index = entry_index,
3176-
"Effect: Store completion {}",
3179+
"Store completion {}",
31773180
CompletionResultFmt(&result)
31783181
);
31793182

3180-
Self::store_completion(ctx.storage, &invocation_id, entry_index, result).await?;
3183+
if let Some(mut journal_entry) = ctx
3184+
.storage
3185+
.get_journal_entry(&invocation_id, entry_index)
3186+
.await?
3187+
.and_then(|journal_entry| match journal_entry {
3188+
JournalEntry::Entry(entry) => Some(entry),
3189+
JournalEntry::Completion(_) => None,
3190+
})
3191+
{
3192+
if journal_entry.ty() == EntryType::Awakeable
3193+
&& journal_entry.header().is_completed() == Some(true)
3194+
{
3195+
// We can ignore when we get an awakeable completion twice as they might be a result of
3196+
// some request being retried from the ingress to complete the awakeable.
3197+
// We'll use only the first completion, because changing the awakeable result
3198+
// after it has been completed for the first time can cause non-deterministic execution.
3199+
warn!(
3200+
restate.invocation.id = %invocation_id,
3201+
restate.journal.index = entry_index,
3202+
"Trying to complete an awakeable already completed. Ignoring this completion");
3203+
debug!("Discarded awakeable completion: {:?}", result);
3204+
return Ok(false);
3205+
}
3206+
if journal_entry.header().is_completed() == Some(true) {
3207+
// We use error level here as this can happen only in case there is some bug
3208+
// in the Partition Processor/Invoker.
3209+
error!(
3210+
restate.invocation.id = %invocation_id,
3211+
restate.journal.index = entry_index,
3212+
"Trying to complete the entry {:?}, but it's already completed. This is a bug.",
3213+
journal_entry.ty());
3214+
return Ok(false);
3215+
}
31813216

3182-
Ok(())
3217+
Codec::write_completion(&mut journal_entry, result)?;
3218+
ctx.storage
3219+
.put_journal_entry(
3220+
&invocation_id,
3221+
entry_index,
3222+
&JournalEntry::Entry(journal_entry),
3223+
)
3224+
.await;
3225+
Ok(true)
3226+
} else {
3227+
// In case we don't have the journal entry (only awakeables case),
3228+
// we'll send the completion afterward once we receive the entry.
3229+
ctx.storage
3230+
.put_journal_entry(
3231+
&invocation_id,
3232+
entry_index,
3233+
&JournalEntry::Completion(result),
3234+
)
3235+
.await;
3236+
Ok(false)
3237+
}
31833238
}
31843239

3185-
fn do_forward_completion<State>(
3240+
fn forward_completion<State>(
31863241
ctx: &mut StateMachineApplyContext<'_, State>,
31873242
invocation_id: InvocationId,
31883243
completion: Completion,
31893244
) {
31903245
debug_if_leader!(
31913246
ctx.is_leader,
31923247
restate.journal.index = completion.entry_index,
3193-
"Effect: Forward completion {} to deployment",
3248+
"Forward completion {} to deployment",
31943249
CompletionResultFmt(&completion.result)
31953250
);
31963251

@@ -3357,58 +3412,6 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
33573412

33583413
Ok(())
33593414
}
3360-
3361-
/// Stores the given completion. Returns `true` if an [`RawEntry`] was completed.
3362-
async fn store_completion<State: JournalTable>(
3363-
state_storage: &mut State,
3364-
invocation_id: &InvocationId,
3365-
entry_index: EntryIndex,
3366-
completion_result: CompletionResult,
3367-
) -> Result<bool, Error> {
3368-
if let Some(mut journal_entry) = state_storage
3369-
.get_journal_entry(invocation_id, entry_index)
3370-
.await?
3371-
.and_then(|journal_entry| match journal_entry {
3372-
JournalEntry::Entry(entry) => Some(entry),
3373-
JournalEntry::Completion(_) => None,
3374-
})
3375-
{
3376-
if journal_entry.ty() == EntryType::Awakeable
3377-
&& journal_entry.header().is_completed() == Some(true)
3378-
{
3379-
// We can ignore when we get an awakeable completion twice as they might be a result of
3380-
// some request being retried from the ingress to complete the awakeable.
3381-
// We'll use only the first completion, because changing the awakeable result
3382-
// after it has been completed for the first time can cause non-deterministic execution.
3383-
warn!(
3384-
restate.invocation.id = %invocation_id,
3385-
restate.journal.index = entry_index,
3386-
"Trying to complete an awakeable already completed. Ignoring this completion");
3387-
debug!("Discarded awakeable completion: {:?}", completion_result);
3388-
return Ok(false);
3389-
}
3390-
Codec::write_completion(&mut journal_entry, completion_result)?;
3391-
state_storage
3392-
.put_journal_entry(
3393-
invocation_id,
3394-
entry_index,
3395-
&JournalEntry::Entry(journal_entry),
3396-
)
3397-
.await;
3398-
Ok(true)
3399-
} else {
3400-
// In case we don't have the journal entry (only awakeables case),
3401-
// we'll send the completion afterward once we receive the entry.
3402-
state_storage
3403-
.put_journal_entry(
3404-
invocation_id,
3405-
entry_index,
3406-
&JournalEntry::Completion(completion_result),
3407-
)
3408-
.await;
3409-
Ok(false)
3410-
}
3411-
}
34123415
}
34133416

34143417
// To write completions in the effects log

crates/worker/src/partition/state_machine/tests/mod.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,26 @@ async fn awakeable_completion_received_before_entry() -> TestResult {
292292
)))
293293
);
294294

295+
// If we try to send the completion again, it should not be forwarded!
296+
297+
let actions = test_env
298+
.apply(Command::InvocationResponse(InvocationResponse {
299+
id: invocation_id,
300+
entry_index: 1,
301+
result: ResponseResult::Success(Bytes::default()),
302+
}))
303+
.await;
304+
assert_that!(
305+
actions,
306+
not(contains(pat!(Action::ForwardCompletion {
307+
invocation_id: eq(invocation_id),
308+
completion: eq(Completion::new(
309+
1,
310+
CompletionResult::Success(Bytes::default())
311+
))
312+
})))
313+
);
314+
295315
let actions = test_env
296316
.apply(Command::InvokerEffect(InvokerEffect {
297317
invocation_id,

0 commit comments

Comments
 (0)