Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions backend/turso/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Sqlite backend

## Adding a migration

1. Install [golang-migrate/migrate](https://www.github.com/golang-migrate/migrate)
1. ```bash
migrate create -ext sql -dir ./db/migrations -seq <name>
```
31 changes: 31 additions & 0 deletions backend/turso/activities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package turso

import (
"context"
"database/sql"
"fmt"

"github.com/cschleiden/go-workflows/backend/history"
"github.com/cschleiden/go-workflows/core"
)

func scheduleActivity(ctx context.Context, tx *sql.Tx, instance *core.WorkflowInstance, event *history.Event) error {
// Attributes are already persisted via the history, we do not need to add them again.

if _, err := tx.ExecContext(
ctx,
`INSERT INTO activities
(id, instance_id, execution_id, event_type, timestamp, schedule_event_id, visible_at) VALUES (?, ?, ?, ?, ?, ?, ?)`,
event.ID,
instance.InstanceID,
instance.ExecutionID,
event.Type,
event.Timestamp,
event.ScheduleEventID,
event.VisibleAt,
); err != nil {
return fmt.Errorf("inserting events: %w", err)
}

return nil
}
4 changes: 4 additions & 0 deletions backend/turso/db/migrations/000001_initial.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DROP TABLE IF EXISTS `instances`;
DROP TABLE IF EXISTS `pending_events`;
DROP TABLE IF EXISTS `history`;
DROP TABLE IF EXISTS `activities`;
67 changes: 67 additions & 0 deletions backend/turso/db/migrations/000001_initial.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
CREATE TABLE IF NOT EXISTS `instances` (
`id` TEXT NOT NULL,
`execution_id` TEXT NOT NULL,
`parent_instance_id` TEXT NULL,
`parent_execution_id` TEXT NULL,
`parent_schedule_event_id` INTEGER NULL,
`metadata` TEXT NULL,
`state` INTEGER NOT NULL,
`created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
`completed_at` DATETIME NULL,
`locked_until` DATETIME NULL,
`sticky_until` DATETIME NULL,
`worker` TEXT NULL,
PRIMARY KEY(`id`, `execution_id`)
);

CREATE INDEX IF NOT EXISTS `idx_instances_id_execution_id` ON `instances` (`id`, `execution_id`);
CREATE INDEX IF NOT EXISTS `idx_instances_locked_until_completed_at` ON `instances` (`locked_until`, `sticky_until`, `completed_at`, `worker`);
CREATE INDEX IF NOT EXISTS `idx_instances_parent_instance_id_parent_execution_id` ON `instances` (`parent_instance_id`, `parent_execution_id`);

CREATE TABLE IF NOT EXISTS `pending_events` (
`id` TEXT,
`sequence_id` INTEGER NOT NULL, -- not used but keep for now for query compat
`instance_id` TEXT NOT NULL,
`execution_id` TEXT NOT NULL,
`event_type` INTEGER NOT NULL,
`timestamp` DATETIME NOT NULL,
`schedule_event_id` INT NOT NULL,
`attributes` BLOB NOT NULL,
`visible_at` DATETIME NULL,
PRIMARY KEY(`id`, `instance_id`)
);

CREATE INDEX IF NOT EXISTS `idx_pending_events_instance_id_execution_id_visible_at_schedule_event_id` ON `pending_events` (`instance_id`, `execution_id`, `visible_at`, `schedule_event_id`);

CREATE TABLE IF NOT EXISTS `history` (
`id` TEXT,
`sequence_id` INTEGER NOT NULL,
`instance_id` TEXT NOT NULL,
`execution_id` TEXT NOT NULL,
`event_type` INTEGER NOT NULL,
`timestamp` DATETIME NOT NULL,
`schedule_event_id` INT NOT NULL,
`attributes` BLOB NOT NULL,
`visible_at` DATETIME NULL,
PRIMARY KEY(`id`, `instance_id`)
);

CREATE INDEX IF NOT EXISTS `idx_history_instance_sequence_id` ON `history` (`instance_id`, `execution_id`, `sequence_id`);

CREATE TABLE IF NOT EXISTS `activities` (
`id` TEXT PRIMARY KEY,
`instance_id` TEXT NOT NULL,
`execution_id` TEXT NOT NULL,
`event_type` INTEGER NOT NULL,
`timestamp` DATETIME NOT NULL,
`schedule_event_id` INT NOT NULL,
`attributes` BLOB NOT NULL,
`visible_at` DATETIME NULL,
`locked_until` DATETIME NULL,
`worker` TEXT NULL
);


CREATE INDEX IF NOT EXISTS `idx_activities_id_worker` ON `activities` (`id`, `worker`);
CREATE INDEX IF NOT EXISTS `idx_activities_locked_until` ON `activities` (`locked_until`);
CREATE INDEX IF NOT EXISTS `idx_activities_instance_id_execution_id_worker` ON `activities` (`instance_id`, `execution_id`, `worker`);
11 changes: 11 additions & 0 deletions backend/turso/db/migrations/000002_add_attributes_table.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
ALTER TABLE `activities` ADD COLUMN `attributes` BLOB NULL;
UPDATE `activities` SET `attributes` = `attributes`.`data` FROM `attributes` WHERE `activities`.`id` = `attributes`.`id` AND `activities`.`instance_id` = `attributes`.`instance_id` AND `activities`.`execution_id` = `attributes`.`execution_id`;

ALTER TABLE `history` ADD COLUMN `attributes` BLOB NULL;
UPDATE `history` SET `attributes` = `attributes`.`data` FROM `attributes` WHERE `history`.`id` = `attributes`.`id` AND `history`.`instance_id` = `attributes`.`instance_id` AND `history`.`execution_id` = `attributes`.`execution_id`;

ALTER TABLE `pending_events` ADD COLUMN `attributes` BLOB NULL;
UPDATE `pending_events` SET `attributes` = `attributes`.`data` FROM `attributes` WHERE `pending_events`.`id` = `attributes`.`id` AND `pending_events`.`instance_id` = `attributes`.`instance_id` AND `pending_events`.`execution_id` = `attributes`.`execution_id`;

-- Drop attributes table
DROP TABLE `attributes`;
19 changes: 19 additions & 0 deletions backend/turso/db/migrations/000002_add_attributes_table.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
CREATE TABLE IF NOT EXISTS `attributes` (
`id` TEXT NOT NULL,
`instance_id` TEXT NOT NULL,
`execution_id` TEXT NOT NULL,
`data` BLOB NOT NULL,
PRIMARY KEY(`id`, `instance_id`, `execution_id`)
);

-- Move activity attributes to attributes table
INSERT OR IGNORE INTO `attributes` (`id`, `instance_id`, `execution_id`, `data`) SELECT `id`, `instance_id`, `execution_id`, `attributes` FROM `activities`;
ALTER TABLE `activities` DROP COLUMN `attributes`;

-- Move history attributes to attributes table
INSERT OR IGNORE INTO `attributes` (`id`, `instance_id`, `execution_id`, `data`) SELECT `id`, `instance_id`, `execution_id`, `attributes` FROM `history`;
ALTER TABLE `history` DROP COLUMN `attributes`;

-- Move pending_events attributes to attributes table
INSERT OR IGNORE INTO `attributes` (`id`, `instance_id`, `execution_id`, `data`) SELECT `id`, `instance_id`, `execution_id`, `attributes` FROM `pending_events`;
ALTER TABLE `pending_events` DROP COLUMN `attributes`;
119 changes: 119 additions & 0 deletions backend/turso/diagnostics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package turso

import (
"context"
"database/sql"
"time"

"github.com/cschleiden/go-workflows/core"
"github.com/cschleiden/go-workflows/diag"
)

var _ diag.Backend = (*tursoBackend)(nil)

func (sb *tursoBackend) GetWorkflowInstances(ctx context.Context, afterInstanceID, afterExecutionID string, count int) ([]*diag.WorkflowInstanceRef, error) {
var err error
tx, err := sb.db.BeginTx(ctx, nil)
if err != nil {
return nil, err
}
defer tx.Rollback()

var rows *sql.Rows
if afterInstanceID != "" {
rows, err = tx.QueryContext(
ctx,
`SELECT i.id, i.execution_id, i.created_at, i.completed_at
FROM instances i
INNER JOIN (SELECT id, created_at FROM instances WHERE id = ? AND execution_id = ?) ii
ON i.created_at < ii.created_at OR (i.created_at = ii.created_at AND i.id < ii.id)
ORDER BY i.created_at DESC, i.id DESC
LIMIT ?`,
afterInstanceID,
afterExecutionID,
count,
)
} else {
rows, err = tx.QueryContext(
ctx,
`SELECT i.id, i.execution_id, i.created_at, i.completed_at
FROM instances i
ORDER BY i.created_at DESC, i.id DESC
LIMIT ?`,
count,
)
}
if err != nil {
return nil, err
}

defer rows.Close()

var instances []*diag.WorkflowInstanceRef

for rows.Next() {
var id, executionID string
var createdAt time.Time
var completedAt *time.Time
err = rows.Scan(&id, &executionID, &createdAt, &completedAt)
if err != nil {
return nil, err
}

var state core.WorkflowInstanceState
if completedAt != nil {
state = core.WorkflowInstanceStateFinished
}

instances = append(instances, &diag.WorkflowInstanceRef{
Instance: core.NewWorkflowInstance(id, executionID),
CreatedAt: createdAt,
CompletedAt: completedAt,
State: state,
})
}

tx.Commit()

return instances, nil
}

func (sb *tursoBackend) GetWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) (*diag.WorkflowInstanceRef, error) {
tx, err := sb.db.BeginTx(ctx, nil)
if err != nil {
return nil, err
}
defer tx.Rollback()

res := tx.QueryRowContext(ctx, "SELECT id, execution_id, created_at, completed_at FROM instances WHERE id = ? AND execution_id = ?", instance.InstanceID, instance.ExecutionID)

var id, executionID string
var createdAt time.Time
var completedAt *time.Time

err = res.Scan(&id, &executionID, &createdAt, &completedAt)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
}

return nil, err
}

var state core.WorkflowInstanceState
if completedAt != nil {
state = core.WorkflowInstanceStateFinished
}

return &diag.WorkflowInstanceRef{
Instance: core.NewWorkflowInstance(id, executionID),
CreatedAt: createdAt,
CompletedAt: completedAt,
State: state,
}, nil
}

func (sb *tursoBackend) GetWorkflowTree(ctx context.Context, instance *core.WorkflowInstance) (*diag.WorkflowInstanceTree, error) {
itb := diag.NewInstanceTreeBuilder(sb)
return itb.BuildWorkflowInstanceTree(ctx, instance)
}
Loading