1
1
import IORedis from 'ioredis' ;
2
- import { ForbiddenException , ServiceUnavailableException } from '@nestjs/common' ;
2
+ import { ForbiddenException , Logger , ServiceUnavailableException } from '@nestjs/common' ;
3
3
import { RedisErrorCodes } from 'src/constants' ;
4
4
import { ProfilerClient } from 'src/modules/profiler/models/profiler.client' ;
5
5
import { RedisObserverStatus } from 'src/modules/profiler/constants' ;
6
6
import { IShardObserver } from 'src/modules/profiler/interfaces/shard-observer.interface' ;
7
7
import ERROR_MESSAGES from 'src/constants/error-messages' ;
8
+ import { EventEmitter2 } from '@nestjs/event-emitter' ;
8
9
9
- export class RedisObserver {
10
- private readonly redis : IORedis . Redis | IORedis . Cluster ;
10
+ export class RedisObserver extends EventEmitter2 {
11
+ private logger = new Logger ( 'RedisObserver' ) ;
12
+
13
+ private redis : IORedis . Redis | IORedis . Cluster ;
11
14
12
15
private profilerClients : Map < string , ProfilerClient > = new Map ( ) ;
13
16
17
+ private profilerClientsListeners : Map < string , any [ ] > = new Map ( ) ;
18
+
14
19
private shardsObservers : IShardObserver [ ] = [ ] ;
15
20
16
21
public status : RedisObserverStatus ;
17
22
18
- constructor ( redis : IORedis . Redis | IORedis . Cluster ) {
19
- this . redis = redis ;
20
- this . status = RedisObserverStatus . Wait ;
23
+ constructor ( ) {
24
+ super ( ) ;
25
+ this . status = RedisObserverStatus . Empty ;
26
+ }
27
+
28
+ init ( func : ( ) => Promise < IORedis . Redis | IORedis . Cluster > ) {
29
+ this . status = RedisObserverStatus . Initializing ;
30
+
31
+ func ( )
32
+ . then ( ( redis ) => {
33
+ this . redis = redis ;
34
+ this . status = RedisObserverStatus . Connected ;
35
+ this . emit ( 'connect' ) ;
36
+ } )
37
+ . catch ( ( err ) => {
38
+ this . emit ( 'connect_error' , err ) ;
39
+ } ) ;
21
40
}
22
41
23
42
/**
@@ -35,36 +54,84 @@ export class RedisObserver {
35
54
return ;
36
55
}
37
56
57
+ if ( ! this . profilerClientsListeners . get ( profilerClient . id ) ) {
58
+ this . profilerClientsListeners . set ( profilerClient . id , [ ] ) ;
59
+ }
60
+
61
+ const profilerListeners = this . profilerClientsListeners . get ( profilerClient . id ) ;
62
+
38
63
this . shardsObservers . forEach ( ( observer ) => {
39
- observer . on ( 'monitor' , ( time , args , source , database ) => {
64
+ const monitorListenerFn = ( time , args , source , database ) => {
40
65
profilerClient . handleOnData ( {
41
66
time, args, database, source, shardOptions : observer . options ,
42
67
} ) ;
43
- } ) ;
44
- observer . on ( 'end' , ( ) => {
68
+ } ;
69
+ const endListenerFn = ( ) => {
45
70
profilerClient . handleOnDisconnect ( ) ;
46
71
this . clear ( ) ;
47
- } ) ;
72
+ } ;
73
+
74
+ observer . on ( 'monitor' , monitorListenerFn ) ;
75
+ observer . on ( 'end' , endListenerFn ) ;
76
+
77
+ profilerListeners . push ( monitorListenerFn , endListenerFn ) ;
78
+ this . logger . debug ( `Subscribed to shard observer. Current listeners: ${ observer . listenerCount ( 'monitor' ) } ` ) ;
48
79
} ) ;
49
80
this . profilerClients . set ( profilerClient . id , profilerClient ) ;
81
+
82
+ this . logger . debug ( `Profiler Client with id:${ profilerClient . id } was added` ) ;
83
+ this . logCurrentState ( ) ;
84
+ }
85
+
86
+ public removeShardsListeners ( profilerClientId : string ) {
87
+ this . shardsObservers . forEach ( ( observer ) => {
88
+ ( this . profilerClientsListeners . get ( profilerClientId ) || [ ] ) . forEach ( ( listener ) => {
89
+ observer . removeListener ( 'monitor' , listener ) ;
90
+ observer . removeListener ( 'end' , listener ) ;
91
+ } ) ;
92
+
93
+ this . logger . debug (
94
+ `Unsubscribed from from shard observer. Current listeners: ${ observer . listenerCount ( 'monitor' ) } ` ,
95
+ ) ;
96
+ } ) ;
50
97
}
51
98
52
99
public unsubscribe ( id : string ) {
100
+ this . removeShardsListeners ( id ) ;
53
101
this . profilerClients . delete ( id ) ;
102
+ this . profilerClientsListeners . delete ( id ) ;
54
103
if ( this . profilerClients . size === 0 ) {
55
104
this . clear ( ) ;
56
105
}
106
+
107
+ this . logger . debug ( `Profiler Client with id:${ id } was unsubscribed` ) ;
108
+ this . logCurrentState ( ) ;
57
109
}
58
110
59
111
public disconnect ( id : string ) {
60
- const userClient = this . profilerClients . get ( id ) ;
61
- if ( userClient ) {
62
- userClient . destroy ( ) ;
112
+ this . removeShardsListeners ( id ) ;
113
+ const profilerClient = this . profilerClients . get ( id ) ;
114
+ if ( profilerClient ) {
115
+ profilerClient . destroy ( ) ;
63
116
}
64
117
this . profilerClients . delete ( id ) ;
118
+ this . profilerClientsListeners . delete ( id ) ;
65
119
if ( this . profilerClients . size === 0 ) {
66
120
this . clear ( ) ;
67
121
}
122
+
123
+ this . logger . debug ( `Profiler Client with id:${ id } was disconnected` ) ;
124
+ this . logCurrentState ( ) ;
125
+ }
126
+
127
+ /**
128
+ * Logs useful inforation about current state for debug purposes
129
+ * @private
130
+ */
131
+ private logCurrentState ( ) {
132
+ this . logger . debug (
133
+ `Status: ${ this . status } ; Shards: ${ this . shardsObservers . length } ; Listeners: ${ this . getProfilerClientsSize ( ) } ` ,
134
+ ) ;
68
135
}
69
136
70
137
public clear ( ) {
@@ -98,6 +165,13 @@ export class RedisObserver {
98
165
} else {
99
166
this . shardsObservers = [ await RedisObserver . createShardObserver ( this . redis ) ] ;
100
167
}
168
+
169
+ this . shardsObservers . forEach ( ( observer ) => {
170
+ observer . on ( 'error' , ( e ) => {
171
+ this . logger . error ( 'Error on shard observer' , e ) ;
172
+ } ) ;
173
+ } ) ;
174
+
101
175
this . status = RedisObserverStatus . Ready ;
102
176
} catch ( error ) {
103
177
this . status = RedisObserverStatus . Error ;
@@ -130,6 +204,7 @@ export class RedisObserver {
130
204
...redis . options ,
131
205
monitor : false ,
132
206
lazyLoading : false ,
207
+ connectionName : `redisinsight-monitor-perm-check-${ Math . random ( ) } ` ,
133
208
} ) ;
134
209
135
210
await duplicate . send_command ( 'monitor' ) ;
0 commit comments