@@ -9,9 +9,14 @@ export interface PubSubRedisOptions {
9
9
publisher ?: RedisClient ;
10
10
subscriber ?: RedisClient ;
11
11
reviver ?: Reviver ;
12
+ serializer ?: Serializer ;
13
+ deserializer ?: Deserializer ;
12
14
}
13
15
14
16
export class RedisPubSub implements PubSubEngine {
17
+ private readonly serializer ?: Serializer ;
18
+ private readonly deserializer ?: Deserializer ;
19
+
15
20
constructor ( options : PubSubRedisOptions = { } ) {
16
21
const {
17
22
triggerTransform,
@@ -20,10 +25,19 @@ export class RedisPubSub implements PubSubEngine {
20
25
subscriber,
21
26
publisher,
22
27
reviver,
28
+ serializer,
29
+ deserializer,
23
30
} = options ;
24
31
25
32
this . triggerTransform = triggerTransform || ( trigger => trigger as string ) ;
33
+
34
+ if ( reviver && deserializer ) {
35
+ throw new Error ( "Reviver and deserializer can't be used together" ) ;
36
+ }
37
+
26
38
this . reviver = reviver ;
39
+ this . serializer = serializer ;
40
+ this . deserializer = deserializer ;
27
41
28
42
if ( subscriber && publisher ) {
29
43
this . redisPublisher = publisher ;
@@ -53,15 +67,15 @@ export class RedisPubSub implements PubSubEngine {
53
67
// handle messages received via psubscribe and subscribe
54
68
this . redisSubscriber . on ( 'pmessage' , this . onMessage . bind ( this ) ) ;
55
69
// partially applied function passes undefined for pattern arg since 'message' event won't provide it:
56
- this . redisSubscriber . on ( 'message' , this . onMessage . bind ( this , undefined ) ) ;
70
+ this . redisSubscriber . on ( 'message' , this . onMessage . bind ( this , undefined ) ) ;
57
71
58
72
this . subscriptionMap = { } ;
59
73
this . subsRefsMap = { } ;
60
74
this . currentSubscriptionId = 0 ;
61
75
}
62
76
63
77
public async publish < T > ( trigger : string , payload : T ) : Promise < void > {
64
- await this . redisPublisher . publish ( trigger , JSON . stringify ( payload ) ) ;
78
+ await this . redisPublisher . publish ( trigger , this . serializer ? this . serializer ( payload ) : JSON . stringify ( payload ) ) ;
65
79
}
66
80
67
81
public subscribe (
@@ -108,7 +122,7 @@ export class RedisPubSub implements PubSubEngine {
108
122
// unsubscribe from specific channel and pattern match
109
123
this . redisSubscriber . unsubscribe ( triggerName ) ;
110
124
this . redisSubscriber . punsubscribe ( triggerName ) ;
111
-
125
+
112
126
delete this . subsRefsMap [ triggerName ] ;
113
127
} else {
114
128
const index = refs . indexOf ( subId ) ;
@@ -146,7 +160,7 @@ export class RedisPubSub implements PubSubEngine {
146
160
147
161
let parsedMessage ;
148
162
try {
149
- parsedMessage = JSON . parse ( message , this . reviver ) ;
163
+ parsedMessage = this . deserializer ? this . deserializer ( message ) : JSON . parse ( message , this . reviver ) ;
150
164
} catch ( e ) {
151
165
parsedMessage = message ;
152
166
}
@@ -174,3 +188,5 @@ export type TriggerTransform = (
174
188
channelOptions ?: Object ,
175
189
) => string ;
176
190
export type Reviver = ( key : any , value : any ) => any ;
191
+ export type Serializer = ( source : any ) => string ;
192
+ export type Deserializer = ( source : string ) => any ;
0 commit comments