Skip to content

Commit f2db228

Browse files
committed
Simplify check
1 parent 90737c7 commit f2db228

File tree

9 files changed

+19
-30
lines changed

9 files changed

+19
-30
lines changed

backend/mysql/mysql.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,8 @@ func (b *mysqlBackend) GetWorkflowInstanceState(ctx context.Context, instance *w
312312

313313
func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, metadata *workflow.Metadata) error {
314314
// Check for existing instance
315-
if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE instance_id = ? LIMIT 1", wfi.InstanceID).Scan(new(int)); err != sql.ErrNoRows {
315+
if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE instance_id = ? AND state = ? LIMIT 1", wfi.InstanceID, core.WorkflowInstanceStateActive).
316+
Scan(new(int)); err != sql.ErrNoRows {
316317
return backend.ErrInstanceAlreadyExists
317318
}
318319

backend/redis/delete.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,9 @@ import (
1414
// KEYS[4] - payload key
1515
// KEYS[5] - active-instance-execution key
1616
// KEYS[6] - instances-by-creation key
17-
// KEYS[7] - instances
1817
// ARGV[1] - instance segment
19-
// ARGV[2] - instance id
2018
var deleteCmd = redis.NewScript(
2119
`redis.call("DEL", KEYS[1], KEYS[2], KEYS[3], KEYS[4], KEYS[5])
22-
redis.call("HDEL", KEYS[7], ARGV[1])
2320
return redis.call("ZREM", KEYS[6], ARGV[1])`)
2421

2522
// deleteInstance deletes an instance from Redis. It does not attempt to remove any future events or pending
@@ -34,8 +31,7 @@ func deleteInstance(ctx context.Context, rdb redis.UniversalClient, instance *co
3431
payloadKey(instance),
3532
activeInstanceExecutionKey(instance.InstanceID),
3633
instancesByCreation(),
37-
instanceIDs(),
38-
}, instanceSegment(instance), instance.InstanceID).Err(); err != nil {
34+
}, instanceSegment(instance)).Err(); err != nil {
3935
return fmt.Errorf("failed to delete instance: %w", err)
4036
}
4137

backend/redis/expire.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,17 @@ import (
1818
// KEYS[4] - pending events key
1919
// KEYS[5] - history key
2020
// KEYS[6] - payload key
21-
// KEYS[7] - instances key
2221
// ARGV[1] - current timestamp
2322
// ARGV[2] - expiration time in seconds
2423
// ARGV[3] - expiration timestamp in unix milliseconds
2524
// ARGV[4] - instance segment
26-
// ARGV[5] - instance id
2725
var expireCmd = redis.NewScript(
2826
`-- Find instances which have already expired and remove from the index set
2927
local expiredInstances = redis.call("ZRANGE", KEYS[2], "-inf", ARGV[1], "BYSCORE")
3028
for i = 1, #expiredInstances do
3129
local instanceSegment = expiredInstances[i]
3230
redis.call("ZREM", KEYS[1], instanceSegment) -- index set
3331
redis.call("ZREM", KEYS[2], instanceSegment) -- expiration set
34-
redis.call("HDEL", KEYS[7], ARGV[5])
3532
end
3633
3734
-- Add expiration time for future cleanup
@@ -60,12 +57,10 @@ func setWorkflowInstanceExpiration(ctx context.Context, rdb redis.UniversalClien
6057
pendingEventsKey(instance),
6158
historyKey(instance),
6259
payloadKey(instance),
63-
instanceIDs(),
6460
},
6561
nowStr,
6662
expiration.Seconds(),
6763
expStr,
6864
instanceSegment(instance),
69-
instance.InstanceID,
7065
).Err()
7166
}

backend/redis/instance.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,6 @@ func createInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.Work
165165
// The newly created instance is going to be the active execution
166166
setActiveInstanceExecutionP(ctx, p, instance)
167167

168-
// Record instance id
169-
p.HSet(ctx, instanceIDs(), instance.InstanceID, 1)
170-
171168
p.ZAdd(ctx, instancesByCreation(), redis.Z{
172169
Member: instanceSegment(instance),
173170
Score: float64(createdAt.UnixMilli()),

backend/redis/keys.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,6 @@ func instancesExpiring() string {
3737
return "instances-expiring"
3838
}
3939

40-
func instanceIDs() string {
41-
return "instances"
42-
}
43-
4440
func pendingEventsKey(instance *core.WorkflowInstance) string {
4541
return fmt.Sprintf("pending-events:%v", instanceSegment(instance))
4642
}

backend/redis/scripts/complete_workflow_task.lua

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ local pendingEventsKey = getKey()
2121
local payloadHashKey = getKey()
2222
local futureEventZSetKey = getKey()
2323
local activeInstancesKey = getKey()
24-
local instancesByIdKey = getKey()
2524

2625
local workflowSetKey = getKey()
2726
local workflowStreamKey = getKey()
@@ -103,7 +102,7 @@ for i = 1, otherWorkflowInstances do
103102
local conflictEventPayloadData = getArgv()
104103

105104
-- Does the instance exist already?
106-
local instanceExists = redis.call("HEXISTS", instancesByIdKey, targetInstanceId)
105+
local instanceExists = redis.call("EXISTS", targetActiveInstanceExecutionState)
107106
if instanceExists == 1 then
108107
redis.call("XADD", pendingEventsKey, "*", "event", conflictEventData)
109108
storePayload(conflictEventId, conflictEventPayloadData)
@@ -119,8 +118,6 @@ for i = 1, otherWorkflowInstances do
119118

120119
-- Track active instance
121120
redis.call("SADD", activeInstancesKey, targetInstanceSegment)
122-
123-
redis.call("HSET", instancesByIdKey, targetInstanceId, 1)
124121
end
125122
end
126123

backend/redis/workflow.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ func (rb *redisBackend) CompleteWorkflowTask(
106106
payloadKey(instance),
107107
futureEventsKey(),
108108
instancesActive(),
109-
instanceIDs(),
110109
queueKeys.SetKey,
111110
queueKeys.StreamKey,
112111
)

backend/sqlite/sqlite.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,8 @@ func (sb *sqliteBackend) CreateWorkflowInstance(ctx context.Context, instance *w
187187

188188
func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, metadata *workflow.Metadata) error {
189189
// Check for existing instance
190-
if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE id = ? LIMIT 1", wfi.InstanceID).Scan(new(int)); err != sql.ErrNoRows {
190+
if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE id = ? AND state = ? LIMIT 1", wfi.InstanceID, core.WorkflowInstanceStateActive).
191+
Scan(new(int)); err != sql.ErrNoRows {
191192
return backend.ErrInstanceAlreadyExists
192193
}
193194

backend/test/e2e.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -206,18 +206,25 @@ func EndToEndBackendTest(t *testing.T, setup func(options ...backend.BackendOpti
206206
name: "SubWorkflow_DuplicateInstanceID",
207207
f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) {
208208
swf := func(ctx workflow.Context, i int) (int, error) {
209+
workflow.NewSignalChannel[any](ctx, "signal").Receive(ctx)
210+
209211
return i * 2, nil
210212
}
211213
wf := func(ctx workflow.Context) (int, error) {
212-
r, err := workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{
214+
swf1 := workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{
213215
InstanceID: "subworkflow",
214-
}, swf, 1).Get(ctx)
215-
if err != nil {
216-
return 0, err
217-
}
216+
}, swf, 1)
217+
218+
defer func() {
219+
rctx := workflow.NewDisconnectedContext(ctx)
220+
221+
// Unblock waiting sub workflow
222+
workflow.SignalWorkflow[any](rctx, "subworkflow", "signal", 1).Get(rctx)
223+
swf1.Get(rctx)
224+
}()
218225

219226
// Run another subworkflow with the same ID
220-
r, err = workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{
227+
r, err := workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{
221228
InstanceID: "subworkflow",
222229
}, swf, 1).Get(ctx)
223230
if err != nil {

0 commit comments

Comments
 (0)