Skip to content

Commit d8cb5de

Browse files
committed
fix #2563 - add support for MONITOR
1 parent 5204417 commit d8cb5de

File tree

4 files changed

+177
-44
lines changed

4 files changed

+177
-44
lines changed

packages/client/lib/RESP/decoder.ts

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,18 @@ interface DecoderOptions {
4949
}
5050

5151
export class Decoder {
52-
private readonly _config;
53-
52+
onReply;
53+
onErrorReply;
54+
onPush;
55+
getTypeMapping;
5456
private _cursor = 0;
55-
5657
private _next;
5758

5859
constructor(config: DecoderOptions) {
59-
this._config = config;
60+
this.onReply = config.onReply;
61+
this.onErrorReply = config.onErrorReply;
62+
this.onPush = config.onPush;
63+
this.getTypeMapping = config.getTypeMapping;
6064
}
6165

6266
reset() {
@@ -99,102 +103,102 @@ export class Decoder {
99103
private _decodeTypeValue(type, chunk) {
100104
switch (type) {
101105
case RESP_TYPES.NULL:
102-
this._config.onReply(this._decodeNull());
106+
this.onReply(this._decodeNull());
103107
return false;
104108

105109
case RESP_TYPES.BOOLEAN:
106110
return this._handleDecodedValue(
107-
this._config.onReply,
111+
this.onReply,
108112
this._decodeBoolean(chunk)
109113
);
110114

111115
case RESP_TYPES.NUMBER:
112116
return this._handleDecodedValue(
113-
this._config.onReply,
117+
this.onReply,
114118
this._decodeNumber(
115-
this._config.getTypeMapping()[RESP_TYPES.NUMBER],
119+
this.getTypeMapping()[RESP_TYPES.NUMBER],
116120
chunk
117121
)
118122
);
119123

120124
case RESP_TYPES.BIG_NUMBER:
121125
return this._handleDecodedValue(
122-
this._config.onReply,
126+
this.onReply,
123127
this._decodeBigNumber(
124-
this._config.getTypeMapping()[RESP_TYPES.BIG_NUMBER],
128+
this.getTypeMapping()[RESP_TYPES.BIG_NUMBER],
125129
chunk
126130
)
127131
);
128132

129133
case RESP_TYPES.DOUBLE:
130134
return this._handleDecodedValue(
131-
this._config.onReply,
135+
this.onReply,
132136
this._decodeDouble(
133-
this._config.getTypeMapping()[RESP_TYPES.DOUBLE],
137+
this.getTypeMapping()[RESP_TYPES.DOUBLE],
134138
chunk
135139
)
136140
);
137141

138142
case RESP_TYPES.SIMPLE_STRING:
139143
return this._handleDecodedValue(
140-
this._config.onReply,
144+
this.onReply,
141145
this._decodeSimpleString(
142-
this._config.getTypeMapping()[RESP_TYPES.SIMPLE_STRING],
146+
this.getTypeMapping()[RESP_TYPES.SIMPLE_STRING],
143147
chunk
144148
)
145149
);
146150

147151
case RESP_TYPES.BLOB_STRING:
148152
return this._handleDecodedValue(
149-
this._config.onReply,
153+
this.onReply,
150154
this._decodeBlobString(
151-
this._config.getTypeMapping()[RESP_TYPES.BLOB_STRING],
155+
this.getTypeMapping()[RESP_TYPES.BLOB_STRING],
152156
chunk
153157
)
154158
);
155159

156160
case RESP_TYPES.VERBATIM_STRING:
157161
return this._handleDecodedValue(
158-
this._config.onReply,
162+
this.onReply,
159163
this._decodeVerbatimString(
160-
this._config.getTypeMapping()[RESP_TYPES.VERBATIM_STRING],
164+
this.getTypeMapping()[RESP_TYPES.VERBATIM_STRING],
161165
chunk
162166
)
163167
);
164168

165169
case RESP_TYPES.SIMPLE_ERROR:
166170
return this._handleDecodedValue(
167-
this._config.onErrorReply,
171+
this.onErrorReply,
168172
this._decodeSimpleError(chunk)
169173
);
170174

171175
case RESP_TYPES.BLOB_ERROR:
172176
return this._handleDecodedValue(
173-
this._config.onErrorReply,
177+
this.onErrorReply,
174178
this._decodeBlobError(chunk)
175179
);
176180

177181
case RESP_TYPES.ARRAY:
178182
return this._handleDecodedValue(
179-
this._config.onReply,
180-
this._decodeArray(this._config.getTypeMapping(), chunk)
183+
this.onReply,
184+
this._decodeArray(this.getTypeMapping(), chunk)
181185
);
182186

183187
case RESP_TYPES.SET:
184188
return this._handleDecodedValue(
185-
this._config.onReply,
186-
this._decodeSet(this._config.getTypeMapping(), chunk)
189+
this.onReply,
190+
this._decodeSet(this.getTypeMapping(), chunk)
187191
);
188192

189193
case RESP_TYPES.MAP:
190194
return this._handleDecodedValue(
191-
this._config.onReply,
192-
this._decodeMap(this._config.getTypeMapping(), chunk)
195+
this.onReply,
196+
this._decodeMap(this.getTypeMapping(), chunk)
193197
);
194198

195199
case RESP_TYPES.PUSH:
196200
return this._handleDecodedValue(
197-
this._config.onPush,
201+
this.onPush,
198202
this._decodeArray(PUSH_TYPE_MAPPING, chunk)
199203
);
200204

packages/client/lib/client/commands-queue.ts

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { CommandArguments, TypeMapping, ReplyUnion, RespVersions } from '../RESP
55
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
66
import { AbortError, ErrorReply } from '../errors';
77
import { EventEmitter } from 'node:stream';
8+
import { MonitorCallback } from '.';
89

910
export interface CommandOptions<T = TypeMapping> {
1011
chainId?: symbol;
@@ -23,6 +24,7 @@ export interface CommandToWrite extends CommandWaitingForReply {
2324
signal: AbortSignal;
2425
listener: () => unknown;
2526
};
27+
resolveOnWrite?: boolean;
2628
}
2729

2830
interface CommandWaitingForReply {
@@ -151,8 +153,33 @@ export default class RedisCommandsQueue {
151153
}
152154
});
153155
}
156+
157+
async monitor(callback: MonitorCallback, typeMapping: TypeMapping = {}, asap = false) {
158+
await this.addCommand(
159+
['MONITOR'],
160+
{ asap },
161+
true
162+
);
163+
164+
const { onReply, getTypeMapping } = this.decoder;
165+
this.decoder.onReply = callback;
166+
this.decoder.getTypeMapping = () => typeMapping;
167+
return () => new Promise<void>(async resolve => {
168+
await this.addCommand(['RESET'], undefined, true);
169+
this.decoder.onReply = (reply: string) => {
170+
if (reply !== 'RESET') return callback(reply);
171+
this.decoder.onReply = onReply;
172+
this.decoder.getTypeMapping = getTypeMapping;
173+
resolve();
174+
};
175+
});
176+
}
154177

155-
addCommand<T>(args: CommandArguments, options?: CommandOptions): Promise<T> {
178+
addCommand<T>(
179+
args: CommandArguments,
180+
options?: CommandOptions,
181+
resolveOnWrite?: boolean
182+
): Promise<T> {
156183
if (this._maxLength && this._toWrite.length + this._waitingForReply.length >= this._maxLength) {
157184
return Promise.reject(new Error('The queue is full'));
158185
} else if (options?.abortSignal?.aborted) {
@@ -164,10 +191,12 @@ export default class RedisCommandsQueue {
164191
const value: CommandToWrite = {
165192
args,
166193
chainId: options?.chainId,
167-
typeMapping: options?.typeMapping,
194+
abort: undefined,
195+
resolveOnWrite,
168196
resolve,
169197
reject,
170-
abort: undefined
198+
channelsCounter: undefined,
199+
typeMapping: options?.typeMapping
171200
};
172201

173202
const signal = options?.abortSignal;
@@ -245,16 +274,19 @@ export default class RedisCommandsQueue {
245274
return new Promise<void>((resolve, reject) => {
246275
this._toWrite.push({
247276
args: command.args,
248-
channelsCounter: command.channelsCounter,
249-
typeMapping: PUSH_TYPE_MAPPING,
250-
resolve: () => {
277+
chainId: undefined,
278+
abort: undefined,
279+
resolveOnWrite: false,
280+
resolve() {
251281
command.resolve();
252282
resolve();
253283
},
254-
reject: err => {
284+
reject(err) {
255285
command.reject?.();
256286
reject(err);
257-
}
287+
},
288+
channelsCounter: command.channelsCounter,
289+
typeMapping: PUSH_TYPE_MAPPING
258290
});
259291
});
260292
}
@@ -279,13 +311,19 @@ export default class RedisCommandsQueue {
279311
RedisCommandsQueue._removeAbortListener(toSend);
280312
toSend.abort = undefined;
281313
}
282-
283-
// TODO reuse `toSend` or create new object?
284-
(toSend as any).args = undefined;
285-
(toSend as any).chainId = undefined;
286-
287-
this._waitingForReply.push(toSend);
288-
this._chainInExecution = toSend.chainId;
314+
315+
if (toSend.resolveOnWrite) {
316+
toSend.resolve();
317+
} else {
318+
// TODO reuse `toSend` or create new object?
319+
(toSend as any).args = undefined;
320+
321+
this._chainInExecution = toSend.chainId;
322+
toSend.chainId = undefined;
323+
324+
this._waitingForReply.push(toSend);
325+
}
326+
289327
yield encoded;
290328
toSend = this._toWrite.shift();
291329
}

packages/client/lib/client/index.spec.ts

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import RedisClient, { RedisClientType } from '.';
55
// import { RedisCommandRawReply, RedisModules, RedisFunctions, RedisScripts } from '../commands';
66
import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, SocketClosedUnexpectedlyError, WatchError } from '../errors';
77
import { defineScript } from '../lua-script';
8-
// import { spy } from 'sinon';
8+
import { spy } from 'sinon';
99
import { once } from 'node:events';
1010
// import { ClientKillFilters } from '../commands/CLIENT_KILL';
1111
// import { promisify } from 'node:util';
@@ -741,4 +741,66 @@ describe('Client', () => {
741741
// },
742742
// disableClientSetup: true
743743
// });
744+
745+
describe('MONITOR', () => {
746+
testUtils.testWithClient('should be able to monitor commands', async client => {
747+
const duplicate = await client.duplicate().connect(),
748+
listener = spy(message => assert.equal(typeof message, 'string'));
749+
await duplicate.monitor(listener);
750+
751+
try {
752+
await Promise.all([
753+
waitTillBeenCalled(listener),
754+
client.ping()
755+
]);
756+
} finally {
757+
duplicate.destroy();
758+
}
759+
}, GLOBAL.SERVERS.OPEN);
760+
761+
testUtils.testWithClient('should keep monitoring after reconnection', async client => {
762+
const duplicate = await client.duplicate().connect(),
763+
listener = spy(message => assert.equal(typeof message, 'string'));
764+
await duplicate.monitor(listener);
765+
766+
try {
767+
await Promise.all([
768+
once(duplicate, 'error'),
769+
client.clientKill({
770+
filter: 'SKIPME',
771+
skipMe: true
772+
})
773+
]);
774+
await Promise.all([
775+
waitTillBeenCalled(listener),
776+
client.ping()
777+
]);
778+
} finally {
779+
duplicate.destroy();
780+
}
781+
}, GLOBAL.SERVERS.OPEN);
782+
783+
testUtils.testWithClient('should be able to go back to "normal mode"', async client => {
784+
const off = await client.monitor(() => {});
785+
await off();
786+
await assert.doesNotReject(client.ping());
787+
}, GLOBAL.SERVERS.OPEN);
788+
789+
testUtils.testWithClient('should respect type mapping', async client => {
790+
const duplicate = await client.duplicate().connect(),
791+
listener = spy(message => assert.ok(message instanceof Buffer));
792+
await duplicate.withTypeMapping({
793+
[RESP_TYPES.SIMPLE_STRING]: Buffer
794+
}).monitor(listener);
795+
796+
try {
797+
await Promise.all([
798+
waitTillBeenCalled(listener),
799+
client.ping()
800+
]);
801+
} finally {
802+
duplicate.destroy();
803+
}
804+
}, GLOBAL.SERVERS.OPEN);
805+
});
744806
});

0 commit comments

Comments
 (0)