Skip to content

Commit fab3bb8

Browse files
authored
Support for signals
1 parent 13f56b1 commit fab3bb8

File tree

4 files changed

+37
-13
lines changed

4 files changed

+37
-13
lines changed

backend/redis/instance.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,18 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, event histor
3030
return err
3131
}
3232

33-
cmd := rb.rdb.XAdd(ctx, &redis.XAddArgs{
33+
_, err = rb.rdb.XAdd(ctx, &redis.XAddArgs{
3434
Stream: pendingEventsKey(event.WorkflowInstance.GetInstanceID()),
3535
ID: "*",
3636
Values: map[string]interface{}{
3737
"event": string(eventData),
3838
},
39-
})
40-
_, err = cmd.Result()
39+
}).Result()
4140
if err != nil {
4241
return errors.Wrap(err, "could not create event stream")
4342
}
4443

45-
// Add instance to pending instances set
44+
// Queue workflow instance task
4645
if _, err := rb.workflowQueue.Enqueue(ctx, event.WorkflowInstance.GetInstanceID(), nil); err != nil {
4746
if err != taskqueue.ErrTaskAlreadyInQueue {
4847
return errors.Wrap(err, "could not queue workflow task")
@@ -53,7 +52,22 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, event histor
5352
}
5453

5554
func (rb *redisBackend) GetWorkflowInstanceHistory(ctx context.Context, instance core.WorkflowInstance) ([]history.Event, error) {
56-
panic("unimplemented")
55+
msgs, err := rb.rdb.XRange(ctx, historyKey(instance.GetInstanceID()), "-", "+").Result()
56+
if err != nil {
57+
return nil, err
58+
}
59+
60+
var events []history.Event
61+
for _, msg := range msgs {
62+
var event history.Event
63+
if err := json.Unmarshal([]byte(msg.Values["event"].(string)), &event); err != nil {
64+
return nil, errors.Wrap(err, "could not unmarshal event")
65+
}
66+
67+
events = append(events, event)
68+
}
69+
70+
return events, nil
5771
}
5872

5973
func (rb *redisBackend) GetWorkflowInstanceState(ctx context.Context, instance core.WorkflowInstance) (backend.WorkflowState, error) {

backend/redis/signal.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,21 @@ package redis
33
import (
44
"context"
55

6+
"github.com/cschleiden/go-workflows/backend/redis/taskqueue"
67
"github.com/cschleiden/go-workflows/internal/history"
8+
"github.com/pkg/errors"
79
)
810

911
func (rb *redisBackend) SignalWorkflow(ctx context.Context, instanceID string, event history.Event) error {
10-
// TODO: Store signal event
11-
// TODO: Queue workflow task
12+
if err := addEventToStream(ctx, rb.rdb, pendingEventsKey(instanceID), &event); err != nil {
13+
return errors.Wrap(err, "could not add event to stream")
14+
}
1215

13-
panic("unimplemented")
16+
if _, err := rb.workflowQueue.Enqueue(ctx, instanceID, nil); err != nil {
17+
if err != taskqueue.ErrTaskAlreadyInQueue {
18+
return errors.Wrap(err, "could not queue workflow task")
19+
}
20+
}
21+
22+
return nil
1423
}

backend/redis/taskqueue/queue.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package taskqueue
33
import (
44
"context"
55
"encoding/json"
6-
"log"
76
"time"
87

98
"github.com/go-redis/redis/v8"
@@ -170,8 +169,6 @@ func (q *taskQueue[T]) Complete(ctx context.Context, taskID string) error {
170169
return errors.New("could find task to complete")
171170
}
172171

173-
log.Println("Completing activity task", taskID)
174-
175172
return nil
176173
}
177174

backend/redis/workflow.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,12 @@ func (rb *redisBackend) CompleteWorkflowTask(ctx context.Context, taskID string,
153153
}
154154

155155
// If there are pending events, enqueue the instance again
156-
// TODO: Check for pending events
157-
if state != backend.WorkflowStateFinished {
156+
pendingCount, err := rb.rdb.XLen(ctx, pendingEventsKey(instance.GetInstanceID())).Result()
157+
if err != nil {
158+
return errors.Wrap(err, "could not read event stream")
159+
}
160+
161+
if state != backend.WorkflowStateFinished && pendingCount > 0 {
158162
if _, err := rb.workflowQueue.Enqueue(ctx, instance.GetInstanceID(), nil); err != nil {
159163
if err != taskqueue.ErrTaskAlreadyInQueue {
160164
return errors.Wrap(err, "could not queue workflow")

0 commit comments

Comments
 (0)