Skip to content

Commit 68a666c

Browse files
committed
Prevent exposure of importing keys on replicas during atomic slot migration (#2635)
# Problem In the current slot migration design, replicas are completely unaware of the slot migration. Because of this, they do not know to hide importing keys, which results in exposure of these keys to commands like KEYS, SCAN, RANDOMKEY, and DBSIZE. # Design The main part of the design is that we will now listen for and process the `SYNCSLOTS ESTABLISH` command on the replica. When a `SYNCSLOTS ESTABLISH` command is received from the primary, we begin tracking a new slot import in a special `SLOT_IMPORT_OCCURRING_ON_PRIMARY` state. Replicas use this state to track the import, and await for a future `SYNCSLOTS FINISH` message that tells them the import is successful/failed. ## Success Case ``` Source Target Target Replica | | | |------------ SYNCSLOTS ESTABLISH -------------->| | | |----- SYNCSLOTS ESTABLISH ------>| |<-------------------- +OK ----------------------| | | | | |~~~~~~~~~~~~~~ snapshot as AOF ~~~~~~~~~~~~~~~~>| | | |~~~~~~ forward snapshot ~~~~~~~~>| |----------- SYNCSLOTS SNAPSHOT-EOF ------------>| | | | | |<----------- SYNCSLOTS REQUEST-PAUSE -----------| | | | | |~~~~~~~~~~~~ incremental changes ~~~~~~~~~~~~~~>| | | |~~~~~~ forward changes ~~~~~~~~~>| |--------------- SYNCSLOTS PAUSED -------------->| | | | | |<---------- SYNCSLOTS REQUEST-FAILOVER ---------| | | | | |---------- SYNCSLOTS FAILOVER-GRANTED --------->| | | | | | (performs takeover & | | propagates topology) | | | | | |------- SYNCSLOTS FINISH ------->| (finds out about topology | | change & marks migration done) | | | | | ``` ## Failure Case ``` Source Target Target Replica | | | |------------ SYNCSLOTS ESTABLISH -------------->| | | |----- SYNCSLOTS ESTABLISH ------>| |<-------------------- +OK ----------------------| | ... ... ... | | | | <FAILURE> | | | | | (performs cleanup) | | | ~~~~~~ UNLINK <key> ... ~~~~~~~>| | | | | | ------ SYNCSLOTS FINISH ------->| | | | ``` ## Full Sync, Partial Sync, and RDB In order to ensure replicas that resync during the import are still aware of the import, the slot import is serialized to a new `cluster-slot-imports` aux field. The encoding includes the job name, the source node name, and the slot ranges being imported. Upon loading an RDB with the `cluster-slot-imports` aux field, replicas will add a new migration in the `SLOT_IMPORT_OCCURRING_ON_PRIMARY` state. It's important to note that a previously saved RDB file can be used as the basis for partial sync with a primary. Because of this, whenever we load an RDB file with the `cluster-slot-imports` aux field, even from disk, we will still add a new migration to track the import. If after loading the RDB, the Valkey node is a primary, it will cancel the slot migration. Having this tracking state loaded on primaries will ensure that replicas partial syncing to a restarted primary still get their `SYNCSLOTS FINISH` message in the replication stream. ## AOF Since AOF cannot be used as the basis for a partial sync, we don't necessarily need to persist the `SYNCSLOTS ESTABLISH` and `FINISH` commands to the AOF. However, considering there is work to change this (#59 #1901) this design doesn't make any assumptions about this. We will propagate the `ESTABLISH` and `FINISH` commands to the AOF, and ensure that they can be properly replayed on AOF load to get to the right state. Similar to RDB, if there are any pending "ESTABLISH" commands that don't have a "FINISH" afterwards upon becoming primary, we will make sure to fail those in `verifyClusterConfigWithData`. Additionally, there was a bug in the existing slot migration where slot import clients were not having their commands persisted to AOF. This has been fixed by ensuring we still propagate to AOF even for slot import clients. ## Promotion & Demotion Since the primary is solely responsible for cleaning up unowned slots, primaries that are demoted will not clean up previously active slot imports. The promoted replica will be responsible for both cleaning up the slot (`verifyClusterConifgWithData`) and sending a `SYNCSLOTS FINISH`. # Other Options Considered I also considered tracking "dirty" slots rather than using the slot import state machine. In this setup, primaries and replicas would simply mark each slot's hashtable in the kvstore as dirty when something is written to it and we do not currently own that slot. This approach is simpler, but has a problem in that modules loaded on the replica would still not get slot migration start/end notifications. If the modules on the replica do not get such notifications, they will not be able to properly contain these dirty keys during slot migration events. --------- Signed-off-by: Jacob Murphy <[email protected]>
1 parent b25f87b commit 68a666c

File tree

16 files changed

+1004
-337
lines changed

16 files changed

+1004
-337
lines changed

src/cluster_legacy.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6385,6 +6385,9 @@ int verifyClusterConfigWithData(void) {
63856385
* completely depend on the replication stream. */
63866386
if (nodeIsReplica(myself)) return C_OK;
63876387

6388+
/* Allow slot migrations to clean up after reloading */
6389+
clusterCleanSlotImportsAfterLoad();
6390+
63886391
/* Check that all the slots we see populated memory have a corresponding
63896392
* entry in the cluster table. Otherwise fix the table. */
63906393
for (j = 0; j < CLUSTER_SLOTS; j++) {
@@ -6464,7 +6467,7 @@ static void clusterSetPrimary(clusterNode *n, int closeSlots, int full_sync_requ
64646467
removeAllNotOwnedShardChannelSubscriptions();
64656468
resetManualFailover();
64666469

6467-
/* Becoming a replica cancels all in progress imports and exports */
6470+
/* Perform needed slot migration state transitions */
64686471
clusterUpdateSlotExportsOnOwnershipChange();
64696472
clusterUpdateSlotImportsOnOwnershipChange();
64706473

src/cluster_migrateslots.c

Lines changed: 499 additions & 173 deletions
Large diffs are not rendered by default.

src/cluster_migrateslots.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,17 @@ void clusterCommandCancelSlotMigrations(client *c);
2727
void backgroundSlotMigrationDoneHandler(int exitcode, int bysignal);
2828
void clusterUpdateSlotExportsOnOwnershipChange(void);
2929
void clusterUpdateSlotImportsOnOwnershipChange(void);
30-
void clusterCleanupSlotMigrationLog(void);
3130
void clusterHandleFlushDuringSlotMigration(void);
3231
size_t clusterGetTotalSlotExportBufferMemory(void);
3332
bool clusterSlotFailoverGranted(int slot);
3433
void clusterFailAllSlotExportsWithMessage(char *message);
3534
void clusterHandleSlotMigrationErrorResponse(slotMigrationJob *job);
3635
void killSlotMigrationChild(void);
36+
void clusterCleanSlotImportsOnFullSync(void);
37+
void clusterCleanSlotImportsOnPromotion(void);
38+
void clusterCleanSlotImportsBeforeLoad(void);
39+
void clusterCleanSlotImportsAfterLoad(void);
40+
int clusterRDBSaveSlotImports(rio *rdb);
41+
int clusterRDBLoadSlotImport(rio *rdb);
3742

3843
#endif /* __CLUSTER_MIGRATESLOTS_H */

src/commands.def

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1169,7 +1169,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = {
11691169
{MAKE_CMD("slaves","Lists the replica nodes of a primary node.","O(N) where N is the number of replicas.","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER REPLICAS`","5.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLAVES_History,0,CLUSTER_SLAVES_Tips,1,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,CLUSTER_SLAVES_Keyspecs,0,NULL,1),.args=CLUSTER_SLAVES_Args},
11701170
{MAKE_CMD("slot-stats","Return an array of slot usage statistics for slots assigned to the current node.","O(N) where N is the total number of slots based on arguments. O(N*log(N)) with ORDERBY subcommand.","8.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOT_STATS_History,0,CLUSTER_SLOT_STATS_Tips,2,clusterSlotStatsCommand,-4,CMD_STALE|CMD_LOADING,0,CLUSTER_SLOT_STATS_Keyspecs,0,NULL,1),.args=CLUSTER_SLOT_STATS_Args},
11711171
{MAKE_CMD("slots","Returns the mapping of cluster slots to nodes.","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOTS_History,2,CLUSTER_SLOTS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SLOTS_Keyspecs,0,NULL,0)},
1172-
{MAKE_CMD("syncslots","A container for internal slot migration commands.","Depends on subcommand.","9.0.0",CMD_DOC_SYSCMD,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SYNCSLOTS_History,0,CLUSTER_SYNCSLOTS_Tips,0,clusterCommand,-3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SYNCSLOTS_Keyspecs,0,NULL,0)},
1172+
{MAKE_CMD("syncslots","A container for internal slot migration commands.","Depends on subcommand.","9.0.0",CMD_DOC_SYSCMD,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SYNCSLOTS_History,0,CLUSTER_SYNCSLOTS_Tips,0,clusterCommand,-3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE|CMD_MAY_REPLICATE,0,CLUSTER_SYNCSLOTS_Keyspecs,0,NULL,0)},
11731173
{0}
11741174
};
11751175

src/commands/cluster-syncslots.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
"command_flags": [
1414
"NO_ASYNC_LOADING",
1515
"ADMIN",
16-
"STALE"
16+
"STALE",
17+
"MAY_REPLICATE"
1718
]
1819
}
1920
}

src/db.c

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -671,6 +671,14 @@ long long emptyData(int dbnum, int flags, void(callback)(hashtable *)) {
671671
* there. */
672672
signalFlushedDb(dbnum, async);
673673

674+
if (clusterIsAnySlotImporting() || clusterIsAnySlotExporting()) {
675+
/* On flush, in progress migrations will be cancelled, and should be
676+
* retried by operators. We also may emptyData when reloading an RDB, in
677+
* which case we will remove active slot imports. Replicas will get a
678+
* new set of slot imports from their primary. */
679+
clusterHandleFlushDuringSlotMigration();
680+
}
681+
674682
/* Empty the database structure. */
675683
removed = emptyDbStructure(server.db, dbnum, async, callback);
676684

@@ -825,12 +833,6 @@ void flushdbCommand(client *c) {
825833

826834
if (getFlushCommandFlags(c, &flags) == C_ERR) return;
827835

828-
if (clusterIsAnySlotImporting() || clusterIsAnySlotExporting()) {
829-
/* In progress migrations will be cancelled, and should be retried by
830-
* operators. */
831-
clusterHandleFlushDuringSlotMigration();
832-
}
833-
834836
/* flushdb should not flush the functions */
835837
server.dirty += emptyData(c->db->id, flags | EMPTYDB_NOFUNCTIONS, NULL);
836838

@@ -855,12 +857,6 @@ void flushallCommand(client *c) {
855857
int flags;
856858
if (getFlushCommandFlags(c, &flags) == C_ERR) return;
857859

858-
if (clusterIsAnySlotImporting() || clusterIsAnySlotExporting()) {
859-
/* In progress migrations will be cancelled, and should be retried by
860-
* operators. */
861-
clusterHandleFlushDuringSlotMigration();
862-
}
863-
864860
/* flushall should not flush the functions */
865861
flushAllDataAndResetRDB(flags | EMPTYDB_NOFUNCTIONS);
866862

src/rdb.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1472,6 +1472,9 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
14721472

14731473
/* save all databases, skip this if we're in functions-only mode */
14741474
if (!(req & REPLICA_REQ_RDB_EXCLUDE_DATA)) {
1475+
/* RDB slot import info is encoded in a required opcode since exposing
1476+
* importing slots is a consistency problem. */
1477+
if (clusterRDBSaveSlotImports(rdb) == C_ERR) goto werr;
14751478
for (j = 0; j < server.dbnum; j++) {
14761479
if (rdbSaveDb(rdb, j, rdbflags, &key_counter) == -1) goto werr;
14771480
}
@@ -2923,6 +2926,9 @@ void startLoading(size_t size, int rdbflags, int async) {
29232926
server.rdb_last_load_keys_loaded = 0;
29242927
blockingOperationStarts();
29252928

2929+
/* Cleanup slot migrations (we need a clean state for the incoming load) */
2930+
clusterCleanSlotImportsBeforeLoad();
2931+
29262932
/* Fire the loading modules start event. */
29272933
int subevent;
29282934
if (rdbflags & RDBFLAGS_AOF_PREAMBLE)
@@ -3194,6 +3200,9 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
31943200
should_expand_db = 0;
31953201
}
31963202
continue; /* Read next opcode. */
3203+
} else if (type == RDB_OPCODE_SLOT_IMPORT) {
3204+
if (clusterRDBLoadSlotImport(rdb) == C_ERR) goto eoferr;
3205+
continue; /* Read next opcode. */
31973206
} else if (type == RDB_OPCODE_AUX) {
31983207
/* AUX: generic string-string fields. Use to add state to RDB
31993208
* which is backward compatible. Implementations of RDB loading

src/rdb.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ enum RdbType {
131131

132132
/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType).
133133
* These are special RDB types, but they start from 255 and grow down. */
134+
#define RDB_OPCODE_SLOT_IMPORT 243 /* Slot import state. */
134135
#define RDB_OPCODE_SLOT_INFO 244 /* Foreign slot info, safe to ignore. */
135136
#define RDB_OPCODE_FUNCTION2 245 /* function library data */
136137
#define RDB_OPCODE_FUNCTION_PRE_GA 246 /* old function library data for 7.0 rc1 and rc2 */

src/replication.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2196,6 +2196,9 @@ void replicationAttachToNewPrimary(void) {
21962196
serverAssert(server.primary == NULL);
21972197
replicationDiscardCachedPrimary();
21982198

2199+
/* Cancel any in progress imports (we will now use the primary's) */
2200+
clusterCleanSlotImportsOnFullSync();
2201+
21992202
disconnectReplicas(); /* Force our replicas to resync with us as well. */
22002203
freeReplicationBacklog(); /* Don't allow our chained replicas to PSYNC. */
22012204
}
@@ -4422,6 +4425,9 @@ void replicationUnsetPrimary(void) {
44224425
/* Restart the AOF subsystem in case we shut it down during a sync when
44234426
* we were still a replica. */
44244427
if (server.aof_enabled && server.aof_state == AOF_OFF) restartAOFAfterSYNC();
4428+
4429+
/* Cancel any ongoing atomic slot migrations */
4430+
clusterCleanSlotImportsOnPromotion();
44254431
}
44264432

44274433
/* This function is called when the replica lose the connection with the

src/server.c

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2151,6 +2151,8 @@ void createSharedObjects(void) {
21512151
shared.persist = createSharedString("PERSIST");
21522152
shared.set = createSharedString("SET");
21532153
shared.eval = createSharedString("EVAL");
2154+
shared.cluster = createSharedString("CLUSTER");
2155+
shared.syncslots = createSharedString("SYNCSLOTS");
21542156

21552157
/* Shared command argument */
21562158
shared.left = createSharedString("left");
@@ -2174,6 +2176,12 @@ void createSharedObjects(void) {
21742176
shared.special_equals = createSharedString("=");
21752177
shared.redacted = createSharedString("(redacted)");
21762178
shared.fields = createSharedString("FIELDS");
2179+
shared.finish = createSharedString("FINISH");
2180+
shared.state = createSharedString("STATE");
2181+
shared.success = createSharedString("SUCCESS");
2182+
shared.failed = createSharedString("FAILED");
2183+
shared.name = createSharedString("NAME");
2184+
shared.message = createSharedString("MESSAGE");
21772185

21782186
for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
21792187
shared.integers[j] = makeObjectShared(createObject(OBJ_STRING, (void *)(long)j));
@@ -3582,8 +3590,15 @@ void alsoPropagate(int dbid, robj **argv, int argc, int target, int slot) {
35823590
if (!shouldPropagate(target)) return;
35833591

35843592
/* Don't propagate commands on slot migration clients, these will be proxied
3585-
* in replicationFeedStreamFromPrimaryStream() */
3586-
if (server.current_client != NULL && server.current_client->slot_migration_job) return;
3593+
* in replicationFeedStreamFromPrimaryStream().
3594+
*
3595+
* However, if we need to propagate to AOF, we should still do that. */
3596+
bool propagate_aof = (target & PROPAGATE_AOF) && server.aof_state != AOF_OFF;
3597+
if (server.current_client != NULL && server.current_client->slot_migration_job) {
3598+
if (!propagate_aof) return;
3599+
/* Disable propagation to replication (just do the AOF) */
3600+
target &= ~PROPAGATE_REPL;
3601+
}
35873602

35883603
argvcopy = zmalloc(sizeof(robj *) * argc);
35893604
for (j = 0; j < argc; j++) {
@@ -4484,10 +4499,10 @@ int processCommand(client *c) {
44844499
return C_OK;
44854500
}
44864501

4487-
/* If the server is paused, block the client until
4488-
* the pause has ended. Replicas are never paused. */
4489-
if (!c->flag.replica && ((isPausedActions(PAUSE_ACTION_CLIENT_ALL)) ||
4490-
((isPausedActions(PAUSE_ACTION_CLIENT_WRITE)) && is_may_replicate_command))) {
4502+
/* If the server is paused, block the client until the pause has ended. Replicas and slot
4503+
* export clients are never paused to allow failover/slot migration to succeed. */
4504+
if (!c->flag.replica && (!c->slot_migration_job || isImportSlotMigrationJob(c->slot_migration_job)) &&
4505+
((isPausedActions(PAUSE_ACTION_CLIENT_ALL)) || ((isPausedActions(PAUSE_ACTION_CLIENT_WRITE)) && is_may_replicate_command))) {
44914506
blockPostponeClient(c);
44924507
return C_OK;
44934508
}

0 commit comments

Comments
 (0)