Skip to content

Commit 76be36e

Browse files
committed
Remove unused queue error
1 parent 4312a4b commit 76be36e

File tree

4 files changed

+13
-22
lines changed

4 files changed

+13
-22
lines changed

backend/redis/queue.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ package redis
33
import (
44
"context"
55
"encoding/json"
6-
"errors"
76
"fmt"
7+
"log"
88
"time"
99

1010
"github.com/go-redis/redis/v8"
@@ -30,8 +30,6 @@ type TaskItem[T any] struct {
3030
Data T
3131
}
3232

33-
var errTaskAlreadyInQueue = errors.New("task already in queue")
34-
3533
type KeyInfo struct {
3634
StreamKey string
3735
SetKey string
@@ -80,7 +78,7 @@ func (q *taskQueue[T]) Keys() KeyInfo {
8078
}
8179
}
8280

83-
// KEYS[1] = stream
81+
// KEYS[1] = set
8482
// KEYS[2] = stream
8583
// ARGV[1] = caller provided id of the task
8684
// ARGV[2] = additional data to store with the task

backend/redis/queue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func Test_TaskQueue(t *testing.T) {
7373
_, err = client.Pipelined(ctx, func(p redis.Pipeliner) error {
7474
return q.Enqueue(ctx, p, "t1", nil)
7575
})
76-
require.Error(t, errTaskAlreadyInQueue, err)
76+
require.NoError(t, err)
7777

7878
task, err := q.Dequeue(ctx, client, lockTimeout, blockTimeout)
7979
require.NoError(t, err)

backend/redis/signal.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,11 @@ func (rb *redisBackend) SignalWorkflow(ctx context.Context, instanceID string, e
2525
))
2626
defer span.End()
2727

28-
if _, err = rb.rdb.Pipelined(ctx, func(p redis.Pipeliner) error {
29-
if err := addEventToStreamP(ctx, p, pendingEventsKey(instanceID), &event); err != nil {
28+
if _, err = rb.rdb.TxPipelined(ctx, func(p redis.Pipeliner) error {
29+
if err := rb.addWorkflowInstanceEventP(ctx, p, instanceState.Instance, &event); err != nil {
3030
return fmt.Errorf("adding event to stream: %w", err)
3131
}
3232

33-
if err := rb.workflowQueue.Enqueue(ctx, p, instanceID, nil); err != nil {
34-
if err != errTaskAlreadyInQueue {
35-
return fmt.Errorf("queueing workflow task: %w", err)
36-
}
37-
}
38-
3933
return nil
4034
}); err != nil {
4135
return err

backend/redis/workflow.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, er
104104

105105
if instanceState.State == backend.WorkflowStateFinished {
106106
l := rb.Logger().With(
107-
"task_id", instanceTask.ID,
107+
"task_id", instanceTask.TaskID,
108+
"id", instanceTask.ID,
108109
"instance_id", instanceState.Instance.InstanceID)
109110

110111
// This should never happen. For now, log information and then panic.
@@ -153,8 +154,8 @@ var removePendingEventsCmd = redis.NewScript(`
153154
var requeueInstanceCmd = redis.NewScript(`
154155
local pending_events = redis.call("XLEN", KEYS[1])
155156
if pending_events > 0 then
156-
local already_queued = redis.call("SADD", KEYS[3], ARGV[1])
157-
if already_queued ~= 0 then
157+
local added = redis.call("SADD", KEYS[3], ARGV[1])
158+
if added == 1 then
158159
redis.call("XADD", KEYS[2], "*", "id", ARGV[1], "data", "")
159160
end
160161
end
@@ -181,7 +182,7 @@ func (rb *redisBackend) CompleteWorkflowTask(
181182
p := rb.rdb.TxPipeline()
182183

183184
// Add executed events to the history
184-
if err := addEventsToStreamP(ctx, p, historyKey(instance.InstanceID), executedEvents); err != nil {
185+
if err := addEventsToHistoryStreamP(ctx, p, historyKey(instance.InstanceID), executedEvents); err != nil {
185186
return fmt.Errorf("serializing : %w", err)
186187
}
187188

@@ -192,7 +193,7 @@ func (rb *redisBackend) CompleteWorkflowTask(
192193
}
193194
}
194195

195-
// Scheduler timers
196+
// Schedule timers
196197
for _, timerEvent := range timerEvents {
197198
if err := addFutureEventP(ctx, p, instance, &timerEvent); err != nil {
198199
return err
@@ -220,12 +221,10 @@ func (rb *redisBackend) CompleteWorkflowTask(
220221
}
221222
}
222223

223-
// If any pending message was added, try to queue workflow task
224+
// Try to queue workflow task
224225
if targetInstance != instance {
225226
if err := rb.workflowQueue.Enqueue(ctx, p, targetInstance.InstanceID, nil); err != nil {
226-
if err != errTaskAlreadyInQueue {
227-
return fmt.Errorf("adding instance to locked instances set: %w", err)
228-
}
227+
return fmt.Errorf("enqueuing workflow task: %w", err)
229228
}
230229
}
231230
}

0 commit comments

Comments
 (0)