Skip to content

Commit 5096b37

Browse files
authored
Merge pull request #290 from cschleiden/cleanup
Fix some linting errors
2 parents 4277a8a + 58ddf62 commit 5096b37

File tree

13 files changed

+27
-98
lines changed

13 files changed

+27
-98
lines changed

backend/monoprocess/monoprocess.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ type monoprocessBackend struct {
1717

1818
workflowSignal chan struct{}
1919
activitySignal chan struct{}
20-
signalTimeout time.Duration
2120

2221
logger *slog.Logger
2322
}
@@ -106,7 +105,9 @@ func (b *monoprocessBackend) CompleteWorkflowTask(
106105
// Note that the worker will be notified even if the timer event gets
107106
// cancelled. This is ok, because the poller will simply find no task
108107
// and continue.
109-
time.AfterFunc(attr.At.Sub(time.Now()), func() { b.notifyWorkflowWorker(context.Background()) })
108+
time.AfterFunc(time.Until(attr.At), func() {
109+
b.notifyWorkflowWorker(ctx)
110+
})
110111
}
111112

112113
b.notifyWorkflowWorker(ctx)

backend/redis/events_future.go

Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -2,71 +2,13 @@ package redis
22

33
import (
44
"context"
5-
"encoding/json"
65
"fmt"
76
"strconv"
87
"time"
98

10-
"github.com/cschleiden/go-workflows/backend/history"
11-
"github.com/cschleiden/go-workflows/core"
129
redis "github.com/redis/go-redis/v9"
1310
)
1411

15-
// Adds an event to be delivered in the future. Not cluster-safe.
16-
// KEYS[1] - future event zset key
17-
// KEYS[2] - future event key
18-
// KEYS[3] - instance payload key
19-
// ARGV[1] - timestamp/score for set
20-
// ARGV[2] - Instance segment
21-
// ARGV[3] - event id
22-
// ARGV[4] - event data
23-
// ARGV[5] - event payload
24-
var addFutureEventCmd = redis.NewScript(`
25-
redis.call("ZADD", KEYS[1], ARGV[1], KEYS[2])
26-
redis.call("HSET", KEYS[2], "instance", ARGV[2], "id", ARGV[3], "event", ARGV[4])
27-
redis.call("HSETNX", KEYS[3], ARGV[3], ARGV[5])
28-
return 0
29-
`)
30-
31-
func addFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) error {
32-
eventData, err := marshalEventWithoutAttributes(event)
33-
if err != nil {
34-
return err
35-
}
36-
37-
payloadEventData, err := json.Marshal(event.Attributes)
38-
if err != nil {
39-
return err
40-
}
41-
42-
return addFutureEventCmd.Run(
43-
ctx, p,
44-
[]string{futureEventsKey(), futureEventKey(instance, event.ScheduleEventID), payloadKey(instance)},
45-
strconv.FormatInt(event.VisibleAt.UnixMilli(), 10),
46-
instanceSegment(instance),
47-
event.ID,
48-
string(eventData),
49-
string(payloadEventData),
50-
).Err()
51-
}
52-
53-
// Remove a scheduled future event. Not cluster-safe.
54-
// KEYS[1] - future event zset key
55-
// KEYS[2] - future event key
56-
// KEYS[3] - instance payload key
57-
var removeFutureEventCmd = redis.NewScript(`
58-
redis.call("ZREM", KEYS[1], KEYS[2])
59-
local eventID = redis.call("HGET", KEYS[2], "id")
60-
redis.call("HDEL", KEYS[3], eventID)
61-
return redis.call("DEL", KEYS[2])
62-
`)
63-
64-
// removeFutureEvent removes a scheduled future event for the given event. Events are associated via their ScheduleEventID
65-
func removeFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) {
66-
key := futureEventKey(instance, event.ScheduleEventID)
67-
removeFutureEventCmd.Run(ctx, p, []string{futureEventsKey(), key, payloadKey(instance)})
68-
}
69-
7012
// Find all due future events. For each event:
7113
// - Look up event data
7214
// - Add to pending event stream for workflow instance

backend/redis/instance.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,9 @@ func createInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.Work
163163
p.SetNX(ctx, key, string(b), 0)
164164

165165
// The newly created instance is going to be the active execution
166-
setActiveInstanceExecutionP(ctx, p, instance)
166+
if err := setActiveInstanceExecutionP(ctx, p, instance); err != nil {
167+
return fmt.Errorf("setting active instance execution: %w", err)
168+
}
167169

168170
p.ZAdd(ctx, instancesByCreation(), redis.Z{
169171
Member: instanceSegment(instance),

backend/redis/redis.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,10 @@ func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) (
5959
// them, loads them. This doesn't work when using (transactional) pipelines, so eagerly load them on startup.
6060
ctx := context.Background()
6161
cmds := map[string]*redis.StringCmd{
62-
"addFutureEventCmd": addFutureEventCmd.Load(ctx, rb.rdb),
63-
"futureEventsCmd": futureEventsCmd.Load(ctx, rb.rdb),
64-
"removeFutureEventCmd": removeFutureEventCmd.Load(ctx, rb.rdb),
65-
"deleteInstanceCmd": deleteCmd.Load(ctx, rb.rdb),
66-
"expireInstanceCmd": expireCmd.Load(ctx, rb.rdb),
67-
"addPayloadsCmd": addPayloadsCmd.Load(ctx, rb.rdb),
62+
"futureEventsCmd": futureEventsCmd.Load(ctx, rb.rdb),
63+
"deleteInstanceCmd": deleteCmd.Load(ctx, rb.rdb),
64+
"expireInstanceCmd": expireCmd.Load(ctx, rb.rdb),
65+
"addPayloadsCmd": addPayloadsCmd.Load(ctx, rb.rdb),
6866
}
6967
for name, cmd := range cmds {
7068
// fmt.Println(name, cmd.Val())

client/client.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,9 @@ func (c *Client) CreateWorkflowInstance(ctx context.Context, options WorkflowIns
6767
defer span.End()
6868

6969
for _, propagator := range c.backend.ContextPropagators() {
70-
propagator.Inject(ctx, metadata)
70+
if err := propagator.Inject(ctx, metadata); err != nil {
71+
return nil, fmt.Errorf("injecting context to propagate: %w", err)
72+
}
7173
}
7274

7375
startedEvent := history.NewPendingEvent(

internal/args/args_test.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -67,24 +67,14 @@ func TestInputsToArgs(t *testing.T) {
6767
inputs = append(inputs, p)
6868
}
6969

70-
args, addContext, err := InputsToArgs(converter.DefaultConverter, reflect.ValueOf(tt.args.fn), inputs)
70+
_, addContext, err := InputsToArgs(converter.DefaultConverter, reflect.ValueOf(tt.args.fn), inputs)
7171
if (err != nil) != tt.wantErr {
7272
t.Errorf("InputsToArgs() error = %v, wantErr %v", err, tt.wantErr)
7373
return
7474
}
7575
if tt.wantErr {
7676
require.EqualError(t, err, tt.err)
7777
require.Equal(t, tt.addContext, addContext)
78-
} else {
79-
if addContext {
80-
// Skip the first argument, it will be filled with the context later
81-
args = args[1:]
82-
}
83-
84-
argValues := make([]interface{}, 0)
85-
for _, arg := range args {
86-
argValues = append(argValues, arg.Interface())
87-
}
8878
}
8979
})
9080
}

internal/workflow/cache/cache_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func Test_Cache_StoreAndGet(t *testing.T) {
2424
c := NewWorkflowExecutorLRUCache(metrics.NewNoopMetricsClient(), 1, time.Second*10)
2525

2626
r := wf.NewRegistry()
27-
r.RegisterWorkflow(workflowWithActivity)
27+
require.NoError(t, r.RegisterWorkflow(workflowWithActivity))
2828

2929
i := core.NewWorkflowInstance("instanceID", "executionID")
3030
e, err := wf.NewExecutor(
@@ -52,7 +52,7 @@ func Test_Cache_StoreAndGet(t *testing.T) {
5252
err = c.Store(context.Background(), i2, e2)
5353
require.NoError(t, err)
5454

55-
re, ok, err = c.Get(context.Background(), i)
55+
_, ok, err = c.Get(context.Background(), i)
5656
require.NoError(t, err)
5757
require.False(t, ok)
5858
}
@@ -66,7 +66,7 @@ func Test_Cache_AutoEviction(t *testing.T) {
6666

6767
i := core.NewWorkflowInstance("instanceID", "executionID")
6868
r := wf.NewRegistry()
69-
r.RegisterWorkflow(workflowWithActivity)
69+
require.NoError(t, r.RegisterWorkflow(workflowWithActivity))
7070
e, err := wf.NewExecutor(
7171
slog.Default(), trace.NewNoopTracerProvider().Tracer(backend.TracerName), r,
7272
converter.DefaultConverter, []workflow.ContextPropagator{}, &testHistoryProvider{}, i,
@@ -96,7 +96,7 @@ func Test_Cache_Evict(t *testing.T) {
9696

9797
i := core.NewWorkflowInstance("instanceID", "executionID")
9898
r := wf.NewRegistry()
99-
r.RegisterWorkflow(workflowWithActivity)
99+
require.NoError(t, r.RegisterWorkflow(workflowWithActivity))
100100
e, err := wf.NewExecutor(
101101
slog.Default(), trace.NewNoopTracerProvider().Tracer(backend.TracerName), r,
102102
converter.DefaultConverter, []workflow.ContextPropagator{}, &testHistoryProvider{}, i,

internal/workflow/executor_test.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -235,10 +235,6 @@ func Test_Executor(t *testing.T) {
235235
require.False(t, e.workflow.Completed())
236236
require.Len(t, e.workflowState.Commands(), 1)
237237

238-
h := []*history.Event{}
239-
h = append(h, oldTask.NewEvents...)
240-
h = append(h, taskResult.Executed...)
241-
242238
newTask := &backend.WorkflowTask{
243239
ID: "taskID",
244240
WorkflowInstance: oldTask.WorkflowInstance,
@@ -399,7 +395,7 @@ func Test_Executor(t *testing.T) {
399395
history.NewPendingEvent(time.Now(), history.EventType_ActivityCompleted, &history.ActivityCompletedAttributes{}, history.ScheduleEventID(2)),
400396
}, result.Executed[len(result.Executed)-1].SequenceID)
401397

402-
result, err = e.ExecuteTask(context.Background(), task2)
398+
_, err = e.ExecuteTask(context.Background(), task2)
403399
require.NoError(t, err)
404400
require.Nil(t, e.workflow.err)
405401
},
@@ -573,7 +569,7 @@ func Test_Executor(t *testing.T) {
573569
// Complete subworkflow
574570
swr, _ := converter.DefaultConverter.To(nil)
575571
hp.history = append(hp.history, result.Executed...)
576-
result, err = e.ExecuteTask(context.Background(), continueTask("instanceID", []*history.Event{
572+
_, err = e.ExecuteTask(context.Background(), continueTask("instanceID", []*history.Event{
577573
history.NewPendingEvent(time.Now(), history.EventType_SubWorkflowCompleted, &history.SubWorkflowCompletedAttributes{
578574
Result: swr,
579575
}, history.ScheduleEventID(1)),

internal/workflow/registry_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func Test_ActivityRegistration(t *testing.T) {
144144
err = r.RegisterActivity(reg_activity, WithName("CustomName"))
145145
require.NoError(t, err)
146146

147-
x, err = r.GetActivity("CustomName")
147+
_, err = r.GetActivity("CustomName")
148148
require.NoError(t, err)
149149
}
150150

internal/workflowerrors/stack.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,5 @@ func stack(skip int) string {
3030
buf.WriteString(frame.String())
3131
}
3232

33-
return string(buf.Bytes())
33+
return buf.String()
3434
}

0 commit comments

Comments
 (0)