Skip to content

Commit 88908db

Browse files
committed
[!] add CopyToProgram and CopyFromProgram built-in tasks
1 parent 287819d commit 88908db

File tree

5 files changed

+175
-12
lines changed

5 files changed

+175
-12
lines changed

.vscode/launch.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@
1313
"env": {},
1414
"args": ["postgresql://scheduler@localhost:5432/timetable",
1515
"--clientname=worker001",
16-
"--log-level=debug",
17-
"--timeout=-1"]
16+
// "--log-level=debug",
17+
// "--timeout=-1"
18+
]
1819
}
1920
]
2021
}

internal/pgengine/copy.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package pgengine
22

33
import (
44
"context"
5+
"errors"
56
"os"
7+
"os/exec"
68
)
79

810
// CopyToFile copies data from database into local file using COPY format specified by sql
@@ -36,3 +38,44 @@ func (pge *PgEngine) CopyFromFile(ctx context.Context, filename string, sql stri
3638
res, err := dbconn.Conn().PgConn().CopyFrom(ctx, f, sql)
3739
return res.RowsAffected(), err
3840
}
41+
42+
// CopyToProgram copies data from database to the standard input of the command using COPY format specified by sql
43+
func (pge *PgEngine) CopyToProgram(ctx context.Context, sql string, cmd string, args ...string) (int64, error) {
44+
dbconn, err := pge.ConfigDb.Acquire(ctx)
45+
if err != nil {
46+
return -1, err
47+
}
48+
defer dbconn.Release()
49+
c := exec.CommandContext(ctx, cmd, args...)
50+
inPipe, err := c.StdinPipe()
51+
if err != nil {
52+
return -1, err
53+
}
54+
if err := c.Start(); err != nil {
55+
return -1, err
56+
}
57+
res, sqlErr := dbconn.Conn().PgConn().CopyTo(ctx, inPipe, sql)
58+
_ = inPipe.Close()
59+
cmdError := c.Wait()
60+
return res.RowsAffected(), errors.Join(sqlErr, cmdError)
61+
}
62+
63+
// CopyFromProgram copies data from the standard output of the command into database using COPY format specified by sql
64+
func (pge *PgEngine) CopyFromProgram(ctx context.Context, sql string, cmd string, args ...string) (int64, error) {
65+
dbconn, err := pge.ConfigDb.Acquire(ctx)
66+
if err != nil {
67+
return -1, err
68+
}
69+
defer dbconn.Release()
70+
c := exec.CommandContext(ctx, cmd, args...)
71+
outPipe, err := c.StdoutPipe()
72+
if err != nil {
73+
return -1, err
74+
}
75+
if err := c.Start(); err != nil {
76+
return -1, err
77+
}
78+
res, err := dbconn.Conn().PgConn().CopyFrom(ctx, outPipe, sql)
79+
waitErr := c.Wait()
80+
return res.RowsAffected(), errors.Join(waitErr, err)
81+
}

internal/scheduler/chain.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package scheduler
22

33
import (
4+
"cmp"
45
"context"
56
"fmt"
67
"strings"
@@ -166,8 +167,8 @@ func (sch *Scheduler) chainWorker(ctx context.Context, chains <-chan Chain) {
166167
}
167168
}
168169

169-
func getTimeoutContext(ctx context.Context, t1 int, t2 int) (context.Context, context.CancelFunc) {
170-
timeout := max(t1, t2)
170+
func getTimeoutContext(ctx context.Context, globalTimeout int, customTimeout int) (context.Context, context.CancelFunc) {
171+
timeout := cmp.Or(customTimeout, globalTimeout)
171172
if timeout > 0 {
172173
return context.WithTimeout(ctx, time.Millisecond*time.Duration(timeout))
173174
}

internal/scheduler/tasks.go

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,16 @@ import (
1414

1515
// BuiltinTasks maps builtin task names with event handlers
1616
var BuiltinTasks = map[string](func(context.Context, *Scheduler, string) (string, error)){
17-
"NoOp": taskNoOp,
18-
"Sleep": taskSleep,
19-
"Log": taskLog,
20-
"SendMail": taskSendMail,
21-
"Download": taskDownload,
22-
"CopyFromFile": taskCopyFromFile,
23-
"CopyToFile": taskCopyToFile,
24-
"Shutdown": taskShutdown}
17+
"NoOp": taskNoOp,
18+
"Sleep": taskSleep,
19+
"Log": taskLog,
20+
"SendMail": taskSendMail,
21+
"Download": taskDownload,
22+
"CopyFromFile": taskCopyFromFile,
23+
"CopyToFile": taskCopyToFile,
24+
"CopyToProgram": taskCopyToProgram,
25+
"CopyFromProgram": taskCopyFromProgram,
26+
"Shutdown": taskShutdown}
2527

2628
func (sch *Scheduler) executeBuiltinTask(ctx context.Context, name string, paramValues []string) (stdout string, err error) {
2729
var s string
@@ -106,6 +108,40 @@ func taskCopyToFile(ctx context.Context, sch *Scheduler, val string) (stdout str
106108
return stdout, err
107109
}
108110

111+
func taskCopyToProgram(ctx context.Context, sch *Scheduler, val string) (stdout string, err error) {
112+
type copyToProgram struct {
113+
SQL string `json:"sql"`
114+
Cmd string `json:"cmd"`
115+
Args []string `json:"args"`
116+
}
117+
var ctp copyToProgram
118+
if err := json.Unmarshal([]byte(val), &ctp); err != nil {
119+
return "", err
120+
}
121+
count, err := sch.pgengine.CopyToProgram(ctx, ctp.SQL, ctp.Cmd, ctp.Args...)
122+
if err == nil {
123+
stdout = fmt.Sprintf("%d rows copied to program %s", count, ctp.Cmd)
124+
}
125+
return stdout, err
126+
}
127+
128+
func taskCopyFromProgram(ctx context.Context, sch *Scheduler, val string) (stdout string, err error) {
129+
type copyFromProgram struct {
130+
SQL string `json:"sql"`
131+
Cmd string `json:"cmd"`
132+
Args []string `json:"args"`
133+
}
134+
var cfp copyFromProgram
135+
if err := json.Unmarshal([]byte(val), &cfp); err != nil {
136+
return "", err
137+
}
138+
count, err := sch.pgengine.CopyFromProgram(ctx, cfp.SQL, cfp.Cmd, cfp.Args...)
139+
if err == nil {
140+
stdout = fmt.Sprintf("%d rows copied from program %s", count, cfp.Cmd)
141+
}
142+
return stdout, err
143+
}
144+
109145
func taskDownload(ctx context.Context, _ *Scheduler, paramValues string) (stdout string, err error) {
110146
type downloadOpts struct {
111147
WorkersNum int `json:"workersnum"`

samples/CopyToFromProgram.sql

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
-- This script demonstrates how to use pg_timetable to interact with etcd.
2+
-- It includes tasks to write keys to etcd, read them back, and parse the results
3+
-- into a structured format. The difference from the Etcd.sql example is that
4+
-- it runs etcdctl commands locally. That allows to run pg_timetable on a remote
5+
-- server without etcd installed on a PostgreSQL server.
6+
7+
-- Setup: A table to store the keys and values from etcd
8+
CREATE TABLE IF NOT EXISTS etcd_test_data (
9+
key TEXT,
10+
value TEXT
11+
);
12+
TRUNCATE public.etcd_test_data;
13+
14+
-- An enhanced example consisting of three tasks:
15+
-- 1. Write key-value pairs to etcd using CopyToProgram BUILT-IN task
16+
-- 2. Read all keys under a specific prefix from etcd using CopyFromProgram BUILT-IN task
17+
-- 3. Delete all keys under the prefix using a single parameterized task
18+
DO $CHAIN$
19+
DECLARE
20+
v_task_id bigint;
21+
v_chain_id bigint;
22+
BEGIN
23+
-- Create the chain with default values executed every minute (NULL == '* * * * *' :: timetable.cron)
24+
INSERT INTO timetable.chain (chain_name, live, self_destruct)
25+
VALUES ('Sync etcd with PostgreSQL', TRUE, TRUE)
26+
RETURNING chain_id INTO v_chain_id;
27+
28+
-- Step 1. Write key-value pairs to etcd
29+
-- Create the task to write keys to etcd
30+
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error)
31+
VALUES (v_chain_id, 1, 'BUILTIN', 'CopyToProgram', FALSE)
32+
RETURNING task_id INTO v_task_id;
33+
34+
-- Create the parameters for the task
35+
INSERT INTO timetable.parameter (task_id, order_id, value)
36+
VALUES (v_task_id, 1, jsonb_build_object(
37+
'sql', $$COPY (
38+
SELECT encode('timetable/' || name::bytea, 'base64'), encode(setting::bytea, 'base64') FROM pg_settings
39+
) TO STDOUT$$,
40+
'cmd', 'sh',
41+
'args', jsonb_build_array(
42+
'-c',
43+
$$while IFS=$'\t' read key_b64 value_b64; do
44+
[ -z "$key_b64" ] && echo "Skipping empty key" && continue
45+
[ -z "$value_b64" ] && echo "Skipping empty value" && continue
46+
key=$(printf '%s' "$key_b64" | base64 -di)
47+
value=$(printf '%s' "$value_b64" | base64 -di)
48+
etcdctl put "$key" "$value"
49+
done$$)
50+
)
51+
));
52+
53+
54+
-- Step 2. Read all keys under the prefix from etcd
55+
-- Create the task to read keys from etcd
56+
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error)
57+
VALUES (v_chain_id, 2, 'BUILTIN', 'CopyFromProgram', FALSE)
58+
RETURNING task_id INTO v_task_id;
59+
60+
-- Create the parameters for the task
61+
INSERT INTO timetable.parameter (task_id, order_id, value)
62+
VALUES (v_task_id, 1, jsonb_build_object(
63+
'sql', 'COPY etcd_test_data FROM STDIN',
64+
'cmd', 'sh',
65+
'args', jsonb_build_array(
66+
'-c',
67+
$$etcdctl get --prefix timetable/ | awk 'NR%2==1{key=$0} NR%2==0{print key "\t" $0}'$$
68+
)
69+
)
70+
);
71+
72+
-- Step 3. Delete all keys under the prefix from etcd
73+
-- Create the task to delete keys from etcd
74+
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error)
75+
VALUES (v_chain_id, 3, 'PROGRAM', 'etcdctl', FALSE)
76+
RETURNING task_id INTO v_task_id;
77+
78+
-- Create the parameters for the task
79+
INSERT INTO timetable.parameter (task_id, order_id, value)
80+
VALUES (v_task_id, 1, jsonb_build_array('del', '--prefix', 'timetable/'));
81+
82+
END; $CHAIN$

0 commit comments

Comments
 (0)