Skip to content

Commit 5e4f8f3

Browse files
spkane31bergundy
andauthored
chasm(callback): migrate state machine transitions (#8520)
## What changed? Migrating the `component/callbacks/statemachine{_test}.go` to `chasm/lib/callback` using the `CHASM` framework ## Why? Part 2 of X for migrating callback to CHASM. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [X] added new unit test(s) - [ ] added new functional test(s) ## Potential risks None --------- Co-authored-by: Roey Berman <[email protected]>
1 parent 6f4ed06 commit 5e4f8f3

File tree

4 files changed

+262
-7
lines changed

4 files changed

+262
-7
lines changed

chasm/lib/callback/component.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package callback
22

33
import (
4+
"time"
5+
46
"go.temporal.io/server/chasm"
57
callbackspb "go.temporal.io/server/chasm/lib/callback/gen/callbackpb/v1"
68
"google.golang.org/protobuf/types/known/timestamppb"
@@ -15,8 +17,6 @@ type Callback struct {
1517

1618
// Persisted internal state
1719
*callbackspb.CallbackState
18-
19-
status *callbackspb.CallbackStatus
2020
}
2121

2222
func NewCallback(
@@ -39,10 +39,15 @@ func (c *Callback) LifecycleState(_ chasm.Context) chasm.LifecycleState {
3939
return chasm.LifecycleStateRunning
4040
}
4141

42-
func (c *Callback) State() *callbackspb.CallbackStatus {
43-
return c.status
42+
func (c *Callback) State() callbackspb.CallbackStatus {
43+
return c.Status
44+
}
45+
46+
func (c *Callback) SetState(status callbackspb.CallbackStatus) {
47+
c.Status = status
4448
}
4549

46-
func (c *Callback) SetState(status *callbackspb.CallbackStatus) {
47-
c.status = status
50+
func (c *Callback) recordAttempt(ts time.Time) {
51+
c.Attempt++
52+
c.LastAttemptCompleteTime = timestamppb.New(ts)
4853
}

chasm/lib/callback/proto/v1/message.proto

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ enum CallbackStatus {
5252
CALLBACK_STATUS_SUCCEEDED = 5;
5353
}
5454

55-
5655
message Callback {
5756
message Nexus {
5857
// Callback URL.

chasm/lib/callback/statemachine.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package callback
2+
3+
import (
4+
"time"
5+
6+
failurepb "go.temporal.io/api/failure/v1"
7+
"go.temporal.io/server/chasm"
8+
callbackspb "go.temporal.io/server/chasm/lib/callback/gen/callbackpb/v1"
9+
"go.temporal.io/server/common/backoff"
10+
"google.golang.org/protobuf/types/known/timestamppb"
11+
)
12+
13+
// EventScheduled is triggered when the callback is meant to be scheduled for the first time - when its Trigger
14+
// condition is met.
15+
type EventScheduled struct{}
16+
17+
var TransitionScheduled = chasm.NewTransition(
18+
[]callbackspb.CallbackStatus{callbackspb.CALLBACK_STATUS_STANDBY},
19+
callbackspb.CALLBACK_STATUS_SCHEDULED,
20+
func(ctx chasm.MutableContext, cb *Callback, event EventScheduled) error {
21+
ctx.AddTask(cb, chasm.TaskAttributes{}, &callbackspb.InvocationTask{})
22+
return nil
23+
},
24+
)
25+
26+
// EventRescheduled is triggered when the callback is meant to be rescheduled after backing off from a previous attempt.
27+
type EventRescheduled struct{}
28+
29+
var TransitionRescheduled = chasm.NewTransition(
30+
[]callbackspb.CallbackStatus{callbackspb.CALLBACK_STATUS_BACKING_OFF},
31+
callbackspb.CALLBACK_STATUS_SCHEDULED,
32+
func(mctx chasm.MutableContext, cb *Callback, event EventRescheduled) error {
33+
cb.NextAttemptScheduleTime = nil
34+
mctx.AddTask(cb, chasm.TaskAttributes{ScheduledTime: time.Time{}}, &callbackspb.InvocationTask{})
35+
return nil
36+
},
37+
)
38+
39+
// EventAttemptFailed is triggered when an attempt is failed with a retryable error.
40+
type EventAttemptFailed struct {
41+
Time time.Time
42+
Err error
43+
RetryPolicy backoff.RetryPolicy
44+
}
45+
46+
var TransitionAttemptFailed = chasm.NewTransition(
47+
[]callbackspb.CallbackStatus{callbackspb.CALLBACK_STATUS_SCHEDULED},
48+
callbackspb.CALLBACK_STATUS_BACKING_OFF,
49+
func(mctx chasm.MutableContext, cb *Callback, event EventAttemptFailed) error {
50+
cb.recordAttempt(event.Time)
51+
// Use 0 for elapsed time as we don't limit the retry by time (for now).
52+
nextDelay := event.RetryPolicy.ComputeNextDelay(0, int(cb.Attempt), event.Err)
53+
nextAttemptScheduleTime := event.Time.Add(nextDelay)
54+
cb.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime)
55+
cb.LastAttemptFailure = &failurepb.Failure{
56+
Message: event.Err.Error(),
57+
FailureInfo: &failurepb.Failure_ApplicationFailureInfo{
58+
ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
59+
NonRetryable: false,
60+
},
61+
},
62+
}
63+
mctx.AddTask(cb, chasm.TaskAttributes{ScheduledTime: time.Time{}}, &callbackspb.InvocationTask{})
64+
return nil
65+
},
66+
)
67+
68+
// EventFailed is triggered when an attempt is failed with a non retryable error.
69+
type EventFailed struct {
70+
Time time.Time
71+
Err error
72+
}
73+
74+
var TransitionFailed = chasm.NewTransition(
75+
[]callbackspb.CallbackStatus{callbackspb.CALLBACK_STATUS_SCHEDULED},
76+
callbackspb.CALLBACK_STATUS_FAILED,
77+
func(mctx chasm.MutableContext, cb *Callback, event EventFailed) error {
78+
cb.recordAttempt(event.Time)
79+
cb.LastAttemptFailure = &failurepb.Failure{
80+
Message: event.Err.Error(),
81+
FailureInfo: &failurepb.Failure_ApplicationFailureInfo{
82+
ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
83+
NonRetryable: true,
84+
},
85+
},
86+
}
87+
mctx.AddTask(cb, chasm.TaskAttributes{ScheduledTime: time.Time{}}, &callbackspb.InvocationTask{})
88+
return nil
89+
},
90+
)
91+
92+
// EventSucceeded is triggered when an attempt succeeds.
93+
type EventSucceeded struct {
94+
Time time.Time
95+
}
96+
97+
var TransitionSucceeded = chasm.NewTransition(
98+
[]callbackspb.CallbackStatus{callbackspb.CALLBACK_STATUS_SCHEDULED},
99+
callbackspb.CALLBACK_STATUS_SUCCEEDED,
100+
func(mctx chasm.MutableContext, cb *Callback, event EventSucceeded) error {
101+
cb.recordAttempt(event.Time)
102+
cb.LastAttemptFailure = nil
103+
mctx.AddTask(cb, chasm.TaskAttributes{}, &callbackspb.InvocationTask{})
104+
return nil
105+
},
106+
)
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
package callback
2+
3+
import (
4+
"errors"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/require"
9+
"go.temporal.io/server/chasm"
10+
callbackspb "go.temporal.io/server/chasm/lib/callback/gen/callbackpb/v1"
11+
"go.temporal.io/server/common/backoff"
12+
"go.uber.org/mock/gomock"
13+
"google.golang.org/protobuf/proto"
14+
)
15+
16+
// testMutableContext is a minimal test helper for capturing tasks
17+
type testMutableContext struct {
18+
chasm.MutableContext
19+
tasks []testTask
20+
}
21+
22+
func (c *testMutableContext) AddTask(component chasm.Component, attributes chasm.TaskAttributes, payload any) {
23+
c.tasks = append(c.tasks, testTask{component, attributes, payload})
24+
}
25+
26+
func (c *testMutableContext) Now(_ chasm.Component) time.Time {
27+
return time.Now()
28+
}
29+
30+
func (c *testMutableContext) Ref(_ chasm.Component) ([]byte, error) {
31+
return nil, nil
32+
}
33+
34+
type testTask struct {
35+
component chasm.Component
36+
attributes chasm.TaskAttributes
37+
payload any
38+
}
39+
40+
func newTestMutableContext(t *testing.T) *testMutableContext {
41+
return &testMutableContext{
42+
MutableContext: chasm.NewMockMutableContext(
43+
gomock.NewController(t),
44+
),
45+
}
46+
}
47+
48+
func TestValidTransitions(t *testing.T) {
49+
// Setup
50+
currentTime := time.Now().UTC()
51+
callback := &Callback{
52+
CallbackState: &callbackspb.CallbackState{
53+
Callback: &callbackspb.Callback{
54+
Variant: &callbackspb.Callback_Nexus_{
55+
Nexus: &callbackspb.Callback_Nexus{
56+
Url: "http://address:666/path/to/callback?query=string",
57+
},
58+
},
59+
},
60+
},
61+
}
62+
callback.SetState(callbackspb.CALLBACK_STATUS_SCHEDULED)
63+
64+
// AttemptFailed
65+
mctx := newTestMutableContext(t)
66+
err := TransitionAttemptFailed.Apply(mctx, callback, EventAttemptFailed{
67+
Time: currentTime,
68+
Err: errors.New("test"),
69+
RetryPolicy: backoff.NewExponentialRetryPolicy(time.Second),
70+
})
71+
require.NoError(t, err)
72+
73+
// Assert info object is updated
74+
require.Equal(t, callbackspb.CALLBACK_STATUS_BACKING_OFF, callback.State())
75+
require.Equal(t, int32(1), callback.Attempt)
76+
require.Equal(t, "test", callback.LastAttemptFailure.Message)
77+
require.False(t, callback.LastAttemptFailure.GetApplicationFailureInfo().NonRetryable)
78+
require.Equal(t, currentTime, callback.LastAttemptCompleteTime.AsTime())
79+
dt := currentTime.Add(time.Second).Sub(callback.NextAttemptScheduleTime.AsTime())
80+
require.Less(t, dt, time.Millisecond*200)
81+
82+
// Assert backoff task is generated
83+
require.Len(t, mctx.tasks, 1)
84+
require.IsType(t, &callbackspb.InvocationTask{}, mctx.tasks[0].payload)
85+
86+
// Rescheduled
87+
mctx = newTestMutableContext(t)
88+
err = TransitionRescheduled.Apply(mctx, callback, EventRescheduled{})
89+
require.NoError(t, err)
90+
91+
// Assert info object is updated only where needed
92+
require.Equal(t, callbackspb.CALLBACK_STATUS_SCHEDULED, callback.State())
93+
require.Equal(t, int32(1), callback.Attempt)
94+
require.Equal(t, "test", callback.LastAttemptFailure.Message)
95+
// Remains unmodified
96+
require.Equal(t, currentTime, callback.LastAttemptCompleteTime.AsTime())
97+
require.Nil(t, callback.NextAttemptScheduleTime)
98+
99+
// Assert callback task is generated
100+
require.Len(t, mctx.tasks, 1)
101+
require.IsType(t, &callbackspb.InvocationTask{}, mctx.tasks[0].payload)
102+
103+
// Store the pre-succeeded state to test Failed later
104+
dup := &Callback{
105+
CallbackState: proto.Clone(callback.CallbackState).(*callbackspb.CallbackState),
106+
}
107+
dup.Status = callback.State()
108+
109+
// Succeeded
110+
currentTime = currentTime.Add(time.Second)
111+
mctx = newTestMutableContext(t)
112+
err = TransitionSucceeded.Apply(mctx, callback, EventSucceeded{Time: currentTime})
113+
require.NoError(t, err)
114+
115+
// Assert info object is updated only where needed
116+
require.Equal(t, callbackspb.CALLBACK_STATUS_SUCCEEDED, callback.State())
117+
require.Equal(t, int32(2), callback.Attempt)
118+
require.Nil(t, callback.LastAttemptFailure)
119+
require.Equal(t, currentTime, callback.LastAttemptCompleteTime.AsTime())
120+
require.Nil(t, callback.NextAttemptScheduleTime)
121+
122+
// Assert task is generated (success transitions also add tasks in chasm)
123+
require.Len(t, mctx.tasks, 1)
124+
125+
// Reset back to scheduled
126+
callback = dup
127+
// Increment the time to ensure it's updated in the transition
128+
currentTime = currentTime.Add(time.Second)
129+
130+
// failed
131+
mctx = newTestMutableContext(t)
132+
err = TransitionFailed.Apply(mctx, callback, EventFailed{Time: currentTime, Err: errors.New("failed")})
133+
require.NoError(t, err)
134+
135+
// Assert info object is updated only where needed
136+
require.Equal(t, callbackspb.CALLBACK_STATUS_FAILED, callback.State())
137+
require.Equal(t, int32(2), callback.Attempt)
138+
require.Equal(t, "failed", callback.LastAttemptFailure.Message)
139+
require.True(t, callback.LastAttemptFailure.GetApplicationFailureInfo().NonRetryable)
140+
require.Equal(t, currentTime, callback.LastAttemptCompleteTime.AsTime())
141+
require.Nil(t, callback.NextAttemptScheduleTime)
142+
143+
// Assert task is generated (failed transitions also add tasks in chasm)
144+
require.Len(t, mctx.tasks, 1)
145+
}

0 commit comments

Comments
 (0)