Skip to content

Commit a9609e0

Browse files
committed
Reduce physrep-sql against the metadb
Signed-off-by: Mark Hannum <mhannum@bloomberg.net>
1 parent 2948f1a commit a9609e0

File tree

13 files changed

+154
-206
lines changed

13 files changed

+154
-206
lines changed

bdb/file.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3663,8 +3663,6 @@ static void delete_log_files_int(bdb_state_type *bdb_state)
36633663
}
36643664
}
36653665

3666-
physrep_update_low_file_num(&lowfilenum, &local_lowfilenum);
3667-
36683666
/* debug: print filenums from other nodes */
36693667

36703668
/* if we have a maximum filenum defined in bdb attributes which is lower,

db/db_tunables.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,11 +517,13 @@ extern int gbl_physrep_hung_replicant_threshold;
517517
extern int gbl_physrep_revconn_check_interval;
518518
extern int gbl_physrep_fake_revconn_populate_error;
519519
extern int gbl_physrep_fake_revconn_populate_error_once;
520+
extern int gbl_physrep_fake_check_revconn_error;
520521
extern int gbl_debug_fake_rte_failure;
521522
extern int gbl_physrep_update_registry_interval;
522523
extern int gbl_physrep_i_am_metadb;
523524
extern int gbl_physrep_keepalive_v2;
524525
extern int gbl_physrep_keepalive_freq_sec;
526+
extern int gbl_physrep_slow_replicant_check_freq_sec;
525527
extern int gbl_physrep_max_candidates;
526528
extern int gbl_physrep_reconnect_penalty;
527529
extern int gbl_physrep_reconnect_interval;

db/db_tunables.h

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1821,19 +1821,15 @@ REGISTER_TUNABLE("tranlog_incoherent_timeout", "Timeout in seconds for incoheren
18211821
TUNABLE_INTEGER, &gbl_tranlog_incoherent_timeout, 0, NULL, NULL, NULL, NULL);
18221822
REGISTER_TUNABLE("tranlog_maxpoll", "Tranlog timeout in seconds for blocking poll. (Default: 60)", TUNABLE_INTEGER,
18231823
&gbl_tranlog_maxpoll, 0, NULL, NULL, NULL, NULL);
1824-
REGISTER_TUNABLE("physrep_check_minlog_freq_sec", "Check the minimum log number to keep this often. (Default: 10)",
1825-
TUNABLE_INTEGER, &gbl_physrep_check_minlog_freq_sec, 0, NULL, NULL, NULL, NULL);
18261824
REGISTER_TUNABLE("physrep_debug", "Print extended physrep trace. (Default: off)", TUNABLE_BOOLEAN, &gbl_physrep_debug,
18271825
0, NULL, NULL, NULL, NULL);
18281826
REGISTER_TUNABLE("physrep_exit_on_invalid_logstream", "Exit physreps on invalid logstream. (Default: off)",
18291827
TUNABLE_BOOLEAN, &gbl_physrep_exit_on_invalid_logstream, 0, NULL, NULL, NULL, NULL);
18301828
REGISTER_TUNABLE("physrep_fanout",
18311829
"Maximum number of physical replicants that a node can service (Default: 8)",
18321830
TUNABLE_INTEGER, &gbl_physrep_fanout, 0, NULL, NULL, NULL, NULL);
1833-
REGISTER_TUNABLE("physrep_hung_replicant_check_freq_sec",
1834-
"Check for hung physical replicant this often. (Default: 10)",
1835-
TUNABLE_INTEGER, &gbl_physrep_hung_replicant_check_freq_sec, 0, NULL,
1836-
NULL, NULL, NULL);
1831+
REGISTER_TUNABLE("physrep_hung_replicant_check_freq_sec", "Check for hung physical replicant this often. (Default: 60)",
1832+
TUNABLE_INTEGER, &gbl_physrep_hung_replicant_check_freq_sec, 0, NULL, NULL, NULL, NULL);
18371833
REGISTER_TUNABLE("physrep_hung_replicant_threshold",
18381834
"Report if the physical replicant has been inactive for this duration. (Default: 60)",
18391835
TUNABLE_INTEGER, &gbl_physrep_hung_replicant_threshold, 0, NULL,
@@ -1849,20 +1845,24 @@ REGISTER_TUNABLE("physrep_fake_revconn_populate_error_once",
18491845
"Fake physrep error on first attempt only, allowing retry to succeed. (Default: off)",
18501846
TUNABLE_BOOLEAN, &gbl_physrep_fake_revconn_populate_error_once, EXPERIMENTAL | INTERNAL, NULL, NULL,
18511847
NULL, NULL);
1848+
REGISTER_TUNABLE("physrep_fake_check_revconn_error",
1849+
"Fake physrep error when asking if we should wait for a revconn. (Default: off)", TUNABLE_BOOLEAN,
1850+
&gbl_physrep_fake_check_revconn_error, EXPERIMENTAL | INTERNAL, NULL, NULL, NULL, NULL);
18521851
REGISTER_TUNABLE("debug_fake_rte_failure", "Fake rte failures in connect-remote-db. (Default: off)", TUNABLE_BOOLEAN,
18531852
&gbl_debug_fake_rte_failure, EXPERIMENTAL | INTERNAL, NULL, NULL, NULL, NULL);
18541853
REGISTER_TUNABLE("physrep_i_am_metadb", "I am physical replication metadb (Default: off)",
18551854
TUNABLE_BOOLEAN, &gbl_physrep_i_am_metadb, NOARG, NULL, NULL, NULL, NULL);
18561855
REGISTER_TUNABLE("physrep_keepalive_v2", "Use version 2 of keepalive which includes first lsn. (Default: off)",
18571856
TUNABLE_BOOLEAN, &gbl_physrep_keepalive_v2, 0, NULL, NULL, NULL, NULL);
18581857
REGISTER_TUNABLE("physrep_keepalive_freq_sec",
1859-
"Periodically send lsn to source node after this interval. (Default: 10)", TUNABLE_INTEGER,
1858+
"Periodically send lsn to source node after this interval. (Default: 60)", TUNABLE_INTEGER,
18601859
&gbl_physrep_keepalive_freq_sec, 0, NULL, NULL, NULL, NULL);
1860+
REGISTER_TUNABLE("physrep_slow_replicant_check_freq_sec", "Check for slow physical replicant this often. (Default: 60)",
1861+
TUNABLE_INTEGER, &gbl_physrep_slow_replicant_check_freq_sec, 0, NULL, NULL, NULL, NULL);
18611862
REGISTER_TUNABLE("physrep_max_candidates",
18621863
"Maximum number of candidates that should be returned to a "
18631864
"new physical replicant during registration. (Default: 6)",
1864-
TUNABLE_INTEGER, &gbl_physrep_max_candidates, 0, NULL,
1865-
NULL, NULL, NULL);
1865+
TUNABLE_INTEGER, &gbl_physrep_max_candidates, 0, NULL, NULL, NULL, NULL);
18661866
REGISTER_TUNABLE("physrep_metadb_host", "List of physical replication metadb cluster hosts.", TUNABLE_STRING,
18671867
&gbl_physrep_metadb_host, READONLY, NULL, NULL, NULL, NULL);
18681868
REGISTER_TUNABLE("physrep_metadb_name", "Physical replication metadb cluster name.",
@@ -1922,10 +1922,10 @@ REGISTER_TUNABLE("revsql_debug",
19221922
"Print extended reversql-sql trace. (Default: off)",
19231923
TUNABLE_BOOLEAN, &gbl_revsql_debug, EXPERIMENTAL | INTERNAL,
19241924
NULL, NULL, NULL, NULL);
1925-
REGISTER_TUNABLE("revsql_host_refresh_freq_sec", "The frequency at which the "
1926-
"reverse connection host list will be refreshed (Default: 5secs)",
1927-
TUNABLE_INTEGER, &gbl_revsql_host_refresh_freq_sec, EXPERIMENTAL | INTERNAL,
1928-
NULL, NULL, NULL, NULL);
1925+
REGISTER_TUNABLE("revsql_host_refresh_freq_sec",
1926+
"The frequency at which the "
1927+
"reverse connection host list will be refreshed (Default: 60 secs)",
1928+
TUNABLE_INTEGER, &gbl_revsql_host_refresh_freq_sec, EXPERIMENTAL | INTERNAL, NULL, NULL, NULL, NULL);
19291929
REGISTER_TUNABLE("admin_revsql", "Run revsql sessions as admin. (Default: Off)", TUNABLE_BOOLEAN, &gbl_admin_revsql, 0,
19301930
NULL, NULL, NULL, NULL);
19311931
REGISTER_TUNABLE("revconn_rdtimeout", "Initial ms timeout for revconn connections. (Default: 100)", TUNABLE_INTEGER,

db/phys_rep.c

Lines changed: 21 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -58,18 +58,17 @@ typedef struct DB_Connection {
5858
} while (0)
5959

6060
int gbl_physrep_debug = 0;
61+
int gbl_physrep_fake_check_revconn_error = 0;
6162
int gbl_physrep_reconnect_interval = 3600; // force re-registration every hour
6263
int gbl_physrep_reconnect_penalty = 0;
6364
int gbl_blocking_physrep = 1;
6465
int gbl_physrep_fanout = 8;
6566
int gbl_physrep_max_candidates = 6;
6667
int gbl_physrep_max_pending_replicants = 10;
6768
int gbl_deferred_phys_flag = 0;
68-
int gbl_physrep_source_nodes_refresh_freq_sec = 10;
69-
int gbl_physrep_slow_replicant_check_freq_sec = 10;
70-
int gbl_physrep_keepalive_freq_sec = 10;
71-
int gbl_physrep_check_minlog_freq_sec = 10;
72-
int gbl_physrep_hung_replicant_check_freq_sec = 10;
69+
int gbl_physrep_slow_replicant_check_freq_sec = 60;
70+
int gbl_physrep_keepalive_freq_sec = 60;
71+
int gbl_physrep_hung_replicant_check_freq_sec = 60;
7372
int gbl_physrep_hung_replicant_threshold = 60;
7473
int gbl_physrep_revconn_check_interval = 60;
7574
int gbl_physrep_update_registry_interval = 60;
@@ -78,7 +77,6 @@ int gbl_physrep_i_am_metadb = 0;
7877
int gbl_physrep_filter_by_class = 1;
7978
int gbl_started_physrep_threads = 0;
8079

81-
unsigned int physrep_min_logfile;
8280
unsigned int gbl_deferred_phys_update;
8381

8482
char *gbl_physrep_source_dbname;
@@ -1173,17 +1171,18 @@ static int send_keepalive(void)
11731171
return 0;
11741172
}
11751173

1176-
unsigned int physrep_min_filenum() {
1177-
return physrep_min_logfile;
1178-
}
1179-
11801174
extern int gbl_reverse_hosts_v2;
11811175

11821176
static int check_for_reverse_conn(cdb2_hndl_tp *hndl) {
11831177
int rc;
11841178
char cmd[400];
11851179
int do_wait = 0;
11861180

1181+
if (gbl_physrep_fake_check_revconn_error) {
1182+
logmsg(LOGMSG_USER, "%s:%d: Fake error checking for reverse connection\n", __func__, __LINE__);
1183+
return -1;
1184+
}
1185+
11871186
if (!gbl_reverse_hosts_v2) {
11881187
rc = snprintf(cmd, sizeof(cmd), "exec procedure sys.physrep.should_wait_for_con('%s', '%s')", gbl_dbname,
11891188
(gbl_machine_class) ? gbl_machine_class : gbl_myhostname);
@@ -1215,38 +1214,6 @@ static int check_for_reverse_conn(cdb2_hndl_tp *hndl) {
12151214
return (rc == 0) ? do_wait : -1;
12161215
}
12171216

1218-
void physrep_update_low_file_num(int *lowfilenum, int *local_lowfilenum) {
1219-
unsigned int physrep_minfilenum;
1220-
if ((get_dbtable_by_name("comdb2_physreps")) == NULL) {
1221-
return;
1222-
}
1223-
1224-
physrep_minfilenum = physrep_min_filenum();
1225-
if (physrep_minfilenum <= 0) {
1226-
if (gbl_physrep_debug) {
1227-
physrep_logmsg(LOGMSG_USER, "%s:%d: lowfilenum unchanged (physrep_minfilenum: %d)\n",
1228-
__func__, __LINE__, physrep_minfilenum);
1229-
}
1230-
} else {
1231-
if (physrep_minfilenum <= *lowfilenum) {
1232-
if (gbl_physrep_debug) {
1233-
physrep_logmsg(LOGMSG_USER, "%s:%d: lowfilenum %d being changed "
1234-
"physical replicant(s) (physrep_minfilenum: %d)\n",
1235-
__func__, __LINE__, *lowfilenum, physrep_minfilenum);
1236-
}
1237-
*lowfilenum = physrep_minfilenum - 1;
1238-
}
1239-
if (physrep_minfilenum <= *local_lowfilenum) {
1240-
*local_lowfilenum = physrep_minfilenum - 1;
1241-
}
1242-
}
1243-
1244-
if (gbl_physrep_debug) {
1245-
physrep_logmsg(LOGMSG_USER, "%s:%d: lowfilenum: %d (physrep_minfilenum: %d)\n",
1246-
__func__, __LINE__, *lowfilenum, physrep_minfilenum);
1247-
}
1248-
}
1249-
12501217
static int slow_replicants_count_int(cdb2_hndl_tp *metadb, unsigned int *count)
12511218
{
12521219
char query[400];
@@ -1306,93 +1273,6 @@ static int slow_replicants_count(unsigned int *count)
13061273
return badrc ? -1 : 0;
13071274
}
13081275

1309-
static int update_min_logfile_int(cdb2_hndl_tp *metadb)
1310-
{
1311-
char cmd[120+nodes_list_sz];
1312-
char *buf;
1313-
size_t buf_len;
1314-
int bytes_written;
1315-
int rc = 0;
1316-
1317-
if (gbl_ready == 0)
1318-
return 0;
1319-
1320-
bytes_written = 0;
1321-
buf = cmd;
1322-
buf_len = sizeof(cmd);
1323-
1324-
bytes_written +=
1325-
snprintf(buf+bytes_written, buf_len-bytes_written,
1326-
"WITH RECURSIVE replication_tree(dbname, host, file) AS "
1327-
" (SELECT dbname, host, file FROM comdb2_physreps "
1328-
" WHERE dbname='%s' AND host IN (",
1329-
gbl_dbname);
1330-
if (bytes_written >= buf_len) {
1331-
physrep_logmsg(LOGMSG_ERROR, "%s:%d Buffer is not long enough!\n", __func__, __LINE__);
1332-
return 1;
1333-
}
1334-
1335-
bytes_written += append_quoted_local_hosts(buf+bytes_written, buf_len-bytes_written, ",");
1336-
if (bytes_written >= buf_len) {
1337-
physrep_logmsg(LOGMSG_ERROR, "%s:%d Buffer is not long enough!\n", __func__, __LINE__);
1338-
return 1;
1339-
}
1340-
1341-
bytes_written += snprintf(buf + bytes_written, buf_len - bytes_written,
1342-
" ) "
1343-
" UNION "
1344-
" SELECT p.dbname, p.host, p.file FROM comdb2_physreps p, "
1345-
" comdb2_physrep_connections c, replication_tree t "
1346-
" WHERE p.state = 'Active' AND p.file <> 0 AND "
1347-
" t.dbname = c.source_dbname AND c.dbname = p.dbname) "
1348-
" SELECT file FROM replication_tree WHERE file IS NOT NULL ORDER BY file LIMIT 1");
1349-
if (bytes_written >= buf_len) {
1350-
physrep_logmsg(LOGMSG_ERROR, "%s:%d Buffer is not long enough!\n", __func__, __LINE__);
1351-
return 1;
1352-
}
1353-
1354-
if (gbl_physrep_debug) {
1355-
physrep_logmsg(LOGMSG_USER, "%s:%d Executing: %s\n", __func__, __LINE__, cmd);
1356-
}
1357-
1358-
rc = cdb2_run_statement(metadb, cmd);
1359-
if (rc == CDB2_OK) {
1360-
while ((rc = cdb2_next_record(metadb)) == CDB2_OK) {
1361-
int64_t *minfile = (int64_t *)cdb2_column_value(metadb, 0);
1362-
physrep_min_logfile = minfile ? (unsigned int)*minfile : 0;
1363-
}
1364-
if (rc == CDB2_OK_DONE)
1365-
rc = 0;
1366-
} else {
1367-
physrep_logmsg(LOGMSG_ERROR, "%s:%d Failed to execute (rc: %d)\n", __func__, __LINE__, rc);
1368-
}
1369-
1370-
return rc;
1371-
}
1372-
1373-
static int update_min_logfile(void)
1374-
{
1375-
cdb2_hndl_tp *metadb;
1376-
int rc, altcnt = gbl_altmetadb_count;
1377-
1378-
if ((rc = physrep_get_metadb_or_local_hndl(&metadb)) != 0) {
1379-
logmsg(LOGMSG_ERROR, "%s: failed to get metadb handle rc=%d\n", __func__, rc);
1380-
} else {
1381-
update_min_logfile_int(metadb);
1382-
cdb2_close(metadb);
1383-
}
1384-
1385-
for (int i = 0; i < altcnt; i++) {
1386-
if ((rc = get_alt_metadb_hndl(&metadb, i)) != 0) {
1387-
logmsg(LOGMSG_ERROR, "%s: failed to get alt metadb handle %d rc=%d\n", __func__, i, rc);
1388-
continue;
1389-
}
1390-
update_min_logfile_int(metadb);
1391-
cdb2_close(metadb);
1392-
}
1393-
return 0;
1394-
}
1395-
13961276
/*
13971277
Check whether we need to wait for a connection from one of the nodes
13981278
in the source db.
@@ -1501,6 +1381,7 @@ static void *physrep_worker(void *args)
15011381
comdb2_name_thread(__func__);
15021382

15031383
volatile int64_t gen, highest_gen = 0;
1384+
int do_revconn;
15041385
int64_t first_lcgen = -1;
15051386
size_t sql_cmd_len = 150;
15061387
char sql_cmd[sql_cmd_len];
@@ -1557,7 +1438,7 @@ static void *physrep_worker(void *args)
15571438
physrep_logmsg(LOGMSG_USER, "%s:%d Re-checking for reverse connection\n", __func__, __LINE__);
15581439
}
15591440
if ((rc = physrep_get_metadb_or_local_hndl(&repl_metadb)) == 0) {
1560-
int do_revconn = do_wait_for_reverse_conn(repl_metadb);
1441+
do_revconn = do_wait_for_reverse_conn(repl_metadb);
15611442
cdb2_close(repl_metadb);
15621443
if (gbl_physrep_debug) {
15631444
physrep_logmsg(LOGMSG_USER, "%s:%d Reverse connection check: do-revcon=%d, is-revcon=%d\n",
@@ -1586,8 +1467,10 @@ static void *physrep_worker(void *args)
15861467
goto sleep_and_retry;
15871468
}
15881469

1589-
last_revconn_check = comdb2_time_epoch();
1590-
if (do_wait_for_reverse_conn(repl_metadb) == 1) {
1470+
do_revconn = do_wait_for_reverse_conn(repl_metadb);
1471+
1472+
if (do_revconn == 1) {
1473+
last_revconn_check = comdb2_time_epoch();
15911474
is_revconn = 1;
15921475
int wait_timeout_sec = 60;
15931476

@@ -1632,6 +1515,12 @@ static void *physrep_worker(void *args)
16321515

16331516
set_repl_db_connected(rev_conn_hndl->remote_dbname, rev_conn_hndl->remote_host);
16341517
} else {
1518+
/* Intentionally try to register if we fail to determine if reverse connection is needed */
1519+
if (do_revconn == 0) {
1520+
last_revconn_check = comdb2_time_epoch();
1521+
} else {
1522+
physrep_logmsg(LOGMSG_DEBUG, "%s:%d do_revconn is %d, registering replicant\n", __func__, __LINE__, do_revconn);
1523+
}
16351524
is_revconn = 0;
16361525
int notfound = 0;
16371526
while (stop_physrep_worker == 0) {
@@ -1957,7 +1846,6 @@ static void *physrep_watcher(void *args) {
19571846
static int physrep_slow_replicant_last_checked;
19581847
static int physrep_keepalive_last_sent;
19591848
static int physrep_hung_replicant_last_checked;
1960-
static int physrep_minlog_last_checked;
19611849

19621850
while (!gbl_exit && stop_physrep_watcher == 0) {
19631851
sleep(1);
@@ -1986,20 +1874,12 @@ static void *physrep_watcher(void *args) {
19861874
}
19871875
// Common:
19881876
// 1) Send keepalives
1989-
// 2) Update maximum log file number upto which it is safe to delete
19901877

19911878
// Periodically send keepalive to report its LSN.
19921879
if ((now - physrep_keepalive_last_sent) >= gbl_physrep_keepalive_freq_sec) {
19931880
send_keepalive();
19941881
physrep_keepalive_last_sent = now;
19951882
}
1996-
1997-
// Update the 'minimum log file' marker upto which it is safe to
1998-
// delete log files.
1999-
if ((now - physrep_minlog_last_checked) >= gbl_physrep_check_minlog_freq_sec) {
2000-
update_min_logfile();
2001-
physrep_minlog_last_checked = now;
2002-
}
20031883
}
20041884
return NULL;
20051885
}

db/reverse_conn.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ int gbl_revsql_allow_command_exec;
6868
int gbl_revsql_debug = 0;
6969
int gbl_revsql_cdb2_debug;
7070
// 'reverse-connection host' list refresh frequency
71-
int gbl_revsql_host_refresh_freq_sec = 5;
71+
int gbl_revsql_host_refresh_freq_sec = 60;
7272
// 'reverse-connection' worker's new connection attempt frequency
7373
int gbl_revsql_connect_freq_sec = 5;
7474

lua/lib/physrep_get_reverse_hosts.lua

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,6 @@
55
local function main(dbname, hostname)
66
db:begin()
77

8-
-- Check whether 'comdb2_physrep_sources' table exists
9-
local rs, rc = db:exec("SELECT count(*)=1 AS cnt FROM comdb2_tables WHERE tablename = 'comdb2_physrep_sources'")
10-
local row = rs:fetch()
11-
if row.cnt == 0 then
12-
db:commit()
13-
return
14-
end
15-
168
local rs, rc = db:exec("SELECT dbname, host FROM comdb2_physrep_sources WHERE " ..
179
"source_dbname = '" .. dbname .. "' AND source_host = '" .. hostname .. "'")
1810
local row = rs:fetch()

lua/lib/physrep_get_reverse_hosts_v2.lua

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,6 @@
55
local function main(dbname, hostname)
66
db:begin()
77

8-
-- Check whether 'comdb2_physrep_sources' table exists
9-
local rs, rc = db:exec("SELECT count(*)=1 AS cnt FROM comdb2_tables WHERE tablename = 'comdb2_physrep_sources'")
10-
local row = rs:fetch()
11-
if row.cnt == 0 then
12-
db:commit()
13-
return
14-
end
15-
168
local rs, rc = db:exec("SELECT dbname, host FROM comdb2_physrep_sources WHERE " ..
179
"source_dbname = '" .. dbname .. "' AND source_host = '" .. hostname .. "'")
1810
local row = rs:fetch()

0 commit comments

Comments
 (0)