Skip to content

Commit 3c3a196

Browse files
ChiliPaneerVenkat Pamulapatihpatro
authored
Perform data cleanup during RDB load on successful version/signature validation (#2600)
Addresses: #2588 ## Overview Previously we call `emptyData()` during a fullSync before validating the RDB version is compatible. This change adds an rdb flag that allows us to flush the database from within `rdbLoadRioWithLoadingCtx`. THhis provides the option to only flush the data if the rdb has a valid version and signature. In the case where we do have an invalid version and signature, we don't emptyData, so if a full sync fails for that reason a replica can still serve stale data instead of clients experiencing cache misses. ## Changes - Added a new flag `RDBFLAGS_EMPTY_DATA` that signals to flush the database after rdb validation - Added logic to call `emptyData` in `rdbLoadRioWithLoadingCtx` in `rdb.c` - Added logic to not clear data if the RDB validation fails in `replication.c` using new return type `RDB_INCOMPATIBLE` - Modified the signature of `rdbLoadRioWithLoadingCtx` to return RDB success codes and updated all calling sites. ## Testing Added a tcl test that uses the debug command `reload nosave` to load from an RDB that has a future version number. This triggers the same code path that full sync's will use, and verifies that we don't flush the data until after the validation is complete. A test already exists that checks that the data is flushed: https://github.com/valkey-io/valkey/blob/unstable/tests/integration/replication.tcl#L1504 --------- Signed-off-by: Venkat Pamulapati <[email protected]> Signed-off-by: Venkat Pamulapati <[email protected]> Co-authored-by: Venkat Pamulapati <[email protected]> Co-authored-by: Harkrishn Patro <[email protected]>
1 parent 5789266 commit 3c3a196

File tree

8 files changed

+97
-44
lines changed

8 files changed

+97
-44
lines changed

src/aof.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1459,7 +1459,7 @@ int loadSingleAppendOnlyFile(char *filename) {
14591459
if (server.repl_backlog == NULL) createReplicationBacklog();
14601460
rdb_flags |= RDBFLAGS_FEED_REPL;
14611461
}
1462-
if (rdbLoadRio(&rdb, rdb_flags, &rsi) != C_OK) {
1462+
if (rdbLoadRio(&rdb, rdb_flags, &rsi) != RDB_OK) {
14631463
if (old_style)
14641464
serverLog(LL_WARNING, "Error reading the RDB preamble of the AOF file %s, AOF loading aborted",
14651465
filename);

src/debug.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -586,7 +586,7 @@ void debugCommand(client *c) {
586586
/* The default behavior is to remove the current dataset from
587587
* memory before loading the RDB file, however when MERGE is
588588
* used together with NOFLUSH, we are able to merge two datasets. */
589-
if (flush) emptyData(-1, EMPTYDB_NO_FLAGS, NULL);
589+
if (flush) flags |= RDBFLAGS_EMPTY_DATA;
590590

591591
protectClient(c);
592592
int ret = rdbLoad(server.rdb_filename, NULL, flags);

src/module.c

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13469,16 +13469,14 @@ int VM_RdbLoad(ValkeyModuleCtx *ctx, ValkeyModuleRdbStream *stream, int flags) {
1346913469
* will prevent COW memory issue. */
1347013470
if (server.child_type == CHILD_TYPE_SLOT_MIGRATION) killSlotMigrationChild();
1347113471

13472-
emptyData(-1, EMPTYDB_NO_FLAGS, NULL);
13473-
1347413472
/* rdbLoad() can go back to the networking and process network events. If
1347513473
* VM_RdbLoad() is called inside a command callback, we don't want to
1347613474
* process the current client. Otherwise, we may free the client or try to
1347713475
* process next message while we are already in the command callback. */
1347813476
if (server.current_client) protectClient(server.current_client);
1347913477

1348013478
serverAssert(stream->type == VALKEYMODULE_RDB_STREAM_FILE);
13481-
int ret = rdbLoad(stream->data.filename, NULL, RDBFLAGS_NONE);
13479+
int ret = rdbLoad(stream->data.filename, NULL, RDBFLAGS_EMPTY_DATA);
1348213480

1348313481
if (server.current_client) unprotectClient(server.current_client);
1348413482

src/rdb.c

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ extern int rdbCheckMode;
7575
void rdbCheckError(const char *fmt, ...);
7676
void rdbCheckSetError(const char *fmt, ...);
7777
int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx);
78+
void replicationEmptyDbCallback(hashtable *ht);
7879

7980
/* Returns true if the RDB version is valid and accepted, false otherwise. This
8081
* function takes configuration into account. The parameter `is_valkey_magic`
@@ -3082,8 +3083,10 @@ int rdbLoadRioWithLoadingCtxScopedRdb(rio *rdb, int rdbflags, rdbSaveInfo *rsi,
30823083
return retval;
30833084
}
30843085

3085-
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
3086-
* otherwise C_ERR is returned.
3086+
/* Load an RDB file from the rio stream 'rdb'. We return one of the following:
3087+
* - RDB_OK On success
3088+
* - RDB_INCOMPATIBLE If the RDB has an invalid signature or version
3089+
* - RDB_FAILED in all other failure cases
30873090
* The rdb_loading_ctx argument holds objects to which the rdb will be loaded to,
30883091
* currently it only allow to set db object and functionLibCtx to which the data
30893092
* will be loaded (in the future it might contains more such objects). */
@@ -3092,10 +3095,6 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
30923095
int type, rdbver;
30933096
uint64_t db_size = 0, expires_size = 0;
30943097
int should_expand_db = 0;
3095-
if (rdb_loading_ctx->dbarray[0] == NULL) {
3096-
rdb_loading_ctx->dbarray[0] = createDatabase(0);
3097-
}
3098-
serverDb *db = rdb_loading_ctx->dbarray[0];
30993098
char buf[1024];
31003099
int error;
31013100
long long empty_keys_skipped = 0;
@@ -3111,14 +3110,30 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
31113110
is_valkey_magic = true;
31123111
} else {
31133112
serverLog(LL_WARNING, "Wrong signature trying to load DB from file: %.9s", buf);
3114-
return C_ERR;
3113+
/* Signal to terminate the rdbLoad without clearing existing data */
3114+
return RDB_INCOMPATIBLE;
31153115
}
31163116
rdbver = atoi(buf + 6);
31173117
if (!rdbIsVersionAccepted(rdbver, is_valkey_magic, is_redis_magic)) {
31183118
serverLog(LL_WARNING, "Can't handle RDB format version %d", rdbver);
3119-
return C_ERR;
3119+
return RDB_INCOMPATIBLE;
3120+
}
3121+
3122+
/* Only empty data if RDBFLAGS_EMPTY_DATA is set */
3123+
if (rdbflags & RDBFLAGS_EMPTY_DATA) {
3124+
int empty_db_flags = server.repl_replica_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS;
3125+
serverLog(LL_NOTICE, "RDB signature and version check passed. Flushing old data");
3126+
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
3127+
3128+
/* functionsLibCtx is cleared when we call emptyData, reinitialize here. */
3129+
rdb_loading_ctx->functions_lib_ctx = functionsLibCtxGetCurrent();
31203130
}
31213131

3132+
if (rdb_loading_ctx->dbarray[0] == NULL) {
3133+
rdb_loading_ctx->dbarray[0] = createDatabase(0);
3134+
}
3135+
serverDb *db = rdb_loading_ctx->dbarray[0];
3136+
31223137
/* Key-specific attributes, set by opcodes before the key type. */
31233138
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
31243139
long long lru_clock = LRU_CLOCK();
@@ -3134,7 +3149,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
31343149
if (is_redis_magic && type >= RDB_FOREIGN_TYPE_MIN && type <= RDB_FOREIGN_TYPE_MAX) {
31353150
serverLog(LL_WARNING, "Can't handle foreign type or opcode %d in RDB with version %d",
31363151
type, rdbver);
3137-
return C_ERR;
3152+
return RDB_FAILED;
31383153
}
31393154

31403155
/* Handle special types. */
@@ -3418,7 +3433,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
34183433
} else if (error == RDB_LOAD_ERR_UNKNOWN_TYPE) {
34193434
sdsfree(key);
34203435
serverLog(LL_WARNING, "Unknown type or opcode when loading DB. Unrecoverable error, aborting now.");
3421-
return C_ERR;
3436+
return RDB_FAILED;
34223437
} else {
34233438
sdsfree(key);
34243439
goto eoferr;
@@ -3502,7 +3517,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
35023517
"got (%llx). Aborting now.",
35033518
(unsigned long long)expected, (unsigned long long)cksum);
35043519
rdbReportCorruptRDB("RDB CRC error");
3505-
return C_ERR;
3520+
return RDB_FAILED;
35063521
}
35073522
}
35083523
}
@@ -3514,7 +3529,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
35143529
serverLog(LL_NOTICE, "Done loading RDB, keys loaded: %lld, keys expired: %lld.",
35153530
server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired);
35163531
}
3517-
return C_OK;
3532+
return RDB_OK;
35183533

35193534
/* Unexpected end of file is handled here calling rdbReportReadError():
35203535
* this will in turn either abort the server in most cases, or if we are loading
@@ -3523,7 +3538,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
35233538
eoferr:
35243539
serverLog(LL_WARNING, "Short read or OOM loading DB. Unrecoverable error, aborting now.");
35253540
rdbReportReadError("Unexpected EOF reading RDB file");
3526-
return C_ERR;
3541+
return RDB_FAILED;
35273542
}
35283543

35293544
/* Like rdbLoadRio() but takes a filename instead of a rio stream. The
@@ -3556,14 +3571,15 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) {
35563571
retval = rdbLoadRio(&rdb, rdbflags, rsi);
35573572

35583573
fclose(fp);
3559-
stopLoading(retval == C_OK);
3574+
stopLoading(retval == RDB_OK);
35603575
/* Reclaim the cache backed by rdb */
3561-
if (retval == C_OK && !(rdbflags & RDBFLAGS_KEEP_CACHE)) {
3576+
if (retval == RDB_OK && !(rdbflags & RDBFLAGS_KEEP_CACHE)) {
35623577
/* TODO: maybe we could combine the fopen and open into one in the future */
35633578
rdb_fd = open(filename, O_RDONLY);
35643579
if (rdb_fd >= 0) bioCreateCloseJob(rdb_fd, 0, 1);
35653580
}
3566-
return (retval == C_OK) ? RDB_OK : RDB_FAILED;
3581+
3582+
return retval;
35673583
}
35683584

35693585
/* A background saving child (BGSAVE) terminated its work. Handle this.

src/rdb.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ enum RdbType {
166166
#define RDBFLAGS_ALLOW_DUP (1 << 2) /* Allow duplicated keys when loading.*/
167167
#define RDBFLAGS_FEED_REPL (1 << 3) /* Feed replication stream when loading.*/
168168
#define RDBFLAGS_KEEP_CACHE (1 << 4) /* Don't reclaim cache after rdb file is generated */
169+
#define RDBFLAGS_EMPTY_DATA (1 << 5) /* Flush the database after validating magic and rdb version*/
169170

170171
/* When rdbLoadObject() returns NULL, the err flag is
171172
* set to hold the type of error that occurred */
@@ -213,5 +214,6 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
213214
ssize_t rdbSaveFunctions(rio *rdb);
214215
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi);
215216
int rdbRestoreOffsetFromSaveInfo(rdbSaveInfo *rsi, bool is_aof_preamble);
217+
void replicationEmptyDbCallback(hashtable *ht);
216218

217219
#endif

src/replication.c

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2069,8 +2069,8 @@ void replicationSendNewlineToPrimary(void) {
20692069
/* Callback used by emptyData() while flushing away old data to load
20702070
* the new dataset received by the primary and by discardTempDb()
20712071
* after loading succeeded or failed. */
2072-
void replicationEmptyDbCallback(hashtable *d) {
2073-
UNUSED(d);
2072+
void replicationEmptyDbCallback(hashtable *ht) {
2073+
UNUSED(ht);
20742074
if (server.repl_state == REPL_STATE_TRANSFER) replicationSendNewlineToPrimary();
20752075
}
20762076

@@ -2366,11 +2366,6 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark,
23662366
/* We will soon start loading the RDB from socket, the replication history is changed,
23672367
* we must discard the cached primary structure and force resync of sub-replicas. */
23682368
replicationAttachToNewPrimary();
2369-
2370-
/* Even though we are on-empty-db and the database is empty, we still call emptyData. */
2371-
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data");
2372-
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
2373-
23742369
dbarray = server.db;
23752370
functions_lib_ctx = functionsLibCtxGetCurrent();
23762371
}
@@ -2387,7 +2382,11 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark,
23872382
if (replicationSupportSkipRDBChecksum(conn, 1, *usemark)) rdb.flags |= RIO_FLAG_SKIP_RDB_CHECKSUM;
23882383
int loadingFailed = 0;
23892384
rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx};
2390-
if (rdbLoadRioWithLoadingCtxScopedRdb(&rdb, RDBFLAGS_REPLICATION, rsi, &loadingCtx) != C_OK) {
2385+
/* If we aren't using the swapdb method, then we want to empty the data before loading the rdb */
2386+
int flags = RDBFLAGS_REPLICATION;
2387+
if (server.repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB) flags |= RDBFLAGS_EMPTY_DATA;
2388+
int retval = rdbLoadRioWithLoadingCtxScopedRdb(&rdb, flags, rsi, &loadingCtx);
2389+
if (retval != RDB_OK) {
23912390
/* RDB loading failed. */
23922391
serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization DB "
23932392
"from socket, check server logs.");
@@ -2413,9 +2412,14 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark,
24132412
disklessLoadDiscardFunctionsLibCtx(temp_functions_lib_ctx);
24142413
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding temporary DB in background");
24152414
} else {
2416-
/* Remove the half-loaded data in case we started with an empty replica. */
2417-
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data");
2418-
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
2415+
/* If we received RDB_INCOMPATIBLE, the old data was preserved */
2416+
if (retval == RDB_INCOMPATIBLE) {
2417+
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: RDB version or signature incompatible, old data preserved");
2418+
} else {
2419+
/* Remove the half-loaded data in case the load failed for other reasons. */
2420+
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data");
2421+
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
2422+
}
24192423
}
24202424

24212425
/* Note that there's no point in restarting the AOF on SYNC
@@ -2496,14 +2500,12 @@ int replicaLoadPrimaryRDBFromDisk(rdbSaveInfo *rsi) {
24962500
* we must discard the cached primary structure and force resync of sub-replicas. */
24972501
replicationAttachToNewPrimary();
24982502

2499-
/* Empty the databases only after the RDB file is ok, that is, before the RDB file
2500-
* is actually loaded, in case we encounter an error and drop the replication stream
2501-
* and leave an empty database. */
2502-
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data");
2503-
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
2504-
2503+
/* We pass RDBFLAGS_EMPTY_DATA to call emptyData() after validating rdb compatibility
2504+
* and before loading the data from the RDB */
25052505
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
2506-
if (rdbLoad(server.rdb_filename, rsi, RDBFLAGS_REPLICATION) != RDB_OK) {
2506+
int retval = rdbLoad(server.rdb_filename, rsi, RDBFLAGS_REPLICATION | RDBFLAGS_EMPTY_DATA);
2507+
2508+
if (retval != RDB_OK) {
25072509
serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization "
25082510
"DB from disk, check server logs.");
25092511
if (server.rdb_del_sync_files && allPersistenceDisabled()) {
@@ -2513,9 +2515,15 @@ int replicaLoadPrimaryRDBFromDisk(rdbSaveInfo *rsi) {
25132515
bg_unlink(server.rdb_filename);
25142516
}
25152517

2516-
/* If disk-based RDB loading fails, remove the half-loaded dataset. */
2517-
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data");
2518-
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
2518+
/* If RDB failed compatibility check, we did not load the new data set or flush our old data. */
2519+
if (retval == RDB_INCOMPATIBLE) {
2520+
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Skipping flush, no new data was loaded.");
2521+
} else {
2522+
/* If disk-based RDB loading fails, remove the half-loaded dataset. */
2523+
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data");
2524+
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
2525+
}
2526+
25192527

25202528
/* Note that there's no point in restarting the AOF on sync failure,
25212529
* it'll be restarted when sync succeeds or replica promoted. */

src/server.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,8 +347,9 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
347347

348348
/* RDB return values for rdbLoad. */
349349
#define RDB_OK 0
350-
#define RDB_NOT_EXIST 1 /* RDB file doesn't exist. */
351-
#define RDB_FAILED 2 /* Failed to load the RDB file. */
350+
#define RDB_NOT_EXIST 1 /* RDB file doesn't exist. */
351+
#define RDB_INCOMPATIBLE 2 /* RDB version or signature is not compatible */
352+
#define RDB_FAILED 3 /* Failed to load the RDB file. */
352353

353354
/* Command doc flags */
354355
#define CMD_DOC_NONE 0

tests/integration/rdb.tcl

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,4 +567,32 @@ start_server {} {
567567
} {OK}
568568
}
569569

570+
start_server {} {
571+
test {RDB Load from incompatible version preserves data} {
572+
# Set test keys
573+
r set testkey1 "value1"
574+
r set testkey2 "value2"
575+
576+
# Use RDB with version 987.
577+
# This emulates a full sync from a server with a future version
578+
set server_dir [lindex [r config get dir] 1]
579+
set rdb_filename [lindex [r config get dbfilename] 1]
580+
set rdb_path "$server_dir/$rdb_filename"
581+
exec cp tests/assets/encodings-rdb987.rdb $rdb_path
582+
583+
# Reload will trigger the rdbLoad code path with the RDBFLAGS_EMPTY_DATA flag
584+
catch {r debug reload nosave}
585+
586+
# Check that version error appears in logs
587+
verify_log_message 0 "*Can't handle RDB format version*" 0
588+
589+
# Verify we don't enter the flushing code path
590+
verify_no_log_message 0 "*RDB signature and version check passed*" 0
591+
592+
# Verify our original data is not flushed
593+
assert_equal [r get testkey1] "value1"
594+
assert_equal [r get testkey2] "value2"
595+
}
596+
}
597+
570598
} ;# tags

0 commit comments

Comments
 (0)