Skip to content

Commit 6b1c5f7

Browse files
[Scheduled Actions] Nexus-based workflow completions (watcher) (#8424)
- Adds the Nexus-based workflow watcher to Scheduler V2. - Scheduler V2 now maintains a mapping of request IDs -> workflow IDs, recorded within the same transaction as the buffered start, that can be used to identify workflow completion results. - Workflow ID generation is moved from within `startWorkflow` to within `SpecProcessor` (earlier, but identical algorithm/results), so that it can be recorded at the same time as the BufferedStart. - `LastCompletionState` is added as a CHASM field (separate document) to keep track of payload results. - A Nexus completion callback is attached to workflows started via Invoker. --------- Co-authored-by: Roey Berman <[email protected]>
1 parent 051fb08 commit 6b1c5f7

21 files changed

+1211
-102
lines changed

api/schedule/v1/message.pb.go

Lines changed: 21 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

chasm/lib/scheduler/backfiller_tasks.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ func (b *BackfillerTaskExecutor) processBackfill(
149149
startTime,
150150
endTime,
151151
request.GetOverlapPolicy(),
152+
scheduler.WorkflowID(),
152153
backfiller.GetBackfillId(),
153154
true,
154155
&limit,

chasm/lib/scheduler/backfiller_tasks_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"go.temporal.io/server/chasm/lib/scheduler"
1313
"go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1"
1414
"go.temporal.io/server/common/metrics"
15-
"go.temporal.io/server/service/history/tasks"
1615
"google.golang.org/protobuf/types/known/timestamppb"
1716
)
1817

@@ -241,7 +240,6 @@ func (s *backfillerTasksSuite) runTestCase(c *backfillTestCase) {
241240
// Either type of request will spawn a Backfiller and schedule an immediate pure task.
242241
_, err = s.node.CloseTransaction()
243242
s.NoError(err)
244-
s.True(s.hasTask(&tasks.ChasmTaskPure{}, chasm.TaskScheduledTimeImmediate))
245243

246244
// Run a backfill task.
247245
err = s.executor.Execute(ctx, backfiller, chasm.TaskAttributes{}, &schedulerpb.BackfillerTask{})
@@ -268,6 +266,11 @@ func (s *backfillerTasksSuite) runTestCase(c *backfillTestCase) {
268266
// Validate BufferedStarts. More detailed validation must be done in the callbacks.
269267
s.Equal(c.ExpectedBufferedStarts, len(invoker.GetBufferedStarts()))
270268

269+
// Validate RequestId -> WorkflowId mapping
270+
for _, start := range invoker.GetBufferedStarts() {
271+
s.Equal(start.WorkflowId, invoker.WorkflowID(start.RequestId))
272+
}
273+
271274
// Callbacks.
272275
if c.ValidateInvoker != nil {
273276
c.ValidateInvoker(invoker)

chasm/lib/scheduler/export_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package scheduler
2+
3+
import (
4+
"time"
5+
6+
enumspb "go.temporal.io/api/enums/v1"
7+
"go.temporal.io/server/chasm"
8+
)
9+
10+
// Export unexported methods for testing.
11+
12+
func (s *Scheduler) RecordCompletedAction(
13+
ctx chasm.MutableContext,
14+
scheduleTime time.Time,
15+
workflowID string,
16+
workflowStatus enumspb.WorkflowExecutionStatus,
17+
) {
18+
s.recordCompletedAction(ctx, scheduleTime, workflowID, workflowStatus)
19+
}

chasm/lib/scheduler/gen/schedulerpb/v1/message.go-helpers.pb.go

Lines changed: 37 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)