|
| 1 | +-- This sample demonstrates how to read multiple key-value pairs from an etcd cluster |
| 2 | +-- based on a prefix and reflect them in a database table. |
| 3 | +-- |
| 4 | +-- It assumes that: |
| 5 | +-- 1. etcdctl is installed and available in the system's PATH. |
| 6 | +-- 2. The etcd cluster is accessible. The default endpoint is http://127.0.0.1:2379. |
| 7 | +-- 3. The user running pg_timetable has sufficient permissions to execute the etcdctl command. |
| 8 | + |
| 9 | +-- The chain will perform the following steps: |
| 10 | +-- 1. Define one task to write to etcd and provide multiple parameter rows to it. |
| 11 | +-- The task will be executed for each parameter row. |
| 12 | +-- 2. Read all key-values under that prefix from etcd as a single JSON object. |
| 13 | +-- 3. Parse the JSON, decode the base64 keys and values, and store them in a table. |
| 14 | +-- 4. Log the number of keys retrieved. |
| 15 | +-- 5. Delete all keys under the prefix from etcd using a single parameterized task. |
| 16 | + |
| 17 | +-- Setup: A table to store the keys and values from etcd |
| 18 | +CREATE TABLE IF NOT EXISTS etcd_test_data ( |
| 19 | + key TEXT, |
| 20 | + value TEXT |
| 21 | +); |
| 22 | +TRUNCATE public.etcd_test_data; |
| 23 | + |
| 24 | +-- Setup: A temporary table to store raw JSON output from etcd |
| 25 | +CREATE TABLE IF NOT EXISTS etcd_json_raw ( |
| 26 | + data JSONB |
| 27 | +); |
| 28 | +TRUNCATE etcd_json_raw; |
| 29 | + |
| 30 | +-- The chain definition |
| 31 | +-- It runs once and then removes itself (self_destruct). |
| 32 | +INSERT INTO timetable.chain ( |
| 33 | + chain_name, |
| 34 | + run_at, |
| 35 | + max_instances, |
| 36 | + live, |
| 37 | + self_destruct |
| 38 | +) |
| 39 | +VALUES ( |
| 40 | + 'ETCD Prefix-Read-Clean', |
| 41 | + '* * * * *', -- Run every minute, but self_destruct will make it run once |
| 42 | + 1, |
| 43 | + TRUE, |
| 44 | + TRUE |
| 45 | +) |
| 46 | +RETURNING chain_id; |
| 47 | + |
| 48 | +-- Task 1: A single task to write key-value pairs to etcd. |
| 49 | +-- This task will be executed 3 times, once for each parameter row below. |
| 50 | +WITH task AS ( |
| 51 | + INSERT INTO timetable.task (chain_id, task_order, task_name, kind, command, ignore_error) |
| 52 | + SELECT currval('timetable.chain_chain_id_seq'), 10, 'Write keys to etcd', 'PROGRAM', 'etcdctl', FALSE |
| 53 | + RETURNING task_id |
| 54 | +) |
| 55 | +INSERT INTO timetable.parameter (task_id, order_id, value) |
| 56 | +SELECT task_id, 1, '["--endpoints=http://127.0.0.1:2379", "put", "/pg_timetable/multi/key1", "value1"]'::jsonb FROM task UNION ALL |
| 57 | +SELECT task_id, 2, '["--endpoints=http://127.0.0.1:2379", "put", "/pg_timetable/multi/key2", "value2"]'::jsonb FROM task UNION ALL |
| 58 | +SELECT task_id, 3, '["--endpoints=http://127.0.0.1:2379", "put", "/pg_timetable/multi/key3", "value3"]'::jsonb FROM task; |
| 59 | + |
| 60 | +-- Task 2: Read all keys under the prefix from etcd as JSON |
| 61 | +INSERT INTO timetable.task ( |
| 62 | + chain_id, |
| 63 | + task_order, |
| 64 | + task_name, |
| 65 | + kind, |
| 66 | + command, |
| 67 | + ignore_error |
| 68 | +) |
| 69 | +SELECT |
| 70 | + currval('timetable.chain_chain_id_seq'), |
| 71 | + 20, |
| 72 | + 'Read from etcd with prefix', |
| 73 | + 'SQL', |
| 74 | + $$COPY etcd_json_raw (data) FROM PROGRAM 'etcdctl --endpoints=http://127.0.0.1:2379 get --prefix /pg_timetable/multi/ --write-out=json'$$, |
| 75 | + FALSE; |
| 76 | + |
| 77 | +-- Task 3: Parse JSON array and store the key-value pairs |
| 78 | +INSERT INTO timetable.task ( |
| 79 | + chain_id, |
| 80 | + task_order, |
| 81 | + task_name, |
| 82 | + kind, |
| 83 | + command, |
| 84 | + ignore_error |
| 85 | +) |
| 86 | +SELECT |
| 87 | + currval('timetable.chain_chain_id_seq'), |
| 88 | + 30, |
| 89 | + 'Parse etcd output and store', |
| 90 | + 'SQL', |
| 91 | + $$INSERT INTO public.etcd_test_data (key, value) |
| 92 | + SELECT |
| 93 | + convert_from(decode(kv->>'key', 'base64'), 'UTF8'), |
| 94 | + convert_from(decode(kv->>'value', 'base64'), 'UTF8') |
| 95 | + FROM |
| 96 | + etcd_json_raw, |
| 97 | + jsonb_array_elements(data->'kvs') AS kv$$, |
| 98 | + FALSE; |
| 99 | + |
| 100 | +-- Task 4: Log the result |
| 101 | +INSERT INTO timetable.task ( |
| 102 | + chain_id, |
| 103 | + task_order, |
| 104 | + task_name, |
| 105 | + kind, |
| 106 | + command, |
| 107 | + ignore_error |
| 108 | +) |
| 109 | +SELECT |
| 110 | + currval('timetable.chain_chain_id_seq'), |
| 111 | + 40, |
| 112 | + 'Log etcd output', |
| 113 | + 'SQL', |
| 114 | + $task$ |
| 115 | + DO |
| 116 | + $$ |
| 117 | + DECLARE |
| 118 | + msg integer; |
| 119 | + BEGIN |
| 120 | + SELECT count(*) FROM public.etcd_test_data INTO msg; |
| 121 | + RAISE notice 'Loaded keys from etcd: %', msg; |
| 122 | + END; |
| 123 | + $$ |
| 124 | + $task$, |
| 125 | + FALSE; |
| 126 | + |
| 127 | +-- Task 5: Clean up the keys in etcd |
| 128 | +WITH task AS ( |
| 129 | + INSERT INTO timetable.task (chain_id, task_order, task_name, kind, command, ignore_error) |
| 130 | + SELECT currval('timetable.chain_chain_id_seq'), 50, 'Clean up etcd', 'PROGRAM', 'etcdctl', FALSE |
| 131 | + RETURNING task_id |
| 132 | +) |
| 133 | +INSERT INTO timetable.parameter (task_id, order_id, value) |
| 134 | +SELECT task_id, 1, '["--endpoints=http://127.0.0.1:2379", "del", "--prefix", "/pg_timetable/multi/"]'::jsonb FROM task; |
0 commit comments