@@ -58,18 +58,17 @@ typedef struct DB_Connection {
5858 } while (0)
5959
6060int gbl_physrep_debug = 0 ;
61+ int gbl_physrep_fake_check_revconn_error = 0 ;
6162int gbl_physrep_reconnect_interval = 3600 ; // force re-registration every hour
6263int gbl_physrep_reconnect_penalty = 0 ;
6364int gbl_blocking_physrep = 1 ;
6465int gbl_physrep_fanout = 8 ;
6566int gbl_physrep_max_candidates = 6 ;
6667int gbl_physrep_max_pending_replicants = 10 ;
6768int gbl_deferred_phys_flag = 0 ;
68- int gbl_physrep_source_nodes_refresh_freq_sec = 10 ;
69- int gbl_physrep_slow_replicant_check_freq_sec = 10 ;
70- int gbl_physrep_keepalive_freq_sec = 10 ;
71- int gbl_physrep_check_minlog_freq_sec = 10 ;
72- int gbl_physrep_hung_replicant_check_freq_sec = 10 ;
69+ int gbl_physrep_slow_replicant_check_freq_sec = 60 ;
70+ int gbl_physrep_keepalive_freq_sec = 60 ;
71+ int gbl_physrep_hung_replicant_check_freq_sec = 60 ;
7372int gbl_physrep_hung_replicant_threshold = 60 ;
7473int gbl_physrep_revconn_check_interval = 60 ;
7574int gbl_physrep_update_registry_interval = 60 ;
@@ -78,7 +77,6 @@ int gbl_physrep_i_am_metadb = 0;
7877int gbl_physrep_filter_by_class = 1 ;
7978int gbl_started_physrep_threads = 0 ;
8079
81- unsigned int physrep_min_logfile ;
8280unsigned int gbl_deferred_phys_update ;
8381
8482char * gbl_physrep_source_dbname ;
@@ -1173,17 +1171,18 @@ static int send_keepalive(void)
11731171 return 0 ;
11741172}
11751173
1176- unsigned int physrep_min_filenum () {
1177- return physrep_min_logfile ;
1178- }
1179-
11801174extern int gbl_reverse_hosts_v2 ;
11811175
11821176static int check_for_reverse_conn (cdb2_hndl_tp * hndl ) {
11831177 int rc ;
11841178 char cmd [400 ];
11851179 int do_wait = 0 ;
11861180
1181+ if (gbl_physrep_fake_check_revconn_error ) {
1182+ logmsg (LOGMSG_USER , "%s:%d: Fake error checking for reverse connection\n" , __func__ , __LINE__ );
1183+ return -1 ;
1184+ }
1185+
11871186 if (!gbl_reverse_hosts_v2 ) {
11881187 rc = snprintf (cmd , sizeof (cmd ), "exec procedure sys.physrep.should_wait_for_con('%s', '%s')" , gbl_dbname ,
11891188 (gbl_machine_class ) ? gbl_machine_class : gbl_myhostname );
@@ -1215,38 +1214,6 @@ static int check_for_reverse_conn(cdb2_hndl_tp *hndl) {
12151214 return (rc == 0 ) ? do_wait : -1 ;
12161215}
12171216
1218- void physrep_update_low_file_num (int * lowfilenum , int * local_lowfilenum ) {
1219- unsigned int physrep_minfilenum ;
1220- if ((get_dbtable_by_name ("comdb2_physreps" )) == NULL ) {
1221- return ;
1222- }
1223-
1224- physrep_minfilenum = physrep_min_filenum ();
1225- if (physrep_minfilenum <= 0 ) {
1226- if (gbl_physrep_debug ) {
1227- physrep_logmsg (LOGMSG_USER , "%s:%d: lowfilenum unchanged (physrep_minfilenum: %d)\n" ,
1228- __func__ , __LINE__ , physrep_minfilenum );
1229- }
1230- } else {
1231- if (physrep_minfilenum <= * lowfilenum ) {
1232- if (gbl_physrep_debug ) {
1233- physrep_logmsg (LOGMSG_USER , "%s:%d: lowfilenum %d being changed "
1234- "physical replicant(s) (physrep_minfilenum: %d)\n" ,
1235- __func__ , __LINE__ , * lowfilenum , physrep_minfilenum );
1236- }
1237- * lowfilenum = physrep_minfilenum - 1 ;
1238- }
1239- if (physrep_minfilenum <= * local_lowfilenum ) {
1240- * local_lowfilenum = physrep_minfilenum - 1 ;
1241- }
1242- }
1243-
1244- if (gbl_physrep_debug ) {
1245- physrep_logmsg (LOGMSG_USER , "%s:%d: lowfilenum: %d (physrep_minfilenum: %d)\n" ,
1246- __func__ , __LINE__ , * lowfilenum , physrep_minfilenum );
1247- }
1248- }
1249-
12501217static int slow_replicants_count_int (cdb2_hndl_tp * metadb , unsigned int * count )
12511218{
12521219 char query [400 ];
@@ -1306,93 +1273,6 @@ static int slow_replicants_count(unsigned int *count)
13061273 return badrc ? -1 : 0 ;
13071274}
13081275
1309- static int update_min_logfile_int (cdb2_hndl_tp * metadb )
1310- {
1311- char cmd [120 + nodes_list_sz ];
1312- char * buf ;
1313- size_t buf_len ;
1314- int bytes_written ;
1315- int rc = 0 ;
1316-
1317- if (gbl_ready == 0 )
1318- return 0 ;
1319-
1320- bytes_written = 0 ;
1321- buf = cmd ;
1322- buf_len = sizeof (cmd );
1323-
1324- bytes_written +=
1325- snprintf (buf + bytes_written , buf_len - bytes_written ,
1326- "WITH RECURSIVE replication_tree(dbname, host, file) AS "
1327- " (SELECT dbname, host, file FROM comdb2_physreps "
1328- " WHERE dbname='%s' AND host IN (" ,
1329- gbl_dbname );
1330- if (bytes_written >= buf_len ) {
1331- physrep_logmsg (LOGMSG_ERROR , "%s:%d Buffer is not long enough!\n" , __func__ , __LINE__ );
1332- return 1 ;
1333- }
1334-
1335- bytes_written += append_quoted_local_hosts (buf + bytes_written , buf_len - bytes_written , "," );
1336- if (bytes_written >= buf_len ) {
1337- physrep_logmsg (LOGMSG_ERROR , "%s:%d Buffer is not long enough!\n" , __func__ , __LINE__ );
1338- return 1 ;
1339- }
1340-
1341- bytes_written += snprintf (buf + bytes_written , buf_len - bytes_written ,
1342- " ) "
1343- " UNION "
1344- " SELECT p.dbname, p.host, p.file FROM comdb2_physreps p, "
1345- " comdb2_physrep_connections c, replication_tree t "
1346- " WHERE p.state = 'Active' AND p.file <> 0 AND "
1347- " t.dbname = c.source_dbname AND c.dbname = p.dbname) "
1348- " SELECT file FROM replication_tree WHERE file IS NOT NULL ORDER BY file LIMIT 1" );
1349- if (bytes_written >= buf_len ) {
1350- physrep_logmsg (LOGMSG_ERROR , "%s:%d Buffer is not long enough!\n" , __func__ , __LINE__ );
1351- return 1 ;
1352- }
1353-
1354- if (gbl_physrep_debug ) {
1355- physrep_logmsg (LOGMSG_USER , "%s:%d Executing: %s\n" , __func__ , __LINE__ , cmd );
1356- }
1357-
1358- rc = cdb2_run_statement (metadb , cmd );
1359- if (rc == CDB2_OK ) {
1360- while ((rc = cdb2_next_record (metadb )) == CDB2_OK ) {
1361- int64_t * minfile = (int64_t * )cdb2_column_value (metadb , 0 );
1362- physrep_min_logfile = minfile ? (unsigned int )* minfile : 0 ;
1363- }
1364- if (rc == CDB2_OK_DONE )
1365- rc = 0 ;
1366- } else {
1367- physrep_logmsg (LOGMSG_ERROR , "%s:%d Failed to execute (rc: %d)\n" , __func__ , __LINE__ , rc );
1368- }
1369-
1370- return rc ;
1371- }
1372-
1373- static int update_min_logfile (void )
1374- {
1375- cdb2_hndl_tp * metadb ;
1376- int rc , altcnt = gbl_altmetadb_count ;
1377-
1378- if ((rc = physrep_get_metadb_or_local_hndl (& metadb )) != 0 ) {
1379- logmsg (LOGMSG_ERROR , "%s: failed to get metadb handle rc=%d\n" , __func__ , rc );
1380- } else {
1381- update_min_logfile_int (metadb );
1382- cdb2_close (metadb );
1383- }
1384-
1385- for (int i = 0 ; i < altcnt ; i ++ ) {
1386- if ((rc = get_alt_metadb_hndl (& metadb , i )) != 0 ) {
1387- logmsg (LOGMSG_ERROR , "%s: failed to get alt metadb handle %d rc=%d\n" , __func__ , i , rc );
1388- continue ;
1389- }
1390- update_min_logfile_int (metadb );
1391- cdb2_close (metadb );
1392- }
1393- return 0 ;
1394- }
1395-
13961276/*
13971277 Check whether we need to wait for a connection from one of the nodes
13981278 in the source db.
@@ -1501,6 +1381,7 @@ static void *physrep_worker(void *args)
15011381 comdb2_name_thread (__func__ );
15021382
15031383 volatile int64_t gen , highest_gen = 0 ;
1384+ int do_revconn ;
15041385 int64_t first_lcgen = -1 ;
15051386 size_t sql_cmd_len = 150 ;
15061387 char sql_cmd [sql_cmd_len ];
@@ -1557,7 +1438,7 @@ static void *physrep_worker(void *args)
15571438 physrep_logmsg (LOGMSG_USER , "%s:%d Re-checking for reverse connection\n" , __func__ , __LINE__ );
15581439 }
15591440 if ((rc = physrep_get_metadb_or_local_hndl (& repl_metadb )) == 0 ) {
1560- int do_revconn = do_wait_for_reverse_conn (repl_metadb );
1441+ do_revconn = do_wait_for_reverse_conn (repl_metadb );
15611442 cdb2_close (repl_metadb );
15621443 if (gbl_physrep_debug ) {
15631444 physrep_logmsg (LOGMSG_USER , "%s:%d Reverse connection check: do-revcon=%d, is-revcon=%d\n" ,
@@ -1586,8 +1467,10 @@ static void *physrep_worker(void *args)
15861467 goto sleep_and_retry ;
15871468 }
15881469
1589- last_revconn_check = comdb2_time_epoch ();
1590- if (do_wait_for_reverse_conn (repl_metadb ) == 1 ) {
1470+ do_revconn = do_wait_for_reverse_conn (repl_metadb );
1471+
1472+ if (do_revconn == 1 ) {
1473+ last_revconn_check = comdb2_time_epoch ();
15911474 is_revconn = 1 ;
15921475 int wait_timeout_sec = 60 ;
15931476
@@ -1631,9 +1514,13 @@ static void *physrep_worker(void *args)
16311514 do_truncate = 1 ;
16321515
16331516 set_repl_db_connected (rev_conn_hndl -> remote_dbname , rev_conn_hndl -> remote_host );
1634- } else {
1517+ } else if (do_revconn == 0 ) {
1518+ last_revconn_check = comdb2_time_epoch ();
16351519 is_revconn = 0 ;
16361520 int notfound = 0 ;
1521+
1522+ physrep_logmsg (LOGMSG_DEBUG , "%s:%d do_revconn is 0, registering replicant\n" , __func__ , __LINE__ );
1523+
16371524 while (stop_physrep_worker == 0 ) {
16381525 if ((rc = register_self (repl_metadb )) == 0 )
16391526 break ;
@@ -1668,6 +1555,11 @@ static void *physrep_worker(void *args)
16681555
16691556 /* Perform truncation to start fresh */
16701557 do_truncate = 1 ;
1558+ } else {
1559+ physrep_logmsg (LOGMSG_ERROR , "%s:%d Failed to determine if reverse connection is needed\n" , __func__ ,
1560+ __LINE__ );
1561+ cdb2_close (repl_metadb );
1562+ goto sleep_and_retry ;
16711563 }
16721564
16731565 // Close the connection to metadb as we now have a connection to the
@@ -1957,7 +1849,6 @@ static void *physrep_watcher(void *args) {
19571849 static int physrep_slow_replicant_last_checked ;
19581850 static int physrep_keepalive_last_sent ;
19591851 static int physrep_hung_replicant_last_checked ;
1960- static int physrep_minlog_last_checked ;
19611852
19621853 while (!gbl_exit && stop_physrep_watcher == 0 ) {
19631854 sleep (1 );
@@ -1986,20 +1877,12 @@ static void *physrep_watcher(void *args) {
19861877 }
19871878 // Common:
19881879 // 1) Send keepalives
1989- // 2) Update maximum log file number upto which it is safe to delete
19901880
19911881 // Periodically send keepalive to report its LSN.
19921882 if ((now - physrep_keepalive_last_sent ) >= gbl_physrep_keepalive_freq_sec ) {
19931883 send_keepalive ();
19941884 physrep_keepalive_last_sent = now ;
19951885 }
1996-
1997- // Update the 'minimum log file' marker upto which it is safe to
1998- // delete log files.
1999- if ((now - physrep_minlog_last_checked ) >= gbl_physrep_check_minlog_freq_sec ) {
2000- update_min_logfile ();
2001- physrep_minlog_last_checked = now ;
2002- }
20031886 }
20041887 return NULL ;
20051888}
0 commit comments