Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
"env": {},
"args": ["postgresql://scheduler@localhost:5432/timetable",
"--clientname=worker001",
"--log-level=debug",
"--timeout=-1"]
// "--log-level=debug",
// "--timeout=-1"
]
}
]
}
43 changes: 43 additions & 0 deletions internal/pgengine/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package pgengine

import (
"context"
"errors"
"os"
"os/exec"
)

// CopyToFile copies data from database into local file using COPY format specified by sql
Expand Down Expand Up @@ -36,3 +38,44 @@ func (pge *PgEngine) CopyFromFile(ctx context.Context, filename string, sql stri
res, err := dbconn.Conn().PgConn().CopyFrom(ctx, f, sql)
return res.RowsAffected(), err
}

// CopyToProgram copies data from database to the standard input of the command using COPY format specified by sql
func (pge *PgEngine) CopyToProgram(ctx context.Context, sql string, cmd string, args ...string) (int64, error) {
dbconn, err := pge.ConfigDb.Acquire(ctx)
if err != nil {
return -1, err
}
defer dbconn.Release()
c := exec.CommandContext(ctx, cmd, args...)
inPipe, err := c.StdinPipe()
if err != nil {
return -1, err
}
if err := c.Start(); err != nil {
return -1, err
}
res, sqlErr := dbconn.Conn().PgConn().CopyTo(ctx, inPipe, sql)
_ = inPipe.Close()
cmdError := c.Wait()
return res.RowsAffected(), errors.Join(sqlErr, cmdError)
}

// CopyFromProgram copies data from the standard output of the command into database using COPY format specified by sql
func (pge *PgEngine) CopyFromProgram(ctx context.Context, sql string, cmd string, args ...string) (int64, error) {
dbconn, err := pge.ConfigDb.Acquire(ctx)
if err != nil {
return -1, err
}
defer dbconn.Release()
c := exec.CommandContext(ctx, cmd, args...)
outPipe, err := c.StdoutPipe()
if err != nil {
return -1, err
}
if err := c.Start(); err != nil {
return -1, err
}
res, err := dbconn.Conn().PgConn().CopyFrom(ctx, outPipe, sql)
waitErr := c.Wait()
return res.RowsAffected(), errors.Join(waitErr, err)
}
5 changes: 3 additions & 2 deletions internal/scheduler/chain.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package scheduler

import (
"cmp"
"context"
"fmt"
"strings"
Expand Down Expand Up @@ -166,8 +167,8 @@ func (sch *Scheduler) chainWorker(ctx context.Context, chains <-chan Chain) {
}
}

func getTimeoutContext(ctx context.Context, t1 int, t2 int) (context.Context, context.CancelFunc) {
timeout := max(t1, t2)
func getTimeoutContext(ctx context.Context, globalTimeout int, customTimeout int) (context.Context, context.CancelFunc) {
timeout := cmp.Or(customTimeout, globalTimeout)
if timeout > 0 {
return context.WithTimeout(ctx, time.Millisecond*time.Duration(timeout))
}
Expand Down
52 changes: 44 additions & 8 deletions internal/scheduler/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@ import (

// BuiltinTasks maps builtin task names with event handlers
var BuiltinTasks = map[string](func(context.Context, *Scheduler, string) (string, error)){
"NoOp": taskNoOp,
"Sleep": taskSleep,
"Log": taskLog,
"SendMail": taskSendMail,
"Download": taskDownload,
"CopyFromFile": taskCopyFromFile,
"CopyToFile": taskCopyToFile,
"Shutdown": taskShutdown}
"NoOp": taskNoOp,
"Sleep": taskSleep,
"Log": taskLog,
"SendMail": taskSendMail,
"Download": taskDownload,
"CopyFromFile": taskCopyFromFile,
"CopyToFile": taskCopyToFile,
"CopyToProgram": taskCopyToProgram,
"CopyFromProgram": taskCopyFromProgram,
"Shutdown": taskShutdown}

func (sch *Scheduler) executeBuiltinTask(ctx context.Context, name string, paramValues []string) (stdout string, err error) {
var s string
Expand Down Expand Up @@ -106,6 +108,40 @@ func taskCopyToFile(ctx context.Context, sch *Scheduler, val string) (stdout str
return stdout, err
}

func taskCopyToProgram(ctx context.Context, sch *Scheduler, val string) (stdout string, err error) {
type copyToProgram struct {
SQL string `json:"sql"`
Cmd string `json:"cmd"`
Args []string `json:"args"`
}
var ctp copyToProgram
if err := json.Unmarshal([]byte(val), &ctp); err != nil {
return "", err
}
count, err := sch.pgengine.CopyToProgram(ctx, ctp.SQL, ctp.Cmd, ctp.Args...)
if err == nil {
stdout = fmt.Sprintf("%d rows copied to program %s", count, ctp.Cmd)
}
return stdout, err
}

func taskCopyFromProgram(ctx context.Context, sch *Scheduler, val string) (stdout string, err error) {
type copyFromProgram struct {
SQL string `json:"sql"`
Cmd string `json:"cmd"`
Args []string `json:"args"`
}
var cfp copyFromProgram
if err := json.Unmarshal([]byte(val), &cfp); err != nil {
return "", err
}
count, err := sch.pgengine.CopyFromProgram(ctx, cfp.SQL, cfp.Cmd, cfp.Args...)
if err == nil {
stdout = fmt.Sprintf("%d rows copied from program %s", count, cfp.Cmd)
}
return stdout, err
}

func taskDownload(ctx context.Context, _ *Scheduler, paramValues string) (stdout string, err error) {
type downloadOpts struct {
WorkersNum int `json:"workersnum"`
Expand Down
82 changes: 82 additions & 0 deletions samples/CopyToFromProgram.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
-- This script demonstrates how to use pg_timetable to interact with etcd.
-- It includes tasks to write keys to etcd, read them back, and parse the results
-- into a structured format. The difference from the Etcd.sql example is that
-- it runs etcdctl commands locally. That allows to run pg_timetable on a remote
-- server without etcd installed on a PostgreSQL server.

-- Setup: A table to store the keys and values from etcd
CREATE TABLE IF NOT EXISTS etcd_test_data (
key TEXT,
value TEXT
);
TRUNCATE public.etcd_test_data;

-- An enhanced example consisting of three tasks:
-- 1. Write key-value pairs to etcd using CopyToProgram BUILT-IN task
-- 2. Read all keys under a specific prefix from etcd using CopyFromProgram BUILT-IN task
-- 3. Delete all keys under the prefix using a single parameterized task
DO $CHAIN$
DECLARE
v_task_id bigint;
v_chain_id bigint;
BEGIN
-- Create the chain with default values executed every minute (NULL == '* * * * *' :: timetable.cron)
INSERT INTO timetable.chain (chain_name, live, self_destruct)
VALUES ('Sync etcd with PostgreSQL', TRUE, TRUE)
RETURNING chain_id INTO v_chain_id;

-- Step 1. Write key-value pairs to etcd
-- Create the task to write keys to etcd
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error)
VALUES (v_chain_id, 1, 'BUILTIN', 'CopyToProgram', FALSE)
RETURNING task_id INTO v_task_id;

-- Create the parameters for the task
INSERT INTO timetable.parameter (task_id, order_id, value)
VALUES (v_task_id, 1, jsonb_build_object(
'sql', $$COPY (
SELECT encode('timetable/' || name::bytea, 'base64'), encode(setting::bytea, 'base64') FROM pg_settings
) TO STDOUT$$,
'cmd', 'sh',
'args', jsonb_build_array(
'-c',
$$while IFS=$'\t' read key_b64 value_b64; do
[ -z "$key_b64" ] && echo "Skipping empty key" && continue
[ -z "$value_b64" ] && echo "Skipping empty value" && continue
key=$(printf '%s' "$key_b64" | base64 -di)
value=$(printf '%s' "$value_b64" | base64 -di)
etcdctl put "$key" "$value"
done$$)
)
);


-- Step 2. Read all keys under the prefix from etcd
-- Create the task to read keys from etcd
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error)
VALUES (v_chain_id, 2, 'BUILTIN', 'CopyFromProgram', FALSE)
RETURNING task_id INTO v_task_id;

-- Create the parameters for the task
INSERT INTO timetable.parameter (task_id, order_id, value)
VALUES (v_task_id, 1, jsonb_build_object(
'sql', 'COPY etcd_test_data FROM STDIN',
'cmd', 'sh',
'args', jsonb_build_array(
'-c',
$$etcdctl get --prefix timetable/ | awk 'NR%2==1{key=$0} NR%2==0{print key "\t" $0}'$$
)
)
);

-- Step 3. Delete all keys under the prefix from etcd
-- Create the task to delete keys from etcd
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error)
VALUES (v_chain_id, 3, 'PROGRAM', 'etcdctl', FALSE)
RETURNING task_id INTO v_task_id;

-- Create the parameters for the task
INSERT INTO timetable.parameter (task_id, order_id, value)
VALUES (v_task_id, 1, jsonb_build_array('del', '--prefix', 'timetable/'));

END; $CHAIN$
Loading