Skip to content

Commit e6df601

Browse files
committed
Reduce physrep-sql against the metadb
Signed-off-by: Mark Hannum <mhannum@bloomberg.net>
1 parent 2948f1a commit e6df601

File tree

12 files changed

+32
-202
lines changed

12 files changed

+32
-202
lines changed

bdb/file.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3663,8 +3663,6 @@ static void delete_log_files_int(bdb_state_type *bdb_state)
36633663
}
36643664
}
36653665

3666-
physrep_update_low_file_num(&lowfilenum, &local_lowfilenum);
3667-
36683666
/* debug: print filenums from other nodes */
36693667

36703668
/* if we have a maximum filenum defined in bdb attributes which is lower,

db/db_tunables.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,7 @@ extern int gbl_physrep_update_registry_interval;
522522
extern int gbl_physrep_i_am_metadb;
523523
extern int gbl_physrep_keepalive_v2;
524524
extern int gbl_physrep_keepalive_freq_sec;
525+
extern int gbl_physrep_slow_replicant_check_freq_sec;
525526
extern int gbl_physrep_max_candidates;
526527
extern int gbl_physrep_reconnect_penalty;
527528
extern int gbl_physrep_reconnect_interval;

db/db_tunables.h

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1821,19 +1821,15 @@ REGISTER_TUNABLE("tranlog_incoherent_timeout", "Timeout in seconds for incoheren
18211821
TUNABLE_INTEGER, &gbl_tranlog_incoherent_timeout, 0, NULL, NULL, NULL, NULL);
18221822
REGISTER_TUNABLE("tranlog_maxpoll", "Tranlog timeout in seconds for blocking poll. (Default: 60)", TUNABLE_INTEGER,
18231823
&gbl_tranlog_maxpoll, 0, NULL, NULL, NULL, NULL);
1824-
REGISTER_TUNABLE("physrep_check_minlog_freq_sec", "Check the minimum log number to keep this often. (Default: 10)",
1825-
TUNABLE_INTEGER, &gbl_physrep_check_minlog_freq_sec, 0, NULL, NULL, NULL, NULL);
18261824
REGISTER_TUNABLE("physrep_debug", "Print extended physrep trace. (Default: off)", TUNABLE_BOOLEAN, &gbl_physrep_debug,
18271825
0, NULL, NULL, NULL, NULL);
18281826
REGISTER_TUNABLE("physrep_exit_on_invalid_logstream", "Exit physreps on invalid logstream. (Default: off)",
18291827
TUNABLE_BOOLEAN, &gbl_physrep_exit_on_invalid_logstream, 0, NULL, NULL, NULL, NULL);
18301828
REGISTER_TUNABLE("physrep_fanout",
18311829
"Maximum number of physical replicants that a node can service (Default: 8)",
18321830
TUNABLE_INTEGER, &gbl_physrep_fanout, 0, NULL, NULL, NULL, NULL);
1833-
REGISTER_TUNABLE("physrep_hung_replicant_check_freq_sec",
1834-
"Check for hung physical replicant this often. (Default: 10)",
1835-
TUNABLE_INTEGER, &gbl_physrep_hung_replicant_check_freq_sec, 0, NULL,
1836-
NULL, NULL, NULL);
1831+
REGISTER_TUNABLE("physrep_hung_replicant_check_freq_sec", "Check for hung physical replicant this often. (Default: 60)",
1832+
TUNABLE_INTEGER, &gbl_physrep_hung_replicant_check_freq_sec, 0, NULL, NULL, NULL, NULL);
18371833
REGISTER_TUNABLE("physrep_hung_replicant_threshold",
18381834
"Report if the physical replicant has been inactive for this duration. (Default: 60)",
18391835
TUNABLE_INTEGER, &gbl_physrep_hung_replicant_threshold, 0, NULL,
@@ -1856,13 +1852,14 @@ REGISTER_TUNABLE("physrep_i_am_metadb", "I am physical replication metadb (Defau
18561852
REGISTER_TUNABLE("physrep_keepalive_v2", "Use version 2 of keepalive which includes first lsn. (Default: off)",
18571853
TUNABLE_BOOLEAN, &gbl_physrep_keepalive_v2, 0, NULL, NULL, NULL, NULL);
18581854
REGISTER_TUNABLE("physrep_keepalive_freq_sec",
1859-
"Periodically send lsn to source node after this interval. (Default: 10)", TUNABLE_INTEGER,
1855+
"Periodically send lsn to source node after this interval. (Default: 60)", TUNABLE_INTEGER,
18601856
&gbl_physrep_keepalive_freq_sec, 0, NULL, NULL, NULL, NULL);
1857+
REGISTER_TUNABLE("physrep_slow_replicant_check_freq_sec", "Check for slow physical replicant this often. (Default: 60)",
1858+
TUNABLE_INTEGER, &gbl_physrep_slow_replicant_check_freq_sec, 0, NULL, NULL, NULL, NULL);
18611859
REGISTER_TUNABLE("physrep_max_candidates",
18621860
"Maximum number of candidates that should be returned to a "
18631861
"new physical replicant during registration. (Default: 6)",
1864-
TUNABLE_INTEGER, &gbl_physrep_max_candidates, 0, NULL,
1865-
NULL, NULL, NULL);
1862+
TUNABLE_INTEGER, &gbl_physrep_max_candidates, 0, NULL, NULL, NULL, NULL);
18661863
REGISTER_TUNABLE("physrep_metadb_host", "List of physical replication metadb cluster hosts.", TUNABLE_STRING,
18671864
&gbl_physrep_metadb_host, READONLY, NULL, NULL, NULL, NULL);
18681865
REGISTER_TUNABLE("physrep_metadb_name", "Physical replication metadb cluster name.",
@@ -1922,10 +1919,10 @@ REGISTER_TUNABLE("revsql_debug",
19221919
"Print extended reversql-sql trace. (Default: off)",
19231920
TUNABLE_BOOLEAN, &gbl_revsql_debug, EXPERIMENTAL | INTERNAL,
19241921
NULL, NULL, NULL, NULL);
1925-
REGISTER_TUNABLE("revsql_host_refresh_freq_sec", "The frequency at which the "
1926-
"reverse connection host list will be refreshed (Default: 5secs)",
1927-
TUNABLE_INTEGER, &gbl_revsql_host_refresh_freq_sec, EXPERIMENTAL | INTERNAL,
1928-
NULL, NULL, NULL, NULL);
1922+
REGISTER_TUNABLE("revsql_host_refresh_freq_sec",
1923+
"The frequency at which the "
1924+
"reverse connection host list will be refreshed (Default: 60 secs)",
1925+
TUNABLE_INTEGER, &gbl_revsql_host_refresh_freq_sec, EXPERIMENTAL | INTERNAL, NULL, NULL, NULL, NULL);
19291926
REGISTER_TUNABLE("admin_revsql", "Run revsql sessions as admin. (Default: Off)", TUNABLE_BOOLEAN, &gbl_admin_revsql, 0,
19301927
NULL, NULL, NULL, NULL);
19311928
REGISTER_TUNABLE("revconn_rdtimeout", "Initial ms timeout for revconn connections. (Default: 100)", TUNABLE_INTEGER,

db/phys_rep.c

Lines changed: 16 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,9 @@ int gbl_physrep_fanout = 8;
6565
int gbl_physrep_max_candidates = 6;
6666
int gbl_physrep_max_pending_replicants = 10;
6767
int gbl_deferred_phys_flag = 0;
68-
int gbl_physrep_source_nodes_refresh_freq_sec = 10;
69-
int gbl_physrep_slow_replicant_check_freq_sec = 10;
70-
int gbl_physrep_keepalive_freq_sec = 10;
71-
int gbl_physrep_check_minlog_freq_sec = 10;
72-
int gbl_physrep_hung_replicant_check_freq_sec = 10;
68+
int gbl_physrep_slow_replicant_check_freq_sec = 60;
69+
int gbl_physrep_keepalive_freq_sec = 60;
70+
int gbl_physrep_hung_replicant_check_freq_sec = 60;
7371
int gbl_physrep_hung_replicant_threshold = 60;
7472
int gbl_physrep_revconn_check_interval = 60;
7573
int gbl_physrep_update_registry_interval = 60;
@@ -78,7 +76,6 @@ int gbl_physrep_i_am_metadb = 0;
7876
int gbl_physrep_filter_by_class = 1;
7977
int gbl_started_physrep_threads = 0;
8078

81-
unsigned int physrep_min_logfile;
8279
unsigned int gbl_deferred_phys_update;
8380

8481
char *gbl_physrep_source_dbname;
@@ -1173,10 +1170,6 @@ static int send_keepalive(void)
11731170
return 0;
11741171
}
11751172

1176-
unsigned int physrep_min_filenum() {
1177-
return physrep_min_logfile;
1178-
}
1179-
11801173
extern int gbl_reverse_hosts_v2;
11811174

11821175
static int check_for_reverse_conn(cdb2_hndl_tp *hndl) {
@@ -1215,38 +1208,6 @@ static int check_for_reverse_conn(cdb2_hndl_tp *hndl) {
12151208
return (rc == 0) ? do_wait : -1;
12161209
}
12171210

1218-
void physrep_update_low_file_num(int *lowfilenum, int *local_lowfilenum) {
1219-
unsigned int physrep_minfilenum;
1220-
if ((get_dbtable_by_name("comdb2_physreps")) == NULL) {
1221-
return;
1222-
}
1223-
1224-
physrep_minfilenum = physrep_min_filenum();
1225-
if (physrep_minfilenum <= 0) {
1226-
if (gbl_physrep_debug) {
1227-
physrep_logmsg(LOGMSG_USER, "%s:%d: lowfilenum unchanged (physrep_minfilenum: %d)\n",
1228-
__func__, __LINE__, physrep_minfilenum);
1229-
}
1230-
} else {
1231-
if (physrep_minfilenum <= *lowfilenum) {
1232-
if (gbl_physrep_debug) {
1233-
physrep_logmsg(LOGMSG_USER, "%s:%d: lowfilenum %d being changed "
1234-
"physical replicant(s) (physrep_minfilenum: %d)\n",
1235-
__func__, __LINE__, *lowfilenum, physrep_minfilenum);
1236-
}
1237-
*lowfilenum = physrep_minfilenum - 1;
1238-
}
1239-
if (physrep_minfilenum <= *local_lowfilenum) {
1240-
*local_lowfilenum = physrep_minfilenum - 1;
1241-
}
1242-
}
1243-
1244-
if (gbl_physrep_debug) {
1245-
physrep_logmsg(LOGMSG_USER, "%s:%d: lowfilenum: %d (physrep_minfilenum: %d)\n",
1246-
__func__, __LINE__, *lowfilenum, physrep_minfilenum);
1247-
}
1248-
}
1249-
12501211
static int slow_replicants_count_int(cdb2_hndl_tp *metadb, unsigned int *count)
12511212
{
12521213
char query[400];
@@ -1306,93 +1267,6 @@ static int slow_replicants_count(unsigned int *count)
13061267
return badrc ? -1 : 0;
13071268
}
13081269

1309-
static int update_min_logfile_int(cdb2_hndl_tp *metadb)
1310-
{
1311-
char cmd[120+nodes_list_sz];
1312-
char *buf;
1313-
size_t buf_len;
1314-
int bytes_written;
1315-
int rc = 0;
1316-
1317-
if (gbl_ready == 0)
1318-
return 0;
1319-
1320-
bytes_written = 0;
1321-
buf = cmd;
1322-
buf_len = sizeof(cmd);
1323-
1324-
bytes_written +=
1325-
snprintf(buf+bytes_written, buf_len-bytes_written,
1326-
"WITH RECURSIVE replication_tree(dbname, host, file) AS "
1327-
" (SELECT dbname, host, file FROM comdb2_physreps "
1328-
" WHERE dbname='%s' AND host IN (",
1329-
gbl_dbname);
1330-
if (bytes_written >= buf_len) {
1331-
physrep_logmsg(LOGMSG_ERROR, "%s:%d Buffer is not long enough!\n", __func__, __LINE__);
1332-
return 1;
1333-
}
1334-
1335-
bytes_written += append_quoted_local_hosts(buf+bytes_written, buf_len-bytes_written, ",");
1336-
if (bytes_written >= buf_len) {
1337-
physrep_logmsg(LOGMSG_ERROR, "%s:%d Buffer is not long enough!\n", __func__, __LINE__);
1338-
return 1;
1339-
}
1340-
1341-
bytes_written += snprintf(buf + bytes_written, buf_len - bytes_written,
1342-
" ) "
1343-
" UNION "
1344-
" SELECT p.dbname, p.host, p.file FROM comdb2_physreps p, "
1345-
" comdb2_physrep_connections c, replication_tree t "
1346-
" WHERE p.state = 'Active' AND p.file <> 0 AND "
1347-
" t.dbname = c.source_dbname AND c.dbname = p.dbname) "
1348-
" SELECT file FROM replication_tree WHERE file IS NOT NULL ORDER BY file LIMIT 1");
1349-
if (bytes_written >= buf_len) {
1350-
physrep_logmsg(LOGMSG_ERROR, "%s:%d Buffer is not long enough!\n", __func__, __LINE__);
1351-
return 1;
1352-
}
1353-
1354-
if (gbl_physrep_debug) {
1355-
physrep_logmsg(LOGMSG_USER, "%s:%d Executing: %s\n", __func__, __LINE__, cmd);
1356-
}
1357-
1358-
rc = cdb2_run_statement(metadb, cmd);
1359-
if (rc == CDB2_OK) {
1360-
while ((rc = cdb2_next_record(metadb)) == CDB2_OK) {
1361-
int64_t *minfile = (int64_t *)cdb2_column_value(metadb, 0);
1362-
physrep_min_logfile = minfile ? (unsigned int)*minfile : 0;
1363-
}
1364-
if (rc == CDB2_OK_DONE)
1365-
rc = 0;
1366-
} else {
1367-
physrep_logmsg(LOGMSG_ERROR, "%s:%d Failed to execute (rc: %d)\n", __func__, __LINE__, rc);
1368-
}
1369-
1370-
return rc;
1371-
}
1372-
1373-
static int update_min_logfile(void)
1374-
{
1375-
cdb2_hndl_tp *metadb;
1376-
int rc, altcnt = gbl_altmetadb_count;
1377-
1378-
if ((rc = physrep_get_metadb_or_local_hndl(&metadb)) != 0) {
1379-
logmsg(LOGMSG_ERROR, "%s: failed to get metadb handle rc=%d\n", __func__, rc);
1380-
} else {
1381-
update_min_logfile_int(metadb);
1382-
cdb2_close(metadb);
1383-
}
1384-
1385-
for (int i = 0; i < altcnt; i++) {
1386-
if ((rc = get_alt_metadb_hndl(&metadb, i)) != 0) {
1387-
logmsg(LOGMSG_ERROR, "%s: failed to get alt metadb handle %d rc=%d\n", __func__, i, rc);
1388-
continue;
1389-
}
1390-
update_min_logfile_int(metadb);
1391-
cdb2_close(metadb);
1392-
}
1393-
return 0;
1394-
}
1395-
13961270
/*
13971271
Check whether we need to wait for a connection from one of the nodes
13981272
in the source db.
@@ -1501,6 +1375,7 @@ static void *physrep_worker(void *args)
15011375
comdb2_name_thread(__func__);
15021376

15031377
volatile int64_t gen, highest_gen = 0;
1378+
int do_revconn;
15041379
int64_t first_lcgen = -1;
15051380
size_t sql_cmd_len = 150;
15061381
char sql_cmd[sql_cmd_len];
@@ -1557,7 +1432,7 @@ static void *physrep_worker(void *args)
15571432
physrep_logmsg(LOGMSG_USER, "%s:%d Re-checking for reverse connection\n", __func__, __LINE__);
15581433
}
15591434
if ((rc = physrep_get_metadb_or_local_hndl(&repl_metadb)) == 0) {
1560-
int do_revconn = do_wait_for_reverse_conn(repl_metadb);
1435+
do_revconn = do_wait_for_reverse_conn(repl_metadb);
15611436
cdb2_close(repl_metadb);
15621437
if (gbl_physrep_debug) {
15631438
physrep_logmsg(LOGMSG_USER, "%s:%d Reverse connection check: do-revcon=%d, is-revcon=%d\n",
@@ -1586,8 +1461,10 @@ static void *physrep_worker(void *args)
15861461
goto sleep_and_retry;
15871462
}
15881463

1589-
last_revconn_check = comdb2_time_epoch();
1590-
if (do_wait_for_reverse_conn(repl_metadb) == 1) {
1464+
do_revconn = do_wait_for_reverse_conn(repl_metadb);
1465+
1466+
if (do_revconn == 1) {
1467+
last_revconn_check = comdb2_time_epoch();
15911468
is_revconn = 1;
15921469
int wait_timeout_sec = 60;
15931470

@@ -1632,8 +1509,15 @@ static void *physrep_worker(void *args)
16321509

16331510
set_repl_db_connected(rev_conn_hndl->remote_dbname, rev_conn_hndl->remote_host);
16341511
} else {
1512+
/* Intentionally try to register if we fail to determine if reverse connection is needed */
1513+
if (do_revconn == 0) {
1514+
last_revconn_check = comdb2_time_epoch();
1515+
}
16351516
is_revconn = 0;
16361517
int notfound = 0;
1518+
1519+
physrep_logmsg(LOGMSG_DEBUG, "%s:%d do_revconn is 0, registering replicant\n", __func__, __LINE__);
1520+
16371521
while (stop_physrep_worker == 0) {
16381522
if ((rc = register_self(repl_metadb)) == 0)
16391523
break;
@@ -1957,7 +1841,6 @@ static void *physrep_watcher(void *args) {
19571841
static int physrep_slow_replicant_last_checked;
19581842
static int physrep_keepalive_last_sent;
19591843
static int physrep_hung_replicant_last_checked;
1960-
static int physrep_minlog_last_checked;
19611844

19621845
while (!gbl_exit && stop_physrep_watcher == 0) {
19631846
sleep(1);
@@ -1986,20 +1869,12 @@ static void *physrep_watcher(void *args) {
19861869
}
19871870
// Common:
19881871
// 1) Send keepalives
1989-
// 2) Update maximum log file number upto which it is safe to delete
19901872

19911873
// Periodically send keepalive to report its LSN.
19921874
if ((now - physrep_keepalive_last_sent) >= gbl_physrep_keepalive_freq_sec) {
19931875
send_keepalive();
19941876
physrep_keepalive_last_sent = now;
19951877
}
1996-
1997-
// Update the 'minimum log file' marker upto which it is safe to
1998-
// delete log files.
1999-
if ((now - physrep_minlog_last_checked) >= gbl_physrep_check_minlog_freq_sec) {
2000-
update_min_logfile();
2001-
physrep_minlog_last_checked = now;
2002-
}
20031878
}
20041879
return NULL;
20051880
}

db/reverse_conn.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ int gbl_revsql_allow_command_exec;
6868
int gbl_revsql_debug = 0;
6969
int gbl_revsql_cdb2_debug;
7070
// 'reverse-connection host' list refresh frequency
71-
int gbl_revsql_host_refresh_freq_sec = 5;
71+
int gbl_revsql_host_refresh_freq_sec = 60;
7272
// 'reverse-connection' worker's new connection attempt frequency
7373
int gbl_revsql_connect_freq_sec = 5;
7474

lua/lib/physrep_get_reverse_hosts.lua

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,6 @@
55
local function main(dbname, hostname)
66
db:begin()
77

8-
-- Check whether 'comdb2_physrep_sources' table exists
9-
local rs, rc = db:exec("SELECT count(*)=1 AS cnt FROM comdb2_tables WHERE tablename = 'comdb2_physrep_sources'")
10-
local row = rs:fetch()
11-
if row.cnt == 0 then
12-
db:commit()
13-
return
14-
end
15-
168
local rs, rc = db:exec("SELECT dbname, host FROM comdb2_physrep_sources WHERE " ..
179
"source_dbname = '" .. dbname .. "' AND source_host = '" .. hostname .. "'")
1810
local row = rs:fetch()

lua/lib/physrep_get_reverse_hosts_v2.lua

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,6 @@
55
local function main(dbname, hostname)
66
db:begin()
77

8-
-- Check whether 'comdb2_physrep_sources' table exists
9-
local rs, rc = db:exec("SELECT count(*)=1 AS cnt FROM comdb2_tables WHERE tablename = 'comdb2_physrep_sources'")
10-
local row = rs:fetch()
11-
if row.cnt == 0 then
12-
db:commit()
13-
return
14-
end
15-
168
local rs, rc = db:exec("SELECT dbname, host FROM comdb2_physrep_sources WHERE " ..
179
"source_dbname = '" .. dbname .. "' AND source_host = '" .. hostname .. "'")
1810
local row = rs:fetch()

lua/lib/physrep_get_revhosts_v2.lua

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,6 @@
55
local function main(dbname, hostname, tier, cluster)
66
db:begin()
77

8-
-- Check whether 'comdb2_physrep_sources' table exists
9-
local rs, rc = db:exec("SELECT count(*)=1 AS cnt FROM comdb2_tables WHERE tablename = 'comdb2_physrep_sources'")
10-
local row = rs:fetch()
11-
if row.cnt == 0 then
12-
db:commit()
13-
return
14-
end
15-
168
local sql = ("SELECT dbname, host FROM comdb2_physrep_sources WHERE " ..
179
"source_dbname = '" .. dbname .. "' AND ( source_host = '" .. hostname ..
1810
"' OR source_host = '" .. tier ..

lua/lib/physrep_should_wait_for_con.lua

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,6 @@
1010
local function main(dbname, hostname)
1111
db:begin()
1212

13-
local rs, row = db:exec("SELECT count(*)=1 AS cnt FROM comdb2_tables WHERE tablename = 'comdb2_physrep_sources'")
14-
local row = rs:fetch()
15-
16-
if row.cnt == 0 then
17-
db:emit(row)
18-
db:commit()
19-
return
20-
end
21-
2213
local rs, row = db:exec("SELECT count(*) as cnt FROM comdb2_physrep_sources " ..
2314
" WHERE dbname = '" .. dbname .. "' AND " ..
2415
" host LIKE '" .. hostname .. "'")

lua/lib/physrep_shouldwait_v2.lua

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,6 @@
1010
local function main(dbname, hostname, tier, cluster)
1111
db:begin()
1212

13-
local rs, row = db:exec("SELECT count(*)=1 AS cnt FROM comdb2_tables WHERE tablename = 'comdb2_physrep_sources'")
14-
local row = rs:fetch()
15-
16-
if row.cnt == 0 then
17-
db:emit(row)
18-
db:commit()
19-
return
20-
end
21-
2213
local sql = ("SELECT count(*) as cnt from comdb2_physrep_sources " ..
2314
" WHERE dbname = '" .. dbname .. "' AND " ..
2415
" ( host LIKE '" .. hostname .. "' OR " ..

0 commit comments

Comments
 (0)