Skip to content

Commit 7391dab

Browse files
committed
feat: add task and task log migration
Adds a migration to copy logs from their old location to the new keyspace that includes the task scope. This change is non-destructive to allow the possibility of a rollback. Tasks are updated throughout their lifetime, so this migration will not overwrite tasks that have already been migrated. PLAT-347
1 parent 83b0e81 commit 7391dab

File tree

2 files changed

+148
-4
lines changed

2 files changed

+148
-4
lines changed
Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
package migrate
22

3+
import "github.com/pgEdge/control-plane/server/internal/migrate/migrations"
4+
35
// allMigrations returns the ordered list of migrations.
46
// Order matters - migrations are executed in slice order.
57
// Add new migrations to this list in chronological order.
68
func allMigrations() []Migration {
79
return []Migration{
8-
// Add migrations here in chronological order
9-
// Example:
10-
// &AddHostMetadataField{},
11-
// &RenameDatabaseStatus{},
10+
&migrations.AddTaskScope{},
1211
}
1312
}
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
package migrations
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"time"
8+
9+
"github.com/google/uuid"
10+
"github.com/pgEdge/control-plane/server/internal/config"
11+
"github.com/pgEdge/control-plane/server/internal/storage"
12+
"github.com/pgEdge/control-plane/server/internal/task"
13+
"github.com/rs/zerolog"
14+
"github.com/samber/do"
15+
clientv3 "go.etcd.io/etcd/client/v3"
16+
)
17+
18+
type AddTaskScope struct{}
19+
20+
func (a *AddTaskScope) Identifier() string {
21+
return "add_task_scope"
22+
}
23+
24+
func (a *AddTaskScope) Run(ctx context.Context, i *do.Injector) error {
25+
cfg, err := do.Invoke[config.Config](i)
26+
if err != nil {
27+
return fmt.Errorf("failed to initialize config: %w", err)
28+
}
29+
logger, err := do.Invoke[zerolog.Logger](i)
30+
if err != nil {
31+
return fmt.Errorf("failed to initialize logger: %w", err)
32+
}
33+
client, err := do.Invoke[*clientv3.Client](i)
34+
if err != nil {
35+
return fmt.Errorf("failed to initialize client: %w", err)
36+
}
37+
taskStore, err := do.Invoke[*task.Store](i)
38+
if err != nil {
39+
return fmt.Errorf("failed to initialize task store: %w", err)
40+
}
41+
42+
logger = logger.With().
43+
Str("component", "migration").
44+
Str("identifier", a.Identifier()).
45+
Logger()
46+
47+
oldTasksPrefix := storage.Prefix("/", cfg.EtcdKeyRoot, "tasks")
48+
oldTaskRangeOp := storage.NewGetPrefixOp[*oldStoredTask](client, oldTasksPrefix)
49+
oldTasks, err := oldTaskRangeOp.Exec(ctx)
50+
if err != nil {
51+
return fmt.Errorf("failed to query for old tasks: %w", err)
52+
}
53+
54+
for _, oldTask := range oldTasks {
55+
err := taskStore.Task.Create(oldTask.convert()).Exec(ctx)
56+
switch {
57+
case errors.Is(err, storage.ErrAlreadyExists):
58+
logger.Info().
59+
Stringer("task_id", oldTask.Task.TaskID).
60+
Msg("task has already been migrated, skipping")
61+
case err != nil:
62+
return fmt.Errorf("failed to migrate task %s: %w", oldTask.Task.TaskID, err)
63+
}
64+
}
65+
66+
oldTaskLogsPrefix := storage.Prefix("/", cfg.EtcdKeyRoot, "task_log_messages")
67+
oldTaskLogsRangeOp := storage.NewGetPrefixOp[*oldStoredTaskLogEntry](client, oldTaskLogsPrefix)
68+
oldTaskLogs, err := oldTaskLogsRangeOp.Exec(ctx)
69+
if err != nil {
70+
return fmt.Errorf("failed to query for old tasks: %w", err)
71+
}
72+
73+
for _, oldTaskLog := range oldTaskLogs {
74+
err := taskStore.TaskLogMessage.Put(oldTaskLog.convert()).Exec(ctx)
75+
if err != nil {
76+
return fmt.Errorf("failed to migrate task log entry %s for task %s: %w", oldTaskLog.EntryID, oldTaskLog.TaskID, err)
77+
}
78+
}
79+
80+
return nil
81+
}
82+
83+
type oldStoredTask struct {
84+
storage.StoredValue
85+
Task struct {
86+
ParentID uuid.UUID `json:"parent_id"`
87+
DatabaseID string `json:"database_id"`
88+
NodeName string `json:"node_name"`
89+
InstanceID string `json:"instance_id"`
90+
HostID string `json:"host_id"`
91+
TaskID uuid.UUID `json:"task_id"`
92+
CreatedAt time.Time `json:"created_at"`
93+
CompletedAt time.Time `json:"completed_at"`
94+
Type task.Type `json:"type"`
95+
WorkflowInstanceID string `json:"workflow_id"`
96+
WorkflowExecutionID string `json:"workflow_execution_id"`
97+
Status task.Status `json:"status"`
98+
Error string `json:"error"`
99+
} `json:"task"`
100+
}
101+
102+
func (o *oldStoredTask) convert() *task.StoredTask {
103+
return &task.StoredTask{
104+
Task: &task.Task{
105+
Scope: task.ScopeDatabase,
106+
EntityID: o.Task.DatabaseID,
107+
TaskID: o.Task.TaskID,
108+
Type: o.Task.Type,
109+
ParentID: o.Task.ParentID,
110+
Status: o.Task.Status,
111+
Error: o.Task.Error,
112+
HostID: o.Task.HostID,
113+
DatabaseID: o.Task.DatabaseID,
114+
NodeName: o.Task.NodeName,
115+
InstanceID: o.Task.InstanceID,
116+
CreatedAt: o.Task.CreatedAt,
117+
CompletedAt: o.Task.CompletedAt,
118+
WorkflowInstanceID: o.Task.WorkflowInstanceID,
119+
WorkflowExecutionID: o.Task.WorkflowExecutionID,
120+
},
121+
}
122+
}
123+
124+
type oldStoredTaskLogEntry struct {
125+
storage.StoredValue
126+
DatabaseID string `json:"database_id"`
127+
TaskID uuid.UUID `json:"task_id"`
128+
EntryID uuid.UUID `json:"entry_id"`
129+
Timestamp time.Time `json:"timestamp"`
130+
Message string `json:"message"`
131+
Fields map[string]any `json:"fields"`
132+
}
133+
134+
func (o *oldStoredTaskLogEntry) convert() *task.StoredTaskLogEntry {
135+
return &task.StoredTaskLogEntry{
136+
Scope: task.ScopeDatabase,
137+
EntityID: o.DatabaseID,
138+
DatabaseID: o.DatabaseID,
139+
TaskID: o.TaskID,
140+
EntryID: o.EntryID,
141+
Timestamp: o.Timestamp,
142+
Message: o.Message,
143+
Fields: o.Fields,
144+
}
145+
}

0 commit comments

Comments
 (0)