Skip to content

Commit ee6d7bc

Browse files
committed
simplify LoadYamlChains
1 parent 8220135 commit ee6d7bc

File tree

7 files changed

+995
-57
lines changed

7 files changed

+995
-57
lines changed

internal/log/log_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,6 @@ func TestPgxLog(*testing.T) {
3535
pgxl := log.NewPgxLogger(log.Init(config.LoggingOpts{LogLevel: "trace"}))
3636
var level tracelog.LogLevel
3737
for level = tracelog.LogLevelNone; level <= tracelog.LogLevelTrace; level++ {
38-
pgxl.Log(context.Background(), level, "foo", map[string]interface{}{"func": "TestPgxLog"})
38+
pgxl.Log(context.Background(), level, "foo", map[string]any{"func": "TestPgxLog"})
3939
}
4040
}

internal/pgengine/bootstrap.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ var backoff = retry.WithCappedDuration(maxWaitTime, retry.NewExponential(WaitTim
3232
// PgxIface is common interface for every pgx class
3333
type PgxIface interface {
3434
Begin(ctx context.Context) (pgx.Tx, error)
35-
Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
36-
QueryRow(context.Context, string, ...interface{}) pgx.Row
37-
Query(ctx context.Context, query string, args ...interface{}) (pgx.Rows, error)
35+
Exec(context.Context, string, ...any) (pgconn.CommandTag, error)
36+
QueryRow(context.Context, string, ...any) pgx.Row
37+
Query(ctx context.Context, query string, args ...any) (pgx.Rows, error)
3838
Ping(ctx context.Context) error
3939
CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error)
4040
}
@@ -194,7 +194,7 @@ func (pge *PgEngine) AddLogHook(ctx context.Context) {
194194

195195
// QueryRowIface specifies interface to use QueryRow method
196196
type QueryRowIface interface {
197-
QueryRow(context.Context, string, ...interface{}) pgx.Row
197+
QueryRow(context.Context, string, ...any) pgx.Row
198198
}
199199

200200
// TryLockClientName obtains lock on the server to prevent another client with the same name

internal/pgengine/bootstrap_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,10 @@ func TestFinalizeConnection(t *testing.T) {
7878
}
7979

8080
type mockpgrow struct {
81-
results []interface{}
81+
results []any
8282
}
8383

84-
func (r *mockpgrow) Scan(dest ...interface{}) error {
84+
func (r *mockpgrow) Scan(dest ...any) error {
8585
if len(r.results) > 0 {
8686
if err, ok := r.results[0].(error); ok {
8787
r.results = r.results[1:]
@@ -103,7 +103,7 @@ type mockpgconn struct {
103103
r pgx.Row
104104
}
105105

106-
func (m mockpgconn) QueryRow(context.Context, string, ...interface{}) pgx.Row {
106+
func (m mockpgconn) QueryRow(context.Context, string, ...any) pgx.Row {
107107
return m.r
108108
}
109109

@@ -117,15 +117,15 @@ func TestTryLockClientName(t *testing.T) {
117117
})
118118

119119
t.Run("no schema yet", func(t *testing.T) {
120-
r := &mockpgrow{results: []interface{}{
120+
r := &mockpgrow{results: []any{
121121
0, //procoid
122122
}}
123123
m := mockpgconn{r}
124124
assert.NoError(t, pge.TryLockClientName(context.Background(), m))
125125
})
126126

127127
t.Run("locking error", func(t *testing.T) {
128-
r := &mockpgrow{results: []interface{}{
128+
r := &mockpgrow{results: []any{
129129
1, //procoid
130130
errors.New("locking error"), //error
131131
}}
@@ -134,7 +134,7 @@ func TestTryLockClientName(t *testing.T) {
134134
})
135135

136136
t.Run("locking successful", func(t *testing.T) {
137-
r := &mockpgrow{results: []interface{}{
137+
r := &mockpgrow{results: []any{
138138
1, //procoid
139139
true, //locked
140140
}}

internal/pgengine/log_hook.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,12 @@ func (hook *LogHook) send(cache []logrus.Entry) {
140140
pgx.Identifier{"timetable", "log"},
141141
[]string{"ts", "client_name", "pid", "log_level", "message", "message_data"},
142142
pgx.CopyFromSlice(len(cache),
143-
func(i int) ([]interface{}, error) {
143+
func(i int) ([]any, error) {
144144
jsonData, err := json.Marshal(cache[i].Data)
145145
if err != nil {
146146
return nil, err
147147
}
148-
return []interface{}{cache[i].Time,
148+
return []any{cache[i].Time,
149149
hook.client,
150150
hook.pid,
151151
adaptEntryLevel(cache[i].Level),

internal/pgengine/transaction.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func (pge *PgEngine) ExecAutonomousSQLTask(ctx context.Context, task *ChainTask,
119119
// ExecuteSQLCommand executes chain command with parameters inside transaction
120120
func (pge *PgEngine) ExecuteSQLCommand(ctx context.Context, executor executor, command string, paramValues []string) (out string, err error) {
121121
var ct pgconn.CommandTag
122-
var params []interface{}
122+
var params []any
123123
if strings.TrimSpace(command) == "" {
124124
return "", errors.New("SQL command cannot be empty")
125125
}

internal/pgengine/yaml.go

Lines changed: 16 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -58,42 +58,12 @@ func (pge *PgEngine) LoadYamlChains(ctx context.Context, filePath string, replac
5858
return fmt.Errorf("chain '%s' already exists (use --replace flag to overwrite)", yamlChain.ChainName)
5959
}
6060

61-
// Use existing add_job function for single-task chains
62-
if len(yamlChain.Tasks) == 1 {
63-
task := yamlChain.Tasks[0]
64-
params, _ := task.ToSQLParameters()
65-
var paramsValue interface{}
66-
if params == "" {
67-
paramsValue = nil
68-
} else {
69-
paramsValue = params
70-
}
71-
72-
_, err := pge.ConfigDb.Exec(ctx, `
73-
SELECT timetable.add_job($1, $2, $3, $4::jsonb, $5::timetable.command_kind, $6, $7, $8, $9, $10, $11, $12)`,
74-
yamlChain.ChainName,
75-
yamlChain.Schedule,
76-
task.Command,
77-
paramsValue,
78-
task.Kind,
79-
nullString(yamlChain.ClientName),
80-
yamlChain.MaxInstances,
81-
yamlChain.Live,
82-
yamlChain.SelfDestruct,
83-
task.IgnoreError,
84-
yamlChain.ExclusiveExecution,
85-
nullString(yamlChain.OnError))
86-
if err != nil {
87-
return fmt.Errorf("failed to create chain %s: %w", yamlChain.ChainName, err)
88-
}
89-
} else {
90-
// Multi-task chain - use direct SQL
91-
chainID, err := pge.createChainFromYaml(ctx, &yamlChain)
92-
if err != nil {
93-
return fmt.Errorf("failed to create multi-task chain %s: %w", yamlChain.ChainName, err)
94-
}
95-
pge.l.WithField("chain", yamlChain.ChainName).WithField("chain_id", chainID).Info("Created multi-task chain")
61+
// Multi-task chain - use direct SQL
62+
chainID, err := pge.createChainFromYaml(ctx, &yamlChain)
63+
if err != nil {
64+
return fmt.Errorf("failed to create multi-task chain %s: %w", yamlChain.ChainName, err)
9665
}
66+
pge.l.WithField("chain", yamlChain.ChainName).WithField("chain_id", chainID).Info("Created multi-task chain")
9767
}
9868

9969
pge.l.WithField("chains", len(yamlConfig.Chains)).WithField("file", filePath).Info("Successfully imported YAML chains")
@@ -167,7 +137,7 @@ func (pge *PgEngine) createChainFromYaml(ctx context.Context, yamlChain *YamlCha
167137
}
168138

169139
// nullString returns nil for empty strings, otherwise returns the string
170-
func nullString(s string) interface{} {
140+
func nullString(s string) any {
171141
if s == "" {
172142
return nil
173143
}
@@ -185,10 +155,16 @@ func (c *YamlChain) ValidateChain() error {
185155
}
186156

187157
// Validate cron format
188-
switch c.Schedule {
189-
case "", "@reboot", "@after", "@every":
190-
// Valid special schedules
191-
default:
158+
specialSchedules := []string{"@reboot", "@after", "@every"}
159+
isSpecial := false
160+
for _, s := range specialSchedules {
161+
if strings.HasPrefix(c.Schedule, s) {
162+
isSpecial = true
163+
break
164+
}
165+
}
166+
167+
if !isSpecial {
192168
fields := strings.Fields(c.Schedule)
193169
if len(fields) != 5 {
194170
return fmt.Errorf("invalid cron format: %s (expected 5 fields)", c.Schedule)

0 commit comments

Comments
 (0)