Skip to content

Commit 09c2020

Browse files
feat(cluster): smart client handoffs oss (hitless upgrades) (redis#3142)
* handle smigrating smigrating notification should effect in increased command and socket timeout for the given connection * first approximation to handling smigrated * deduplicate notifications based on sequence id * add slotnumber to commands * add support for extracting commands from queue * parse notification * work on main algo * fix: handle string values in push message reply comparison Buffer.equals() was failing when reply[0] was a string instead of a Buffer, causing hangs on push notifications. Now converts strings to Buffers before comparison in PubSub and commands-queue handlers. Changes: - PubSub.isStatusReply: convert reply[0] to Buffer if string - PubSub.isShardedUnsubscribe: convert reply[0] to Buffer if string - PubSub.handleMessageReply: convert reply[0] to Buffer if string - commands-queue PONG handler: convert reply[0] to Buffer if string * parse SMIGRATED according to new format * comply with the new notification structure * refine algo * handle pubSubNode replacement * tests: merge all `after` functions into one * tests: add `testWithProxiedCluster()` function * Update index.ts * tests: add ProxyController for easier proxy comms * fix: access private queue through _self proxy and guard client close calls * test(cluster): add fault injector infrastructure for hitless upgrade testing * feat(test-utils): add RE database management and test utilities * fix: fix command queue extraction and prepend logic * test: add slot migration tests and refactor proxied fault injector * fix: wait for ALL ports while spawning proxied redis * fix: handle partial PubSubListeners in resubscribeAllPubSubListeners * refactor: maintenance tests and enhance fault injector client Test Infrastructure: - Migrate maintenance tests from maintenance.spec.ts to dedicated e2e test files - Add maintenance.e2e.ts for direct RE cluster testing with testWithRECluster helper - Add maintenance.proxy.e2e.ts for proxy-based cluster testing - Dynamically generate tests based on available action triggers from fault injector API Fault Injector Client: - Add listActionTriggers() to query available triggers by action and effect - Add selectDbConfig() and createAndSelectDatabase() for database context management - Auto-resolve bdb_id from selected database when not explicitly provided - Support trigger-specific database configurations from requirements Test Utils: - Export REClusterTestOptions interface - Refactor testWithRECluster to reset cluster state before each test - Add cluster reset and cleanup between tests for isolation RESP Decoder & Socket: - Add wire-level debug logging for troubleshooting Cluster: - Add debug logging for command execution and MOVED error handling - Add debug logging for slot discovery and client routing Enterprise Maintenance Manager: - Add debug logging for push message handling * refactor: improve SMIGRATED push message parsing and add comprehensive tests - Extract parseSMigratedPush into static method with proper type definitions - Add Address, Destination, and SMigratedEntry interfaces for better type safety - Support multiple source entries in SMIGRATED events (previously only handled single source) - Add comprehensive test suite covering single slots, ranges, multiple sources/destinations - Update cluster-slots to iterate over all entries in SMIGRATED event - Remove debug console.log statements from production code * refactor: #handleSmigrated: move source cleanup outside destinations loop - Track all moving slots and destination nodes during destinations loop - Wait for inflight commands AFTER all destinations are processed - Extract commands and handle source cleanup once per entry, not per destination - Unpause all destination nodes at the end of entry processing This fixes an issue where source nodes were being unpaused prematurely when multiple destinations existed, potentially allowing new commands to queue before all slot migrations were complete. * refactor: add error handling to #handleSmigrated with try-catch-finally - Wrap entry processing in try-catch to handle async operation failures - Unpause source node in catch block to prevent deadlock on error - Move destination unpause to finally block to ensure cleanup always runs - Re-throw error after cleanup to propagate failures - Remove debug console.log statements * refactor: replace hardcoded node ID 'asdff' with meaningful smigrated-host:port * fix: merge conflict residuals * refactor: remove extra db deletion dbs are deleted as part of the reset_cluster action * test: iterate over all trigger requirements and improve test naming * uncomment tests * test: refactor test naming to use single baseTestName variable with improved format * remove debug logs * fix: prevent PubSub subscription loss during cluster maintenance - Handle pubSubNode replacement BEFORE destroying source connections to ensure subscriptions are resubscribed on a new node while we can still read listeners from the old client - Create new pubSubClient before destroying old one to prevent window where pubSubNode is undefined - Use destroy() instead of close() for source node connections since close() can hang when the server is unresponsive during removal * Fix PubSub test hangs by awaiting publish batches The publish loops in PubSub tests were using a fire-and-forget pattern, creating promises without awaiting them. During slot migration, this caused unbounded accumulation of pending promises which blocked the Node.js event loop, preventing fault injector polling from continuing. Changes: - Fix all 8 publish loops to use await Promise.all(batchPromises) - Add 30-second timeout to fault injector fetch requests - Fix misleading assertion message in enterprise-maintenance-manager * Fix slot migration hangs during SMIGRATED handling Two bugs fixed: 1. extractCommandsForSlots infinite loop: When iterating through the linked list, if a command's slot was NOT in the moving slots set, the code never advanced 'current' to 'current.next', causing an infinite loop. Added the missing else branch. 2. Commands stuck in waitingForReply: During slot migration, commands sent to the source node could get stuck waiting for replies that never come. Added timeout and flushOnTimeout options to waitForInflightCommandsToComplete() - when timeout fires with flushOnTimeout=true, pending commands are rejected with TimeoutError instead of blocking forever. * improve FI debug logs * implement unrelaxation * chore: delete temp arch files * fix: address PR comments * fix: route commands to correct destinations during SMIGRATED handling Previously, when handling SMIGRATED events with multiple destinations, commands were extracted for ALL moving slots and sent to only the LAST destination. This caused commands targeting slots on destination A to incorrectly be sent to destination B. The fix moves command extraction inside the destination loop so each destination receives only the commands for its specific slots: 1. Inside the destination loop: - Convert destination's slots to a Set<number> - Update this.slots mappings to point to the destination - Extract commands from source for this destination's specific slots - Prepend those commands to this destination's queue - Unpause this destination immediately 2. After the loop: - If source has no slots left: extract remaining slotless commands and send to last destination - Handle pubsub listeners (unchanged - uses allMovingSlots) - Clean up source if needed 3. Removed obsolete finally block since destinations are now unpaused inside the loop. Also updated JSDoc for parseSMigratedPush to document the result structure guarantees: - Each source address appears in exactly one entry (deduplicated) - Within each entry, each destination address appears exactly once - Each destination contains the complete list of slots that moved from that source to that destination * fix: ensure slotNumber is passed to commands when options is undefined Previously, slotNumber was only set when options was defined, causing commands like SPUBLISH to be treated as slotless during cluster slot migration. This led to incorrect command routing during SMIGRATED handling. Now always create an options object and set slotNumber, ensuring proper command routing during slot migration. * fix: schedule writes after moving slotless commands to destination node After moving slotless commands to a destination node via prependCommandsToWrite, no write was scheduled because the destination was already unpaused before the commands were added. This caused the moved commands to remain queued without being sent. The fix calls _unpause() after prependCommandsToWrite to trigger write scheduling for the moved commands, ensuring they are processed immediately. * fix: properly emit error event * fix: make cache check resilient to options object creation The cache check used strict object identity (===) to determine if default type mapping should be used. This broke when cluster code created a new options object to pass slotNumber, even when the user passed no options. Changed the check to also accept options where typeMapping matches, since slotNumber is an internal property that shouldn't affect caching. * fix: resolve flaky tests The comprehensive_stats_run test was incorrectly wrapped in an it() block while also using testWithClient() which internally creates its own it() block. This nested it() structure causes Mocha to mishandle before/after hooks, leading to intermittent 'after all' hook timeouts. Changed the outer it() to describe() to properly scope the test. The global after() hooks that clean up Docker containers were using Mocha's default 2000ms timeout, which can be exceeded when cleaning up many containers in CI. Increased to 30 seconds. * feat: enable test filtering - Rename maintenance.e2e.ts to smart-client-handoffs-oss.e2e.ts - Add filterTriggersByArgs() to test-scenario.util.ts - Support --effect, --trigger, --db/--database CLI filters - Simplify test file by using shared utility function * docs: add usage comment to smart-client-handoffs-oss.e2e.ts --------- Co-authored-by: Pavel Pashov <pavel.pashov@redis.com>
1 parent 7f256b0 commit 09c2020

22 files changed

+3188
-177
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ dump.rdb
99
documentation/
1010
tsconfig.tsbuildinfo
1111
junit-results/
12+
*.log

packages/client/lib/RESP/decoder.ts

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ export class Decoder {
9898
this.#next = undefined;
9999
return this.#decodeTypeValue(type, chunk);
100100
}
101-
101+
102102
#decodeTypeValue(type, chunk) {
103103
switch (type) {
104104
case RESP_TYPES.NULL:
@@ -128,7 +128,7 @@ export class Decoder {
128128
chunk
129129
)
130130
);
131-
131+
132132
case RESP_TYPES.DOUBLE:
133133
return this.#handleDecodedValue(
134134
this.onReply,
@@ -137,7 +137,7 @@ export class Decoder {
137137
chunk
138138
)
139139
);
140-
140+
141141
case RESP_TYPES.SIMPLE_STRING:
142142
return this.#handleDecodedValue(
143143
this.onReply,
@@ -146,7 +146,7 @@ export class Decoder {
146146
chunk
147147
)
148148
);
149-
149+
150150
case RESP_TYPES.BLOB_STRING:
151151
return this.#handleDecodedValue(
152152
this.onReply,
@@ -170,7 +170,7 @@ export class Decoder {
170170
this.onErrorReply,
171171
this.#decodeSimpleError(chunk)
172172
);
173-
173+
174174
case RESP_TYPES.BLOB_ERROR:
175175
return this.#handleDecodedValue(
176176
this.onErrorReply,
@@ -188,7 +188,7 @@ export class Decoder {
188188
this.onReply,
189189
this.#decodeSet(this.getTypeMapping(), chunk)
190190
);
191-
191+
192192
case RESP_TYPES.MAP:
193193
return this.#handleDecodedValue(
194194
this.onReply,
@@ -421,17 +421,17 @@ export class Decoder {
421421
return this.#cursor === chunk.length ?
422422
this.#decodeDoubleExponent.bind(this, d) :
423423
this.#decodeDoubleExponent(d, chunk);
424-
424+
425425
case ASCII['\r']:
426426
this.#cursor = cursor + 2; // skip \r\n
427427
return isNegative ? -double : double;
428428
}
429-
429+
430430
if (decimalIndex < Decoder.#DOUBLE_DECIMAL_MULTIPLIERS.length) {
431431
double += (byte - ASCII['0']) * Decoder.#DOUBLE_DECIMAL_MULTIPLIERS[decimalIndex++];
432432
}
433433
} while (++cursor < chunk.length);
434-
434+
435435
this.#cursor = cursor;
436436
return this.#decodeDoubleDecimal.bind(this, isNegative, decimalIndex, double);
437437
}
@@ -613,7 +613,7 @@ export class Decoder {
613613
}
614614

615615
#decodeVerbatimStringFormat(stringLength, chunk) {
616-
const formatCb = this.#decodeStringWithLength.bind(this, 3, 1, String);
616+
const formatCb = this.#decodeStringWithLength.bind(this, 3, 1, String);
617617
return this.#cursor >= chunk.length ?
618618
this.#continueDecodeVerbatimStringFormat.bind(this, stringLength, formatCb) :
619619
this.#continueDecodeVerbatimStringFormat(stringLength, formatCb, chunk);
@@ -689,13 +689,13 @@ export class Decoder {
689689

690690
case RESP_TYPES.BIG_NUMBER:
691691
return this.#decodeBigNumber(typeMapping[RESP_TYPES.BIG_NUMBER], chunk);
692-
692+
693693
case RESP_TYPES.DOUBLE:
694694
return this.#decodeDouble(typeMapping[RESP_TYPES.DOUBLE], chunk);
695-
695+
696696
case RESP_TYPES.SIMPLE_STRING:
697697
return this.#decodeSimpleString(typeMapping[RESP_TYPES.SIMPLE_STRING], chunk);
698-
698+
699699
case RESP_TYPES.BLOB_STRING:
700700
return this.#decodeBlobString(typeMapping[RESP_TYPES.BLOB_STRING], chunk);
701701

@@ -704,7 +704,7 @@ export class Decoder {
704704

705705
case RESP_TYPES.SIMPLE_ERROR:
706706
return this.#decodeSimpleError(chunk);
707-
707+
708708
case RESP_TYPES.BLOB_ERROR:
709709
return this.#decodeBlobError(chunk);
710710

@@ -713,7 +713,7 @@ export class Decoder {
713713

714714
case RESP_TYPES.SET:
715715
return this.#decodeSet(typeMapping, chunk);
716-
716+
717717
case RESP_TYPES.MAP:
718718
return this.#decodeMap(typeMapping, chunk);
719719

@@ -997,7 +997,7 @@ export class Decoder {
997997
// decode simple string map key as string (and not as buffer)
998998
case RESP_TYPES.SIMPLE_STRING:
999999
return this.#decodeSimpleString(String, chunk);
1000-
1000+
10011001
// decode blob string map key as string (and not as buffer)
10021002
case RESP_TYPES.BLOB_STRING:
10031003
return this.#decodeBlobString(String, chunk);
@@ -1028,7 +1028,7 @@ export class Decoder {
10281028
this.#decodeNestedType.bind(this, typeMapping),
10291029
typeMapping
10301030
);
1031-
}
1031+
}
10321032

10331033
const value = this.#decodeNestedType(typeMapping, chunk);
10341034
if (typeof value === 'function') {

packages/client/lib/client/cache.spec.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -609,13 +609,12 @@ describe("Client Side Cache", () => {
609609
});
610610
});
611611
});
612-
it('should reflect comprehensive cache operations in stats via BasicClientSideCache', async function () {
613-
612+
describe('comprehensive stats', () => {
614613
const csc = new BasicClientSideCache({
615614
maxEntries: 2, // Small size to easily trigger evictions
616615
});
617616

618-
testUtils.testWithClient('comprehensive_stats_run', async client => {
617+
testUtils.testWithClient('should reflect comprehensive cache operations in stats via BasicClientSideCache', async client => {
619618

620619
// --- Phase 1: Initial misses and loads ---
621620
await client.set('keyA', 'valueA_1');

packages/client/lib/client/commands-queue.ts

Lines changed: 85 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ export interface CommandOptions<T = TypeMapping> {
1919
* Timeout for the command in milliseconds
2020
*/
2121
timeout?: number;
22+
/**
23+
* @internal
24+
* The slot the command is targeted to (if any)
25+
*/
26+
slotNumber?: number;
2227
}
2328

2429
export interface CommandToWrite extends CommandWaitingForReply {
@@ -33,6 +38,7 @@ export interface CommandToWrite extends CommandWaitingForReply {
3338
listener: () => unknown;
3439
originalTimeout: number | undefined;
3540
} | undefined;
41+
slotNumber?: number
3642
}
3743

3844
interface CommandWaitingForReply {
@@ -186,14 +192,34 @@ export default class RedisCommandsQueue {
186192
this.#pushHandlers.push(handler);
187193
}
188194

189-
async waitForInflightCommandsToComplete(): Promise<void> {
195+
async waitForInflightCommandsToComplete(options?: { timeoutMs?: number, flushOnTimeout?: boolean }): Promise<void> {
190196
// In-flight commands already completed
191197
if(this.#waitingForReply.length === 0) {
192198
return
193199
};
194200
// Otherwise wait for in-flight commands to fire `empty` event
195201
return new Promise(resolve => {
196-
this.#waitingForReply.events.on('empty', resolve)
202+
const onEmpty = () => {
203+
if (timeoutId) clearTimeout(timeoutId);
204+
resolve();
205+
};
206+
207+
let timeoutId: ReturnType<typeof setTimeout> | undefined;
208+
const timeoutMs = options?.timeoutMs;
209+
if (timeoutMs !== undefined && timeoutMs > 0) {
210+
timeoutId = setTimeout(() => {
211+
this.#waitingForReply.events.off('empty', onEmpty);
212+
const pendingCount = this.#waitingForReply.length;
213+
dbgMaintenance(`waitForInflightCommandsToComplete timed out after ${timeoutMs}ms with ${pendingCount} commands still waiting`);
214+
if (options?.flushOnTimeout && pendingCount > 0) {
215+
dbgMaintenance(`Flushing ${pendingCount} commands that timed out waiting for reply`);
216+
this.#flushWaitingForReply(new TimeoutError());
217+
}
218+
resolve(); // Resolve instead of reject - we don't want to fail the migration
219+
}, timeoutMs);
220+
}
221+
222+
this.#waitingForReply.events.once('empty', onEmpty);
197223
});
198224
}
199225

@@ -219,6 +245,7 @@ export default class RedisCommandsQueue {
219245
channelsCounter: undefined,
220246
typeMapping: options?.typeMapping
221247
};
248+
value.slotNumber = options?.slotNumber
222249

223250
// If #maintenanceCommandTimeout was explicitly set, we should
224251
// use it instead of the timeout provided by the command
@@ -283,7 +310,8 @@ export default class RedisCommandsQueue {
283310
if (Array.isArray(reply)) {
284311
if (this.#onPush(reply)) return;
285312

286-
if (PONG.equals(reply[0] as Buffer)) {
313+
const firstElement = typeof reply[0] === 'string' ? Buffer.from(reply[0]) : reply[0];
314+
if (PONG.equals(firstElement as Buffer)) {
287315
const { resolve, typeMapping } = this.#waitingForReply.shift()!,
288316
buffer = ((reply[1] as Buffer).length === 0 ? reply[0] : reply[1]) as Buffer;
289317
resolve(typeMapping?.[RESP_TYPES.SIMPLE_STRING] === Buffer ? buffer : buffer.toString());
@@ -342,6 +370,10 @@ export default class RedisCommandsQueue {
342370
return this.#pubSub.removeAllListeners();
343371
}
344372

373+
removeShardedPubSubListenersForSlots(slots: Set<number>) {
374+
return this.#pubSub.removeShardedPubSubListenersForSlots(slots);
375+
}
376+
345377
resubscribe(chainId?: symbol) {
346378
const commands = this.#pubSub.resubscribe();
347379
if (!commands.length) return;
@@ -541,4 +573,54 @@ export default class RedisCommandsQueue {
541573
this.#waitingForReply.length === 0
542574
);
543575
}
576+
577+
/**
578+
*
579+
* Extracts commands for the given slots from the toWrite queue.
580+
* Some commands dont have "slotNumber", which means they are not designated to particular slot/node.
581+
* We ignore those.
582+
*/
583+
extractCommandsForSlots(slots: Set<number>): CommandToWrite[] {
584+
const result: CommandToWrite[] = [];
585+
let current = this.#toWrite.head;
586+
while(current !== undefined) {
587+
if(current.value.slotNumber !== undefined && slots.has(current.value.slotNumber)) {
588+
result.push(current.value);
589+
const toRemove = current;
590+
current = current.next;
591+
this.#toWrite.remove(toRemove);
592+
} else {
593+
// Move to next node even if we don't extract this command
594+
current = current.next;
595+
}
596+
}
597+
return result;
598+
}
599+
600+
/**
601+
* Gets all commands from the write queue without removing them.
602+
*/
603+
extractAllCommands(): CommandToWrite[] {
604+
const result: CommandToWrite[] = [];
605+
let current = this.#toWrite.head;
606+
while(current) {
607+
result.push(current.value);
608+
this.#toWrite.remove(current);
609+
current = current.next;
610+
}
611+
return result;
612+
}
613+
614+
/**
615+
* Prepends commands to the write queue in reverse.
616+
*/
617+
prependCommandsToWrite(commands: CommandToWrite[]) {
618+
if (!commands.length) {
619+
return;
620+
}
621+
622+
for (let i = commands.length - 1; i >= 0; i--) {
623+
this.#toWrite.unshift(commands[i]);
624+
}
625+
}
544626
}

0 commit comments

Comments
 (0)