Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ add_library(bdb
genid.c
import.c
info.c
log_info.c
lite.c
ll.c
llmeta.c
Expand All @@ -50,6 +51,7 @@ add_library(bdb
rowlocks.c
rowlocks_util.c
serializable.c
signallogfill.c
summarize.c
temphash.c
temptable.c
Expand Down
3 changes: 1 addition & 2 deletions bdb/bdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,7 @@ int bdb_get_seqnum(bdb_state_type *bdb_state, seqnum_type *seqnum)
return outrc;
}

int bdb_get_lsn_node(bdb_state_type *bdb_state, char *host, int *logfile,
int *offset)
int bdb_get_lsn_node(bdb_state_type *bdb_state, char *host, uint32_t *logfile, uint32_t *offset)
{
struct interned_string *host_interned = intern_ptr(host);
struct hostinfo *h = retrieve_hostinfo(host_interned);
Expand Down
6 changes: 5 additions & 1 deletion bdb/bdb_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ enum {
BDB_CALLBACK_SERIALCHECK,
BDB_CALLBACK_ADMIN_APPSOCK,
BDB_CALLBACK_SYNCMODE,
BDB_CALLBACK_NODEUP_DRTEST
BDB_CALLBACK_NODEUP_DRTEST,
BDB_CALLBACK_SIGNAL_LOGFILL
};

enum { BDB_REPFAIL_NET, BDB_REPFAIL_TIMEOUT, BDB_REPFAIL_RMTBDB };
Expand Down Expand Up @@ -410,6 +411,9 @@ typedef int (*SYNCMODE)(bdb_state_type *);
/* Callback to dr-test aware rtcpu */
typedef int (*NODEUP_DRTEST)(bdb_state_type *, const char *hode, int *isdrtest);

/* Callback to signal logfill */
typedef int (*SIGNAL_LOGFILL)(bdb_state_type *);

typedef int (*BDB_CALLBACK_FP)();
bdb_callback_type *bdb_callback_create(void);
void bdb_callback_set(bdb_callback_type *bdb_callback, int callback_type,
Expand Down
1 change: 1 addition & 0 deletions bdb/bdb_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,7 @@ struct bdb_callback_tag {
SERIALCHECK serialcheck_rtn;
SYNCMODE syncmode_rtn;
NODEUP_DRTEST nodeup_drtest_rtn;
SIGNAL_LOGFILL signal_logfill_rtn;
};

struct waiting_for_lsn {
Expand Down
4 changes: 4 additions & 0 deletions bdb/callback.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ void bdb_callback_set(bdb_callback_type *bdb_callback, int callback_type,
bdb_callback->nodeup_drtest_rtn = (NODEUP_DRTEST)callback_rtn;
break;

case BDB_CALLBACK_SIGNAL_LOGFILL:
bdb_callback->signal_logfill_rtn = (SIGNAL_LOGFILL)callback_rtn;
break;

default:
break;
}
Expand Down
10 changes: 9 additions & 1 deletion bdb/file.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
#include <dirent.h>
#include <fcntl.h>

#include <signallogfill.h>

#ifdef __sun
/* for PTHREAD_STACK_MIN on Solaris */
#define __EXTENSIONS__
Expand Down Expand Up @@ -110,6 +112,7 @@
#include <phys_rep_lsn.h>

#include <log_trigger.h>
#include <sqllogfill.h>

extern int gbl_bdblock_debug;
extern int gbl_keycompr;
Expand Down Expand Up @@ -2422,6 +2425,7 @@ int bdb_is_standalone(void *dbenv, void *in_bdb_state)
}

extern int gbl_commit_delay_trace;
extern int gbl_sql_logfill;
int gbl_skip_catchup_logic = 0;
int gbl_debug_downgrade_cluster_at_open = 0;

Expand Down Expand Up @@ -2676,7 +2680,7 @@ static DB_ENV *dbenv_open(bdb_state_type *bdb_state)
dbenv->set_rep_transport(dbenv, bdb_state->repinfo->myhost,
berkdb_send_rtn);
dbenv->set_rep_send_ack(dbenv, comdb2_early_ack);

dbenv->set_rep_signal_logfill(dbenv, comdb2_signal_logfill);
dbenv->set_check_standalone(dbenv, comdb2_is_standalone);
dbenv->set_truncate_sc_callback(dbenv, comdb2_reload_schemas);
dbenv->set_rep_truncate_callback(dbenv, comdb2_replicated_truncate);
Expand Down Expand Up @@ -3090,6 +3094,10 @@ if (!is_real_netinfo(bdb_state->repinfo->netinfo))

master_host = bdb_state->repinfo->master_host;

if (!gbl_exit && gbl_sql_logfill) {
create_sql_logfill_threads(bdb_state);
}

if ((master_host == db_eid_invalid) || (master_host == bdb_master_dupe))
goto waitformaster;

Expand Down
85 changes: 85 additions & 0 deletions bdb/log_info.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
Copyright 2025 Bloomberg Finance L.P.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include <build/db.h>
#include <bdb_int.h>
#include <log_info.h>
#include <strings.h>
#include <logmsg.h>
#include <string.h>

static LOG_INFO get_lsn_internal(bdb_state_type *bdb_state, int flags)
{
int rc;

/* get db internals */
DB_LOGC *logc;
DBT logrec;
DB_LSN log_lsn;
LOG_INFO log_info = {0};

rc = bdb_state->dbenv->log_cursor(bdb_state->dbenv, &logc, 0);
if (rc) {
logmsg(LOGMSG_ERROR, "%s: Can't get log cursor rc %d\n", __func__, rc);
return log_info;
}
bzero(&logrec, sizeof(DBT));
logrec.flags = DB_DBT_MALLOC;
rc = logc->get(logc, &log_lsn, &logrec, flags);
if (rc) {
logmsg(LOGMSG_ERROR, "%s: Can't get last log record rc %d\n", __func__, rc);
logc->close(logc, 0);
return log_info;
}

log_info.file = log_lsn.file;
log_info.offset = log_lsn.offset;
log_info.size = logrec.size;

if (logrec.data)
free(logrec.data);

logc->close(logc, 0);

return log_info;
}

LOG_INFO get_last_lsn(bdb_state_type *bdb_state)
{
return get_lsn_internal(bdb_state, DB_LAST);
}

LOG_INFO get_first_lsn(bdb_state_type *bdb_state)
{
return get_lsn_internal(bdb_state, DB_FIRST);
}

uint32_t get_next_offset(DB_ENV *dbenv, LOG_INFO log_info)
{
return log_info.offset + log_info.size + dbenv->get_log_header_size(dbenv);
}

int log_info_compare(LOG_INFO *a, LOG_INFO *b)
{
if (a->file < b->file)
return -1;
if (a->file > b->file)
return 1;
if (a->offset < b->offset)
return -1;
if (a->offset > b->offset)
return 1;
return 0;
}
39 changes: 39 additions & 0 deletions bdb/log_info.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
Copyright 2025 Bloomberg Finance L.P.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#ifndef LOG_INFO_H
#define LOG_INFO_H

struct __db_env;
struct bdb_state_tag;

#include <stdint.h>

typedef struct LOG_INFO LOG_INFO;

struct LOG_INFO {
uint32_t file;
uint32_t offset;
uint32_t size;
uint32_t gen;
};

LOG_INFO get_last_lsn(struct bdb_state_tag *);
LOG_INFO get_first_lsn(struct bdb_state_tag *);
uint32_t get_next_offset(struct __db_env *, LOG_INFO log_info);
int log_info_compare(LOG_INFO *a, LOG_INFO *b);

#endif
57 changes: 0 additions & 57 deletions bdb/phys_rep_lsn.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,58 +29,6 @@ extern int gbl_physrep_debug;
int gbl_physrep_exit_on_invalid_logstream = 0;
int gbl_physrep_ignore_queues = 1;

static LOG_INFO get_lsn_internal(bdb_state_type *bdb_state, int flags)
{
int rc;

/* get db internals */
DB_LOGC *logc;
DBT logrec;
DB_LSN log_lsn;
LOG_INFO log_info = {0};

rc = bdb_state->dbenv->log_cursor(bdb_state->dbenv, &logc, 0);
if (rc) {
physrep_logmsg(LOGMSG_ERROR, "%s: Can't get log cursor rc %d\n", __func__, rc);
return log_info;
}
bzero(&logrec, sizeof(DBT));
logrec.flags = DB_DBT_MALLOC;
rc = logc->get(logc, &log_lsn, &logrec, flags);
if (rc) {
physrep_logmsg(LOGMSG_ERROR, "%s: Can't get last log record rc %d\n", __func__,
rc);
logc->close(logc, 0);
return log_info;
}
#if 0
if (gbl_physrep_debug)
physrep_logmsg(LOGMSG_USER, "%s: LSN %u:%u\n", __func__, log_lsn.file,
log_lsn.offset);
#endif

log_info.file = log_lsn.file;
log_info.offset = log_lsn.offset;
log_info.size = logrec.size;

if (logrec.data)
free(logrec.data);

logc->close(logc, 0);

return log_info;
}

LOG_INFO get_last_lsn(bdb_state_type *bdb_state)
{
return get_lsn_internal(bdb_state, DB_LAST);
}

LOG_INFO get_first_lsn(bdb_state_type *bdb_state)
{
return get_lsn_internal(bdb_state, DB_FIRST);
}

int compare_log(DBT *logrec, void *blob, unsigned int blob_len)
{
int rc;
Expand Down Expand Up @@ -252,11 +200,6 @@ static int get_next_matchable(DB_LOGC *logc, LOG_INFO *info, int check_current,

/* generator code */

uint32_t get_next_offset(DB_ENV *dbenv, LOG_INFO log_info)
{
return log_info.offset + log_info.size + dbenv->get_log_header_size(dbenv);
}

int apply_log(bdb_state_type *bdb_state, unsigned int file, unsigned int offset, int64_t rectype, void *blob,
int blob_len)
{
Expand Down
13 changes: 2 additions & 11 deletions bdb/phys_rep_lsn.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,11 @@
#include <stdint.h>
#include <time.h>

typedef struct LOG_INFO LOG_INFO;
struct LOG_INFO {
uint32_t file;
uint32_t offset;
uint32_t size;
uint32_t gen;
};

struct __db_env;
struct bdb_state_tag;

#include <log_info.h>

typedef unsigned char u_int8_t;

/* Mark this table to be ignored (not replicated to) */
Expand All @@ -32,9 +26,6 @@ int physrep_ignore_table_count(void);
/* List ignored tables */
int physrep_list_ignored_tables(void);

LOG_INFO get_last_lsn(struct bdb_state_tag *);
LOG_INFO get_first_lsn(struct bdb_state_tag *);
uint32_t get_next_offset(struct __db_env *, LOG_INFO log_info);
int apply_log(struct bdb_state_tag *, unsigned int file, unsigned int offset, int64_t rectype, void *blob,
int blob_len);
int truncate_log_lock(struct bdb_state_tag *, unsigned int file,
Expand Down
10 changes: 10 additions & 0 deletions bdb/rep.c
Original file line number Diff line number Diff line change
Expand Up @@ -3852,6 +3852,8 @@ static void rem_rep_mon(struct rep_mon *rm)
Pthread_mutex_unlock(&rep_mon_lk);
}

extern __thread char *rep_apply_caller;

#if defined _LINUX_SOURCE && !defined __APPLE__
static __thread pid_t process_berkdb_tid = 0;
#endif
Expand Down Expand Up @@ -3965,9 +3967,11 @@ static int process_berkdb(bdb_state_type *bdb_state, char *host, DBT *control, D
logmsg(LOGMSG_INFO, "%s:%d dropping message!\n", __func__, __LINE__);
r = 0;
} else {
rep_apply_caller = "replication";
r = bdb_state->dbenv->rep_process_message(bdb_state->dbenv, control, rec,
&host, &permlsn,
&commit_generation, &newgen, &newmaster, online);
rep_apply_caller = NULL;
}

if (got_vote2lock) {
Expand Down Expand Up @@ -5210,6 +5214,12 @@ static int berkdb_receive_rtn_int(void *ack_handle, void *usr_ptr,
recbufsz = p_rep_type_berkdb_rep_buf_hdr.recbufsz;
recbufcrc = p_rep_type_berkdb_rep_buf_hdr.recbufcrc;

if (recbufsz < 0 || recbufsz > dtalen) {
logmsg(LOGMSG_ERROR, "invalid recbufsz %d\n", recbufsz);
logmsg(LOGMSG_ERROR, "%p %p %d\n", p_buf, dta, dtalen);
return -1;
}

recbuf = (char *)p_buf;
p_buf += recbufsz;

Expand Down
26 changes: 26 additions & 0 deletions bdb/signallogfill.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
Copyright 2025 Bloomberg Finance L.P.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include <bdb_int.h>
#include <signallogfill.h>

void comdb2_signal_logfill(DB_ENV *dbenv, DB_LSN *wlsn)
{
bdb_state_type *bdb_state = dbenv->app_private;
if (bdb_state->callback->signal_logfill_rtn) {
bdb_state->callback->signal_logfill_rtn(bdb_state);
}
return;
}
Loading