Skip to content

Commit f328d26

Browse files
Support not using backends on a per task level (#36)
1 parent a9e0f09 commit f328d26

File tree

5 files changed

+46
-4
lines changed

5 files changed

+46
-4
lines changed

v1/backends/redis/redis.go

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,14 @@ import (
77
"sync"
88
"time"
99

10-
"github.com/go-redsync/redsync/v4"
11-
redsyncredis "github.com/go-redsync/redsync/v4/redis/redigo"
12-
"github.com/gomodule/redigo/redis"
13-
1410
"github.com/RichardKnop/machinery/v1/backends/iface"
1511
"github.com/RichardKnop/machinery/v1/common"
1612
"github.com/RichardKnop/machinery/v1/config"
1713
"github.com/RichardKnop/machinery/v1/log"
1814
"github.com/RichardKnop/machinery/v1/tasks"
15+
"github.com/go-redsync/redsync/v4"
16+
redsyncredis "github.com/go-redsync/redsync/v4/redis/redigo"
17+
"github.com/gomodule/redigo/redis"
1918
)
2019

2120
// Backend represents a Redis result backend
@@ -158,6 +157,10 @@ func (b *Backend) mergeNewTaskState(conn redis.Conn, newState *tasks.TaskState)
158157

159158
// SetStatePending updates task state to PENDING
160159
func (b *Backend) SetStatePending(signature *tasks.Signature) error {
160+
if signature.NoBackend {
161+
return nil
162+
}
163+
161164
conn := b.open()
162165
defer conn.Close()
163166

@@ -167,6 +170,10 @@ func (b *Backend) SetStatePending(signature *tasks.Signature) error {
167170

168171
// SetStateReceived updates task state to RECEIVED
169172
func (b *Backend) SetStateReceived(signature *tasks.Signature) error {
173+
if signature.NoBackend {
174+
return nil
175+
}
176+
170177
conn := b.open()
171178
defer conn.Close()
172179

@@ -177,6 +184,10 @@ func (b *Backend) SetStateReceived(signature *tasks.Signature) error {
177184

178185
// SetStateStarted updates task state to STARTED
179186
func (b *Backend) SetStateStarted(signature *tasks.Signature) error {
187+
if signature.NoBackend {
188+
return nil
189+
}
190+
180191
conn := b.open()
181192
defer conn.Close()
182193

@@ -187,6 +198,10 @@ func (b *Backend) SetStateStarted(signature *tasks.Signature) error {
187198

188199
// SetStateRetry updates task state to RETRY
189200
func (b *Backend) SetStateRetry(signature *tasks.Signature) error {
201+
if signature.NoBackend {
202+
return nil
203+
}
204+
190205
conn := b.open()
191206
defer conn.Close()
192207

@@ -197,6 +212,10 @@ func (b *Backend) SetStateRetry(signature *tasks.Signature) error {
197212

198213
// SetStateSuccess updates task state to SUCCESS
199214
func (b *Backend) SetStateSuccess(signature *tasks.Signature, results []*tasks.TaskResult) error {
215+
if signature.NoBackend {
216+
return nil
217+
}
218+
200219
conn := b.open()
201220
defer conn.Close()
202221

@@ -207,6 +226,10 @@ func (b *Backend) SetStateSuccess(signature *tasks.Signature, results []*tasks.T
207226

208227
// SetStateFailure updates task state to FAILURE
209228
func (b *Backend) SetStateFailure(signature *tasks.Signature, err string) error {
229+
if signature.NoBackend {
230+
return nil
231+
}
232+
210233
conn := b.open()
211234
defer conn.Close()
212235

v1/brokers/azure/storage_queue.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,12 @@ func (b *Broker) consumeOne(delivery azqueue.DequeueMessagesResponse, taskProces
224224
sig.ReceivedAt = time.Now()
225225
sig.AzureMessageContent = *msg.MessageText
226226

227+
if sig.UUID == "" {
228+
// No UUID means the "task" was injected outside of machinery and we can't track state since there's no ID
229+
// to track against.
230+
sig.NoBackend = true
231+
}
232+
227233
if msg.PopReceipt != nil {
228234
sig.AzurePopReceipt = *msg.PopReceipt
229235
}

v1/brokers/sqs/sqs.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,12 @@ func (b *Broker) consumeOne(delivery *awssqs.ReceiveMessageOutput, taskProcessor
261261

262262
sig.ReceivedAt = time.Now()
263263

264+
if sig.UUID == "" {
265+
// No UUID means the "task" was injected outside of machinery and we can't track state since there's no ID
266+
// to track against.
267+
sig.NoBackend = true
268+
}
269+
264270
if delivery.Messages[0].ReceiptHandle != nil {
265271
sig.SQSReceiptHandle = *delivery.Messages[0].ReceiptHandle
266272
}

v1/server.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,10 @@ func (server *Server) SendTaskWithContext(ctx context.Context, signature *tasks.
229229
return nil, fmt.Errorf("Publish message error: %s", err)
230230
}
231231

232+
if signature.NoBackend {
233+
return nil, nil
234+
}
235+
232236
return result.NewAsyncResult(signature, server.backend), nil
233237
}
234238

v1/tasks/signature.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ type Signature struct {
7474
AzurePopReceipt string
7575
AzureMessageContent string
7676

77+
// NoBackend: skip all operations related to machineries backend concept. Just leverage the broker abstraction.
78+
NoBackend bool
79+
7780
// StopTaskDeletionOnError used with sqs when we want to send failed messages to dlq,
7881
// and don't want machinery to delete from source queue
7982
StopTaskDeletionOnError bool

0 commit comments

Comments
 (0)