diff --git a/.vscode/launch.json b/.vscode/launch.json index b92f595c..f19a545a 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -13,8 +13,9 @@ "env": {}, "args": ["postgresql://scheduler@localhost:5432/timetable", "--clientname=worker001", - "--log-level=debug", - "--timeout=-1"] + // "--log-level=debug", + // "--timeout=-1" + ] } ] } \ No newline at end of file diff --git a/internal/pgengine/copy.go b/internal/pgengine/copy.go index e61ff5e4..dc80bc67 100644 --- a/internal/pgengine/copy.go +++ b/internal/pgengine/copy.go @@ -2,7 +2,9 @@ package pgengine import ( "context" + "errors" "os" + "os/exec" ) // CopyToFile copies data from database into local file using COPY format specified by sql @@ -36,3 +38,44 @@ func (pge *PgEngine) CopyFromFile(ctx context.Context, filename string, sql stri res, err := dbconn.Conn().PgConn().CopyFrom(ctx, f, sql) return res.RowsAffected(), err } + +// CopyToProgram copies data from database to the standard input of the command using COPY format specified by sql +func (pge *PgEngine) CopyToProgram(ctx context.Context, sql string, cmd string, args ...string) (int64, error) { + dbconn, err := pge.ConfigDb.Acquire(ctx) + if err != nil { + return -1, err + } + defer dbconn.Release() + c := exec.CommandContext(ctx, cmd, args...) + inPipe, err := c.StdinPipe() + if err != nil { + return -1, err + } + if err := c.Start(); err != nil { + return -1, err + } + res, sqlErr := dbconn.Conn().PgConn().CopyTo(ctx, inPipe, sql) + _ = inPipe.Close() + cmdError := c.Wait() + return res.RowsAffected(), errors.Join(sqlErr, cmdError) +} + +// CopyFromProgram copies data from the standard output of the command into database using COPY format specified by sql +func (pge *PgEngine) CopyFromProgram(ctx context.Context, sql string, cmd string, args ...string) (int64, error) { + dbconn, err := pge.ConfigDb.Acquire(ctx) + if err != nil { + return -1, err + } + defer dbconn.Release() + c := exec.CommandContext(ctx, cmd, args...) + outPipe, err := c.StdoutPipe() + if err != nil { + return -1, err + } + if err := c.Start(); err != nil { + return -1, err + } + res, err := dbconn.Conn().PgConn().CopyFrom(ctx, outPipe, sql) + waitErr := c.Wait() + return res.RowsAffected(), errors.Join(waitErr, err) +} diff --git a/internal/scheduler/chain.go b/internal/scheduler/chain.go index a71ae401..da82c45d 100644 --- a/internal/scheduler/chain.go +++ b/internal/scheduler/chain.go @@ -1,6 +1,7 @@ package scheduler import ( + "cmp" "context" "fmt" "strings" @@ -166,8 +167,8 @@ func (sch *Scheduler) chainWorker(ctx context.Context, chains <-chan Chain) { } } -func getTimeoutContext(ctx context.Context, t1 int, t2 int) (context.Context, context.CancelFunc) { - timeout := max(t1, t2) +func getTimeoutContext(ctx context.Context, globalTimeout int, customTimeout int) (context.Context, context.CancelFunc) { + timeout := cmp.Or(customTimeout, globalTimeout) if timeout > 0 { return context.WithTimeout(ctx, time.Millisecond*time.Duration(timeout)) } diff --git a/internal/scheduler/tasks.go b/internal/scheduler/tasks.go index 19ed5c77..883ac808 100644 --- a/internal/scheduler/tasks.go +++ b/internal/scheduler/tasks.go @@ -14,14 +14,16 @@ import ( // BuiltinTasks maps builtin task names with event handlers var BuiltinTasks = map[string](func(context.Context, *Scheduler, string) (string, error)){ - "NoOp": taskNoOp, - "Sleep": taskSleep, - "Log": taskLog, - "SendMail": taskSendMail, - "Download": taskDownload, - "CopyFromFile": taskCopyFromFile, - "CopyToFile": taskCopyToFile, - "Shutdown": taskShutdown} + "NoOp": taskNoOp, + "Sleep": taskSleep, + "Log": taskLog, + "SendMail": taskSendMail, + "Download": taskDownload, + "CopyFromFile": taskCopyFromFile, + "CopyToFile": taskCopyToFile, + "CopyToProgram": taskCopyToProgram, + "CopyFromProgram": taskCopyFromProgram, + "Shutdown": taskShutdown} func (sch *Scheduler) executeBuiltinTask(ctx context.Context, name string, paramValues []string) (stdout string, err error) { var s string @@ -106,6 +108,40 @@ func taskCopyToFile(ctx context.Context, sch *Scheduler, val string) (stdout str return stdout, err } +func taskCopyToProgram(ctx context.Context, sch *Scheduler, val string) (stdout string, err error) { + type copyToProgram struct { + SQL string `json:"sql"` + Cmd string `json:"cmd"` + Args []string `json:"args"` + } + var ctp copyToProgram + if err := json.Unmarshal([]byte(val), &ctp); err != nil { + return "", err + } + count, err := sch.pgengine.CopyToProgram(ctx, ctp.SQL, ctp.Cmd, ctp.Args...) + if err == nil { + stdout = fmt.Sprintf("%d rows copied to program %s", count, ctp.Cmd) + } + return stdout, err +} + +func taskCopyFromProgram(ctx context.Context, sch *Scheduler, val string) (stdout string, err error) { + type copyFromProgram struct { + SQL string `json:"sql"` + Cmd string `json:"cmd"` + Args []string `json:"args"` + } + var cfp copyFromProgram + if err := json.Unmarshal([]byte(val), &cfp); err != nil { + return "", err + } + count, err := sch.pgengine.CopyFromProgram(ctx, cfp.SQL, cfp.Cmd, cfp.Args...) + if err == nil { + stdout = fmt.Sprintf("%d rows copied from program %s", count, cfp.Cmd) + } + return stdout, err +} + func taskDownload(ctx context.Context, _ *Scheduler, paramValues string) (stdout string, err error) { type downloadOpts struct { WorkersNum int `json:"workersnum"` diff --git a/samples/CopyToFromProgram.sql b/samples/CopyToFromProgram.sql new file mode 100644 index 00000000..418f7622 --- /dev/null +++ b/samples/CopyToFromProgram.sql @@ -0,0 +1,82 @@ +-- This script demonstrates how to use pg_timetable to interact with etcd. +-- It includes tasks to write keys to etcd, read them back, and parse the results +-- into a structured format. The difference from the Etcd.sql example is that +-- it runs etcdctl commands locally. That allows to run pg_timetable on a remote +-- server without etcd installed on a PostgreSQL server. + +-- Setup: A table to store the keys and values from etcd +CREATE TABLE IF NOT EXISTS etcd_test_data ( + key TEXT, + value TEXT +); +TRUNCATE public.etcd_test_data; + +-- An enhanced example consisting of three tasks: +-- 1. Write key-value pairs to etcd using CopyToProgram BUILT-IN task +-- 2. Read all keys under a specific prefix from etcd using CopyFromProgram BUILT-IN task +-- 3. Delete all keys under the prefix using a single parameterized task +DO $CHAIN$ +DECLARE + v_task_id bigint; + v_chain_id bigint; +BEGIN + -- Create the chain with default values executed every minute (NULL == '* * * * *' :: timetable.cron) + INSERT INTO timetable.chain (chain_name, live, self_destruct) + VALUES ('Sync etcd with PostgreSQL', TRUE, TRUE) + RETURNING chain_id INTO v_chain_id; + + -- Step 1. Write key-value pairs to etcd + -- Create the task to write keys to etcd + INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error) + VALUES (v_chain_id, 1, 'BUILTIN', 'CopyToProgram', FALSE) + RETURNING task_id INTO v_task_id; + + -- Create the parameters for the task + INSERT INTO timetable.parameter (task_id, order_id, value) + VALUES (v_task_id, 1, jsonb_build_object( + 'sql', $$COPY ( + SELECT encode('timetable/' || name::bytea, 'base64'), encode(setting::bytea, 'base64') FROM pg_settings + ) TO STDOUT$$, + 'cmd', 'sh', + 'args', jsonb_build_array( + '-c', + $$while IFS=$'\t' read key_b64 value_b64; do + [ -z "$key_b64" ] && echo "Skipping empty key" && continue + [ -z "$value_b64" ] && echo "Skipping empty value" && continue + key=$(printf '%s' "$key_b64" | base64 -di) + value=$(printf '%s' "$value_b64" | base64 -di) + etcdctl put "$key" "$value" + done$$) + ) + ); + + + -- Step 2. Read all keys under the prefix from etcd + -- Create the task to read keys from etcd + INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error) + VALUES (v_chain_id, 2, 'BUILTIN', 'CopyFromProgram', FALSE) + RETURNING task_id INTO v_task_id; + + -- Create the parameters for the task + INSERT INTO timetable.parameter (task_id, order_id, value) + VALUES (v_task_id, 1, jsonb_build_object( + 'sql', 'COPY etcd_test_data FROM STDIN', + 'cmd', 'sh', + 'args', jsonb_build_array( + '-c', + $$etcdctl get --prefix timetable/ | awk 'NR%2==1{key=$0} NR%2==0{print key "\t" $0}'$$ + ) + ) + ); + + -- Step 3. Delete all keys under the prefix from etcd + -- Create the task to delete keys from etcd + INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error) + VALUES (v_chain_id, 3, 'PROGRAM', 'etcdctl', FALSE) + RETURNING task_id INTO v_task_id; + + -- Create the parameters for the task + INSERT INTO timetable.parameter (task_id, order_id, value) + VALUES (v_task_id, 1, jsonb_build_array('del', '--prefix', 'timetable/')); + +END; $CHAIN$ \ No newline at end of file