Skip to content

Commit bcd894d

Browse files
authored
Merge branch 'master' into greenkeeper/@types/mocha-7.0.0
2 parents e3e5149 + 0ea938b commit bcd894d

File tree

5 files changed

+131
-16
lines changed

5 files changed

+131
-16
lines changed

.travis.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ language: node_js
22
node_js:
33
- "node"
44
- "lts/*"
5-
- "6"
65

76
cache:
87
directories:

package-lock.json

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "graphql-redis-subscriptions",
3-
"version": "2.1.2",
3+
"version": "2.2.0",
44
"description": "A graphql-subscriptions PubSub Engine using redis",
55
"main": "dist/index.js",
66
"repository": {
@@ -47,10 +47,10 @@
4747
},
4848
"devDependencies": {
4949
"@types/chai": "^4.1.6",
50-
"@types/ioredis": "4.14.1",
50+
"@types/ioredis": "4.14.7",
5151
"@types/chai-as-promised": "7.1.1",
5252
"@types/mocha": "^7.0.0",
53-
"@types/node": "12.12.19",
53+
"@types/node": "13.5.3",
5454
"@types/simple-mock": "0.8.0",
5555
"chai": "^4.2.0",
5656
"chai-as-promised": "^7.1.1",

src/redis-pubsub.ts

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,14 @@ export interface PubSubRedisOptions {
99
publisher?: RedisClient;
1010
subscriber?: RedisClient;
1111
reviver?: Reviver;
12+
serializer?: Serializer;
13+
deserializer?: Deserializer;
1214
}
1315

1416
export class RedisPubSub implements PubSubEngine {
17+
private readonly serializer?: Serializer;
18+
private readonly deserializer?: Deserializer;
19+
1520
constructor(options: PubSubRedisOptions = {}) {
1621
const {
1722
triggerTransform,
@@ -20,10 +25,19 @@ export class RedisPubSub implements PubSubEngine {
2025
subscriber,
2126
publisher,
2227
reviver,
28+
serializer,
29+
deserializer,
2330
} = options;
2431

2532
this.triggerTransform = triggerTransform || (trigger => trigger as string);
33+
34+
if (reviver && deserializer) {
35+
throw new Error("Reviver and deserializer can't be used together");
36+
}
37+
2638
this.reviver = reviver;
39+
this.serializer = serializer;
40+
this.deserializer = deserializer;
2741

2842
if (subscriber && publisher) {
2943
this.redisPublisher = publisher;
@@ -53,15 +67,15 @@ export class RedisPubSub implements PubSubEngine {
5367
// handle messages received via psubscribe and subscribe
5468
this.redisSubscriber.on('pmessage', this.onMessage.bind(this));
5569
// partially applied function passes undefined for pattern arg since 'message' event won't provide it:
56-
this.redisSubscriber.on('message', this.onMessage.bind(this, undefined));
70+
this.redisSubscriber.on('message', this.onMessage.bind(this, undefined));
5771

5872
this.subscriptionMap = {};
5973
this.subsRefsMap = {};
6074
this.currentSubscriptionId = 0;
6175
}
6276

6377
public async publish<T>(trigger: string, payload: T): Promise<void> {
64-
await this.redisPublisher.publish(trigger, JSON.stringify(payload));
78+
await this.redisPublisher.publish(trigger, this.serializer ? this.serializer(payload) : JSON.stringify(payload));
6579
}
6680

6781
public subscribe(
@@ -108,7 +122,7 @@ export class RedisPubSub implements PubSubEngine {
108122
// unsubscribe from specific channel and pattern match
109123
this.redisSubscriber.unsubscribe(triggerName);
110124
this.redisSubscriber.punsubscribe(triggerName);
111-
125+
112126
delete this.subsRefsMap[triggerName];
113127
} else {
114128
const index = refs.indexOf(subId);
@@ -146,7 +160,7 @@ export class RedisPubSub implements PubSubEngine {
146160

147161
let parsedMessage;
148162
try {
149-
parsedMessage = JSON.parse(message, this.reviver);
163+
parsedMessage = this.deserializer ? this.deserializer(message) : JSON.parse(message, this.reviver);
150164
} catch (e) {
151165
parsedMessage = message;
152166
}
@@ -174,3 +188,5 @@ export type TriggerTransform = (
174188
channelOptions?: Object,
175189
) => string;
176190
export type Reviver = (key: any, value: any) => any;
191+
export type Serializer = (source: any) => string;
192+
export type Deserializer = (source: string) => any;

src/test/tests.ts

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import * as chai from 'chai';
22
import * as chaiAsPromised from 'chai-as-promised';
3-
import { spy, restore } from 'simple-mock';
3+
import { spy, restore, stub } from 'simple-mock';
44
import { isAsyncIterable } from 'iterall';
55
import { RedisPubSub } from '../redis-pubsub';
66
import * as IORedis from 'ioredis';
@@ -272,6 +272,106 @@ describe('RedisPubSub', () => {
272272
});
273273
});
274274

275+
it('refuses custom reviver with a deserializer', done => {
276+
const reviver = stub();
277+
const deserializer = stub();
278+
279+
try {
280+
expect(() => new RedisPubSub({...mockOptions, reviver, deserializer}))
281+
.to.throw("Reviver and deserializer can't be used together");
282+
done();
283+
} catch (e) {
284+
done(e);
285+
}
286+
});
287+
288+
it('allows to use a custom serializer', done => {
289+
const serializer = stub();
290+
const serializedPayload = `{ "hello": "custom" }`;
291+
serializer.returnWith(serializedPayload);
292+
293+
const pubSub = new RedisPubSub({...mockOptions, serializer });
294+
295+
try {
296+
pubSub.subscribe('TOPIC', message => {
297+
try {
298+
expect(message).to.eql({hello: 'custom'});
299+
done();
300+
} catch (e) {
301+
done(e);
302+
}
303+
}).then(() => {
304+
pubSub.publish('TOPIC', {hello: 'world'});
305+
});
306+
} catch (e) {
307+
done(e);
308+
}
309+
});
310+
311+
it('custom serializer can throw an error', done => {
312+
const serializer = stub();
313+
serializer.throwWith(new Error('Custom serialization error'));
314+
315+
const pubSub = new RedisPubSub({...mockOptions, serializer });
316+
317+
try {
318+
pubSub.publish('TOPIC', {hello: 'world'}).then(() => {
319+
done(new Error('Expected error to be thrown upon publish'));
320+
}, err => {
321+
expect(err.message).to.eql('Custom serialization error');
322+
done();
323+
});
324+
} catch (e) {
325+
done(e);
326+
}
327+
});
328+
329+
it('allows to use a custom deserializer', done => {
330+
const deserializer = stub();
331+
const deserializedPayload = { hello: 'custom' };
332+
deserializer.returnWith(deserializedPayload);
333+
334+
const pubSub = new RedisPubSub({...mockOptions, deserializer });
335+
336+
try {
337+
pubSub.subscribe('TOPIC', message => {
338+
try {
339+
expect(message).to.eql({hello: 'custom'});
340+
done();
341+
} catch (e) {
342+
done(e);
343+
}
344+
}).then(() => {
345+
pubSub.publish('TOPIC', {hello: 'world'});
346+
});
347+
} catch (e) {
348+
done(e);
349+
}
350+
});
351+
352+
it('unparsed payload is returned if custom deserializer throws an error', done => {
353+
const deserializer = stub();
354+
deserializer.throwWith(new Error('Custom deserialization error'));
355+
356+
const pubSub = new RedisPubSub({...mockOptions, deserializer });
357+
358+
try {
359+
pubSub.subscribe('TOPIC', message => {
360+
try {
361+
expect(message).to.be.a('string');
362+
expect(message).to.eql('{"hello":"world"}');
363+
done();
364+
} catch (e) {
365+
done(e);
366+
}
367+
}).then(() => {
368+
pubSub.publish('TOPIC', {hello: 'world'});
369+
});
370+
} catch (e) {
371+
done(e);
372+
}
373+
});
374+
275375
it('throws if you try to unsubscribe with an unknown id', () => {
276376
const pubSub = new RedisPubSub(mockOptions);
277377
return expect(() => pubSub.unsubscribe(123))

0 commit comments

Comments
 (0)