Skip to content

Commit 15b5bfb

Browse files
authored
Merge pull request pgEdge#77 from ibrarahmad/SPOC-62
[SPOC-62]: Fix slot_advance error and add-node stuckness
2 parents 8ce7e66 + 20e8e14 commit 15b5bfb

File tree

6 files changed

+79
-24
lines changed

6 files changed

+79
-24
lines changed

spock_functions.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,8 @@ Datum spock_create_subscription(PG_FUNCTION_ARGS)
573573
create_subscription(&sub);
574574

575575
/* Create progress entry to track commit ts per local/remote origin */
576-
create_progress_entry(localnode->node->id, originif.nodeid, 0);
576+
create_progress_entry(localnode->node->id, originif.nodeid, GetCurrentIntegerTimestamp());
577+
577578

578579
/* Create synchronization status for the subscription. */
579580
memset(&sync, 0, sizeof(SpockSyncStatus));

spockctrl/include/workflow.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#ifndef WORKFLOW_H
22
#define WORKFLOW_H
33

4+
#include <stdbool.h>
45
#include <jansson.h>
56

67
#define MAX_ARGS 15
@@ -23,6 +24,7 @@ typedef struct Step
2324
char *on_failure; /* JSON string for on_failure actions */
2425
int sleep; /* Sleep time after the step */
2526
int type; /* Type of the step (e.g., STEP_TYPE_SPOCK, STEP_TYPE_SQL, STEP_TYPE_SHELL) */
27+
bool ignore_errors; /* Ignore errors for this step */
2628
} Step;
2729

2830
typedef struct Workflow

spockctrl/src/sql.c

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <string.h>
44
#include <stdarg.h>
55
#include <getopt.h>
6+
#include <stdbool.h>
67
#include <libpq-fe.h>
78
#include <regex.h>
89
#include "dbconn.h"
@@ -29,6 +30,7 @@ int handle_sql_exec_command(int argc, char *argv[])
2930
{"node", required_argument, 0, 'n'},
3031
{"sql", required_argument, 0, 's'},
3132
{"help", no_argument, 0, 'h'},
33+
{"ignore-errors", no_argument, 0, 'i'},
3234
{0, 0, 0, 0}
3335
};
3436

@@ -45,9 +47,10 @@ int handle_sql_exec_command(int argc, char *argv[])
4547
int ncols;
4648
int row;
4749
int col;
50+
bool ignore_errors = false;
4851

4952
optind = 1;
50-
while ((c = getopt_long(argc, argv, "n:s:h", long_options, &option_index)) != -1) {
53+
while ((c = getopt_long(argc, argv, "n:s:hi", long_options, &option_index)) != -1) {
5154
switch (c) {
5255
case 'n':
5356
node_name = optarg;
@@ -58,12 +61,14 @@ int handle_sql_exec_command(int argc, char *argv[])
5861
case 'h':
5962
print_sql_help();
6063
return EXIT_SUCCESS;
64+
case 'i':
65+
ignore_errors = true;
66+
break;
6167
default:
6268
print_sql_help();
6369
return EXIT_FAILURE;
6470
}
6571
}
66-
6772
if (!sql_stmt)
6873
{
6974
log_error("SQL statement is required.");
@@ -80,24 +85,26 @@ int handle_sql_exec_command(int argc, char *argv[])
8085
{
8186
log_error("Failed to get connection info for node '%s'.", node_name ? node_name : "(default)");
8287
print_sql_help();
83-
return EXIT_FAILURE;
88+
return ignore_errors ? EXIT_SUCCESS : EXIT_FAILURE; // Handle ignore-errors
8489
}
8590

8691
conn = PQconnectdb(conninfo);
8792
if (!conn || PQstatus(conn) != CONNECTION_OK)
8893
{
8994
log_error("Connection to database failed: %s", conn ? PQerrorMessage(conn) : "NULL connection");
9095
if (conn) PQfinish(conn);
91-
return EXIT_FAILURE;
96+
return ignore_errors ? EXIT_SUCCESS : EXIT_FAILURE; // Handle ignore-errors
9297
}
98+
9399
res = PQexec(conn, sub_sql);
94100
if (res == NULL || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK))
95101
{
96-
log_error("Failed to execute SQL: %s", conn != NULL ? PQerrorMessage(conn) : "NULL connection");
102+
if (!ignore_errors)
103+
log_error("Failed to execute SQL: %s", conn != NULL ? PQerrorMessage(conn) : "NULL connection");
97104
if (res != NULL)
98105
PQclear(res);
99106
PQfinish(conn);
100-
return EXIT_FAILURE;
107+
return ignore_errors ? EXIT_SUCCESS : EXIT_FAILURE; // Handle ignore-errors
101108
}
102109

103110
/* Prepare output file name */
@@ -108,7 +115,7 @@ int handle_sql_exec_command(int argc, char *argv[])
108115
log_error("Could not open output file '%s' for writing.", out_filename);
109116
PQclear(res);
110117
PQfinish(conn);
111-
return EXIT_FAILURE;
118+
return ignore_errors ? EXIT_SUCCESS : EXIT_FAILURE; // Handle ignore-errors
112119
}
113120

114121
if (PQresultStatus(res) == PGRES_TUPLES_OK)
@@ -127,18 +134,18 @@ int handle_sql_exec_command(int argc, char *argv[])
127134
{
128135
for (col = 0; col < ncols; col++)
129136
{
130-
fprintf(stdout, "%s=%s%s", PQfname(res, col), PQgetvalue(res, row, col), (col < ncols - 1) ? "\t" : "\n");
137+
log_debug0("%s=%s%s", PQfname(res, col), PQgetvalue(res, row, col), (col < ncols - 1) ? "\t" : "\n");
131138
fprintf(outf, "%s=%s%s", PQfname(res, col), PQgetvalue(res, row, col), (col < ncols - 1) ? "\t" : "\n");
132139
}
133140
}
134141
}
135142
else if (PQresultStatus(res) == PGRES_COMMAND_OK)
136143
{
137-
fprintf(outf, "result=%s\n", PQcmdStatus(res));
144+
log_debug0("result=%s\n", PQcmdStatus(res));
138145
}
139146
else
140147
{
141-
fprintf(outf, "result=success\n");
148+
log_debug0("result=success\n");
142149
}
143150

144151
fclose(outf);

spockctrl/src/sub.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ handle_sub_create_command(int argc, char *argv[])
302302
{
303303
for (col = 0; col < ncols; col++)
304304
{
305-
fprintf(stdout, "%s=%s%s", PQfname(res, col), PQgetvalue(res, row, col), (col < ncols - 1) ? "\t" : "\n");
305+
log_debug0("%s=%s%s", PQfname(res, col), PQgetvalue(res, row, col), (col < ncols - 1) ? "\t" : "\n");
306306
fprintf(outf, "%s=%s%s", PQfname(res, col), PQgetvalue(res, row, col), (col < ncols - 1) ? "\t" : "\n");
307307
}
308308
}

spockctrl/src/workflow.c

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,21 @@ static int prepare_arguments(Step *step, char *argv[], int max_args, const char
6464
}
6565
argv[argc++] = step->args[i];
6666
}
67+
/* Add the ignore_errors flag if set */
68+
if (step->ignore_errors)
69+
{
70+
if (argc >= max_args - 1)
71+
{
72+
log_error("Too many arguments in step: %s", step->name ? step->name : "(unknown)");
73+
return -1;
74+
}
75+
argv[argc++] = "--ignore-errors";
76+
}
6777

6878
argv[argc] = NULL; /* Null-terminate */
6979
return argc;
7080
}
7181

72-
7382
Workflow *
7483
load_workflow(const char *json_file_path)
7584
{
@@ -146,7 +155,7 @@ load_workflow(const char *json_file_path)
146155
static int
147156
parse_spock_step(json_t *spock, Step *step)
148157
{
149-
json_t *command, *description, *args, *node, *name, *sleep_val;
158+
json_t *command, *description, *args, *node, *name, *sleep_val, *ignore_errors;
150159
int j;
151160

152161
step->type = STEP_TYPE_SPOCK;
@@ -211,13 +220,24 @@ parse_spock_step(json_t *spock, Step *step)
211220
else
212221
step->sleep = 0;
213222

223+
/* Parse ignore_errors flag */
224+
ignore_errors = json_object_get(spock, "ignore_errors");
225+
if (ignore_errors && json_is_boolean(ignore_errors))
226+
{
227+
step->ignore_errors = json_boolean_value(ignore_errors);
228+
}
229+
else
230+
{
231+
step->ignore_errors = false; // Default to false
232+
}
233+
214234
return 0;
215235
}
216236

217237
static int
218238
parse_sql_step(json_t *sql, Step *step)
219239
{
220-
json_t *command, *description, *args, *node, *name, *sleep_val;
240+
json_t *command, *description, *args, *node, *name, *sleep_val, *ignore_errors;
221241
int j;
222242

223243
step->type = STEP_TYPE_SQL;
@@ -282,6 +302,16 @@ parse_sql_step(json_t *sql, Step *step)
282302
else
283303
step->sleep = 0;
284304

305+
ignore_errors = json_object_get(sql, "ignore_errors");
306+
if (ignore_errors && json_is_boolean(ignore_errors))
307+
{
308+
step->ignore_errors = json_boolean_value(ignore_errors);
309+
}
310+
else
311+
{
312+
step->ignore_errors = false; // Default to false
313+
}
314+
285315
return 0;
286316
}
287317

@@ -586,8 +616,7 @@ run_workflow(Workflow *workflow)
586616
return 0;
587617
}
588618

589-
int
590-
handle_sql_command(Step *step)
619+
int handle_sql_command(Step *step)
591620
{
592621
char *argv[MAX_ARGS + 1];
593622
int argc;
@@ -599,14 +628,23 @@ handle_sql_command(Step *step)
599628
return -1;
600629
}
601630

602-
603631
/* Debug: Print the prepared arguments */
604632
for (int i = 0; i < argc; i++)
605633
{
606634
log_debug0("Argument[%d]: %s", i, argv[i]);
607635
}
608-
handle_sql_exec_command(argc, argv);
609-
return 0;
636+
637+
/* Execute the SQL command */
638+
int result = handle_sql_exec_command(argc, argv);
639+
640+
/* Handle ignore_errors flag */
641+
if (result != 0)
642+
{
643+
log_warning("SQL command failed, but ignoring errors as per configuration.");
644+
return 0; // Treat as success
645+
}
646+
647+
return result;
610648
}
611649

612650
static int

spockctrl/workflows/add_node.json

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,14 @@
5656
"--force_text_transfer=false",
5757
"--enabled=true"
5858
],
59-
"ignore_errors": false,
6059
"on_success": {},
6160
"on_failure": {}
6261
}
6362
},
6463
{
6564
"sql": {
6665
"node": "n2",
66+
"ignore_errors": false,
6767
"command": "SQL",
6868
"description": "Wait for apply worker on n2 subscription",
6969
"sleep": 0,
@@ -115,6 +115,7 @@
115115
"command": "SQL",
116116
"description": "Trigger a sync event on (n2)",
117117
"sleep": 10,
118+
"ignore_errors": false,
118119
"args": [
119120
"--sql=SELECT spock.sync_event();"
120121
],
@@ -128,6 +129,7 @@
128129
"command": "SQL",
129130
"description": "Wait for a sync event on (n1) for n2-n1",
130131
"sleep": 0,
132+
"ignore_errors": false,
131133
"args": [
132134
"--sql=CALL spock.wait_for_sync_event(true, 'n2', '$n2.sync_event'::pg_lsn, 1200000);"
133135
],
@@ -161,7 +163,8 @@
161163
"node": "n1",
162164
"command": "SQL",
163165
"description": "Trigger a sync event on (n1)",
164-
"sleep": 5,
166+
"sleep": 0,
167+
"ignore_errors": false,
165168
"args": [
166169
"--sql=SELECT spock.sync_event();"
167170
],
@@ -174,7 +177,8 @@
174177
"node": "n3",
175178
"command": "SQL",
176179
"description": "Wait for a sync event on (n1) for n1-n3",
177-
"sleep": 10,
180+
"sleep": 0,
181+
"ignore_errors": false,
178182
"args": [
179183
"--sql=CALL spock.wait_for_sync_event(true, 'n1', '$n1.sync_event'::pg_lsn, 1200000);"
180184
],
@@ -187,7 +191,8 @@
187191
"node": "n3",
188192
"command": "SQL",
189193
"description": "Check commit timestamp for n3 lag",
190-
"sleep": 1,
194+
"sleep": 0,
195+
"ignore_errors": false,
191196
"args": [
192197
"--sql=SELECT commit_timestamp FROM spock.lag_tracker WHERE origin_name = 'n2' AND receiver_name = 'n3'"
193198
],
@@ -201,6 +206,7 @@
201206
"command": "SQL",
202207
"description": "Advance the replication slot for n2->n3 based on a specific commit timestamp",
203208
"sleep": 0,
209+
"ignore_errors": true,
204210
"args": [
205211
"--sql=WITH lsn_cte AS (SELECT spock.get_lsn_from_commit_ts('spk_pgedge_n2_sub_n2_n3', '$n3.commit_timestamp'::timestamp) AS lsn) SELECT pg_replication_slot_advance('spk_pgedge_n2_sub_n2_n3', lsn) FROM lsn_cte;"
206212
],
@@ -227,6 +233,7 @@
227233
"command": "SQL",
228234
"description": "Check the replication lags between nodes.",
229235
"sleep": 0,
236+
"ignore_errors": false,
230237
"args": [
231238
"--sql=DO $$\nDECLARE\n lag_n1_n3 interval;\n lag_n2_n3 interval;\nBEGIN\n LOOP\n SELECT now() - commit_timestamp INTO lag_n1_n3\n FROM spock.lag_tracker\n WHERE origin_name = 'n1' AND receiver_name = 'n3';\n\n SELECT now() - commit_timestamp INTO lag_n2_n3\n FROM spock.lag_tracker\n WHERE origin_name = 'n2' AND receiver_name = 'n3';\n\n RAISE NOTICE 'n1 → n3 lag: %, n2 → n3 lag: %',\n COALESCE(lag_n1_n3::text, 'NULL'),\n COALESCE(lag_n2_n3::text, 'NULL');\n\n EXIT WHEN lag_n1_n3 IS NOT NULL AND lag_n2_n3 IS NOT NULL\n AND extract(epoch FROM lag_n1_n3) < 59\n AND extract(epoch FROM lag_n2_n3) < 59;\n\n PERFORM pg_sleep(1);\n END LOOP;\nEND\n$$;\n"
232239
],

0 commit comments

Comments
 (0)