Skip to content

Commit 7a021b5

Browse files
committed
Fail sending a signal when workflow instance not running
1 parent f6ec13e commit 7a021b5

File tree

6 files changed

+36
-11
lines changed

6 files changed

+36
-11
lines changed

backend/backend.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@ package backend
22

33
import (
44
"context"
5+
"errors"
56

67
core "github.com/cschleiden/go-workflows/internal/core"
78
"github.com/cschleiden/go-workflows/internal/history"
89
"github.com/cschleiden/go-workflows/internal/task"
910
"github.com/cschleiden/go-workflows/workflow"
1011
)
1112

13+
var ErrInstanceNotFound = errors.New("workflow instance not found")
14+
1215
type WorkflowState int
1316

1417
const (

backend/mysql/mysql.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,12 @@ func (b *mysqlBackend) SignalWorkflow(ctx context.Context, instanceID string, ev
240240
}
241241
defer tx.Rollback()
242242

243+
// TODO: Combine this with the event insertion
244+
res := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE instance_id = ? LIMIT 1", instanceID)
245+
if err := res.Scan(nil); err == sql.ErrNoRows {
246+
return backend.ErrInstanceNotFound
247+
}
248+
243249
if err := insertNewEvents(ctx, tx, instanceID, []history.Event{event}); err != nil {
244250
return errors.Wrap(err, "could not insert signal event")
245251
}

backend/redis/instance.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,14 +160,18 @@ func updateInstance(ctx context.Context, rdb redis.UniversalClient, instanceID s
160160

161161
func readInstance(ctx context.Context, rdb redis.UniversalClient, instanceID string) (*instanceState, error) {
162162
key := instanceKey(instanceID)
163-
cmd := rdb.Get(ctx, key)
164163

165-
if err := cmd.Err(); err != nil {
164+
val, err := rdb.Get(ctx, key).Result()
165+
if err != nil {
166+
if err == redis.Nil {
167+
return nil, backend.ErrInstanceNotFound
168+
}
169+
166170
return nil, errors.Wrap(err, "could not read instance")
167171
}
168172

169173
var state instanceState
170-
if err := json.Unmarshal([]byte(cmd.Val()), &state); err != nil {
174+
if err := json.Unmarshal([]byte(val), &state); err != nil {
171175
return nil, errors.Wrap(err, "could not unmarshal instance state")
172176
}
173177

backend/redis/signal.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@ import (
99
)
1010

1111
func (rb *redisBackend) SignalWorkflow(ctx context.Context, instanceID string, event history.Event) error {
12+
_, err := readInstance(ctx, rb.rdb, instanceID)
13+
if err != nil {
14+
return err
15+
}
16+
1217
msgID, err := addEventToStream(ctx, rb.rdb, pendingEventsKey(instanceID), &event)
1318
if err != nil {
1419
return errors.Wrap(err, "could not add event to stream")

backend/sqlite/sqlite.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,12 @@ func (sb *sqliteBackend) SignalWorkflow(ctx context.Context, instanceID string,
202202
}
203203
defer tx.Rollback()
204204

205+
// TODO: Combine this with the event insertion
206+
res := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE id = ? LIMIT 1", instanceID)
207+
if err := res.Scan(nil); err == sql.ErrNoRows {
208+
return backend.ErrInstanceNotFound
209+
}
210+
205211
if err := insertNewEvents(ctx, tx, instanceID, []history.Event{event}); err != nil {
206212
return errors.Wrap(err, "could not insert signal event")
207213
}

backend/test/backend.go renamed to backend/test/backendtest.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ func BackendTest(t *testing.T, setup func() backend.Backend, teardown func(b bac
2020
f func(t *testing.T, ctx context.Context, b backend.Backend)
2121
}{
2222
{
23-
name: "Test_GetWorkflowTask_ReturnsNilWhenTimeout",
23+
name: "GetWorkflowTask_ReturnsNilWhenTimeout",
2424
f: func(t *testing.T, ctx context.Context, b backend.Backend) {
2525
ctx, cancel := context.WithTimeout(ctx, time.Millisecond)
2626
defer cancel()
@@ -30,7 +30,7 @@ func BackendTest(t *testing.T, setup func() backend.Backend, teardown func(b bac
3030
},
3131
},
3232
{
33-
name: "Test_GetActivityTask_ReturnsNilWhenTimeout",
33+
name: "GetActivityTask_ReturnsNilWhenTimeout",
3434
f: func(t *testing.T, ctx context.Context, b backend.Backend) {
3535
ctx, cancel := context.WithTimeout(ctx, time.Millisecond)
3636
defer cancel()
@@ -40,7 +40,7 @@ func BackendTest(t *testing.T, setup func() backend.Backend, teardown func(b bac
4040
},
4141
},
4242
{
43-
name: "Test_CreateWorkflowInstance_DoesNotError",
43+
name: "CreateWorkflowInstance_DoesNotError",
4444
f: func(t *testing.T, ctx context.Context, b backend.Backend) {
4545
instanceID := uuid.NewString()
4646

@@ -52,7 +52,7 @@ func BackendTest(t *testing.T, setup func() backend.Backend, teardown func(b bac
5252
},
5353
},
5454
{
55-
name: "Test_CreateWorkflowInstance_SameInstanceIDErrors",
55+
name: "CreateWorkflowInstance_SameInstanceIDErrors",
5656
f: func(t *testing.T, ctx context.Context, b backend.Backend) {
5757
instanceID := uuid.NewString()
5858
executionID := uuid.NewString()
@@ -71,7 +71,7 @@ func BackendTest(t *testing.T, setup func() backend.Backend, teardown func(b bac
7171
},
7272
},
7373
{
74-
name: "Test_GetWorkflowTask_ReturnsTask",
74+
name: "GetWorkflowTask_ReturnsTask",
7575
f: func(t *testing.T, ctx context.Context, b backend.Backend) {
7676
wfi := core.NewWorkflowInstance(uuid.NewString(), uuid.NewString())
7777
err := b.CreateWorkflowInstance(ctx, history.WorkflowEvent{
@@ -88,7 +88,7 @@ func BackendTest(t *testing.T, setup func() backend.Backend, teardown func(b bac
8888
},
8989
},
9090
{
91-
name: "Test_GetWorkflowTask_LocksTask",
91+
name: "GetWorkflowTask_LocksTask",
9292
f: func(t *testing.T, ctx context.Context, b backend.Backend) {
9393
wfi := core.NewWorkflowInstance(uuid.NewString(), uuid.NewString())
9494
err := b.CreateWorkflowInstance(ctx, history.WorkflowEvent{
@@ -113,7 +113,7 @@ func BackendTest(t *testing.T, setup func() backend.Backend, teardown func(b bac
113113
},
114114
},
115115
{
116-
name: "Test_CompleteWorkflowTask_ReturnsErrorIfNotLocked",
116+
name: "CompleteWorkflowTask_ReturnsErrorIfNotLocked",
117117
f: func(t *testing.T, ctx context.Context, b backend.Backend) {
118118
wfi := core.NewWorkflowInstance(uuid.NewString(), uuid.NewString())
119119
err := b.CreateWorkflowInstance(ctx, history.WorkflowEvent{
@@ -128,7 +128,7 @@ func BackendTest(t *testing.T, setup func() backend.Backend, teardown func(b bac
128128
},
129129
},
130130
{
131-
name: "Test_CompleteWorkflowTask_AddsNewEventsToHistory",
131+
name: "CompleteWorkflowTask_AddsNewEventsToHistory",
132132
f: func(t *testing.T, ctx context.Context, b backend.Backend) {
133133
startedEvent := history.NewHistoryEvent(time.Now(), history.EventType_WorkflowExecutionStarted, &history.ExecutionStartedAttributes{})
134134
activityScheduledEvent := history.NewHistoryEvent(time.Now(), history.EventType_ActivityScheduled, &history.ActivityScheduledAttributes{}, history.ScheduleEventID(1))
@@ -188,6 +188,7 @@ func BackendTest(t *testing.T, setup func() backend.Backend, teardown func(b bac
188188
c := client.New(b)
189189
err := c.SignalWorkflow(ctx, "does-not-exist", "signal", "value")
190190
require.Error(t, err)
191+
require.Equal(t, backend.ErrInstanceNotFound, err)
191192
},
192193
},
193194
}

0 commit comments

Comments
 (0)