@@ -58,18 +58,18 @@ typedef struct DB_Connection {
5858 } while (0)
5959
6060int gbl_physrep_debug = 0 ;
61+ int gbl_physrep_fake_check_revconn_error = 0 ;
62+ int gbl_physrep_fake_register_self_error = 0 ;
6163int gbl_physrep_reconnect_interval = 3600 ; // force re-registration every hour
6264int gbl_physrep_reconnect_penalty = 0 ;
6365int gbl_blocking_physrep = 1 ;
6466int gbl_physrep_fanout = 8 ;
6567int gbl_physrep_max_candidates = 6 ;
6668int gbl_physrep_max_pending_replicants = 10 ;
6769int gbl_deferred_phys_flag = 0 ;
68- int gbl_physrep_source_nodes_refresh_freq_sec = 10 ;
69- int gbl_physrep_slow_replicant_check_freq_sec = 10 ;
70- int gbl_physrep_keepalive_freq_sec = 10 ;
71- int gbl_physrep_check_minlog_freq_sec = 10 ;
72- int gbl_physrep_hung_replicant_check_freq_sec = 10 ;
70+ int gbl_physrep_slow_replicant_check_freq_sec = 60 ;
71+ int gbl_physrep_keepalive_freq_sec = 60 ;
72+ int gbl_physrep_hung_replicant_check_freq_sec = 60 ;
7373int gbl_physrep_hung_replicant_threshold = 60 ;
7474int gbl_physrep_revconn_check_interval = 60 ;
7575int gbl_physrep_update_registry_interval = 60 ;
@@ -78,7 +78,6 @@ int gbl_physrep_i_am_metadb = 0;
7878int gbl_physrep_filter_by_class = 1 ;
7979int gbl_started_physrep_threads = 0 ;
8080
81- unsigned int physrep_min_logfile ;
8281unsigned int gbl_deferred_phys_update ;
8382
8483char * gbl_physrep_source_dbname ;
@@ -871,6 +870,11 @@ static int register_self(cdb2_hndl_tp *repl_metadb)
871870 physrep_logmsg (LOGMSG_USER , "%s:%d Registering self\n" , __func__ , __LINE__ );
872871 }
873872
873+ if (gbl_physrep_fake_register_self_error ) {
874+ logmsg (LOGMSG_USER , "%s:%d: Fake error in register_self\n" , __func__ , __LINE__ );
875+ return -1 ;
876+ }
877+
874878 // Reset all the nodes from this physical replication cluster; and mark them
875879 // 'Inactive'.
876880 //
@@ -1173,17 +1177,18 @@ static int send_keepalive(void)
11731177 return 0 ;
11741178}
11751179
1176- unsigned int physrep_min_filenum () {
1177- return physrep_min_logfile ;
1178- }
1179-
11801180extern int gbl_reverse_hosts_v2 ;
11811181
11821182static int check_for_reverse_conn (cdb2_hndl_tp * hndl ) {
11831183 int rc ;
11841184 char cmd [400 ];
11851185 int do_wait = 0 ;
11861186
1187+ if (gbl_physrep_fake_check_revconn_error ) {
1188+ logmsg (LOGMSG_USER , "%s:%d: Fake error checking for reverse connection\n" , __func__ , __LINE__ );
1189+ return -1 ;
1190+ }
1191+
11871192 if (!gbl_reverse_hosts_v2 ) {
11881193 rc = snprintf (cmd , sizeof (cmd ), "exec procedure sys.physrep.should_wait_for_con('%s', '%s')" , gbl_dbname ,
11891194 (gbl_machine_class ) ? gbl_machine_class : gbl_myhostname );
@@ -1215,38 +1220,6 @@ static int check_for_reverse_conn(cdb2_hndl_tp *hndl) {
12151220 return (rc == 0 ) ? do_wait : -1 ;
12161221}
12171222
1218- void physrep_update_low_file_num (int * lowfilenum , int * local_lowfilenum ) {
1219- unsigned int physrep_minfilenum ;
1220- if ((get_dbtable_by_name ("comdb2_physreps" )) == NULL ) {
1221- return ;
1222- }
1223-
1224- physrep_minfilenum = physrep_min_filenum ();
1225- if (physrep_minfilenum <= 0 ) {
1226- if (gbl_physrep_debug ) {
1227- physrep_logmsg (LOGMSG_USER , "%s:%d: lowfilenum unchanged (physrep_minfilenum: %d)\n" ,
1228- __func__ , __LINE__ , physrep_minfilenum );
1229- }
1230- } else {
1231- if (physrep_minfilenum <= * lowfilenum ) {
1232- if (gbl_physrep_debug ) {
1233- physrep_logmsg (LOGMSG_USER , "%s:%d: lowfilenum %d being changed "
1234- "physical replicant(s) (physrep_minfilenum: %d)\n" ,
1235- __func__ , __LINE__ , * lowfilenum , physrep_minfilenum );
1236- }
1237- * lowfilenum = physrep_minfilenum - 1 ;
1238- }
1239- if (physrep_minfilenum <= * local_lowfilenum ) {
1240- * local_lowfilenum = physrep_minfilenum - 1 ;
1241- }
1242- }
1243-
1244- if (gbl_physrep_debug ) {
1245- physrep_logmsg (LOGMSG_USER , "%s:%d: lowfilenum: %d (physrep_minfilenum: %d)\n" ,
1246- __func__ , __LINE__ , * lowfilenum , physrep_minfilenum );
1247- }
1248- }
1249-
12501223static int slow_replicants_count_int (cdb2_hndl_tp * metadb , unsigned int * count )
12511224{
12521225 char query [400 ];
@@ -1306,93 +1279,6 @@ static int slow_replicants_count(unsigned int *count)
13061279 return badrc ? -1 : 0 ;
13071280}
13081281
1309- static int update_min_logfile_int (cdb2_hndl_tp * metadb )
1310- {
1311- char cmd [120 + nodes_list_sz ];
1312- char * buf ;
1313- size_t buf_len ;
1314- int bytes_written ;
1315- int rc = 0 ;
1316-
1317- if (gbl_ready == 0 )
1318- return 0 ;
1319-
1320- bytes_written = 0 ;
1321- buf = cmd ;
1322- buf_len = sizeof (cmd );
1323-
1324- bytes_written +=
1325- snprintf (buf + bytes_written , buf_len - bytes_written ,
1326- "WITH RECURSIVE replication_tree(dbname, host, file) AS "
1327- " (SELECT dbname, host, file FROM comdb2_physreps "
1328- " WHERE dbname='%s' AND host IN (" ,
1329- gbl_dbname );
1330- if (bytes_written >= buf_len ) {
1331- physrep_logmsg (LOGMSG_ERROR , "%s:%d Buffer is not long enough!\n" , __func__ , __LINE__ );
1332- return 1 ;
1333- }
1334-
1335- bytes_written += append_quoted_local_hosts (buf + bytes_written , buf_len - bytes_written , "," );
1336- if (bytes_written >= buf_len ) {
1337- physrep_logmsg (LOGMSG_ERROR , "%s:%d Buffer is not long enough!\n" , __func__ , __LINE__ );
1338- return 1 ;
1339- }
1340-
1341- bytes_written += snprintf (buf + bytes_written , buf_len - bytes_written ,
1342- " ) "
1343- " UNION "
1344- " SELECT p.dbname, p.host, p.file FROM comdb2_physreps p, "
1345- " comdb2_physrep_connections c, replication_tree t "
1346- " WHERE p.state = 'Active' AND p.file <> 0 AND "
1347- " t.dbname = c.source_dbname AND c.dbname = p.dbname) "
1348- " SELECT file FROM replication_tree WHERE file IS NOT NULL ORDER BY file LIMIT 1" );
1349- if (bytes_written >= buf_len ) {
1350- physrep_logmsg (LOGMSG_ERROR , "%s:%d Buffer is not long enough!\n" , __func__ , __LINE__ );
1351- return 1 ;
1352- }
1353-
1354- if (gbl_physrep_debug ) {
1355- physrep_logmsg (LOGMSG_USER , "%s:%d Executing: %s\n" , __func__ , __LINE__ , cmd );
1356- }
1357-
1358- rc = cdb2_run_statement (metadb , cmd );
1359- if (rc == CDB2_OK ) {
1360- while ((rc = cdb2_next_record (metadb )) == CDB2_OK ) {
1361- int64_t * minfile = (int64_t * )cdb2_column_value (metadb , 0 );
1362- physrep_min_logfile = minfile ? (unsigned int )* minfile : 0 ;
1363- }
1364- if (rc == CDB2_OK_DONE )
1365- rc = 0 ;
1366- } else {
1367- physrep_logmsg (LOGMSG_ERROR , "%s:%d Failed to execute (rc: %d)\n" , __func__ , __LINE__ , rc );
1368- }
1369-
1370- return rc ;
1371- }
1372-
1373- static int update_min_logfile (void )
1374- {
1375- cdb2_hndl_tp * metadb ;
1376- int rc , altcnt = gbl_altmetadb_count ;
1377-
1378- if ((rc = physrep_get_metadb_or_local_hndl (& metadb )) != 0 ) {
1379- logmsg (LOGMSG_ERROR , "%s: failed to get metadb handle rc=%d\n" , __func__ , rc );
1380- } else {
1381- update_min_logfile_int (metadb );
1382- cdb2_close (metadb );
1383- }
1384-
1385- for (int i = 0 ; i < altcnt ; i ++ ) {
1386- if ((rc = get_alt_metadb_hndl (& metadb , i )) != 0 ) {
1387- logmsg (LOGMSG_ERROR , "%s: failed to get alt metadb handle %d rc=%d\n" , __func__ , i , rc );
1388- continue ;
1389- }
1390- update_min_logfile_int (metadb );
1391- cdb2_close (metadb );
1392- }
1393- return 0 ;
1394- }
1395-
13961282/*
13971283 Check whether we need to wait for a connection from one of the nodes
13981284 in the source db.
@@ -1501,6 +1387,7 @@ static void *physrep_worker(void *args)
15011387 comdb2_name_thread (__func__ );
15021388
15031389 volatile int64_t gen , highest_gen = 0 ;
1390+ int do_revconn ;
15041391 int64_t first_lcgen = -1 ;
15051392 size_t sql_cmd_len = 150 ;
15061393 char sql_cmd [sql_cmd_len ];
@@ -1557,7 +1444,7 @@ static void *physrep_worker(void *args)
15571444 physrep_logmsg (LOGMSG_USER , "%s:%d Re-checking for reverse connection\n" , __func__ , __LINE__ );
15581445 }
15591446 if ((rc = physrep_get_metadb_or_local_hndl (& repl_metadb )) == 0 ) {
1560- int do_revconn = do_wait_for_reverse_conn (repl_metadb );
1447+ do_revconn = do_wait_for_reverse_conn (repl_metadb );
15611448 cdb2_close (repl_metadb );
15621449 if (gbl_physrep_debug ) {
15631450 physrep_logmsg (LOGMSG_USER , "%s:%d Reverse connection check: do-revcon=%d, is-revcon=%d\n" ,
@@ -1586,8 +1473,10 @@ static void *physrep_worker(void *args)
15861473 goto sleep_and_retry ;
15871474 }
15881475
1589- last_revconn_check = comdb2_time_epoch ();
1590- if (do_wait_for_reverse_conn (repl_metadb ) == 1 ) {
1476+ do_revconn = do_wait_for_reverse_conn (repl_metadb );
1477+
1478+ if (do_revconn == 1 ) {
1479+ last_revconn_check = comdb2_time_epoch ();
15911480 is_revconn = 1 ;
15921481 int wait_timeout_sec = 60 ;
15931482
@@ -1632,6 +1521,13 @@ static void *physrep_worker(void *args)
16321521
16331522 set_repl_db_connected (rev_conn_hndl -> remote_dbname , rev_conn_hndl -> remote_host );
16341523 } else {
1524+ /* Intentionally try to register if we fail to determine if reverse connection is needed */
1525+ if (do_revconn == 0 ) {
1526+ last_revconn_check = comdb2_time_epoch ();
1527+ } else {
1528+ physrep_logmsg (LOGMSG_DEBUG , "%s:%d do_revconn is %d, registering replicant\n" , __func__ , __LINE__ ,
1529+ do_revconn );
1530+ }
16351531 is_revconn = 0 ;
16361532 int notfound = 0 ;
16371533 while (stop_physrep_worker == 0 ) {
@@ -1650,6 +1546,12 @@ static void *physrep_worker(void *args)
16501546 physrep_logmsg (level , "%s:%d Failed to register against cluster, attempt %d\n" ,
16511547 __func__ , __LINE__ , notfound );
16521548
1549+ if (do_revconn == -1 ) {
1550+ cdb2_close (repl_metadb );
1551+ repl_metadb = NULL ;
1552+ goto sleep_and_retry ;
1553+ }
1554+
16531555 do {
16541556 cdb2_close (repl_metadb );
16551557 repl_metadb = NULL ;
@@ -1957,7 +1859,6 @@ static void *physrep_watcher(void *args) {
19571859 static int physrep_slow_replicant_last_checked ;
19581860 static int physrep_keepalive_last_sent ;
19591861 static int physrep_hung_replicant_last_checked ;
1960- static int physrep_minlog_last_checked ;
19611862
19621863 while (!gbl_exit && stop_physrep_watcher == 0 ) {
19631864 sleep (1 );
@@ -1986,20 +1887,12 @@ static void *physrep_watcher(void *args) {
19861887 }
19871888 // Common:
19881889 // 1) Send keepalives
1989- // 2) Update maximum log file number upto which it is safe to delete
19901890
19911891 // Periodically send keepalive to report its LSN.
19921892 if ((now - physrep_keepalive_last_sent ) >= gbl_physrep_keepalive_freq_sec ) {
19931893 send_keepalive ();
19941894 physrep_keepalive_last_sent = now ;
19951895 }
1996-
1997- // Update the 'minimum log file' marker upto which it is safe to
1998- // delete log files.
1999- if ((now - physrep_minlog_last_checked ) >= gbl_physrep_check_minlog_freq_sec ) {
2000- update_min_logfile ();
2001- physrep_minlog_last_checked = now ;
2002- }
20031896 }
20041897 return NULL ;
20051898}
0 commit comments