Skip to content

Commit 0fbc386

Browse files
authored
Include transient and speculative WFT events in GetWorkflowExecutionHistoryResponse (#9325)
## What changed? Re-does #9138 which was incidentally merged. Include transient and speculative WFT events in `GetWorkflowExecutionHistoryReponse` response, unless UI or CLI made request. * Adds `transient_or_speculative_events` back to `GetMutableStateResponse` * Reserve `transient_workflow_task` in `HisotryCOntinuation` token * Add validation helpers * Add query-compare-query for transient events at request start and end Re-implements #7732 ## Why? Fix "premature end of stream" errors when workers request history after cache eviction w/ transient/speculative workflow tasks present. This adds transient & speculative WFT events in `GetWorkflowExecutionHistory` (already in `PollWorkflowTask`). Worker cache eviction w/ speculative workflow tasks causes the expected and actual event counts to be different. #7732 passed transient events through continuation tokens, which could become stale during pagination. This PR implements mutable state querying at both start and end of pagination and compares transient event IDs to detect if WFT state changed during pagination and return a retryable error. ## How did you test it? - [X] built - [X] run locally and tested manually - [X] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s) ## Potential risks Same risks from #7732
1 parent 42a0024 commit 0fbc386

File tree

22 files changed

+981
-414
lines changed

22 files changed

+981
-414
lines changed

api/historyservice/v1/request_response.pb.go

Lines changed: 253 additions & 241 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/token/v1/message.pb.go

Lines changed: 38 additions & 49 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs/architecture/speculative-workflow-task.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,14 @@ a special [in-memory-queue](./in-memory-queue.md) is used for speculative Workfl
4949
> #### TODO
5050
> It is important to point out that the `WorkflowTaskScheduled` and `WorkflowTaskStarted` events
5151
> for transient and speculative Workflow Task are only added to the `PollWorkflowTask` response - and
52-
> not to the `GetWorkflowExecutionHistory` response. This has an unfortunate consequence: when the
52+
> not to the `GetWorkflowExecutionHistory` response. This has an unfortunate consequence: when the
5353
> worker receives a speculative Workflow Task on a sticky task queue, but the Workflow is already
54-
> evicted from its cache, it issues a `GetWorkflowExecutionHistory` request, which returns the
55-
> history *without* speculative events. This leads to a `premature end of stream` error on the
54+
> evicted from its cache, it issues a `GetWorkflowExecutionHistory` request, which returns the
55+
> history *without* speculative events. This leads to a `premature end of stream` error on the
5656
> worker side. The worker fails the Workflow Task, clears stickiness, and everything works fine
5757
> after that - but a failed Workflow Task appears in the history. Fortunately, it doesn't happen often.
58+
>
59+
> See PR #9325 for related work on ensuring transient events are not incorrectly returned to CLI/UI clients.
5860
5961
## Speculative Workflow Task & Workflow Update
6062
Speculative Workflow Task was introduced to make it possible for Workflow Update to have zero writes

proto/internal/temporal/server/api/historyservice/v1/request_response.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,9 @@ message GetMutableStateResponse {
154154
string inherited_build_id = 23;
155155
repeated temporal.server.api.persistence.v1.VersionedTransition transition_history = 24;
156156
temporal.api.workflow.v1.WorkflowExecutionVersioningInfo versioning_info = 25;
157+
// Transient or speculative workflow task events which are not yet persisted in the history.
158+
// These events should be appended to the history when it is returned to the worker.
159+
temporal.server.api.history.v1.TransientWorkflowTaskInfo transient_or_speculative_tasks = 26;
157160
}
158161

159162
message PollMutableStateRequest {

proto/internal/temporal/server/api/token/v1/message.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ message HistoryContinuation {
1515
int64 next_event_id = 3;
1616
bool is_workflow_running = 5;
1717
bytes persistence_token = 6;
18-
temporal.server.api.history.v1.TransientWorkflowTaskInfo transient_workflow_task = 7;
18+
reserved 7; // Was: transient_workflow_task - no longer passed through continuation token
1919
bytes branch_token = 8;
2020
reserved 9;
2121
temporal.server.api.history.v1.VersionHistoryItem version_history_item = 10;

service/history/api/get_history_util.go

Lines changed: 78 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -115,24 +115,21 @@ func GetRawHistory(
115115
metrics.HistorySize.With(metricsHandler).Record(int64(size))
116116

117117
if len(nextToken) == 0 && transientWorkflowTaskInfo != nil {
118-
if err := validateTransientWorkflowTaskEvents(nextEventID, transientWorkflowTaskInfo); err != nil {
119-
logger := shardContext.GetLogger()
120-
metricsHandler := interceptor.GetMetricsHandlerFromContext(ctx, logger).WithTags(metrics.OperationTag(metrics.HistoryGetRawHistoryScope))
121-
metrics.ServiceErrIncompleteHistoryCounter.With(metricsHandler).Record(1)
122-
logger.Error("getHistory error",
123-
tag.WorkflowNamespaceID(namespaceID.String()),
124-
tag.WorkflowID(execution.GetWorkflowId()),
125-
tag.WorkflowRunID(execution.GetRunId()),
126-
tag.Error(err))
127-
return nil, nil, err
128-
}
129-
130-
if len(transientWorkflowTaskInfo.HistorySuffix) > 0 {
131-
blob, err := shardContext.GetPayloadSerializer().SerializeEvents(transientWorkflowTaskInfo.HistorySuffix)
132-
if err != nil {
133-
return nil, nil, err
118+
// Check if we should include transient/speculative events
119+
if shouldIncludeTransientOrSpeculativeTasks(ctx, transientWorkflowTaskInfo) {
120+
if err := ValidateTransientWorkflowTaskEvents(nextEventID, transientWorkflowTaskInfo); err != nil {
121+
logger := shardContext.GetLogger()
122+
metricsHandler := interceptor.GetMetricsHandlerFromContext(ctx, logger).WithTags(metrics.OperationTag(metrics.HistoryGetRawHistoryScope))
123+
metrics.ServiceErrIncompleteHistoryCounter.With(metricsHandler).Record(1)
124+
} else {
125+
if len(transientWorkflowTaskInfo.HistorySuffix) > 0 {
126+
blob, err := shardContext.GetPayloadSerializer().SerializeEvents(transientWorkflowTaskInfo.HistorySuffix)
127+
if err != nil {
128+
return nil, nil, err
129+
}
130+
rawHistory = append(rawHistory, blob)
131+
}
134132
}
135-
rawHistory = append(rawHistory, blob)
136133
}
137134
}
138135

@@ -235,16 +232,19 @@ func GetHistory(
235232
tag.Error(err))
236233
}
237234
if len(nextPageToken) == 0 && transientWorkflowTaskInfo != nil {
238-
if err := validateTransientWorkflowTaskEvents(nextEventID, transientWorkflowTaskInfo); err != nil {
239-
metrics.ServiceErrIncompleteHistoryCounter.With(metricsHandler).Record(1)
240-
logger.Error("getHistory error",
241-
tag.WorkflowNamespaceID(namespaceID.String()),
242-
tag.WorkflowID(execution.GetWorkflowId()),
243-
tag.WorkflowRunID(execution.GetRunId()),
244-
tag.Error(err))
235+
// Check if we should include transient/speculative events
236+
if shouldIncludeTransientOrSpeculativeTasks(ctx, transientWorkflowTaskInfo) {
237+
if err := ValidateTransientWorkflowTaskEvents(nextEventID, transientWorkflowTaskInfo); err != nil {
238+
metrics.ServiceErrIncompleteHistoryCounter.With(metricsHandler).Record(1)
239+
// Don't append events, but don't fail request
240+
} else {
241+
if len(transientWorkflowTaskInfo.HistorySuffix) > 0 {
242+
// Validation passed, append events
243+
// Append the transient workflow task events once we are done enumerating everything from the events table
244+
historyEvents = append(historyEvents, transientWorkflowTaskInfo.HistorySuffix...)
245+
}
246+
}
245247
}
246-
// Append the transient workflow task events once we are done enumerating everything from the events table
247-
historyEvents = append(historyEvents, transientWorkflowTaskInfo.HistorySuffix...)
248248
}
249249

250250
if err := ProcessOutgoingSearchAttributes(
@@ -386,7 +386,58 @@ func ProcessOutgoingSearchAttributes(
386386
return nil
387387
}
388388

389-
func validateTransientWorkflowTaskEvents(
389+
// shouldIncludeTransientOrSpeculativeTasks determines if transient/speculative events should be included.
390+
// This function is called only when on the last page of history pagination (nextToken is empty).
391+
func shouldIncludeTransientOrSpeculativeTasks(
392+
ctx context.Context,
393+
tranOrSpecEvents *historyspb.TransientWorkflowTaskInfo,
394+
) bool {
395+
return len(tranOrSpecEvents.GetHistorySuffix()) > 0 &&
396+
clientSupportsTranOrSpecEvents(ctx) &&
397+
areValidTransientOrSpecEvents(tranOrSpecEvents)
398+
}
399+
400+
func areValidTransientOrSpecEvents(tranOrSpecEvents *historyspb.TransientWorkflowTaskInfo) bool {
401+
events := tranOrSpecEvents.GetHistorySuffix()
402+
if len(events) == 0 || len(events) > 2 {
403+
return false
404+
}
405+
406+
// First must be WFT_SCHEDULED
407+
if events[0].GetEventType() != enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED {
408+
return false
409+
}
410+
411+
// If 2 events, second must be WFT_STARTED immediately after
412+
if len(events) == 2 {
413+
if events[1].GetEventType() != enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED {
414+
return false
415+
}
416+
if events[1].GetEventId() != events[0].GetEventId()+1 {
417+
return false
418+
}
419+
}
420+
421+
return true
422+
}
423+
424+
// clientSupportsTranOrSpecEvents detects if client supports transient events
425+
// Default to include transient events for clients, only CLI and UI are
426+
// explicitly excluded for backward compatability
427+
func clientSupportsTranOrSpecEvents(ctx context.Context) bool {
428+
clientName, _ := headers.GetClientNameAndVersion(ctx)
429+
430+
switch clientName {
431+
case headers.ClientNameCLI, headers.ClientNameUI:
432+
return false
433+
default:
434+
return true
435+
}
436+
}
437+
438+
// ValidateTransientWorkflowTaskEvents validates that transient workflow task events have sequential event IDs
439+
// starting from the given offset. Returns an error if any event ID doesn't match the expected sequence.
440+
func ValidateTransientWorkflowTaskEvents(
390441
eventIDOffset int64,
391442
transientWorkflowTaskInfo *historyspb.TransientWorkflowTaskInfo,
392443
) error {

0 commit comments

Comments
 (0)