Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions bdb/file.c
Original file line number Diff line number Diff line change
Expand Up @@ -3663,8 +3663,6 @@ static void delete_log_files_int(bdb_state_type *bdb_state)
}
}

physrep_update_low_file_num(&lowfilenum, &local_lowfilenum);

/* debug: print filenums from other nodes */

/* if we have a maximum filenum defined in bdb attributes which is lower,
Expand Down
3 changes: 3 additions & 0 deletions db/db_tunables.c
Original file line number Diff line number Diff line change
Expand Up @@ -517,11 +517,14 @@ extern int gbl_physrep_hung_replicant_threshold;
extern int gbl_physrep_revconn_check_interval;
extern int gbl_physrep_fake_revconn_populate_error;
extern int gbl_physrep_fake_revconn_populate_error_once;
extern int gbl_physrep_fake_check_revconn_error;
extern int gbl_physrep_fake_register_self_error;
extern int gbl_debug_fake_rte_failure;
extern int gbl_physrep_update_registry_interval;
extern int gbl_physrep_i_am_metadb;
extern int gbl_physrep_keepalive_v2;
extern int gbl_physrep_keepalive_freq_sec;
extern int gbl_physrep_slow_replicant_check_freq_sec;
extern int gbl_physrep_max_candidates;
extern int gbl_physrep_reconnect_penalty;
extern int gbl_physrep_reconnect_interval;
Expand Down
29 changes: 16 additions & 13 deletions db/db_tunables.h
Original file line number Diff line number Diff line change
Expand Up @@ -1821,19 +1821,15 @@ REGISTER_TUNABLE("tranlog_incoherent_timeout", "Timeout in seconds for incoheren
TUNABLE_INTEGER, &gbl_tranlog_incoherent_timeout, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("tranlog_maxpoll", "Tranlog timeout in seconds for blocking poll. (Default: 60)", TUNABLE_INTEGER,
&gbl_tranlog_maxpoll, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_check_minlog_freq_sec", "Check the minimum log number to keep this often. (Default: 10)",
TUNABLE_INTEGER, &gbl_physrep_check_minlog_freq_sec, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_debug", "Print extended physrep trace. (Default: off)", TUNABLE_BOOLEAN, &gbl_physrep_debug,
0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_exit_on_invalid_logstream", "Exit physreps on invalid logstream. (Default: off)",
TUNABLE_BOOLEAN, &gbl_physrep_exit_on_invalid_logstream, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_fanout",
"Maximum number of physical replicants that a node can service (Default: 8)",
TUNABLE_INTEGER, &gbl_physrep_fanout, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_hung_replicant_check_freq_sec",
"Check for hung physical replicant this often. (Default: 10)",
TUNABLE_INTEGER, &gbl_physrep_hung_replicant_check_freq_sec, 0, NULL,
NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_hung_replicant_check_freq_sec", "Check for hung physical replicant this often. (Default: 60)",
TUNABLE_INTEGER, &gbl_physrep_hung_replicant_check_freq_sec, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_hung_replicant_threshold",
"Report if the physical replicant has been inactive for this duration. (Default: 60)",
TUNABLE_INTEGER, &gbl_physrep_hung_replicant_threshold, 0, NULL,
Expand All @@ -1849,20 +1845,27 @@ REGISTER_TUNABLE("physrep_fake_revconn_populate_error_once",
"Fake physrep error on first attempt only, allowing retry to succeed. (Default: off)",
TUNABLE_BOOLEAN, &gbl_physrep_fake_revconn_populate_error_once, EXPERIMENTAL | INTERNAL, NULL, NULL,
NULL, NULL);
REGISTER_TUNABLE("physrep_fake_check_revconn_error",
"Fake physrep error when asking if we should wait for a revconn. (Default: off)", TUNABLE_BOOLEAN,
&gbl_physrep_fake_check_revconn_error, EXPERIMENTAL | INTERNAL, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_fake_register_self_error", "Fake physrep error when calling register_self. (Default: off)",
TUNABLE_BOOLEAN, &gbl_physrep_fake_register_self_error, EXPERIMENTAL | INTERNAL, NULL, NULL, NULL,
NULL);
REGISTER_TUNABLE("debug_fake_rte_failure", "Fake rte failures in connect-remote-db. (Default: off)", TUNABLE_BOOLEAN,
&gbl_debug_fake_rte_failure, EXPERIMENTAL | INTERNAL, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_i_am_metadb", "I am physical replication metadb (Default: off)",
TUNABLE_BOOLEAN, &gbl_physrep_i_am_metadb, NOARG, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_keepalive_v2", "Use version 2 of keepalive which includes first lsn. (Default: off)",
TUNABLE_BOOLEAN, &gbl_physrep_keepalive_v2, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_keepalive_freq_sec",
"Periodically send lsn to source node after this interval. (Default: 10)", TUNABLE_INTEGER,
"Periodically send lsn to source node after this interval. (Default: 60)", TUNABLE_INTEGER,
&gbl_physrep_keepalive_freq_sec, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_slow_replicant_check_freq_sec", "Check for slow physical replicant this often. (Default: 60)",
TUNABLE_INTEGER, &gbl_physrep_slow_replicant_check_freq_sec, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_max_candidates",
"Maximum number of candidates that should be returned to a "
"new physical replicant during registration. (Default: 6)",
TUNABLE_INTEGER, &gbl_physrep_max_candidates, 0, NULL,
NULL, NULL, NULL);
TUNABLE_INTEGER, &gbl_physrep_max_candidates, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_metadb_host", "List of physical replication metadb cluster hosts.", TUNABLE_STRING,
&gbl_physrep_metadb_host, READONLY, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_metadb_name", "Physical replication metadb cluster name.",
Expand Down Expand Up @@ -1922,10 +1925,10 @@ REGISTER_TUNABLE("revsql_debug",
"Print extended reversql-sql trace. (Default: off)",
TUNABLE_BOOLEAN, &gbl_revsql_debug, EXPERIMENTAL | INTERNAL,
NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("revsql_host_refresh_freq_sec", "The frequency at which the "
"reverse connection host list will be refreshed (Default: 5secs)",
TUNABLE_INTEGER, &gbl_revsql_host_refresh_freq_sec, EXPERIMENTAL | INTERNAL,
NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("revsql_host_refresh_freq_sec",
"The frequency at which the "
"reverse connection host list will be refreshed (Default: 60 secs)",
TUNABLE_INTEGER, &gbl_revsql_host_refresh_freq_sec, EXPERIMENTAL | INTERNAL, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("admin_revsql", "Run revsql sessions as admin. (Default: Off)", TUNABLE_BOOLEAN, &gbl_admin_revsql, 0,
NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("revconn_rdtimeout", "Initial ms timeout for revconn connections. (Default: 100)", TUNABLE_INTEGER,
Expand Down
175 changes: 34 additions & 141 deletions db/phys_rep.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,18 @@ typedef struct DB_Connection {
} while (0)

int gbl_physrep_debug = 0;
int gbl_physrep_fake_check_revconn_error = 0;
int gbl_physrep_fake_register_self_error = 0;
int gbl_physrep_reconnect_interval = 3600; // force re-registration every hour
int gbl_physrep_reconnect_penalty = 0;
int gbl_blocking_physrep = 1;
int gbl_physrep_fanout = 8;
int gbl_physrep_max_candidates = 6;
int gbl_physrep_max_pending_replicants = 10;
int gbl_deferred_phys_flag = 0;
int gbl_physrep_source_nodes_refresh_freq_sec = 10;
int gbl_physrep_slow_replicant_check_freq_sec = 10;
int gbl_physrep_keepalive_freq_sec = 10;
int gbl_physrep_check_minlog_freq_sec = 10;
int gbl_physrep_hung_replicant_check_freq_sec = 10;
int gbl_physrep_slow_replicant_check_freq_sec = 60;
int gbl_physrep_keepalive_freq_sec = 60;
int gbl_physrep_hung_replicant_check_freq_sec = 60;
int gbl_physrep_hung_replicant_threshold = 60;
int gbl_physrep_revconn_check_interval = 60;
int gbl_physrep_update_registry_interval = 60;
Expand All @@ -78,7 +78,6 @@ int gbl_physrep_i_am_metadb = 0;
int gbl_physrep_filter_by_class = 1;
int gbl_started_physrep_threads = 0;

unsigned int physrep_min_logfile;
unsigned int gbl_deferred_phys_update;

char *gbl_physrep_source_dbname;
Expand Down Expand Up @@ -871,6 +870,11 @@ static int register_self(cdb2_hndl_tp *repl_metadb)
physrep_logmsg(LOGMSG_USER, "%s:%d Registering self\n", __func__, __LINE__);
}

if (gbl_physrep_fake_register_self_error) {
logmsg(LOGMSG_USER, "%s:%d: Fake error in register_self\n", __func__, __LINE__);
return -1;
}

// Reset all the nodes from this physical replication cluster; and mark them
// 'Inactive'.
//
Expand Down Expand Up @@ -1173,17 +1177,18 @@ static int send_keepalive(void)
return 0;
}

unsigned int physrep_min_filenum() {
return physrep_min_logfile;
}

extern int gbl_reverse_hosts_v2;

static int check_for_reverse_conn(cdb2_hndl_tp *hndl) {
int rc;
char cmd[400];
int do_wait = 0;

if (gbl_physrep_fake_check_revconn_error) {
logmsg(LOGMSG_USER, "%s:%d: Fake error checking for reverse connection\n", __func__, __LINE__);
return -1;
}

if (!gbl_reverse_hosts_v2) {
rc = snprintf(cmd, sizeof(cmd), "exec procedure sys.physrep.should_wait_for_con('%s', '%s')", gbl_dbname,
(gbl_machine_class) ? gbl_machine_class : gbl_myhostname);
Expand Down Expand Up @@ -1215,38 +1220,6 @@ static int check_for_reverse_conn(cdb2_hndl_tp *hndl) {
return (rc == 0) ? do_wait : -1;
}

void physrep_update_low_file_num(int *lowfilenum, int *local_lowfilenum) {
unsigned int physrep_minfilenum;
if ((get_dbtable_by_name("comdb2_physreps")) == NULL) {
return;
}

physrep_minfilenum = physrep_min_filenum();
if (physrep_minfilenum <= 0) {
if (gbl_physrep_debug) {
physrep_logmsg(LOGMSG_USER, "%s:%d: lowfilenum unchanged (physrep_minfilenum: %d)\n",
__func__, __LINE__, physrep_minfilenum);
}
} else {
if (physrep_minfilenum <= *lowfilenum) {
if (gbl_physrep_debug) {
physrep_logmsg(LOGMSG_USER, "%s:%d: lowfilenum %d being changed "
"physical replicant(s) (physrep_minfilenum: %d)\n",
__func__, __LINE__, *lowfilenum, physrep_minfilenum);
}
*lowfilenum = physrep_minfilenum - 1;
}
if (physrep_minfilenum <= *local_lowfilenum) {
*local_lowfilenum = physrep_minfilenum - 1;
}
}

if (gbl_physrep_debug) {
physrep_logmsg(LOGMSG_USER, "%s:%d: lowfilenum: %d (physrep_minfilenum: %d)\n",
__func__, __LINE__, *lowfilenum, physrep_minfilenum);
}
}

static int slow_replicants_count_int(cdb2_hndl_tp *metadb, unsigned int *count)
{
char query[400];
Expand Down Expand Up @@ -1306,93 +1279,6 @@ static int slow_replicants_count(unsigned int *count)
return badrc ? -1 : 0;
}

static int update_min_logfile_int(cdb2_hndl_tp *metadb)
{
char cmd[120+nodes_list_sz];
char *buf;
size_t buf_len;
int bytes_written;
int rc = 0;

if (gbl_ready == 0)
return 0;

bytes_written = 0;
buf = cmd;
buf_len = sizeof(cmd);

bytes_written +=
snprintf(buf+bytes_written, buf_len-bytes_written,
"WITH RECURSIVE replication_tree(dbname, host, file) AS "
" (SELECT dbname, host, file FROM comdb2_physreps "
" WHERE dbname='%s' AND host IN (",
gbl_dbname);
if (bytes_written >= buf_len) {
physrep_logmsg(LOGMSG_ERROR, "%s:%d Buffer is not long enough!\n", __func__, __LINE__);
return 1;
}

bytes_written += append_quoted_local_hosts(buf+bytes_written, buf_len-bytes_written, ",");
if (bytes_written >= buf_len) {
physrep_logmsg(LOGMSG_ERROR, "%s:%d Buffer is not long enough!\n", __func__, __LINE__);
return 1;
}

bytes_written += snprintf(buf + bytes_written, buf_len - bytes_written,
" ) "
" UNION "
" SELECT p.dbname, p.host, p.file FROM comdb2_physreps p, "
" comdb2_physrep_connections c, replication_tree t "
" WHERE p.state = 'Active' AND p.file <> 0 AND "
" t.dbname = c.source_dbname AND c.dbname = p.dbname) "
" SELECT file FROM replication_tree WHERE file IS NOT NULL ORDER BY file LIMIT 1");
if (bytes_written >= buf_len) {
physrep_logmsg(LOGMSG_ERROR, "%s:%d Buffer is not long enough!\n", __func__, __LINE__);
return 1;
}

if (gbl_physrep_debug) {
physrep_logmsg(LOGMSG_USER, "%s:%d Executing: %s\n", __func__, __LINE__, cmd);
}

rc = cdb2_run_statement(metadb, cmd);
if (rc == CDB2_OK) {
while ((rc = cdb2_next_record(metadb)) == CDB2_OK) {
int64_t *minfile = (int64_t *)cdb2_column_value(metadb, 0);
physrep_min_logfile = minfile ? (unsigned int)*minfile : 0;
}
if (rc == CDB2_OK_DONE)
rc = 0;
} else {
physrep_logmsg(LOGMSG_ERROR, "%s:%d Failed to execute (rc: %d)\n", __func__, __LINE__, rc);
}

return rc;
}

static int update_min_logfile(void)
{
cdb2_hndl_tp *metadb;
int rc, altcnt = gbl_altmetadb_count;

if ((rc = physrep_get_metadb_or_local_hndl(&metadb)) != 0) {
logmsg(LOGMSG_ERROR, "%s: failed to get metadb handle rc=%d\n", __func__, rc);
} else {
update_min_logfile_int(metadb);
cdb2_close(metadb);
}

for (int i = 0; i < altcnt; i++) {
if ((rc = get_alt_metadb_hndl(&metadb, i)) != 0) {
logmsg(LOGMSG_ERROR, "%s: failed to get alt metadb handle %d rc=%d\n", __func__, i, rc);
continue;
}
update_min_logfile_int(metadb);
cdb2_close(metadb);
}
return 0;
}

/*
Check whether we need to wait for a connection from one of the nodes
in the source db.
Expand Down Expand Up @@ -1501,6 +1387,7 @@ static void *physrep_worker(void *args)
comdb2_name_thread(__func__);

volatile int64_t gen, highest_gen = 0;
int do_revconn;
int64_t first_lcgen = -1;
size_t sql_cmd_len = 150;
char sql_cmd[sql_cmd_len];
Expand Down Expand Up @@ -1557,7 +1444,7 @@ static void *physrep_worker(void *args)
physrep_logmsg(LOGMSG_USER, "%s:%d Re-checking for reverse connection\n", __func__, __LINE__);
}
if ((rc = physrep_get_metadb_or_local_hndl(&repl_metadb)) == 0) {
int do_revconn = do_wait_for_reverse_conn(repl_metadb);
do_revconn = do_wait_for_reverse_conn(repl_metadb);
cdb2_close(repl_metadb);
if (gbl_physrep_debug) {
physrep_logmsg(LOGMSG_USER, "%s:%d Reverse connection check: do-revcon=%d, is-revcon=%d\n",
Expand Down Expand Up @@ -1586,8 +1473,10 @@ static void *physrep_worker(void *args)
goto sleep_and_retry;
}

last_revconn_check = comdb2_time_epoch();
if (do_wait_for_reverse_conn(repl_metadb) == 1) {
do_revconn = do_wait_for_reverse_conn(repl_metadb);

if (do_revconn == 1) {
last_revconn_check = comdb2_time_epoch();
is_revconn = 1;
int wait_timeout_sec = 60;

Expand Down Expand Up @@ -1632,6 +1521,13 @@ static void *physrep_worker(void *args)

set_repl_db_connected(rev_conn_hndl->remote_dbname, rev_conn_hndl->remote_host);
} else {
/* Intentionally try to register if we fail to determine if reverse connection is needed */
if (do_revconn == 0) {
last_revconn_check = comdb2_time_epoch();
} else {
physrep_logmsg(LOGMSG_DEBUG, "%s:%d do_revconn is %d, registering replicant\n", __func__, __LINE__,
do_revconn);
}
is_revconn = 0;
int notfound = 0;
while (stop_physrep_worker == 0) {
Expand All @@ -1650,6 +1546,12 @@ static void *physrep_worker(void *args)
physrep_logmsg(level, "%s:%d Failed to register against cluster, attempt %d\n",
__func__, __LINE__, notfound);

if (do_revconn == -1) {
cdb2_close(repl_metadb);
repl_metadb = NULL;
goto sleep_and_retry;
}

do {
cdb2_close(repl_metadb);
repl_metadb = NULL;
Expand Down Expand Up @@ -1957,7 +1859,6 @@ static void *physrep_watcher(void *args) {
static int physrep_slow_replicant_last_checked;
static int physrep_keepalive_last_sent;
static int physrep_hung_replicant_last_checked;
static int physrep_minlog_last_checked;

while (!gbl_exit && stop_physrep_watcher == 0) {
sleep(1);
Expand Down Expand Up @@ -1986,20 +1887,12 @@ static void *physrep_watcher(void *args) {
}
// Common:
// 1) Send keepalives
// 2) Update maximum log file number upto which it is safe to delete

// Periodically send keepalive to report its LSN.
if ((now - physrep_keepalive_last_sent) >= gbl_physrep_keepalive_freq_sec) {
send_keepalive();
physrep_keepalive_last_sent = now;
}

// Update the 'minimum log file' marker upto which it is safe to
// delete log files.
if ((now - physrep_minlog_last_checked) >= gbl_physrep_check_minlog_freq_sec) {
update_min_logfile();
physrep_minlog_last_checked = now;
}
}
return NULL;
}
Expand Down
2 changes: 1 addition & 1 deletion db/reverse_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ int gbl_revsql_allow_command_exec;
int gbl_revsql_debug = 0;
int gbl_revsql_cdb2_debug;
// 'reverse-connection host' list refresh frequency
int gbl_revsql_host_refresh_freq_sec = 5;
int gbl_revsql_host_refresh_freq_sec = 60;
// 'reverse-connection' worker's new connection attempt frequency
int gbl_revsql_connect_freq_sec = 5;

Expand Down
Loading
Loading