Skip to content

Commit 7f03877

Browse files
committed
Add error handling for duplicate sub workflow instance
in SQLite backend
1 parent 639328e commit 7f03877

File tree

1 file changed

+29
-27
lines changed

1 file changed

+29
-27
lines changed

backend/sqlite/sqlite.go

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/cschleiden/go-workflows/backend/metrics"
2020
"github.com/cschleiden/go-workflows/core"
2121
"github.com/cschleiden/go-workflows/internal/metrickeys"
22+
"github.com/cschleiden/go-workflows/internal/workflowerrors"
2223
"github.com/cschleiden/go-workflows/workflow"
2324
"github.com/google/uuid"
2425
"go.opentelemetry.io/otel/trace"
@@ -168,13 +169,8 @@ func (sb *sqliteBackend) CreateWorkflowInstance(ctx context.Context, instance *w
168169
}
169170
defer tx.Rollback()
170171

171-
// Check for existing instance
172-
if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE id = ? LIMIT 1", instance.InstanceID).Scan(new(int)); err != sql.ErrNoRows {
173-
return backend.ErrInstanceAlreadyExists
174-
}
175-
176172
// Create workflow instance
177-
if err := createInstance(ctx, tx, instance, event.Attributes.(*history.ExecutionStartedAttributes).Metadata, false); err != nil {
173+
if err := createInstance(ctx, tx, instance, event.Attributes.(*history.ExecutionStartedAttributes).Metadata); err != nil {
178174
return err
179175
}
180176

@@ -189,7 +185,12 @@ func (sb *sqliteBackend) CreateWorkflowInstance(ctx context.Context, instance *w
189185
return nil
190186
}
191187

192-
func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, metadata *workflow.Metadata, ignoreDuplicate bool) error {
188+
func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, metadata *workflow.Metadata) error {
189+
// Check for existing instance
190+
if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE id = ? LIMIT 1", wfi.InstanceID).Scan(new(int)); err != sql.ErrNoRows {
191+
return backend.ErrInstanceAlreadyExists
192+
}
193+
193194
var parentInstanceID, parentExecutionID *string
194195
var parentEventID *int64
195196
if wfi.SubWorkflow() {
@@ -203,9 +204,9 @@ func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, met
203204
return fmt.Errorf("marshaling metadata: %w", err)
204205
}
205206

206-
res, err := tx.ExecContext(
207+
_, err = tx.ExecContext(
207208
ctx,
208-
"INSERT OR IGNORE INTO `instances` (id, execution_id, parent_instance_id, parent_execution_id, parent_schedule_event_id, metadata, state) VALUES (?, ?, ?, ?, ?, ?, ?)",
209+
"INSERT INTO `instances` (id, execution_id, parent_instance_id, parent_execution_id, parent_schedule_event_id, metadata, state) VALUES (?, ?, ?, ?, ?, ?, ?)",
209210
wfi.InstanceID,
210211
wfi.ExecutionID,
211212
parentInstanceID,
@@ -218,17 +219,6 @@ func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, met
218219
return fmt.Errorf("inserting workflow instance: %w", err)
219220
}
220221

221-
if !ignoreDuplicate {
222-
rows, err := res.RowsAffected()
223-
if err != nil {
224-
return err
225-
}
226-
227-
if rows != 1 {
228-
return backend.ErrInstanceAlreadyExists
229-
}
230-
}
231-
232222
return nil
233223
}
234224

@@ -547,16 +537,28 @@ func (sb *sqliteBackend) CompleteWorkflowTask(
547537
groupedEvents := history.EventsByWorkflowInstance(workflowEvents)
548538

549539
for targetInstance, events := range groupedEvents {
550-
for _, m := range events {
551-
if m.HistoryEvent.Type == history.EventType_WorkflowExecutionStarted {
552-
a := m.HistoryEvent.Attributes.(*history.ExecutionStartedAttributes)
553-
// Create new instance
554-
if err := createInstance(ctx, tx, m.WorkflowInstance, a.Metadata, true); err != nil {
555-
return err
540+
// Are we creating a new sub-workflow instance?
541+
m := events[0]
542+
if m.HistoryEvent.Type == history.EventType_WorkflowExecutionStarted {
543+
a := m.HistoryEvent.Attributes.(*history.ExecutionStartedAttributes)
544+
// Create new instance
545+
if err := createInstance(ctx, tx, m.WorkflowInstance, a.Metadata); err != nil {
546+
if err == backend.ErrInstanceAlreadyExists {
547+
if err := insertPendingEvents(ctx, tx, instance, []*history.Event{
548+
history.NewPendingEvent(time.Now(), history.EventType_SubWorkflowFailed, &history.SubWorkflowFailedAttributes{
549+
Error: workflowerrors.FromError(backend.ErrInstanceAlreadyExists),
550+
}, history.ScheduleEventID(m.WorkflowInstance.ParentEventID)),
551+
}); err != nil {
552+
return fmt.Errorf("inserting sub-workflow failed event: %w", err)
553+
}
554+
555+
continue
556556
}
557557

558-
break
558+
return fmt.Errorf("creating sub-workflow instance: %w", err)
559559
}
560+
561+
continue
560562
}
561563

562564
// Insert pending events for target instance

0 commit comments

Comments
 (0)