Skip to content

Commit 874afb9

Browse files
author
Artem
committed
flush logs + refactoring
1 parent aa42770 commit 874afb9

File tree

11 files changed

+240
-128
lines changed

11 files changed

+240
-128
lines changed

redisinsight/api/src/modules/profiler/emitters/client.logs-emitter.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { Socket } from 'socket.io';
22
import { ILogsEmitter } from 'src/modules/profiler/interfaces/logs-emitter.interface';
33
import { ProfilerServerEvents } from 'src/modules/profiler/constants';
44

5-
class ClientLogsEmitter implements ILogsEmitter {
5+
export class ClientLogsEmitter implements ILogsEmitter {
66
private readonly client: Socket;
77

88
public readonly id: string;
@@ -16,9 +16,9 @@ class ClientLogsEmitter implements ILogsEmitter {
1616
return this.client.emit(ProfilerServerEvents.Data, items);
1717
}
1818

19-
public addClientObserver() {}
19+
public addProfilerClient() {}
2020

21-
public removeClientObserver() {}
22-
}
21+
public removeProfilerClient() {}
2322

24-
export default ClientLogsEmitter;
23+
public flushLogs() {}
24+
}

redisinsight/api/src/modules/profiler/emitters/file.logs-emitter.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class FileLogsEmitter implements ILogsEmitter {
1414
/**
1515
* Write batch of logs to a file
1616
*/
17-
public async emit(items: any[]) {
17+
async emit(items: any[]) {
1818
try {
1919
if (!this.logFile.getWriteStream()) {
2020
return;
@@ -31,12 +31,16 @@ class FileLogsEmitter implements ILogsEmitter {
3131
}
3232
}
3333

34-
public addClientObserver(id: string) {
35-
return this.logFile.addClientObserver(id);
34+
async addProfilerClient(id: string) {
35+
return this.logFile.addProfilerClient(id);
3636
}
3737

38-
public removeClientObserver(id: string) {
39-
return this.logFile.removeClientObserver(id);
38+
async removeProfilerClient(id: string) {
39+
return this.logFile.removeProfilerClient(id);
40+
}
41+
42+
async flushLogs() {
43+
return this.logFile.destroy();
4044
}
4145
}
4246
export default FileLogsEmitter;
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
export interface ILogsEmitter {
22
id: string;
33
emit: (items: any[]) => void;
4-
addClientObserver: (id: string) => void;
5-
removeClientObserver: (id: string) => void;
4+
addProfilerClient: (id: string) => void;
5+
removeProfilerClient: (id: string) => void;
6+
flushLogs: () => void;
67
}

redisinsight/api/src/modules/profiler/models/log-file.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,12 @@ export class LogFile {
7272
this.alias = alias;
7373
}
7474

75-
public addClientObserver(id: string) {
75+
addProfilerClient(id: string) {
7676
this.clientObservers.set(id, id);
7777
this.idleSince = 0;
7878
}
7979

80-
public removeClientObserver(id: string) {
80+
removeProfilerClient(id: string) {
8181
this.clientObservers.delete(id);
8282

8383
if (!this.clientObservers.size) {
@@ -92,10 +92,11 @@ export class LogFile {
9292
}
9393

9494
/**
95-
* Remove file after finish
95+
* Remove file and delete write stream after finish
9696
*/
9797
async destroy() {
9898
try {
99+
this.writeStream?.close();
99100
this.writeStream = null;
100101
await fs.unlink(this.filePath);
101102
} catch (e) {

redisinsight/api/src/modules/profiler/models/profiler.client.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,14 @@ export class ProfilerClient {
5656

5757
public addLogsEmitter(emitter: ILogsEmitter) {
5858
this.logsEmitters.set(emitter.id, emitter);
59-
emitter.addClientObserver(this.id);
59+
emitter.addProfilerClient(this.id);
60+
}
61+
62+
async flushLogs() {
63+
this.logsEmitters.forEach((emitter) => emitter.flushLogs());
6064
}
6165

6266
public destroy() {
63-
this.logsEmitters.forEach((emitter) => emitter.removeClientObserver(this.id));
67+
this.logsEmitters.forEach((emitter) => emitter.removeProfilerClient(this.id));
6468
}
6569
}

redisinsight/api/src/modules/profiler/models/redis.observer.ts

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import ERROR_MESSAGES from 'src/constants/error-messages';
99
export class RedisObserver {
1010
private readonly redis: IORedis.Redis | IORedis.Cluster;
1111

12-
private clientMonitorObservers: Map<string, ProfilerClient> = new Map();
12+
private profilerClients: Map<string, ProfilerClient> = new Map();
1313

1414
private shardsObservers: IShardObserver[] = [];
1515

@@ -20,65 +20,82 @@ export class RedisObserver {
2020
this.status = RedisObserverStatus.Wait;
2121
}
2222

23-
public async subscribe(client: ProfilerClient) {
23+
/**
24+
* Create "monitor" clients for each shard if not exists
25+
* Subscribe profiler client to each each shard
26+
* Ignore when profiler client with such id already exists
27+
* @param profilerClient
28+
*/
29+
public async subscribe(profilerClient: ProfilerClient) {
2430
if (this.status !== RedisObserverStatus.Ready) {
2531
await this.connect();
2632
}
27-
if (this.clientMonitorObservers.has(client.id)) {
33+
34+
if (this.profilerClients.has(profilerClient.id)) {
2835
return;
2936
}
3037

3138
this.shardsObservers.forEach((observer) => {
3239
observer.on('monitor', (time, args, source, database) => {
33-
client.handleOnData({
40+
profilerClient.handleOnData({
3441
time, args, database, source, shardOptions: observer.options,
3542
});
3643
});
3744
observer.on('end', () => {
38-
client.handleOnDisconnect();
45+
profilerClient.handleOnDisconnect();
3946
this.clear();
4047
});
4148
});
42-
this.clientMonitorObservers.set(client.id, client);
49+
this.profilerClients.set(profilerClient.id, profilerClient);
4350
}
4451

4552
public unsubscribe(id: string) {
46-
this.clientMonitorObservers.delete(id);
47-
if (this.clientMonitorObservers.size === 0) {
53+
this.profilerClients.delete(id);
54+
if (this.profilerClients.size === 0) {
4855
this.clear();
4956
}
5057
}
5158

5259
public disconnect(id: string) {
53-
const userClient = this.clientMonitorObservers.get(id);
60+
const userClient = this.profilerClients.get(id);
5461
if (userClient) {
5562
userClient.destroy();
5663
}
57-
this.clientMonitorObservers.delete(id);
58-
if (this.clientMonitorObservers.size === 0) {
64+
this.profilerClients.delete(id);
65+
if (this.profilerClients.size === 0) {
5966
this.clear();
6067
}
6168
}
6269

6370
public clear() {
64-
this.clientMonitorObservers.clear();
65-
this.shardsObservers.forEach((observer) => observer.disconnect());
71+
this.profilerClients.clear();
72+
this.shardsObservers.forEach((observer) => {
73+
observer.removeAllListeners('end');
74+
observer.disconnect();
75+
});
6676
this.shardsObservers = [];
6777
this.status = RedisObserverStatus.End;
6878
}
6979

70-
public getSize(): number {
71-
return this.clientMonitorObservers.size;
80+
/**
81+
* Return number of profilerClients for current Redis Observer instance
82+
*/
83+
public getProfilerClientsSize(): number {
84+
return this.profilerClients.size;
7285
}
7386

87+
/**
88+
* Create shard observer for each Redis shard to receive "monitor" data
89+
* @private
90+
*/
7491
private async connect(): Promise<void> {
7592
try {
7693
if (this.redis instanceof IORedis.Cluster) {
7794
this.shardsObservers = await Promise.all(
78-
this.redis.nodes('all').filter((node) => node.status === 'ready').map(this.createShardObserver),
95+
this.redis.nodes('all').filter((node) => node.status === 'ready').map(RedisObserver.createShardObserver),
7996
);
8097
} else {
81-
this.shardsObservers = [await this.createShardObserver(this.redis)];
98+
this.shardsObservers = [await RedisObserver.createShardObserver(this.redis)];
8299
}
83100
this.status = RedisObserverStatus.Ready;
84101
} catch (error) {
@@ -92,12 +109,20 @@ export class RedisObserver {
92109
}
93110
}
94111

95-
private async createShardObserver(redis: IORedis.Redis): Promise<IShardObserver> {
96-
// HACK: ioredis impropriety throw error a user has no permissions to run the 'monitor' command
112+
/**
113+
* Create and return shard observer using IORedis common client
114+
* @param redis
115+
*/
116+
static async createShardObserver(redis: IORedis.Redis): Promise<IShardObserver> {
97117
await RedisObserver.isMonitorAvailable(redis);
98118
return await redis.monitor() as IShardObserver;
99119
}
100120

121+
/**
122+
* HACK: ioredis do not handle error when a user has no permissions to run the 'monitor' command
123+
* Here we try to send "monitor" command directly to throw error (like NOPERM) if any
124+
* @param redis
125+
*/
101126
static async isMonitorAvailable(redis: IORedis.Redis): Promise<boolean> {
102127
// @ts-ignore
103128
const duplicate = redis.duplicate({

redisinsight/api/src/modules/profiler/profiler.gateway.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ export class ProfilerGateway implements OnGatewayConnection, OnGatewayDisconnect
3030
await this.service.addListenerForInstance(ProfilerGateway.getInstanceId(client), client, settings);
3131
return { status: 'ok' };
3232
} catch (error) {
33+
this.logger.error('Unable to add listener', error);
3334
throw new WsException(error);
3435
}
3536
}
@@ -40,16 +41,18 @@ export class ProfilerGateway implements OnGatewayConnection, OnGatewayDisconnect
4041
await this.service.removeListenerFromInstance(ProfilerGateway.getInstanceId(client), client.id);
4142
return { status: 'ok' };
4243
} catch (error) {
44+
this.logger.error('Unable to pause monitor', error);
4345
throw new WsException(error);
4446
}
4547
}
4648

4749
@SubscribeMessage(ProfilerClientEvents.FlushLogs)
4850
async flushLogs(client: Socket): Promise<any> {
4951
try {
50-
await this.service.flushLogs(ProfilerGateway.getInstanceId(client), client.id);
52+
await this.service.flushLogs(client.id);
5153
return { status: 'ok' };
5254
} catch (error) {
55+
this.logger.error('Unable to flush logs', error);
5356
throw new WsException(error);
5457
}
5558
}

redisinsight/api/src/modules/profiler/profiler.module.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,19 @@ import { Module } from '@nestjs/common';
22
import { SharedModule } from 'src/modules/shared/shared.module';
33
import { LogFileProvider } from 'src/modules/profiler/providers/log-file.provider';
44
import { ProfilerController } from 'src/modules/profiler/profiler.controller';
5+
import { RedisObserverProvider } from 'src/modules/profiler/providers/redis-observer.provider';
6+
import { ProfilerClientProvider } from 'src/modules/profiler/providers/profiler-client.provider';
57
import { ProfilerGateway } from './profiler.gateway';
68
import { ProfilerService } from './profiler.service';
79

810
@Module({
911
imports: [SharedModule],
1012
providers: [
13+
RedisObserverProvider,
14+
ProfilerClientProvider,
15+
LogFileProvider,
1116
ProfilerGateway,
1217
ProfilerService,
13-
LogFileProvider,
1418
],
1519
controllers: [ProfilerController],
1620
})

0 commit comments

Comments
 (0)