Skip to content

Commit 5787158

Browse files
vr009cschleiden
authored andcommitted
feat: add postgres backend
This change introduces full postgres support and aligns SQL, migrations, and diagnostics with postgres conventions. The implementation is based on the existing mysql backend, with small fixes and adjustments required by the postgres connector and Postgres semantics. It also adds integration tests and updates tooling and samples to run against Postgres.
1 parent ed82bcb commit 5787158

18 files changed

+1857
-2
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
DROP TABLE IF EXISTS instances;
2+
DROP TABLE IF EXISTS pending_events;
3+
DROP TABLE IF EXISTS history;
4+
DROP TABLE IF EXISTS activities;
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
DROP TABLE IF EXISTS instances;
2+
3+
CREATE TABLE instances (
4+
id bigserial NOT NULL PRIMARY KEY,
5+
instance_id varchar(128) NOT NULL,
6+
execution_id varchar(128) NOT NULL,
7+
parent_instance_id varchar(128) NULL,
8+
parent_execution_id varchar(128) NULL,
9+
parent_schedule_event_id numeric NULL,
10+
metadata bytea NULL,
11+
state int NOT NULL,
12+
created_at timestamptz NOT NULL DEFAULT CURRENT_TIMESTAMP,
13+
completed_at timestamptz NULL,
14+
locked_until timestamptz NULL,
15+
sticky_until timestamptz NULL,
16+
worker varchar(64) NULL
17+
);
18+
19+
CREATE UNIQUE INDEX idx_instances_instance_id_execution_id on instances (instance_id, execution_id);
20+
CREATE INDEX idx_instances_locked_until_completed_at on instances (completed_at, locked_until, sticky_until, worker);
21+
CREATE INDEX idx_instances_parent_instance_id_parent_execution_id ON instances (parent_instance_id, parent_execution_id);
22+
23+
DROP TABLE IF EXISTS pending_events;
24+
CREATE TABLE pending_events (
25+
id bigserial NOT NULL PRIMARY KEY,
26+
event_id varchar(128) NOT NULL,
27+
sequence_id bigserial NOT NULL, -- Not used, but keep for now for query compat
28+
instance_id varchar(128) NOT NULL,
29+
execution_id varchar(128) NOT NULL,
30+
event_type int NOT NULL,
31+
timestamp timestamptz NOT NULL,
32+
schedule_event_id bigserial NOT NULL,
33+
attributes bytea NOT NULL,
34+
visible_at timestamptz NULL
35+
);
36+
37+
CREATE INDEX idx_pending_events_inid_exid ON pending_events (instance_id, execution_id);
38+
CREATE INDEX idx_pending_events_inid_exid_visible_at_schedule_event_id ON pending_events (instance_id, execution_id, visible_at, schedule_event_id);
39+
40+
DROP TABLE IF EXISTS history;
41+
CREATE TABLE IF NOT EXISTS history (
42+
id bigserial NOT NULL PRIMARY KEY,
43+
event_id varchar(128) NOT NULL,
44+
sequence_id bigserial NOT NULL,
45+
instance_id varchar(128) NOT NULL,
46+
execution_id varchar(128) NOT NULL,
47+
event_type int NOT NULL,
48+
timestamp timestamptz NOT NULL,
49+
schedule_event_id bigserial NOT NULL,
50+
attributes bytea NOT NULL,
51+
visible_at timestamptz NULL
52+
);
53+
54+
CREATE INDEX idx_history_instance_id_execution_id ON history (instance_id, execution_id);
55+
CREATE INDEX idx_history_instance_id_execution_id_sequence_id ON history (instance_id, execution_id, sequence_id);
56+
57+
DROP TABLE IF EXISTS activities;
58+
CREATE TABLE IF NOT EXISTS activities (
59+
id bigserial NOT NULL PRIMARY KEY,
60+
activity_id varchar(128) NOT NULL,
61+
instance_id varchar(128) NOT NULL,
62+
execution_id varchar(128) NOT NULL,
63+
event_type int NOT NULL,
64+
timestamp timestamptz NOT NULL,
65+
schedule_event_id bigserial NOT NULL,
66+
attributes bytea NOT NULL,
67+
visible_at timestamptz NULL,
68+
locked_until timestamptz NULL,
69+
worker VARCHAR(64) NULL
70+
);
71+
72+
CREATE UNIQUE INDEX idx_activities_instance_id_execution_id_activity_id_worker ON activities (instance_id, execution_id, activity_id, worker);
73+
CREATE INDEX idx_activities_locked_until on activities (locked_until);
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
ALTER TABLE activities ADD COLUMN attributes BYTEA;
2+
UPDATE activities SET attributes = attributes.data FROM attributes WHERE activities.event_id = attributes.event_id AND activities.instance_id = attributes.instance_id AND activities.execution_id = attributes.execution_id;
3+
ALTER TABLE activities ALTER COLUMN attributes SET NOT NULL;
4+
5+
ALTER TABLE history ADD COLUMN attributes BYTEA;
6+
UPDATE history SET attributes = attributes.data FROM attributes WHERE history.event_id = attributes.event_id AND history.instance_id = attributes.instance_id AND history.execution_id = attributes.execution_id;
7+
ALTER TABLE history ALTER COLUMN attributes SET NOT NULL;
8+
9+
ALTER TABLE pending_events ADD COLUMN attributes BYTEA;
10+
UPDATE pending_events SET attributes = attributes.data FROM attributes WHERE pending_events.event_id = attributes.event_id AND pending_events.instance_id = attributes.instance_id AND pending_events.execution_id = attributes.execution_id;
11+
ALTER TABLE pending_events ALTER COLUMN attributes SET NOT NULL;
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
DROP TABLE IF EXISTS attributes;
2+
3+
CREATE TABLE attributes (
4+
id BIGSERIAL NOT NULL PRIMARY KEY,
5+
event_id varchar(128) NOT NULL,
6+
instance_id varchar(128) NOT NULL,
7+
execution_id varchar(128) NOT NULL,
8+
data bytea NOT NULL
9+
);
10+
11+
CREATE UNIQUE INDEX idx_attributes_instance_id_execution_id_event_id on attributes (instance_id, execution_id, event_id);
12+
CREATE INDEX idx_attributes_event_id on attributes (event_id);
13+
14+
-- Move activity attributes to attributes table
15+
INSERT INTO attributes (event_id, instance_id, execution_id, data) SELECT activity_id, instance_id, execution_id, attributes FROM activities ON CONFLICT DO NOTHING;
16+
ALTER TABLE activities DROP COLUMN attributes;
17+
18+
-- Move history attributes to attributes table
19+
INSERT INTO attributes (event_id, instance_id, execution_id, data) SELECT event_id, instance_id, execution_id, attributes FROM history ON CONFLICT DO NOTHING;
20+
ALTER TABLE history DROP COLUMN attributes;
21+
22+
-- Move pending_events attributes to attributes table
23+
INSERT INTO attributes (event_id, instance_id, execution_id, data) SELECT event_id, instance_id, execution_id, attributes FROM pending_events ON CONFLICT DO NOTHING;
24+
ALTER TABLE pending_events DROP COLUMN attributes;
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
-- Remove queue column from instances
2+
ALTER TABLE instances DROP COLUMN queue;
3+
4+
-- Update index
5+
DROP INDEX idx_instances_locked_until_completed_at_queue;
6+
CREATE INDEX idx_instances_locked_until_completed_at ON instances (completed_at, locked_until, sticky_until, worker);
7+
8+
-- Update index
9+
DROP INDEX idx_activities_locked_until_queue;
10+
11+
-- Remove queue column from activities
12+
ALTER TABLE activities DROP COLUMN queue;
13+
14+
CREATE INDEX idx_activities_locked_until ON activities (locked_until);
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
ALTER TABLE instances ADD COLUMN queue varchar(128) DEFAULT '' NOT NULL;
2+
3+
-- Update index
4+
DROP INDEX idx_instances_locked_until_completed_at;
5+
CREATE INDEX idx_instances_locked_until_completed_at_queue ON instances (completed_at, locked_until, sticky_until, worker, queue);
6+
7+
ALTER TABLE activities ADD COLUMN queue varchar(128) DEFAULT '' NOT NULL;
8+
9+
-- Update index
10+
DROP INDEX idx_activities_locked_until;
11+
CREATE INDEX idx_activities_locked_until_queue ON activities (locked_until, queue);

backend/postgres/diagnostics.go

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package postgresbackend
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"time"
7+
8+
"github.com/cschleiden/go-workflows/core"
9+
"github.com/cschleiden/go-workflows/diag"
10+
)
11+
12+
var _ diag.Backend = (*postgresBackend)(nil)
13+
14+
func (mb *postgresBackend) GetWorkflowInstances(ctx context.Context, afterInstanceID, afterExecutionID string, count int) ([]*diag.WorkflowInstanceRef, error) {
15+
var err error
16+
tx, err := mb.db.BeginTx(ctx, nil)
17+
if err != nil {
18+
return nil, err
19+
}
20+
defer tx.Rollback()
21+
22+
var rows *sql.Rows
23+
if afterInstanceID != "" {
24+
rows, err = tx.QueryContext(
25+
ctx,
26+
`SELECT i.instance_id, i.execution_id, i.parent_instance_id, i.parent_execution_id, i.parent_schedule_event_id, i.created_at, i.completed_at, i.queue
27+
FROM instances i
28+
INNER JOIN (SELECT instance_id, created_at FROM instances WHERE instance_id = $1 AND execution_id = $2) ii
29+
ON i.created_at < ii.created_at OR (i.created_at = ii.created_at AND i.instance_id < ii.instance_id)
30+
ORDER BY i.created_at DESC, i.instance_id DESC
31+
LIMIT $3`,
32+
afterInstanceID,
33+
afterExecutionID,
34+
count,
35+
)
36+
} else {
37+
rows, err = tx.QueryContext(
38+
ctx,
39+
`SELECT i.instance_id, i.execution_id, i.parent_instance_id, i.parent_execution_id, i.parent_schedule_event_id, i.created_at, i.completed_at, i.queue
40+
FROM instances i
41+
ORDER BY i.created_at DESC, i.instance_id DESC
42+
LIMIT $1`,
43+
count,
44+
)
45+
}
46+
if err != nil {
47+
return nil, err
48+
}
49+
50+
defer rows.Close()
51+
52+
var instances []*diag.WorkflowInstanceRef
53+
54+
for rows.Next() {
55+
var id, executionID, queue string
56+
var parentID, parentExecutionID *string
57+
var parentScheduleEventID *int64
58+
var createdAt time.Time
59+
var completedAt *time.Time
60+
err = rows.Scan(&id, &executionID, &parentID, &parentExecutionID, &parentScheduleEventID, &createdAt, &completedAt, &queue)
61+
if err != nil {
62+
return nil, err
63+
}
64+
65+
var state core.WorkflowInstanceState
66+
if completedAt != nil {
67+
state = core.WorkflowInstanceStateFinished
68+
}
69+
70+
var instance *core.WorkflowInstance
71+
if parentID != nil {
72+
parentInstance := core.NewWorkflowInstance(*parentID, *parentExecutionID)
73+
instance = core.NewSubWorkflowInstance(id, executionID, parentInstance, *parentScheduleEventID)
74+
} else {
75+
instance = core.NewWorkflowInstance(id, executionID)
76+
}
77+
78+
instances = append(instances, &diag.WorkflowInstanceRef{
79+
Instance: instance,
80+
CreatedAt: createdAt,
81+
CompletedAt: completedAt,
82+
State: state,
83+
Queue: queue,
84+
})
85+
}
86+
87+
if rows.Err() != nil {
88+
return nil, rows.Err()
89+
}
90+
91+
return instances, nil
92+
}
93+
94+
func (mb *postgresBackend) GetWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) (*diag.WorkflowInstanceRef, error) {
95+
tx, err := mb.db.BeginTx(ctx, nil)
96+
if err != nil {
97+
return nil, err
98+
}
99+
defer tx.Rollback()
100+
101+
res := tx.QueryRowContext(
102+
ctx,
103+
`SELECT instance_id, execution_id, parent_instance_id, parent_execution_id, parent_schedule_event_id, created_at, completed_at, queue
104+
FROM instances
105+
WHERE instance_id = $1 AND execution_id = $2`, instance.InstanceID, instance.ExecutionID)
106+
107+
var id, executionID, queue string
108+
var parentID, parentExecutionID *string
109+
var parentScheduleEventID *int64
110+
var createdAt time.Time
111+
var completedAt *time.Time
112+
113+
err = res.Scan(&id, &executionID, &parentID, &parentExecutionID, &parentScheduleEventID, &createdAt, &completedAt, &queue)
114+
if err != nil {
115+
if err == sql.ErrNoRows {
116+
return nil, nil
117+
}
118+
119+
return nil, err
120+
}
121+
122+
var state core.WorkflowInstanceState
123+
if completedAt != nil {
124+
state = core.WorkflowInstanceStateFinished
125+
}
126+
127+
if parentID != nil {
128+
parentInstance := core.NewWorkflowInstance(*parentID, *parentExecutionID)
129+
instance = core.NewSubWorkflowInstance(id, executionID, parentInstance, *parentScheduleEventID)
130+
} else {
131+
instance = core.NewWorkflowInstance(id, executionID)
132+
}
133+
134+
return &diag.WorkflowInstanceRef{
135+
Instance: instance,
136+
CreatedAt: createdAt,
137+
CompletedAt: completedAt,
138+
State: state,
139+
Queue: queue,
140+
}, nil
141+
}
142+
143+
func (mb *postgresBackend) GetWorkflowTree(ctx context.Context, instance *core.WorkflowInstance) (*diag.WorkflowInstanceTree, error) {
144+
itb := diag.NewInstanceTreeBuilder(mb)
145+
return itb.BuildWorkflowInstanceTree(ctx, instance)
146+
}

0 commit comments

Comments
 (0)