Skip to content

Commit fb0573c

Browse files
committed
Apply missing log records from dedicated thread
Signed-off-by: Mark Hannum <[email protected]>
1 parent 6889c92 commit fb0573c

33 files changed

+1144
-125
lines changed

bdb/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ add_library(bdb
3030
genid.c
3131
import.c
3232
info.c
33+
log_info.c
3334
lite.c
3435
ll.c
3536
llmeta.c
@@ -50,6 +51,7 @@ add_library(bdb
5051
rowlocks.c
5152
rowlocks_util.c
5253
serializable.c
54+
signallogfill.c
5355
summarize.c
5456
temphash.c
5557
temptable.c

bdb/bdb.c

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,8 +200,7 @@ int bdb_get_seqnum(bdb_state_type *bdb_state, seqnum_type *seqnum)
200200
return outrc;
201201
}
202202

203-
int bdb_get_lsn_node(bdb_state_type *bdb_state, char *host, int *logfile,
204-
int *offset)
203+
int bdb_get_lsn_node(bdb_state_type *bdb_state, char *host, uint32_t *logfile, uint32_t *offset)
205204
{
206205
struct interned_string *host_interned = intern_ptr(host);
207206
struct hostinfo *h = retrieve_hostinfo(host_interned);

bdb/bdb_api.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ enum {
105105
BDB_CALLBACK_SERIALCHECK,
106106
BDB_CALLBACK_ADMIN_APPSOCK,
107107
BDB_CALLBACK_SYNCMODE,
108-
BDB_CALLBACK_NODEUP_DRTEST
108+
BDB_CALLBACK_NODEUP_DRTEST,
109+
BDB_CALLBACK_SIGNAL_LOGFILL
109110
};
110111

111112
enum { BDB_REPFAIL_NET, BDB_REPFAIL_TIMEOUT, BDB_REPFAIL_RMTBDB };
@@ -410,6 +411,9 @@ typedef int (*SYNCMODE)(bdb_state_type *);
410411
/* Callback to dr-test aware rtcpu */
411412
typedef int (*NODEUP_DRTEST)(bdb_state_type *, const char *hode, int *isdrtest);
412413

414+
/* Callback to signal logfill */
415+
typedef int (*SIGNAL_LOGFILL)(bdb_state_type *);
416+
413417
typedef int (*BDB_CALLBACK_FP)();
414418
bdb_callback_type *bdb_callback_create(void);
415419
void bdb_callback_set(bdb_callback_type *bdb_callback, int callback_type,

bdb/bdb_int.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -696,6 +696,7 @@ struct bdb_callback_tag {
696696
SERIALCHECK serialcheck_rtn;
697697
SYNCMODE syncmode_rtn;
698698
NODEUP_DRTEST nodeup_drtest_rtn;
699+
SIGNAL_LOGFILL signal_logfill_rtn;
699700
};
700701

701702
struct waiting_for_lsn {

bdb/callback.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,10 @@ void bdb_callback_set(bdb_callback_type *bdb_callback, int callback_type,
111111
bdb_callback->nodeup_drtest_rtn = (NODEUP_DRTEST)callback_rtn;
112112
break;
113113

114+
case BDB_CALLBACK_SIGNAL_LOGFILL:
115+
bdb_callback->signal_logfill_rtn = (SIGNAL_LOGFILL)callback_rtn;
116+
break;
117+
114118
default:
115119
break;
116120
}

bdb/file.c

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
#include <dirent.h>
3636
#include <fcntl.h>
3737

38+
#include <signallogfill.h>
39+
3840
#ifdef __sun
3941
/* for PTHREAD_STACK_MIN on Solaris */
4042
#define __EXTENSIONS__
@@ -110,6 +112,7 @@
110112
#include <phys_rep_lsn.h>
111113

112114
#include <log_trigger.h>
115+
#include <sqllogfill.h>
113116

114117
extern int gbl_bdblock_debug;
115118
extern int gbl_keycompr;
@@ -2422,6 +2425,7 @@ int bdb_is_standalone(void *dbenv, void *in_bdb_state)
24222425
}
24232426

24242427
extern int gbl_commit_delay_trace;
2428+
extern int gbl_sql_logfill;
24252429
int gbl_skip_catchup_logic = 0;
24262430
int gbl_debug_downgrade_cluster_at_open = 0;
24272431

@@ -2676,7 +2680,7 @@ static DB_ENV *dbenv_open(bdb_state_type *bdb_state)
26762680
dbenv->set_rep_transport(dbenv, bdb_state->repinfo->myhost,
26772681
berkdb_send_rtn);
26782682
dbenv->set_rep_send_ack(dbenv, comdb2_early_ack);
2679-
2683+
dbenv->set_rep_signal_logfill(dbenv, comdb2_signal_logfill);
26802684
dbenv->set_check_standalone(dbenv, comdb2_is_standalone);
26812685
dbenv->set_truncate_sc_callback(dbenv, comdb2_reload_schemas);
26822686
dbenv->set_rep_truncate_callback(dbenv, comdb2_replicated_truncate);
@@ -3090,6 +3094,10 @@ if (!is_real_netinfo(bdb_state->repinfo->netinfo))
30903094

30913095
master_host = bdb_state->repinfo->master_host;
30923096

3097+
if (gbl_sql_logfill) {
3098+
create_sql_logfill_threads(bdb_state);
3099+
}
3100+
30933101
if ((master_host == db_eid_invalid) || (master_host == bdb_master_dupe))
30943102
goto waitformaster;
30953103

bdb/log_info.c

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
Copyright 2025 Bloomberg Finance L.P.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
#include <build/db.h>
17+
#include <bdb_int.h>
18+
#include <log_info.h>
19+
#include <strings.h>
20+
#include <logmsg.h>
21+
#include <string.h>
22+
23+
static LOG_INFO get_lsn_internal(bdb_state_type *bdb_state, int flags)
24+
{
25+
int rc;
26+
27+
/* get db internals */
28+
DB_LOGC *logc;
29+
DBT logrec;
30+
DB_LSN log_lsn;
31+
LOG_INFO log_info = {0};
32+
33+
rc = bdb_state->dbenv->log_cursor(bdb_state->dbenv, &logc, 0);
34+
if (rc) {
35+
logmsg(LOGMSG_ERROR, "%s: Can't get log cursor rc %d\n", __func__, rc);
36+
return log_info;
37+
}
38+
bzero(&logrec, sizeof(DBT));
39+
logrec.flags = DB_DBT_MALLOC;
40+
rc = logc->get(logc, &log_lsn, &logrec, flags);
41+
if (rc) {
42+
logmsg(LOGMSG_ERROR, "%s: Can't get last log record rc %d\n", __func__, rc);
43+
logc->close(logc, 0);
44+
return log_info;
45+
}
46+
47+
log_info.file = log_lsn.file;
48+
log_info.offset = log_lsn.offset;
49+
log_info.size = logrec.size;
50+
51+
if (logrec.data)
52+
free(logrec.data);
53+
54+
logc->close(logc, 0);
55+
56+
return log_info;
57+
}
58+
59+
LOG_INFO get_last_lsn(bdb_state_type *bdb_state)
60+
{
61+
return get_lsn_internal(bdb_state, DB_LAST);
62+
}
63+
64+
LOG_INFO get_first_lsn(bdb_state_type *bdb_state)
65+
{
66+
return get_lsn_internal(bdb_state, DB_FIRST);
67+
}
68+
69+
uint32_t get_next_offset(DB_ENV *dbenv, LOG_INFO log_info)
70+
{
71+
return log_info.offset + log_info.size + dbenv->get_log_header_size(dbenv);
72+
}
73+
74+
int log_info_compare(LOG_INFO *a, LOG_INFO *b)
75+
{
76+
if (a->file < b->file)
77+
return -1;
78+
if (a->file > b->file)
79+
return 1;
80+
if (a->offset < b->offset)
81+
return -1;
82+
if (a->offset > b->offset)
83+
return 1;
84+
return 0;
85+
}

bdb/log_info.h

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
Copyright 2025 Bloomberg Finance L.P.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
#ifndef LOG_INFO_H
18+
#define LOG_INFO_H
19+
20+
struct __db_env;
21+
struct bdb_state_tag;
22+
23+
#include <stdint.h>
24+
25+
typedef struct LOG_INFO LOG_INFO;
26+
27+
struct LOG_INFO {
28+
uint32_t file;
29+
uint32_t offset;
30+
uint32_t size;
31+
uint32_t gen;
32+
};
33+
34+
LOG_INFO get_last_lsn(struct bdb_state_tag *);
35+
LOG_INFO get_first_lsn(struct bdb_state_tag *);
36+
uint32_t get_next_offset(struct __db_env *, LOG_INFO log_info);
37+
int log_info_compare(LOG_INFO *a, LOG_INFO *b);
38+
39+
#endif

bdb/phys_rep_lsn.c

Lines changed: 0 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -29,58 +29,6 @@ extern int gbl_physrep_debug;
2929
int gbl_physrep_exit_on_invalid_logstream = 0;
3030
int gbl_physrep_ignore_queues = 1;
3131

32-
static LOG_INFO get_lsn_internal(bdb_state_type *bdb_state, int flags)
33-
{
34-
int rc;
35-
36-
/* get db internals */
37-
DB_LOGC *logc;
38-
DBT logrec;
39-
DB_LSN log_lsn;
40-
LOG_INFO log_info = {0};
41-
42-
rc = bdb_state->dbenv->log_cursor(bdb_state->dbenv, &logc, 0);
43-
if (rc) {
44-
physrep_logmsg(LOGMSG_ERROR, "%s: Can't get log cursor rc %d\n", __func__, rc);
45-
return log_info;
46-
}
47-
bzero(&logrec, sizeof(DBT));
48-
logrec.flags = DB_DBT_MALLOC;
49-
rc = logc->get(logc, &log_lsn, &logrec, flags);
50-
if (rc) {
51-
physrep_logmsg(LOGMSG_ERROR, "%s: Can't get last log record rc %d\n", __func__,
52-
rc);
53-
logc->close(logc, 0);
54-
return log_info;
55-
}
56-
#if 0
57-
if (gbl_physrep_debug)
58-
physrep_logmsg(LOGMSG_USER, "%s: LSN %u:%u\n", __func__, log_lsn.file,
59-
log_lsn.offset);
60-
#endif
61-
62-
log_info.file = log_lsn.file;
63-
log_info.offset = log_lsn.offset;
64-
log_info.size = logrec.size;
65-
66-
if (logrec.data)
67-
free(logrec.data);
68-
69-
logc->close(logc, 0);
70-
71-
return log_info;
72-
}
73-
74-
LOG_INFO get_last_lsn(bdb_state_type *bdb_state)
75-
{
76-
return get_lsn_internal(bdb_state, DB_LAST);
77-
}
78-
79-
LOG_INFO get_first_lsn(bdb_state_type *bdb_state)
80-
{
81-
return get_lsn_internal(bdb_state, DB_FIRST);
82-
}
83-
8432
int compare_log(DBT *logrec, void *blob, unsigned int blob_len)
8533
{
8634
int rc;
@@ -252,11 +200,6 @@ static int get_next_matchable(DB_LOGC *logc, LOG_INFO *info, int check_current,
252200

253201
/* generator code */
254202

255-
uint32_t get_next_offset(DB_ENV *dbenv, LOG_INFO log_info)
256-
{
257-
return log_info.offset + log_info.size + dbenv->get_log_header_size(dbenv);
258-
}
259-
260203
int apply_log(bdb_state_type *bdb_state, unsigned int file, unsigned int offset, int64_t rectype, void *blob,
261204
int blob_len)
262205
{

bdb/phys_rep_lsn.h

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,11 @@
44
#include <stdint.h>
55
#include <time.h>
66

7-
typedef struct LOG_INFO LOG_INFO;
8-
struct LOG_INFO {
9-
uint32_t file;
10-
uint32_t offset;
11-
uint32_t size;
12-
uint32_t gen;
13-
};
14-
157
struct __db_env;
168
struct bdb_state_tag;
179

10+
#include <log_info.h>
11+
1812
typedef unsigned char u_int8_t;
1913

2014
/* Mark this table to be ignored (not replicated to) */
@@ -32,9 +26,6 @@ int physrep_ignore_table_count(void);
3226
/* List ignored tables */
3327
int physrep_list_ignored_tables(void);
3428

35-
LOG_INFO get_last_lsn(struct bdb_state_tag *);
36-
LOG_INFO get_first_lsn(struct bdb_state_tag *);
37-
uint32_t get_next_offset(struct __db_env *, LOG_INFO log_info);
3829
int apply_log(struct bdb_state_tag *, unsigned int file, unsigned int offset, int64_t rectype, void *blob,
3930
int blob_len);
4031
int truncate_log_lock(struct bdb_state_tag *, unsigned int file,

0 commit comments

Comments
 (0)