Skip to content

Commit 0f1249d

Browse files
ItalyPaleAlemukundansundardapr-bot
authored
Allow canceling recurring reminders after execution for non-internal actors (dapr#7337)
* Allow canceling recurring reminders after execution for non-internal actors Internal actors have the ability to cancel a recurring reminder by responding to an execution with the special error `ErrReminderCanceled` after a reminder is executed. However, the capability was never exposed to "external" actors. This PR adds the ability for an actor to stop recurring reminders (and timers) after they are executed also for "external" actors. This is possible by including the header `X-DaprReminderCancel` with a truthy value in the response when a reminder or timer is executed. This will require support in SDKs, but it allows something that is particularly helpful (and today can be buggy, see dapr#6666) Signed-off-by: ItalyPaleAle <[email protected]> * Lint Signed-off-by: ItalyPaleAle <[email protected]> * Lint Signed-off-by: ItalyPaleAle <[email protected]> --------- Signed-off-by: ItalyPaleAle <[email protected]> Co-authored-by: Mukundan Sundararajan <[email protected]> Co-authored-by: Dapr Bot <[email protected]>
1 parent 6ef8029 commit 0f1249d

File tree

2 files changed

+78
-35
lines changed

2 files changed

+78
-35
lines changed

pkg/actors/actors.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ import (
5656
eventqueue "github.com/dapr/kit/events/queue"
5757
"github.com/dapr/kit/logger"
5858
"github.com/dapr/kit/ptr"
59+
"github.com/dapr/kit/utils"
5960
)
6061

6162
const (
@@ -676,6 +677,11 @@ func (a *actorsRuntime) callLocalActor(ctx context.Context, req *internalv1pb.In
676677
return res, actorerrors.NewActorError(res)
677678
}
678679

680+
// Allow stopping a recurring reminder or timer
681+
if v := res.GetHeaders()["X-Daprremindercancel"]; v != nil && len(v.GetValues()) > 0 && utils.IsTruthy(v.GetValues()[0]) {
682+
return res, ErrReminderCanceled
683+
}
684+
679685
return res, nil
680686
}
681687

tests/integration/suite/actors/reminders/basic.go

Lines changed: 72 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ type basic struct {
4545
daprd *daprd.Daprd
4646
place *placement.Placement
4747

48-
methodcalled atomic.Int64
48+
reminderCalled atomic.Int64
49+
stopReminderCalled atomic.Int64
4950
}
5051

5152
func (b *basic) Setup(t *testing.T) []framework.Option {
@@ -57,7 +58,12 @@ func (b *basic) Setup(t *testing.T) []framework.Option {
5758
w.WriteHeader(http.StatusOK)
5859
})
5960
handler.HandleFunc("/actors/myactortype/myactorid/method/remind/remindermethod", func(w http.ResponseWriter, r *http.Request) {
60-
b.methodcalled.Add(1)
61+
b.reminderCalled.Add(1)
62+
})
63+
handler.HandleFunc("/actors/myactortype/myactorid/method/remind/stopreminder", func(w http.ResponseWriter, r *http.Request) {
64+
b.stopReminderCalled.Add(1)
65+
w.Header().Set("X-DaprReminderCancel", "true")
66+
w.WriteHeader(http.StatusOK)
6167
})
6268
handler.HandleFunc("/actors/myactortype/myactorid/method/foo", func(w http.ResponseWriter, r *http.Request) {})
6369

@@ -82,30 +88,19 @@ func (b *basic) Run(t *testing.T, ctx context.Context) {
8288

8389
daprdURL := "http://localhost:" + strconv.Itoa(b.daprd.HTTPPort()) + "/v1.0/actors/myactortype/myactorid"
8490

85-
req, err := http.NewRequestWithContext(ctx, http.MethodPost, daprdURL+"/method/foo", nil)
86-
require.NoError(t, err)
87-
88-
require.EventuallyWithT(t, func(c *assert.CollectT) {
89-
resp, rErr := client.Do(req)
90-
//nolint:testifylint
91-
if assert.NoError(c, rErr) {
92-
assert.NoError(c, resp.Body.Close())
93-
assert.Equal(c, http.StatusOK, resp.StatusCode)
94-
}
95-
}, time.Second*10, time.Millisecond*10, "actor not ready in time")
96-
97-
body := `{"dueTime": "0ms"}`
98-
req, err = http.NewRequestWithContext(ctx, http.MethodPost, daprdURL+"/reminders/remindermethod", strings.NewReader(body))
99-
require.NoError(t, err)
100-
101-
resp, err := client.Do(req)
102-
require.NoError(t, err)
103-
require.NoError(t, resp.Body.Close())
104-
assert.Equal(t, http.StatusNoContent, resp.StatusCode)
105-
106-
assert.Eventually(t, func() bool {
107-
return b.methodcalled.Load() == 1
108-
}, time.Second*3, time.Millisecond*10)
91+
t.Run("actor ready", func(t *testing.T) {
92+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, daprdURL+"/method/foo", nil)
93+
require.NoError(t, err)
94+
95+
require.EventuallyWithT(t, func(c *assert.CollectT) {
96+
resp, rErr := client.Do(req)
97+
//nolint:testifylint
98+
if assert.NoError(c, rErr) {
99+
assert.NoError(c, resp.Body.Close())
100+
assert.Equal(c, http.StatusOK, resp.StatusCode)
101+
}
102+
}, 10*time.Second, 10*time.Millisecond, "actor not ready in time")
103+
})
109104

110105
conn, err := grpc.DialContext(ctx, b.daprd.GRPCAddress(),
111106
grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(),
@@ -114,15 +109,57 @@ func (b *basic) Run(t *testing.T, ctx context.Context) {
114109
t.Cleanup(func() { require.NoError(t, conn.Close()) })
115110
gclient := rtv1.NewDaprClient(conn)
116111

117-
_, err = gclient.RegisterActorReminder(ctx, &rtv1.RegisterActorReminderRequest{
118-
ActorType: "myactortype",
119-
ActorId: "myactorid",
120-
Name: "remindermethod",
121-
DueTime: "0ms",
112+
t.Run("schedule reminder via HTTP", func(t *testing.T) {
113+
const body = `{"dueTime": "0ms"}`
114+
var (
115+
req *http.Request
116+
resp *http.Response
117+
)
118+
req, err = http.NewRequestWithContext(ctx, http.MethodPost, daprdURL+"/reminders/remindermethod", strings.NewReader(body))
119+
require.NoError(t, err)
120+
121+
resp, err = client.Do(req)
122+
require.NoError(t, err)
123+
require.NoError(t, resp.Body.Close())
124+
assert.Equal(t, http.StatusNoContent, resp.StatusCode)
125+
126+
assert.Eventually(t, func() bool {
127+
return b.reminderCalled.Load() == 1
128+
}, 3*time.Second, 10*time.Millisecond)
122129
})
123-
require.NoError(t, err)
124130

125-
assert.Eventually(t, func() bool {
126-
return b.methodcalled.Load() == 2
127-
}, time.Second*3, time.Millisecond*10)
131+
t.Run("schedule reminder via gRPC", func(t *testing.T) {
132+
_, err = gclient.RegisterActorReminder(ctx, &rtv1.RegisterActorReminderRequest{
133+
ActorType: "myactortype",
134+
ActorId: "myactorid",
135+
Name: "remindermethod",
136+
DueTime: "0ms",
137+
})
138+
require.NoError(t, err)
139+
140+
assert.Eventually(t, func() bool {
141+
return b.reminderCalled.Load() == 2
142+
}, 3*time.Second, 10*time.Millisecond)
143+
})
144+
145+
t.Run("cancel recurring reminder", func(t *testing.T) {
146+
// Register a reminder that repeats every second
147+
_, err = gclient.RegisterActorReminder(ctx, &rtv1.RegisterActorReminderRequest{
148+
ActorType: "myactortype",
149+
ActorId: "myactorid",
150+
Name: "stopreminder",
151+
DueTime: "0s",
152+
Period: "1s",
153+
})
154+
require.NoError(t, err)
155+
156+
// Should be invoked once
157+
assert.Eventually(t, func() bool {
158+
return b.stopReminderCalled.Load() == 1
159+
}, 3*time.Second, 10*time.Millisecond)
160+
161+
// After 2s, should not have been invoked more
162+
time.Sleep(2 * time.Second)
163+
assert.Equal(t, int64(1), b.stopReminderCalled.Load())
164+
})
128165
}

0 commit comments

Comments
 (0)