Skip to content

Commit 43ff153

Browse files
authored
Merge pull request #199 from quentinus95/custom-serialize-deserialize
Allow to pass a custom serializer and deserializer
2 parents 3977a97 + 4f0e9cd commit 43ff153

File tree

2 files changed

+121
-5
lines changed

2 files changed

+121
-5
lines changed

src/redis-pubsub.ts

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,14 @@ export interface PubSubRedisOptions {
99
publisher?: RedisClient;
1010
subscriber?: RedisClient;
1111
reviver?: Reviver;
12+
serializer?: Serializer;
13+
deserializer?: Deserializer;
1214
}
1315

1416
export class RedisPubSub implements PubSubEngine {
17+
private readonly serializer?: Serializer;
18+
private readonly deserializer?: Deserializer;
19+
1520
constructor(options: PubSubRedisOptions = {}) {
1621
const {
1722
triggerTransform,
@@ -20,10 +25,19 @@ export class RedisPubSub implements PubSubEngine {
2025
subscriber,
2126
publisher,
2227
reviver,
28+
serializer,
29+
deserializer,
2330
} = options;
2431

2532
this.triggerTransform = triggerTransform || (trigger => trigger as string);
33+
34+
if (reviver && deserializer) {
35+
throw new Error("Reviver and deserializer can't be used together");
36+
}
37+
2638
this.reviver = reviver;
39+
this.serializer = serializer;
40+
this.deserializer = deserializer;
2741

2842
if (subscriber && publisher) {
2943
this.redisPublisher = publisher;
@@ -53,15 +67,15 @@ export class RedisPubSub implements PubSubEngine {
5367
// handle messages received via psubscribe and subscribe
5468
this.redisSubscriber.on('pmessage', this.onMessage.bind(this));
5569
// partially applied function passes undefined for pattern arg since 'message' event won't provide it:
56-
this.redisSubscriber.on('message', this.onMessage.bind(this, undefined));
70+
this.redisSubscriber.on('message', this.onMessage.bind(this, undefined));
5771

5872
this.subscriptionMap = {};
5973
this.subsRefsMap = {};
6074
this.currentSubscriptionId = 0;
6175
}
6276

6377
public async publish<T>(trigger: string, payload: T): Promise<void> {
64-
await this.redisPublisher.publish(trigger, JSON.stringify(payload));
78+
await this.redisPublisher.publish(trigger, this.serializer ? this.serializer(payload) : JSON.stringify(payload));
6579
}
6680

6781
public subscribe(
@@ -108,7 +122,7 @@ export class RedisPubSub implements PubSubEngine {
108122
// unsubscribe from specific channel and pattern match
109123
this.redisSubscriber.unsubscribe(triggerName);
110124
this.redisSubscriber.punsubscribe(triggerName);
111-
125+
112126
delete this.subsRefsMap[triggerName];
113127
} else {
114128
const index = refs.indexOf(subId);
@@ -146,7 +160,7 @@ export class RedisPubSub implements PubSubEngine {
146160

147161
let parsedMessage;
148162
try {
149-
parsedMessage = JSON.parse(message, this.reviver);
163+
parsedMessage = this.deserializer ? this.deserializer(message) : JSON.parse(message, this.reviver);
150164
} catch (e) {
151165
parsedMessage = message;
152166
}
@@ -174,3 +188,5 @@ export type TriggerTransform = (
174188
channelOptions?: Object,
175189
) => string;
176190
export type Reviver = (key: any, value: any) => any;
191+
export type Serializer = (source: any) => string;
192+
export type Deserializer = (source: string) => any;

src/test/tests.ts

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import * as chai from 'chai';
22
import * as chaiAsPromised from 'chai-as-promised';
3-
import { spy, restore } from 'simple-mock';
3+
import { spy, restore, stub } from 'simple-mock';
44
import { isAsyncIterable } from 'iterall';
55
import { RedisPubSub } from '../redis-pubsub';
66
import * as IORedis from 'ioredis';
@@ -272,6 +272,106 @@ describe('RedisPubSub', () => {
272272
});
273273
});
274274

275+
it('refuses custom reviver with a deserializer', done => {
276+
const reviver = stub();
277+
const deserializer = stub();
278+
279+
try {
280+
expect(() => new RedisPubSub({...mockOptions, reviver, deserializer}))
281+
.to.throw("Reviver and deserializer can't be used together");
282+
done();
283+
} catch (e) {
284+
done(e);
285+
}
286+
});
287+
288+
it('allows to use a custom serializer', done => {
289+
const serializer = stub();
290+
const serializedPayload = `{ "hello": "custom" }`;
291+
serializer.returnWith(serializedPayload);
292+
293+
const pubSub = new RedisPubSub({...mockOptions, serializer });
294+
295+
try {
296+
pubSub.subscribe('TOPIC', message => {
297+
try {
298+
expect(message).to.eql({hello: 'custom'});
299+
done();
300+
} catch (e) {
301+
done(e);
302+
}
303+
}).then(() => {
304+
pubSub.publish('TOPIC', {hello: 'world'});
305+
});
306+
} catch (e) {
307+
done(e);
308+
}
309+
});
310+
311+
it('custom serializer can throw an error', done => {
312+
const serializer = stub();
313+
serializer.throwWith(new Error('Custom serialization error'));
314+
315+
const pubSub = new RedisPubSub({...mockOptions, serializer });
316+
317+
try {
318+
pubSub.publish('TOPIC', {hello: 'world'}).then(() => {
319+
done(new Error('Expected error to be thrown upon publish'));
320+
}, err => {
321+
expect(err.message).to.eql('Custom serialization error');
322+
done();
323+
});
324+
} catch (e) {
325+
done(e);
326+
}
327+
});
328+
329+
it('allows to use a custom deserializer', done => {
330+
const deserializer = stub();
331+
const deserializedPayload = { hello: 'custom' };
332+
deserializer.returnWith(deserializedPayload);
333+
334+
const pubSub = new RedisPubSub({...mockOptions, deserializer });
335+
336+
try {
337+
pubSub.subscribe('TOPIC', message => {
338+
try {
339+
expect(message).to.eql({hello: 'custom'});
340+
done();
341+
} catch (e) {
342+
done(e);
343+
}
344+
}).then(() => {
345+
pubSub.publish('TOPIC', {hello: 'world'});
346+
});
347+
} catch (e) {
348+
done(e);
349+
}
350+
});
351+
352+
it('unparsed payload is returned if custom deserializer throws an error', done => {
353+
const deserializer = stub();
354+
deserializer.throwWith(new Error('Custom deserialization error'));
355+
356+
const pubSub = new RedisPubSub({...mockOptions, deserializer });
357+
358+
try {
359+
pubSub.subscribe('TOPIC', message => {
360+
try {
361+
expect(message).to.be.a('string');
362+
expect(message).to.eql('{"hello":"world"}');
363+
done();
364+
} catch (e) {
365+
done(e);
366+
}
367+
}).then(() => {
368+
pubSub.publish('TOPIC', {hello: 'world'});
369+
});
370+
} catch (e) {
371+
done(e);
372+
}
373+
});
374+
275375
it('throws if you try to unsubscribe with an unknown id', () => {
276376
const pubSub = new RedisPubSub(mockOptions);
277377
return expect(() => pubSub.unsubscribe(123))

0 commit comments

Comments
 (0)