-
Notifications
You must be signed in to change notification settings - Fork 61
postgres support #360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
erma07
wants to merge
5
commits into
cschleiden:main
Choose a base branch
from
erma07:main
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
postgres support #360
Changes from 1 commit
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
0d61dad
postgres support
bddad94
Merge branch 'cschleiden:main' into main
erma07 f710b8f
Update 000001_initial.up.sql change create table if exists to create …
erma07 b87e41c
Update 000003_add_attributes_table.up.sql change create table if exis…
erma07 c2d1920
Update postgres.go pgx instead of lib/pq
erma07 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
DROP TABLE IF EXISTS instances; | ||
DROP TABLE IF EXISTS pending_events; | ||
DROP TABLE IF EXISTS history; | ||
DROP TABLE IF EXISTS activities; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
CREATE TABLE IF NOT EXISTS instances ( | ||
id BIGSERIAL NOT NULL PRIMARY KEY, | ||
instance_id UUID NOT NULL, | ||
execution_id UUID NOT NULL, | ||
parent_instance_id UUID NULL, | ||
parent_execution_id UUID NULL, | ||
parent_schedule_event_id NUMERIC NULL, | ||
metadata BYTEA NULL, | ||
state INT NOT NULL, | ||
created_at timestamptz NOT NULL DEFAULT CURRENT_TIMESTAMP, | ||
completed_at timestamptz NULL, | ||
locked_until timestamptz NULL, | ||
sticky_until timestamptz NULL, | ||
worker VARCHAR(64) NULL | ||
); | ||
|
||
CREATE UNIQUE INDEX idx_instances_instance_id_execution_id on instances (instance_id, execution_id); | ||
CREATE INDEX idx_instances_locked_until_completed_at on instances (completed_at, locked_until, sticky_until, worker); | ||
CREATE INDEX idx_instances_parent_instance_id_parent_execution_id ON instances (parent_instance_id, parent_execution_id); | ||
|
||
|
||
CREATE TABLE IF NOT EXISTS pending_events ( | ||
id BIGSERIAL NOT NULL PRIMARY KEY, | ||
event_id UUID NOT NULL, | ||
sequence_id BIGSERIAL NOT NULL, -- Not used, but keep for now for query compat | ||
instance_id UUID NOT NULL, | ||
execution_id UUID NOT NULL, | ||
event_type INT NOT NULL, | ||
timestamp timestamptz NOT NULL, | ||
schedule_event_id BIGSERIAL NOT NULL, | ||
attributes BYTEA NOT NULL, | ||
visible_at timestamptz NULL | ||
); | ||
|
||
CREATE INDEX idx_pending_events_inid_exid ON pending_events (instance_id, execution_id); | ||
CREATE INDEX idx_pending_events_inid_exid_visible_at_schedule_event_id ON pending_events (instance_id, execution_id, visible_at, schedule_event_id); | ||
|
||
|
||
CREATE TABLE IF NOT EXISTS history ( | ||
id BIGSERIAL NOT NULL PRIMARY KEY, | ||
event_id UUID NOT NULL, | ||
sequence_id BIGSERIAL NOT NULL, | ||
instance_id UUID NOT NULL, | ||
execution_id UUID NOT NULL, | ||
event_type INT NOT NULL, | ||
timestamp timestamptz NOT NULL, | ||
schedule_event_id BIGSERIAL NOT NULL, | ||
attributes BYTEA NOT NULL, | ||
visible_at timestamptz NULL | ||
); | ||
|
||
CREATE INDEX idx_history_instance_id_execution_id ON history (instance_id, execution_id); | ||
CREATE INDEX idx_history_instance_id_execution_id_sequence_id ON history (instance_id, execution_id, sequence_id); | ||
|
||
CREATE TABLE IF NOT EXISTS activities ( | ||
id BIGSERIAL NOT NULL PRIMARY KEY, | ||
activity_id UUID NOT NULL, | ||
instance_id UUID NOT NULL, | ||
execution_id UUID NOT NULL, | ||
event_type INT NOT NULL, | ||
timestamp timestamptz NOT NULL, | ||
schedule_event_id BIGSERIAL NOT NULL, | ||
attributes BYTEA NOT NULL, | ||
visible_at timestamptz NULL, | ||
locked_until timestamptz NULL, | ||
worker VARCHAR(64) NULL | ||
); | ||
|
||
CREATE UNIQUE INDEX idx_activities_instance_id_execution_id_activity_id_worker ON activities (instance_id, execution_id, activity_id, worker); | ||
CREATE INDEX idx_activities_locked_until on activities (locked_until); |
3 changes: 3 additions & 0 deletions
3
backend/postgres/db/migrations/000002_attributes_blob.down.sql
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
ALTER TABLE activities ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL; | ||
ALTER TABLE history ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL; | ||
ALTER TABLE pending_events ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
ALTER TABLE activities ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL; | ||
ALTER TABLE history ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL; | ||
ALTER TABLE pending_events ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL; |
15 changes: 15 additions & 0 deletions
15
backend/postgres/db/migrations/000003_add_attributes_table.down.sql
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
ALTER TABLE activities ADD COLUMN attributes BYTEA, ALTER COLUMN attributes SET NULL; | ||
UPDATE activities SET attributes = attributes.data FROM attributes WHERE activities.event_id = attributes.event_id AND activities.instance_id = attributes.instance_id AND activities.execution_id = attributes.execution_id; | ||
ALTER TABLE activities ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL; | ||
|
||
ALTER TABLE history ADD COLUMN attributes BYTEA, ALTER COLUMN attributes SET NULL; | ||
UPDATE history SET attributes = attributes.data FROM attributes WHERE history.event_id = attributes.event_id AND history.instance_id = attributes.instance_id AND history.execution_id = attributes.execution_id; | ||
ALTER TABLE history ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL; | ||
|
||
ALTER TABLE pending_events ADD COLUMN attributes BYTEA, ALTER COLUMN attributes SET NULL; | ||
UPDATE pending_events SET attributes = attributes.data FROM attributes WHERE pending_events.event_id = attributes.event_id AND pending_events.instance_id = attributes.instance_id AND pending_events.execution_id = attributes.execution_id; | ||
ALTER TABLE pending_events ALTER COLUMN attributes TYPE BYTEA, ALTER COLUMN attributes SET NOT NULL; | ||
|
||
|
||
-- Drop attributes table | ||
DROP TABLE attributes; |
22 changes: 22 additions & 0 deletions
22
backend/postgres/db/migrations/000003_add_attributes_table.up.sql
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
CREATE TABLE IF NOT EXISTS attributes ( | ||
id BIGSERIAL NOT NULL PRIMARY KEY, | ||
event_id UUID NOT NULL, | ||
instance_id UUID NOT NULL, | ||
execution_id UUID NOT NULL, | ||
data BYTEA NOT NULL | ||
); | ||
|
||
CREATE UNIQUE INDEX idx_attributes_instance_id_execution_id_event_id on attributes (instance_id, execution_id, event_id); | ||
CREATE INDEX idx_attributes_event_id on attributes (event_id); | ||
|
||
-- Move activity attributes to attributes table | ||
INSERT INTO attributes (event_id, instance_id, execution_id, data) SELECT activity_id, instance_id, execution_id, attributes FROM activities ON CONFLICT DO NOTHING; | ||
ALTER TABLE activities DROP COLUMN attributes; | ||
|
||
-- Move history attributes to attributes table | ||
INSERT INTO attributes (event_id, instance_id, execution_id, data) SELECT event_id, instance_id, execution_id, attributes FROM history ON CONFLICT DO NOTHING; | ||
ALTER TABLE history DROP COLUMN attributes; | ||
|
||
-- Move pending_events attributes to attributes table | ||
INSERT INTO attributes (event_id, instance_id, execution_id, data) SELECT event_id, instance_id, execution_id, attributes FROM pending_events ON CONFLICT DO NOTHING; | ||
ALTER TABLE pending_events DROP COLUMN attributes; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
package postgresbackend | ||
|
||
import ( | ||
"context" | ||
"database/sql" | ||
"time" | ||
|
||
"github.com/cschleiden/go-workflows/core" | ||
"github.com/cschleiden/go-workflows/diag" | ||
) | ||
|
||
var _ diag.Backend = (*postgresBackend)(nil) | ||
|
||
func (mb *postgresBackend) GetWorkflowInstances(ctx context.Context, afterInstanceID, afterExecutionID string, count int) ([]*diag.WorkflowInstanceRef, error) { | ||
var err error | ||
tx, err := mb.db.BeginTx(ctx, nil) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer tx.Rollback() | ||
|
||
var rows *sql.Rows | ||
if afterInstanceID != "" { | ||
rows, err = tx.QueryContext( | ||
ctx, | ||
SQLReplacer(`SELECT i.instance_id, i.execution_id, i.created_at, i.completed_at | ||
FROM instances i | ||
INNER JOIN (SELECT instance_id, created_at FROM instances WHERE id = ? AND execution_id = ?) ii | ||
ON i.created_at < ii.created_at OR (i.created_at = ii.created_at AND i.instance_id < ii.instance_id) | ||
ORDER BY i.created_at DESC, i.instance_id DESC | ||
LIMIT ?`), | ||
afterInstanceID, | ||
afterExecutionID, | ||
count, | ||
) | ||
} else { | ||
rows, err = tx.QueryContext( | ||
ctx, | ||
SQLReplacer(`SELECT i.instance_id, i.execution_id, i.created_at, i.completed_at | ||
FROM instances i | ||
ORDER BY i.created_at DESC, i.instance_id DESC | ||
LIMIT ?`), | ||
count, | ||
) | ||
} | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
defer rows.Close() | ||
|
||
var instances []*diag.WorkflowInstanceRef | ||
|
||
for rows.Next() { | ||
var id, executionID string | ||
var createdAt time.Time | ||
var completedAt *time.Time | ||
err = rows.Scan(&id, &executionID, &createdAt, &completedAt) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
var state core.WorkflowInstanceState | ||
if completedAt != nil { | ||
state = core.WorkflowInstanceStateFinished | ||
} | ||
|
||
instances = append(instances, &diag.WorkflowInstanceRef{ | ||
Instance: core.NewWorkflowInstance(id, executionID), | ||
CreatedAt: createdAt, | ||
CompletedAt: completedAt, | ||
State: state, | ||
}) | ||
} | ||
|
||
return instances, nil | ||
} | ||
|
||
func (mb *postgresBackend) GetWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) (*diag.WorkflowInstanceRef, error) { | ||
tx, err := mb.db.BeginTx(ctx, nil) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer tx.Rollback() | ||
|
||
res := tx.QueryRowContext( | ||
ctx, | ||
SQLReplacer("SELECT instance_id, execution_id, created_at, completed_at FROM instances WHERE instance_id = ? AND execution_id = ?"), instance.InstanceID, instance.ExecutionID) | ||
|
||
var id, executionID string | ||
var createdAt time.Time | ||
var completedAt *time.Time | ||
|
||
err = res.Scan(&id, &executionID, &createdAt, &completedAt) | ||
if err != nil { | ||
if err == sql.ErrNoRows { | ||
return nil, nil | ||
} | ||
|
||
return nil, err | ||
} | ||
|
||
var state core.WorkflowInstanceState | ||
if completedAt != nil { | ||
state = core.WorkflowInstanceStateFinished | ||
} | ||
|
||
return &diag.WorkflowInstanceRef{ | ||
Instance: core.NewWorkflowInstance(id, executionID), | ||
CreatedAt: createdAt, | ||
CompletedAt: completedAt, | ||
State: state, | ||
}, nil | ||
} | ||
|
||
func (mb *postgresBackend) GetWorkflowTree(ctx context.Context, instance *core.WorkflowInstance) (*diag.WorkflowInstanceTree, error) { | ||
itb := diag.NewInstanceTreeBuilder(mb) | ||
return itb.BuildWorkflowInstanceTree(ctx, instance) | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
package postgresbackend | ||
|
||
import ( | ||
"context" | ||
"database/sql" | ||
"fmt" | ||
"strings" | ||
|
||
"github.com/cschleiden/go-workflows/backend/history" | ||
"github.com/cschleiden/go-workflows/core" | ||
) | ||
|
||
func insertPendingEvents(ctx context.Context, tx *sql.Tx, instance *core.WorkflowInstance, newEvents []*history.Event) error { | ||
return insertEvents(ctx, tx, "pending_events", instance, newEvents) | ||
} | ||
|
||
func insertHistoryEvents(ctx context.Context, tx *sql.Tx, instance *core.WorkflowInstance, historyEvents []*history.Event) error { | ||
return insertEvents(ctx, tx, "history", instance, historyEvents) | ||
} | ||
|
||
func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instance *core.WorkflowInstance, events []*history.Event) error { | ||
const batchSize = 20 | ||
for batchStart := 0; batchStart < len(events); batchStart += batchSize { | ||
batchEnd := batchStart + batchSize | ||
if batchEnd > len(events) { | ||
batchEnd = len(events) | ||
} | ||
batchEvents := events[batchStart:batchEnd] | ||
|
||
aquery := "INSERT INTO attributes (event_id, instance_id, execution_id, data) VALUES (?, ?, ?, ?)" + strings.Repeat(", (?, ?, ?, ?)", len(batchEvents)-1) + " ON CONFLICT DO NOTHING" | ||
aargs := make([]interface{}, 0, len(batchEvents)*4) | ||
|
||
query := "INSERT INTO " + tableName + | ||
" (event_id, sequence_id, instance_id, execution_id, event_type, timestamp, schedule_event_id, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)" + | ||
strings.Repeat(", (?, ?, ?, ?, ?, ?, ?, ?)", len(batchEvents)-1) | ||
|
||
args := make([]interface{}, 0, len(batchEvents)*8) | ||
|
||
for _, newEvent := range batchEvents { | ||
a, err := history.SerializeAttributes(newEvent.Attributes) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
aargs = append(aargs, newEvent.ID, instance.InstanceID, instance.ExecutionID, a) | ||
|
||
args = append( | ||
args, | ||
newEvent.ID, newEvent.SequenceID, instance.InstanceID, instance.ExecutionID, newEvent.Type, newEvent.Timestamp, newEvent.ScheduleEventID, newEvent.VisibleAt) | ||
} | ||
|
||
if _, err := tx.ExecContext( | ||
ctx, | ||
SQLReplacer(aquery), | ||
aargs..., | ||
); err != nil { | ||
return fmt.Errorf("inserting attributes: %w", err) | ||
} | ||
|
||
_, err := tx.ExecContext( | ||
ctx, | ||
SQLReplacer(query), | ||
args..., | ||
) | ||
if err != nil { | ||
return fmt.Errorf("inserting events: %w", err) | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func removeFutureEvent(ctx context.Context, tx *sql.Tx, instance *core.WorkflowInstance, scheduleEventID int64) error { | ||
_, err := tx.ExecContext( | ||
ctx, | ||
SQLReplacer( | ||
"DELETE pending_events, attributes FROM pending_events INNER JOIN attributes ON pending_events.event_id = attributes.event_id WHERE pending_events.instance_id = ? AND pending_events.execution_id = ? AND pending_events.schedule_event_id = ? AND pending_events.visible_at IS NOT NULL"), | ||
instance.InstanceID, | ||
instance.ExecutionID, | ||
scheduleEventID, | ||
) | ||
|
||
return err | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
package postgresbackend | ||
|
||
import ( | ||
"database/sql" | ||
|
||
"github.com/cschleiden/go-workflows/backend" | ||
) | ||
|
||
type options struct { | ||
backend.Options | ||
|
||
PostgreSQLOptions func(db *sql.DB) | ||
|
||
// ApplyMigrations automatically applies database migrations on startup. | ||
ApplyMigrations bool | ||
} | ||
|
||
type option func(*options) | ||
|
||
// WithApplyMigrations automatically applies database migrations on startup. | ||
func WithApplyMigrations(applyMigrations bool) option { | ||
return func(o *options) { | ||
o.ApplyMigrations = applyMigrations | ||
} | ||
} | ||
|
||
func WithPostgreSQLOptions(f func(db *sql.DB)) option { | ||
return func(o *options) { | ||
o.PostgreSQLOptions = f | ||
} | ||
} | ||
|
||
// WithBackendOptions allows to pass generic backend options. | ||
func WithBackendOptions(opts ...backend.BackendOption) option { | ||
return func(o *options) { | ||
for _, opt := range opts { | ||
opt(&o.Options) | ||
} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason
NOT EXISTS
is used here? In cases like migration (especially creation) being critical path, this may skip creating a table when one already exists with different schema. Reentrancy here has too much of an edge case.