@@ -15,7 +15,8 @@ import { Observable } from 'rxjs';
1515
1616export class RedisStreamStrategy
1717 extends Server
18- implements CustomTransportStrategy {
18+ implements CustomTransportStrategy
19+ {
1920 private streamHandlerMap = { } ;
2021
2122 private redis : RedisInstance ;
@@ -34,23 +35,21 @@ export class RedisStreamStrategy
3435 this . handleError ( this . redis ) ;
3536 this . handleError ( this . client ) ;
3637
37- // when both instances connect
38+ // when server instnce connect, bind handlers.
3839 this . redis . on ( CONNECT_EVENT , ( ) => {
39- this . client . on ( CONNECT_EVENT , ( ) => {
40- this . logger . log (
41- 'Redis connected successfully on ' +
40+ // this.client.on(CONNECT_EVENT, () => {});
41+
42+ this . logger . log (
43+ 'Redis connected successfully on ' +
4244 ( this . options . connection ?. url ??
43- this . options . connection . host +
44- ':' +
45- this . options . connection . port ) ,
46- ) ;
45+ this . options . connection . host + ':' + this . options . connection . port ) ,
46+ ) ;
4747
48- this . bindHandlers ( ) ;
48+ this . bindHandlers ( ) ;
4949
50- // Essential. or await app.listen() will hang forever.
51- // Any code after it won't work.
52- callback ( ) ;
53- } ) ;
50+ // Essential. or await app.listen() will hang forever.
51+ // Any code after it won't work.
52+ callback ( ) ;
5453 } ) ;
5554 }
5655
@@ -103,9 +102,9 @@ export class RedisStreamStrategy
103102 if ( error ?. message . includes ( 'BUSYGROUP' ) ) {
104103 this . logger . debug (
105104 'Consumer Group "' +
106- consumerGroup +
107- '" already exists for stream: ' +
108- stream ,
105+ consumerGroup +
106+ '" already exists for stream: ' +
107+ stream ,
109108 ) ;
110109 return true ;
111110 } else {
0 commit comments