Skip to content

Commit f077748

Browse files
author
Artem
committed
unit tests
1 parent 9a77a31 commit f077748

File tree

10 files changed

+693
-2
lines changed

10 files changed

+693
-2
lines changed

redisinsight/api/src/modules/pub-sub/constants/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ export enum SubscriptionType {
1414
}
1515

1616
export enum RedisClientStatus {
17-
Connecting = 'connectiong',
17+
Connecting = 'connecting',
1818
Connected = 'connected',
1919
Error = 'error',
2020
End = 'end',
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
import * as Redis from 'ioredis';
2+
import { RedisClient } from 'src/modules/pub-sub/model/redis-client';
3+
import { RedisClientEvents, RedisClientStatus } from 'src/modules/pub-sub/constants';
4+
5+
const getRedisClientFn = jest.fn();
6+
7+
const nodeClient = Object.create(Redis.prototype);
8+
nodeClient.subscribe = jest.fn();
9+
nodeClient.psubscribe = jest.fn();
10+
nodeClient.unsubscribe = jest.fn();
11+
nodeClient.punsubscribe = jest.fn();
12+
nodeClient.status = 'ready';
13+
nodeClient.disconnect = jest.fn();
14+
15+
describe('RedisClient', () => {
16+
let redisClient: RedisClient;
17+
18+
beforeEach(() => {
19+
jest.resetAllMocks();
20+
redisClient = new RedisClient('databaseId', getRedisClientFn);
21+
getRedisClientFn.mockResolvedValue(nodeClient);
22+
nodeClient.subscribe.mockResolvedValue('OK');
23+
nodeClient.psubscribe.mockResolvedValue('OK');
24+
});
25+
26+
describe('getClient', () => {
27+
let connectSpy;
28+
29+
beforeEach(() => {
30+
connectSpy = jest.spyOn(redisClient as any, 'connect');
31+
});
32+
33+
it('should connect and return client by default', async () => {
34+
expect(await redisClient.getClient()).toEqual(nodeClient);
35+
expect(connectSpy).toHaveBeenCalledTimes(1);
36+
expect(redisClient['status']).toEqual(RedisClientStatus.Connected);
37+
});
38+
it('should wait until first attempt of connection finish with success', async () => {
39+
redisClient.getClient().then().catch();
40+
expect(redisClient['status']).toEqual(RedisClientStatus.Connecting);
41+
expect(await redisClient.getClient()).toEqual(nodeClient);
42+
expect(connectSpy).toHaveBeenCalledTimes(1);
43+
expect(redisClient['status']).toEqual(RedisClientStatus.Connected);
44+
});
45+
it('should wait until first attempt of connection finish with error', async () => {
46+
try {
47+
getRedisClientFn.mockRejectedValueOnce(new Error('Connection error'));
48+
redisClient.getClient().then().catch();
49+
expect(redisClient['status']).toEqual(RedisClientStatus.Connecting);
50+
expect(await redisClient.getClient()).toEqual(nodeClient);
51+
fail();
52+
} catch (e) {
53+
expect(connectSpy).toHaveBeenCalledTimes(1);
54+
expect(redisClient['status']).toEqual(RedisClientStatus.Error);
55+
}
56+
});
57+
it('should return existing connection when status connected', async () => {
58+
expect(await redisClient.getClient()).toEqual(nodeClient);
59+
expect(connectSpy).toHaveBeenCalledTimes(1);
60+
expect(redisClient['status']).toEqual(RedisClientStatus.Connected);
61+
expect(await redisClient.getClient()).toEqual(nodeClient);
62+
expect(connectSpy).toHaveBeenCalledTimes(1);
63+
});
64+
it('should return create new connection when status end or error', async () => {
65+
expect(await redisClient.getClient()).toEqual(nodeClient);
66+
expect(connectSpy).toHaveBeenCalledTimes(1);
67+
expect(redisClient['status']).toEqual(RedisClientStatus.Connected);
68+
redisClient['status'] = RedisClientStatus.Error;
69+
expect(await redisClient.getClient()).toEqual(nodeClient);
70+
expect(connectSpy).toHaveBeenCalledTimes(2);
71+
expect(redisClient['status']).toEqual(RedisClientStatus.Connected);
72+
redisClient['status'] = RedisClientStatus.End;
73+
expect(await redisClient.getClient()).toEqual(nodeClient);
74+
expect(connectSpy).toHaveBeenCalledTimes(3);
75+
expect(redisClient['status']).toEqual(RedisClientStatus.Connected);
76+
});
77+
});
78+
79+
describe('connect', () => {
80+
it('should connect and emit connected event', async () => {
81+
expect(await new Promise((res) => {
82+
redisClient['connect']();
83+
redisClient.on(RedisClientEvents.Connected, res);
84+
})).toEqual(nodeClient);
85+
});
86+
it('should emit message event (message source)', async () => {
87+
await redisClient['connect']();
88+
const [id, message] = await new Promise((res) => {
89+
redisClient.on('message', (i, m) => res([i, m]));
90+
nodeClient.emit('message', 'channel-a', 'message-a');
91+
});
92+
93+
expect(id).toEqual('s:channel-a');
94+
expect(message.channel).toEqual('channel-a');
95+
expect(message.message).toEqual('message-a');
96+
});
97+
it('should emit message event (pmessage source)', async () => {
98+
await redisClient['connect']();
99+
const [id, message] = await new Promise((res) => {
100+
redisClient.on('message', (i, m) => res([i, m]));
101+
nodeClient.emit('pmessage', '*', 'channel-aa', 'message-aa');
102+
});
103+
expect(id).toEqual('p:*');
104+
expect(message.channel).toEqual('channel-aa');
105+
expect(message.message).toEqual('message-aa');
106+
});
107+
it('should emit end event', async () => {
108+
await redisClient['connect']();
109+
await new Promise((res) => {
110+
redisClient.on('end', () => {
111+
res(null);
112+
});
113+
114+
nodeClient.emit('end');
115+
});
116+
});
117+
});
118+
119+
describe('destroy', () => {
120+
it('should remove all listeners, disconnect, set client to null and emit end event', async () => {
121+
const removeAllListenersSpy = jest.spyOn(nodeClient, 'removeAllListeners');
122+
123+
await redisClient['connect']();
124+
redisClient.destroy();
125+
126+
expect(redisClient['client']).toEqual(null);
127+
expect(redisClient['status']).toEqual(RedisClientStatus.End);
128+
expect(removeAllListenersSpy).toHaveBeenCalled();
129+
expect(nodeClient.disconnect).toHaveBeenCalled();
130+
});
131+
});
132+
});

redisinsight/api/src/modules/pub-sub/model/redis-client.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ export class RedisClient extends EventEmitter2 {
2323
this.connectFn = connectFn;
2424
}
2525

26+
/**
27+
* Get existing client or wait until previous attempt fulfill or initiate new connection attempt
28+
* based on current status
29+
*/
2630
async getClient(): Promise<IORedis.Redis | IORedis.Cluster> {
2731
try {
2832
this.logger.debug(`Get client ${this}`);
@@ -51,6 +55,12 @@ export class RedisClient extends EventEmitter2 {
5155
}
5256
}
5357

58+
/**
59+
* Connects to redis and change current status to Connected
60+
* Also emit Connected event after success
61+
* Also subscribe to needed channels
62+
* @private
63+
*/
5464
private async connect() {
5565
this.status = RedisClientStatus.Connecting;
5666
this.client = await this.connectFn();
@@ -79,6 +89,10 @@ export class RedisClient extends EventEmitter2 {
7989
});
8090
}
8191

92+
/**
93+
* Unsubscribe all listeners and disconnect
94+
* Remove client and set current state to End
95+
*/
8296
destroy() {
8397
this.client?.removeAllListeners();
8498
this.client?.disconnect();
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
import * as Redis from 'ioredis';
2+
import { mockSocket } from 'src/__mocks__';
3+
import { UserSession } from 'src/modules/pub-sub/model/user-session';
4+
import { UserClient } from 'src/modules/pub-sub/model/user-client';
5+
import { RedisClient } from 'src/modules/pub-sub/model/redis-client';
6+
import { SimpleSubscription } from 'src/modules/pub-sub/model/simple.subscription';
7+
import { SubscriptionType } from 'src/modules/pub-sub/constants';
8+
import { PatternSubscription } from 'src/modules/pub-sub/model/pattern.subscription';
9+
10+
const getRedisClientFn = jest.fn();
11+
12+
const nodeClient = Object.create(Redis.prototype);
13+
nodeClient.subscribe = jest.fn();
14+
nodeClient.psubscribe = jest.fn();
15+
nodeClient.unsubscribe = jest.fn();
16+
nodeClient.punsubscribe = jest.fn();
17+
nodeClient.status = 'ready';
18+
nodeClient.disconnect = jest.fn();
19+
20+
const mockUserClient = new UserClient('socketId', mockSocket, 'databaseId');
21+
22+
const mockRedisClient = new RedisClient('databaseId', getRedisClientFn);
23+
24+
const mockSubscriptionDto = {
25+
channel: 'channel-a',
26+
type: SubscriptionType.Subscribe,
27+
};
28+
29+
const mockPSubscriptionDto = {
30+
channel: 'channel-a',
31+
type: SubscriptionType.PSubscribe,
32+
};
33+
34+
const mockSubscription = new SimpleSubscription(mockUserClient, mockSubscriptionDto);
35+
const mockPSubscription = new PatternSubscription(mockUserClient, mockPSubscriptionDto);
36+
37+
const mockMessage = {
38+
channel: 'channel-a',
39+
message: 'message-a',
40+
time: 1234567890,
41+
};
42+
43+
describe('UserSession', () => {
44+
let userSession: UserSession;
45+
46+
beforeEach(() => {
47+
jest.resetAllMocks();
48+
userSession = new UserSession(mockUserClient, mockRedisClient);
49+
getRedisClientFn.mockResolvedValue(nodeClient);
50+
nodeClient.subscribe.mockResolvedValue('OK');
51+
nodeClient.psubscribe.mockResolvedValue('OK');
52+
});
53+
54+
describe('subscribe', () => {
55+
it('should subscribe to a channel', async () => {
56+
expect(userSession['subscriptions'].size).toEqual(0);
57+
await userSession.subscribe(mockSubscription);
58+
expect(userSession['subscriptions'].size).toEqual(1);
59+
await userSession.subscribe(mockSubscription);
60+
expect(userSession['subscriptions'].size).toEqual(1);
61+
expect(userSession['subscriptions'].get(mockSubscription.getId())).toEqual(mockSubscription);
62+
await userSession.subscribe(mockPSubscription);
63+
expect(userSession['subscriptions'].size).toEqual(2);
64+
await userSession.subscribe(mockPSubscription);
65+
expect(userSession['subscriptions'].size).toEqual(2);
66+
expect(userSession['subscriptions'].get(mockPSubscription.getId())).toEqual(mockPSubscription);
67+
});
68+
});
69+
70+
describe('unsubscribe', () => {
71+
it('should unsubscribe from a channel', async () => {
72+
expect(userSession['subscriptions'].size).toEqual(0);
73+
await userSession.subscribe(mockSubscription);
74+
expect(userSession['subscriptions'].size).toEqual(1);
75+
await userSession.subscribe(mockPSubscription);
76+
expect(userSession['subscriptions'].size).toEqual(2);
77+
await userSession.unsubscribe(mockSubscription);
78+
expect(userSession['subscriptions'].size).toEqual(1);
79+
await userSession.unsubscribe(mockSubscription);
80+
expect(userSession['subscriptions'].size).toEqual(1);
81+
await userSession.unsubscribe(mockPSubscription);
82+
expect(userSession['subscriptions'].size).toEqual(0);
83+
await userSession.unsubscribe(mockPSubscription);
84+
expect(userSession['subscriptions'].size).toEqual(0);
85+
});
86+
});
87+
88+
describe('handleMessage', () => {
89+
let handleSimpleSpy;
90+
let handlePatternSpy;
91+
92+
beforeEach(async () => {
93+
handleSimpleSpy = jest.spyOn(mockSubscription, 'pushMessage');
94+
handlePatternSpy = jest.spyOn(mockPSubscription, 'pushMessage');
95+
await userSession.subscribe(mockSubscription);
96+
await userSession.subscribe(mockPSubscription);
97+
});
98+
it('should handle message by particular subscription', async () => {
99+
userSession.handleMessage('id', mockMessage);
100+
expect(handleSimpleSpy).toHaveBeenCalledTimes(0);
101+
expect(handlePatternSpy).toHaveBeenCalledTimes(0);
102+
userSession.handleMessage(mockSubscription.getId(), mockMessage);
103+
expect(handleSimpleSpy).toHaveBeenCalledTimes(1);
104+
expect(handlePatternSpy).toHaveBeenCalledTimes(0);
105+
userSession.handleMessage(mockPSubscription.getId(), mockMessage);
106+
userSession.handleMessage(mockPSubscription.getId(), mockMessage);
107+
expect(handleSimpleSpy).toHaveBeenCalledTimes(1);
108+
expect(handlePatternSpy).toHaveBeenCalledTimes(2);
109+
// wait until debounce process
110+
await new Promise((res) => setTimeout(res, 200));
111+
});
112+
});
113+
114+
describe('handleDisconnect', () => {
115+
beforeEach(async () => {
116+
await userSession.subscribe(mockSubscription);
117+
await userSession.subscribe(mockPSubscription);
118+
});
119+
it('should handle message by particular subscription', async () => {
120+
userSession.handleDisconnect();
121+
expect(userSession['subscriptions'].size).toEqual(0);
122+
expect(nodeClient.disconnect).toHaveBeenCalled();
123+
});
124+
});
125+
});

redisinsight/api/src/modules/pub-sub/model/user-session.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ export class UserSession {
3232

3333
getRedisClient() { return this.redisClient; }
3434

35+
/**
36+
* Subscribe to a Pub/Sub channel and create Redis client connection if needed
37+
* Also add subscription to the subscriptions list
38+
* @param subscription
39+
*/
3540
async subscribe(subscription: ISubscription) {
3641
const client = await this.redisClient?.getClient();
3742

@@ -43,6 +48,11 @@ export class UserSession {
4348
}
4449
}
4550

51+
/**
52+
* Unsubscribe from a channel and remove from the list of subscriptions
53+
* Also destroy redis client when no subscriptions left
54+
* @param subscription
55+
*/
4656
async unsubscribe(subscription: ISubscription) {
4757
this.subscriptions.delete(subscription.getId());
4858

@@ -57,6 +67,13 @@ export class UserSession {
5767
}
5868
}
5969

70+
/**
71+
* Redirect message to a proper subscription from the list using id
72+
* ID is generated in this way: "p:channelName" where "p" - is a type of subscription
73+
* Subscription types: s - "subscribe", p - "psubscribe", ss - "ssubscribe"
74+
* @param id
75+
* @param message
76+
*/
6077
handleMessage(id: string, message: IMessage) {
6178
const subscription = this.subscriptions.get(id);
6279

@@ -65,6 +82,11 @@ export class UserSession {
6582
}
6683
}
6784

85+
/**
86+
* Handle socket disconnection
87+
* In this case we need to destroy entire session and cascade destroy other models inside
88+
* to be sure that there is no open connections left
89+
*/
6890
handleDisconnect() {
6991
this.userClient.getSocket().emit(
7092
PubSubServerEvents.Exception,
@@ -74,6 +96,9 @@ export class UserSession {
7496
this.destroy();
7597
}
7698

99+
/**
100+
* Reset subscriptions map and call and destroy Redis client
101+
*/
77102
destroy() {
78103
this.logger.debug(`Destroy ${this}`);
79104

0 commit comments

Comments
 (0)