Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions db/db_metrics.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ struct comdb2_metrics_store {
int64_t not_durable_commits;
int64_t incoherent_slow_skips;
int64_t inmem_repdb_memory;
int64_t physrep_metadb_sql_count;

int64_t page_reads;
int64_t page_writes;
Expand Down Expand Up @@ -321,6 +322,8 @@ comdb2_metric gbl_metrics[] = {
STATISTIC_COLLECTION_TYPE_CUMULATIVE, &stats.incoherent_slow_skips, NULL},
{"inmem_repdb_memory", "Memory utilized by in-memory repdb", STATISTIC_INTEGER, STATISTIC_COLLECTION_TYPE_LATEST,
&stats.inmem_repdb_memory, NULL},
{"physrep_metadb_sql_count", "Count of SQL statements executed against physrep metadb", STATISTIC_INTEGER,
STATISTIC_COLLECTION_TYPE_CUMULATIVE, &stats.physrep_metadb_sql_count, NULL},
{"page_reads", "Total page reads", STATISTIC_INTEGER, STATISTIC_COLLECTION_TYPE_CUMULATIVE, &stats.page_reads,
NULL},
{"page_writes", "Total page writes", STATISTIC_INTEGER, STATISTIC_COLLECTION_TYPE_CUMULATIVE, &stats.page_writes,
Expand Down Expand Up @@ -446,6 +449,7 @@ extern int64_t gbl_distributed_commit_count;
extern int64_t gbl_not_durable_commit_count;
extern int64_t gbl_incoherent_slow_skips;
extern int64_t gbl_inmem_repdb_memory;
extern int64_t gbl_physrep_metadb_sql_count;

static void update_sqllogfill_metrics()
{
Expand Down Expand Up @@ -656,6 +660,7 @@ int refresh_metrics(void)
stats.not_durable_commits = gbl_not_durable_commit_count;
stats.incoherent_slow_skips = gbl_incoherent_slow_skips;
stats.inmem_repdb_memory = gbl_inmem_repdb_memory;
stats.physrep_metadb_sql_count = gbl_physrep_metadb_sql_count;
struct global_stats gstats = {0};

global_request_stats(&gstats);
Expand Down
1 change: 1 addition & 0 deletions db/db_tunables.c
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ extern int gbl_physrep_update_registry_interval;
extern int gbl_physrep_i_am_metadb;
extern int gbl_physrep_keepalive_v2;
extern int gbl_physrep_keepalive_freq_sec;
extern int gbl_physrep_slow_replicant_check_freq_sec;
extern int gbl_physrep_max_candidates;
extern int gbl_physrep_reconnect_penalty;
extern int gbl_physrep_reconnect_interval;
Expand Down
23 changes: 11 additions & 12 deletions db/db_tunables.h
Original file line number Diff line number Diff line change
Expand Up @@ -1821,7 +1821,7 @@ REGISTER_TUNABLE("tranlog_incoherent_timeout", "Timeout in seconds for incoheren
TUNABLE_INTEGER, &gbl_tranlog_incoherent_timeout, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("tranlog_maxpoll", "Tranlog timeout in seconds for blocking poll. (Default: 60)", TUNABLE_INTEGER,
&gbl_tranlog_maxpoll, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_check_minlog_freq_sec", "Check the minimum log number to keep this often. (Default: 10)",
REGISTER_TUNABLE("physrep_check_minlog_freq_sec", "Check the minimum log number to keep this often. (Default: 600)",
TUNABLE_INTEGER, &gbl_physrep_check_minlog_freq_sec, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_debug", "Print extended physrep trace. (Default: off)", TUNABLE_BOOLEAN, &gbl_physrep_debug,
0, NULL, NULL, NULL, NULL);
Expand All @@ -1830,10 +1830,8 @@ REGISTER_TUNABLE("physrep_exit_on_invalid_logstream", "Exit physreps on invalid
REGISTER_TUNABLE("physrep_fanout",
"Maximum number of physical replicants that a node can service (Default: 8)",
TUNABLE_INTEGER, &gbl_physrep_fanout, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_hung_replicant_check_freq_sec",
"Check for hung physical replicant this often. (Default: 10)",
TUNABLE_INTEGER, &gbl_physrep_hung_replicant_check_freq_sec, 0, NULL,
NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_hung_replicant_check_freq_sec", "Check for hung physical replicant this often. (Default: 60)",
TUNABLE_INTEGER, &gbl_physrep_hung_replicant_check_freq_sec, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_hung_replicant_threshold",
"Report if the physical replicant has been inactive for this duration. (Default: 60)",
TUNABLE_INTEGER, &gbl_physrep_hung_replicant_threshold, 0, NULL,
Expand All @@ -1856,13 +1854,14 @@ REGISTER_TUNABLE("physrep_i_am_metadb", "I am physical replication metadb (Defau
REGISTER_TUNABLE("physrep_keepalive_v2", "Use version 2 of keepalive which includes first lsn. (Default: off)",
TUNABLE_BOOLEAN, &gbl_physrep_keepalive_v2, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_keepalive_freq_sec",
"Periodically send lsn to source node after this interval. (Default: 10)", TUNABLE_INTEGER,
"Periodically send lsn to source node after this interval. (Default: 60)", TUNABLE_INTEGER,
&gbl_physrep_keepalive_freq_sec, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_slow_replicant_check_freq_sec", "Check for slow physical replicant this often. (Default: 60)",
TUNABLE_INTEGER, &gbl_physrep_slow_replicant_check_freq_sec, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_max_candidates",
"Maximum number of candidates that should be returned to a "
"new physical replicant during registration. (Default: 6)",
TUNABLE_INTEGER, &gbl_physrep_max_candidates, 0, NULL,
NULL, NULL, NULL);
TUNABLE_INTEGER, &gbl_physrep_max_candidates, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_metadb_host", "List of physical replication metadb cluster hosts.", TUNABLE_STRING,
&gbl_physrep_metadb_host, READONLY, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("physrep_metadb_name", "Physical replication metadb cluster name.",
Expand Down Expand Up @@ -1922,10 +1921,10 @@ REGISTER_TUNABLE("revsql_debug",
"Print extended reversql-sql trace. (Default: off)",
TUNABLE_BOOLEAN, &gbl_revsql_debug, EXPERIMENTAL | INTERNAL,
NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("revsql_host_refresh_freq_sec", "The frequency at which the "
"reverse connection host list will be refreshed (Default: 5secs)",
TUNABLE_INTEGER, &gbl_revsql_host_refresh_freq_sec, EXPERIMENTAL | INTERNAL,
NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("revsql_host_refresh_freq_sec",
"The frequency at which the "
"reverse connection host list will be refreshed (Default: 60 secs)",
TUNABLE_INTEGER, &gbl_revsql_host_refresh_freq_sec, EXPERIMENTAL | INTERNAL, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("admin_revsql", "Run revsql sessions as admin. (Default: Off)", TUNABLE_BOOLEAN, &gbl_admin_revsql, 0,
NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("revconn_rdtimeout", "Initial ms timeout for revconn connections. (Default: 100)", TUNABLE_INTEGER,
Expand Down
46 changes: 20 additions & 26 deletions db/phys_rep.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

#include <parse_lsn.h>
#include <logmsg.h>
#include <comdb2_atomic.h>

/* internal implementation */
typedef struct DB_Connection {
Expand All @@ -61,15 +62,15 @@ int gbl_physrep_debug = 0;
int gbl_physrep_reconnect_interval = 3600; // force re-registration every hour
int gbl_physrep_reconnect_penalty = 0;
int gbl_blocking_physrep = 1;
int64_t gbl_physrep_metadb_sql_count = 0;
int gbl_physrep_fanout = 8;
int gbl_physrep_max_candidates = 6;
int gbl_physrep_max_pending_replicants = 10;
int gbl_deferred_phys_flag = 0;
int gbl_physrep_source_nodes_refresh_freq_sec = 10;
int gbl_physrep_slow_replicant_check_freq_sec = 10;
int gbl_physrep_keepalive_freq_sec = 10;
int gbl_physrep_check_minlog_freq_sec = 10;
int gbl_physrep_hung_replicant_check_freq_sec = 10;
int gbl_physrep_slow_replicant_check_freq_sec = 60;
int gbl_physrep_keepalive_freq_sec = 60;
int gbl_physrep_hung_replicant_check_freq_sec = 60;
int gbl_physrep_check_minlog_freq_sec = 600;
int gbl_physrep_hung_replicant_threshold = 60;
int gbl_physrep_revconn_check_interval = 60;
int gbl_physrep_update_registry_interval = 60;
Expand Down Expand Up @@ -535,6 +536,7 @@ static int update_registry(cdb2_hndl_tp *repl_metadb, const char *remote_dbname,
physrep_logmsg(LOGMSG_USER, "%s:%d Executing: %s\n", __func__, __LINE__, cmd);
}

ATOMIC_ADD64(gbl_physrep_metadb_sql_count, 1);
if ((rc = cdb2_run_statement(repl_metadb, cmd)) == CDB2_OK) {
while ((rc = cdb2_next_record(repl_metadb)) == CDB2_OK)
;
Expand Down Expand Up @@ -600,6 +602,7 @@ static int send_reset_nodes_int(cdb2_hndl_tp *hndl, const char *state)
physrep_logmsg(LOGMSG_USER, "%s:%d Executing: %s\n", __func__, __LINE__, cmd);
}

ATOMIC_ADD64(gbl_physrep_metadb_sql_count, 1);
if ((rc = cdb2_run_statement(hndl, cmd)) == CDB2_OK) {
while ((rc = cdb2_next_record(hndl)) == CDB2_OK)
;
Expand Down Expand Up @@ -924,6 +927,7 @@ static int register_self(cdb2_hndl_tp *repl_metadb)
}

int candidate_leaders_count = 0;
ATOMIC_ADD64(gbl_physrep_metadb_sql_count, 1);
if ((rc = cdb2_run_statement(repl_metadb, cmd)) == CDB2_OK) {
while ((rc = cdb2_next_record(repl_metadb)) == CDB2_OK) {
char *dbname = (char *)cdb2_column_value(repl_metadb, 1);
Expand Down Expand Up @@ -1102,26 +1106,10 @@ int gbl_physrep_keepalive_v2 = 0;

static int send_keepalive_int(cdb2_hndl_tp *metadb)
{
int rc = 0, use_v2 = 0;
int rc = 0, use_v2 = gbl_physrep_keepalive_v2;
char cmd[600];
LOG_INFO info;

/* TODO: remove after v2 enabled & new-schema is everywhere */
if (gbl_physrep_keepalive_v2) {
rc = snprintf(
cmd, sizeof(cmd),
"select count(*) from comdb2_columns where tablename='comdb2_physreps' and columnname='firstfile'");
rc = cdb2_run_statement(metadb, cmd);
if (rc != CDB2_OK) {
physrep_logmsg(LOGMSG_ERROR, "%s:%d Failed to execute cmd %s (rc: %d)\n", __func__, __LINE__, cmd, rc);
return rc;
}
if (cdb2_next_record(metadb) == CDB2_OK) {
int64_t val = *(int64_t *)cdb2_column_value(metadb, 0);
use_v2 = (val != 0) ? 1 : 0;
}
}

info = get_last_lsn(thedb->bdb_env);
if (use_v2) {
LOG_INFO first_info;
Expand All @@ -1140,6 +1128,7 @@ static int send_keepalive_int(cdb2_hndl_tp *metadb)
if (gbl_physrep_debug)
physrep_logmsg(LOGMSG_USER, "%s:%d: Executing: %s\n", __func__, __LINE__, cmd);

ATOMIC_ADD64(gbl_physrep_metadb_sql_count, 1);
rc = cdb2_run_statement(metadb, cmd);
if (rc == CDB2_OK) {
while (cdb2_next_record(metadb) == CDB2_OK) {
Expand Down Expand Up @@ -1200,6 +1189,7 @@ static int check_for_reverse_conn(cdb2_hndl_tp *hndl) {
if (gbl_physrep_debug)
physrep_logmsg(LOGMSG_USER, "%s:%d Executing: %s\n", __func__, __LINE__, cmd);

ATOMIC_ADD64(gbl_physrep_metadb_sql_count, 1);
if ((rc = cdb2_run_statement(hndl, cmd)) == CDB2_OK) {
while ((rc = cdb2_next_record(hndl)) == CDB2_OK) {
int64_t val = *(int64_t *)cdb2_column_value(hndl, 0);
Expand Down Expand Up @@ -1261,6 +1251,7 @@ static int slow_replicants_count_int(cdb2_hndl_tp *metadb, unsigned int *count)
"AND now() - last_keepalive >= %d",
gbl_dbname, gbl_physrep_hung_replicant_threshold);

ATOMIC_ADD64(gbl_physrep_metadb_sql_count, 1);
rc = cdb2_run_statement(metadb, query);
if (rc == CDB2_OK) {
while ((rc = cdb2_next_record(metadb)) == CDB2_OK) {
Expand Down Expand Up @@ -1355,6 +1346,7 @@ static int update_min_logfile_int(cdb2_hndl_tp *metadb)
physrep_logmsg(LOGMSG_USER, "%s:%d Executing: %s\n", __func__, __LINE__, cmd);
}

ATOMIC_ADD64(gbl_physrep_metadb_sql_count, 1);
rc = cdb2_run_statement(metadb, cmd);
if (rc == CDB2_OK) {
while ((rc = cdb2_next_record(metadb)) == CDB2_OK) {
Expand Down Expand Up @@ -1563,14 +1555,15 @@ static void *physrep_worker(void *args)
physrep_logmsg(LOGMSG_USER, "%s:%d Reverse connection check: do-revcon=%d, is-revcon=%d\n",
__func__, __LINE__, do_revconn, is_revconn);
}

/* The call might have failed. That's okay, don't hammer metadb */
last_revconn_check = comdb2_time_epoch();

if (do_revconn == -1) {
logmsg(LOGMSG_ERROR, "%s:%d Failed to contact physrep metadb- keeping do_revconn the same: %d\n",
logmsg(LOGMSG_DEBUG, "%s:%d Failed to contact physrep metadb- keeping do_revconn the same: %d\n",
__func__, __LINE__, is_revconn);
} else {

/* Only update timestamp on successful check */
last_revconn_check = comdb2_time_epoch();

if ((do_revconn && !is_revconn) || (!do_revconn && is_revconn)) {
logmsg(LOGMSG_USER, "Revconn changed, do_revconn=%d, is_revconn=%d\n", do_revconn, is_revconn);
close_repl_connection(repl_db_cnct, repl_db, __func__, __LINE__);
Expand Down Expand Up @@ -1926,6 +1919,7 @@ static void am_i_hung(time_t cur_time) {
return;
}

ATOMIC_ADD64(gbl_physrep_metadb_sql_count, 1);
rc = cdb2_run_statement(repl_metadb, query);
if (rc == CDB2_OK) {
while ((rc = cdb2_next_record(repl_metadb)) == CDB2_OK) {
Expand Down
2 changes: 1 addition & 1 deletion db/reverse_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ int gbl_revsql_allow_command_exec;
int gbl_revsql_debug = 0;
int gbl_revsql_cdb2_debug;
// 'reverse-connection host' list refresh frequency
int gbl_revsql_host_refresh_freq_sec = 5;
int gbl_revsql_host_refresh_freq_sec = 60;
// 'reverse-connection' worker's new connection attempt frequency
int gbl_revsql_connect_freq_sec = 5;

Expand Down
6 changes: 3 additions & 3 deletions docs/pages/operating/physical_replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ CREATE TABLE comdb2_physrep_sources(dbname CSTRING(60),
## Tunables

* blocking_physrep: The `SELECT .. FROM comdb2_transaction_logs` query executed by physical replicants blocks for the next log record. (Default: `false`)
* physrep_check_minlog_freq_sec: Check the minimum log number to keep this often. (Default: `10`)
* physrep_check_minlog_freq_sec: Check the minimum log number to keep this often. (Default: `600`)
* physrep_debug: Print extended physrep trace. (Default: `off`)
* physrep_exit_on_invalid_logstream: Exit physreps on invalid logstream. (Default: off)
* physrep_fanout: Maximum number of physical replicants that a node can service (Default: `8`)
Expand All @@ -189,9 +189,9 @@ CREATE TABLE comdb2_physrep_sources(dbname CSTRING(60),
* physrep_source_host: List of physical replication source cluster hosts.
* revsql_allow_command_execution : Allow processing and execution of command * over the `reverse connection` that has come in as part of the request. This is mostly intended for testing. (Default: off)
* revsql_cdb2_debug: Print extended reversql-sql cdb2 related trace. (Default: off)
* revsql_connect_freq_sec: This node will attempt to `reverse connect` to the remote host at this frequency. (Default: 5secs)
* revsql_connect_freq_sec: This node will attempt to `reverse connect` to the remote host at this frequency. (Default: 5 secs)
* revsql_debug: Print extended reversql-sql trace. (Default: off)
* revsql_host_refresh_freq_sec: The frequency at which the reverse connection host list will be refreshed. (Default: 5secs)
* revsql_host_refresh_freq_sec: The frequency at which the reverse connection host list will be refreshed. (Default: 60 secs)
* revsql_force_rte: Force rte-mode for all reverse connections. (Default: on)
* connect_remote_rte: Force rte-mode for both fdb and revsql connections. (Default: off)

Expand Down
8 changes: 0 additions & 8 deletions lua/lib/physrep_get_reverse_hosts.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,6 @@
local function main(dbname, hostname)
db:begin()

-- Check whether 'comdb2_physrep_sources' table exists
local rs, rc = db:exec("SELECT count(*)=1 AS cnt FROM comdb2_tables WHERE tablename = 'comdb2_physrep_sources'")
local row = rs:fetch()
if row.cnt == 0 then
db:commit()
return
end

local rs, rc = db:exec("SELECT dbname, host FROM comdb2_physrep_sources WHERE " ..
"source_dbname = '" .. dbname .. "' AND source_host = '" .. hostname .. "'")
local row = rs:fetch()
Expand Down
8 changes: 0 additions & 8 deletions lua/lib/physrep_get_reverse_hosts_v2.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,6 @@
local function main(dbname, hostname)
db:begin()

-- Check whether 'comdb2_physrep_sources' table exists
local rs, rc = db:exec("SELECT count(*)=1 AS cnt FROM comdb2_tables WHERE tablename = 'comdb2_physrep_sources'")
local row = rs:fetch()
if row.cnt == 0 then
db:commit()
return
end

local rs, rc = db:exec("SELECT dbname, host FROM comdb2_physrep_sources WHERE " ..
"source_dbname = '" .. dbname .. "' AND source_host = '" .. hostname .. "'")
local row = rs:fetch()
Expand Down
8 changes: 0 additions & 8 deletions lua/lib/physrep_get_revhosts_v2.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,6 @@
local function main(dbname, hostname, tier, cluster)
db:begin()

-- Check whether 'comdb2_physrep_sources' table exists
local rs, rc = db:exec("SELECT count(*)=1 AS cnt FROM comdb2_tables WHERE tablename = 'comdb2_physrep_sources'")
local row = rs:fetch()
if row.cnt == 0 then
db:commit()
return
end

local sql = ("SELECT dbname, host FROM comdb2_physrep_sources WHERE " ..
"source_dbname = '" .. dbname .. "' AND ( source_host = '" .. hostname ..
"' OR source_host = '" .. tier ..
Expand Down
9 changes: 0 additions & 9 deletions lua/lib/physrep_should_wait_for_con.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,6 @@
local function main(dbname, hostname)
db:begin()

local rs, row = db:exec("SELECT count(*)=1 AS cnt FROM comdb2_tables WHERE tablename = 'comdb2_physrep_sources'")
local row = rs:fetch()

if row.cnt == 0 then
db:emit(row)
db:commit()
return
end

local rs, row = db:exec("SELECT count(*) as cnt FROM comdb2_physrep_sources " ..
" WHERE dbname = '" .. dbname .. "' AND " ..
" host LIKE '" .. hostname .. "'")
Expand Down
9 changes: 0 additions & 9 deletions lua/lib/physrep_shouldwait_v2.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,6 @@
local function main(dbname, hostname, tier, cluster)
db:begin()

local rs, row = db:exec("SELECT count(*)=1 AS cnt FROM comdb2_tables WHERE tablename = 'comdb2_physrep_sources'")
local row = rs:fetch()

if row.cnt == 0 then
db:emit(row)
db:commit()
return
end

local sql = ("SELECT count(*) as cnt from comdb2_physrep_sources " ..
" WHERE dbname = '" .. dbname .. "' AND " ..
" ( host LIKE '" .. hostname .. "' OR " ..
Expand Down
1 change: 1 addition & 0 deletions tests/phys_rep_tiered.test/firstfile.testopts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
physrep_keepalive_v2 1
4 changes: 3 additions & 1 deletion tests/phys_rep_tiered.test/lrl.options
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ logmsg level debug
dtastripe 1
debug_drop_nth_rep_message 10000
incoherent_slow_inactive_timeout 0
# test requires fast revconn than
revsql_host_refresh_freq_sec 5
revsql_fake_connect_failure 1
revsql_debug 1
physrep_debug 1
Expand All @@ -10,7 +12,7 @@ forbid_remote_admin 0
ctrace_dbdir 1
allow_lua_print 1
reverse_hosts_v2 1
physrep_keepalive_v2 1
physrep_keepalive_v2 0
enable_snapshot_isolation

# Reproduced 'incoherent-physrep' bug with this removed.
Expand Down
Loading
Loading