Skip to content

Commit 6548e10

Browse files
authored
Use client pool for redis connections (#1247)
* feat(socket|redis): create a redis client pool for setex operations * feat: use client pool for leave-room and bump pool size * chore: remove refreshSlotsCache and error logs
1 parent 663052a commit 6548e10

File tree

7 files changed

+84
-66
lines changed

7 files changed

+84
-66
lines changed

packages/sdk-socket-server-next/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
"dotenv": "^16.3.1",
4848
"express": "^4.18.2",
4949
"express-rate-limit": "^7.1.5",
50+
"generic-pool": "^3.9.0",
5051
"helmet": "^5.1.1",
5152
"ioredis": "^5.3.2",
5253
"logform": "^2.6.0",

packages/sdk-socket-server-next/src/analytics-api.ts

Lines changed: 60 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import {
2525
incrementAnalyticsEvents,
2626
incrementRedisCacheOperation,
2727
} from './metrics';
28+
import genericPool from "generic-pool";
2829

2930
const logger = getLogger();
3031

@@ -54,8 +55,6 @@ if (redisNodes.length === 0) {
5455
process.exit(1);
5556
}
5657

57-
let redisClient: Cluster | Redis | undefined;
58-
5958
export const getRedisOptions = (
6059
isTls: boolean,
6160
password: string | undefined,
@@ -79,15 +78,6 @@ export const getRedisOptions = (
7978
const targetErrors = [/MOVED/, /READONLY/, /ETIMEDOUT/];
8079

8180
logger.error('Redis reconnect error:', error);
82-
if (error.message.includes('MOVED') && redisClient instanceof Cluster) {
83-
logger.error('Refreshing Redis Cluster slots cache');
84-
try {
85-
redisClient?.refreshSlotsCache();
86-
} catch (error) {
87-
logger.error('Error refreshing Redis Cluster slots cache:', error);
88-
}
89-
}
90-
9181
return targetErrors.some((targetError) =>
9282
targetError.test(error.message),
9383
);
@@ -99,83 +89,97 @@ export const getRedisOptions = (
9989
return options;
10090
};
10191

102-
export const getRedisClient = () => {
103-
if (!redisClient) {
104-
if (redisCluster) {
105-
logger.info('Connecting to Redis Cluster...');
92+
export const buildRedisClient = (usePipelining: boolean = true) => {
93+
let newRedisClient: Cluster | Redis | undefined;
10694

107-
const redisOptions = getRedisOptions(
108-
redisTLS,
109-
process.env.REDIS_PASSWORD,
110-
);
111-
const redisClusterOptions: ClusterOptions = {
112-
dnsLookup: (address, callback) => callback(null, address),
113-
scaleReads: 'slave',
114-
slotsRefreshTimeout: 5000,
115-
showFriendlyErrorStack: true,
116-
slotsRefreshInterval: 2000,
117-
clusterRetryStrategy: (times) => Math.min(times * 30, 1000),
118-
enableAutoPipelining: true,
119-
redisOptions,
120-
};
95+
if (redisCluster) {
96+
logger.info('Connecting to Redis Cluster...');
12197

122-
logger.debug(
123-
'Redis Cluster options:',
124-
JSON.stringify(redisClusterOptions, null, 2),
125-
);
98+
const redisOptions = getRedisOptions(
99+
redisTLS,
100+
process.env.REDIS_PASSWORD,
101+
);
102+
const redisClusterOptions: ClusterOptions = {
103+
dnsLookup: (address, callback) => callback(null, address),
104+
scaleReads: 'slave',
105+
slotsRefreshTimeout: 5000,
106+
showFriendlyErrorStack: true,
107+
slotsRefreshInterval: 2000,
108+
clusterRetryStrategy: (times) => Math.min(times * 30, 1000),
109+
enableAutoPipelining: usePipelining,
110+
redisOptions,
111+
};
126112

127-
redisClient = new Cluster(redisNodes, redisClusterOptions);
128-
} else {
129-
logger.info('Connecting to single Redis node');
130-
redisClient = new Redis(redisNodes[0]);
131-
}
113+
logger.debug(
114+
'Redis Cluster options:',
115+
JSON.stringify(redisClusterOptions, null, 2),
116+
);
117+
118+
newRedisClient = new Cluster(redisNodes, redisClusterOptions);
119+
} else {
120+
logger.info('Connecting to single Redis node');
121+
newRedisClient = new Redis(redisNodes[0]);
132122
}
133123

134-
redisClient.on('ready', () => {
124+
newRedisClient.on('ready', () => {
135125
logger.info('Redis ready');
136-
137-
if (redisClient instanceof Cluster) {
138-
logger.error('Refreshing Redis Cluster slots cache');
139-
try {
140-
redisClient?.refreshSlotsCache();
141-
} catch (error) {
142-
logger.error('Error refreshing Redis Cluster slots cache:', error);
143-
}
144-
}
145126
});
146127

147-
redisClient.on('error', (error) => {
128+
newRedisClient.on('error', (error) => {
148129
logger.error('Redis error:', error);
149130
});
150131

151-
redisClient.on('connect', () => {
132+
newRedisClient.on('connect', () => {
152133
logger.info('Connected to Redis Cluster successfully');
153134
});
154135

155-
redisClient.on('close', () => {
136+
newRedisClient.on('close', () => {
156137
logger.info('Disconnected from Redis Cluster');
157138
});
158139

159-
redisClient.on('reconnecting', () => {
140+
newRedisClient.on('reconnecting', () => {
160141
logger.info('Reconnecting to Redis Cluster');
161142
});
162143

163-
redisClient.on('end', () => {
144+
newRedisClient.on('end', () => {
164145
logger.info('Redis Cluster connection ended');
165146
});
166147

167-
redisClient.on('wait', () => {
148+
newRedisClient.on('wait', () => {
168149
logger.info('Redis Cluster waiting for connection');
169150
});
170151

171-
redisClient.on('select', (node) => {
152+
newRedisClient.on('select', (node) => {
172153
logger.info('Redis Cluster selected node:', node);
173154
});
174155

156+
return newRedisClient;
157+
}
158+
159+
const redisFactory = {
160+
create: () => {
161+
return Promise.resolve(buildRedisClient(false));
162+
},
163+
destroy: (client: Cluster | Redis) => {
164+
return Promise.resolve(client.disconnect());
165+
},
166+
};
167+
168+
let redisClient: Cluster | Redis | undefined;
169+
170+
export const getGlobalRedisClient = () => {
171+
if (!redisClient) {
172+
redisClient = buildRedisClient();
173+
}
174+
175175
return redisClient;
176176
};
177177

178-
export const pubClient = getRedisClient();
178+
export const pubClient = getGlobalRedisClient();
179+
export const pubClientPool = genericPool.createPool(redisFactory, {
180+
max: 35,
181+
min: 15,
182+
});
179183

180184
const app = express();
181185

packages/sdk-socket-server-next/src/protocol/handleChannelRejected.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Server, Socket } from 'socket.io';
2-
import { pubClient } from '../analytics-api';
2+
import { pubClient, pubClientPool } from '../analytics-api';
33
import { config } from '../config';
44
import { getLogger } from '../logger';
55
import { ChannelConfig } from './handleJoinChannel';
@@ -78,13 +78,17 @@ export const handleChannelRejected = async (
7878
},
7979
);
8080

81+
const client = await pubClientPool.acquire();
82+
8183
// Update redis channel config to inform dApp of rejection
82-
await pubClient.setex(
84+
await client.setex(
8385
channelConfigKey,
8486
config.rejectedChannelExpiry,
8587
JSON.stringify(channelConfig),
8688
);
8789

90+
await pubClientPool.release(client);
91+
8892
// Also broadcast to dapp if it is connected
8993
socket.broadcast.to(channelId).emit(`rejected-${channelId}`, { channelId });
9094

packages/sdk-socket-server-next/src/protocol/handleJoinChannel.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// protocol/handleJoinChannel.ts
22
import { Server, Socket } from 'socket.io';
33
import { validate } from 'uuid';
4-
import { pubClient } from '../analytics-api';
4+
import { pubClient, pubClientPool } from '../analytics-api';
55
import { MAX_CLIENTS_PER_ROOM, config, isDevelopment } from '../config';
66
import { getLogger } from '../logger';
77
import { rateLimiter } from '../rate-limiter';
@@ -182,11 +182,15 @@ export const handleJoinChannel = async ({
182182
JSON.stringify(channelConfig),
183183
);
184184

185-
await pubClient.setex(
185+
const client = await pubClientPool.acquire();
186+
187+
await client.setex(
186188
channelConfigKey,
187189
config.channelExpiry,
188190
JSON.stringify(channelConfig),
189191
); // 1 week expiration
192+
193+
await pubClientPool.release(client);
190194
}
191195
}
192196

packages/sdk-socket-server-next/src/redis-check.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import dotenv from 'dotenv';
33
// Dotenv must be loaded before importing local files
44
dotenv.config();
5-
import { getRedisClient } from './analytics-api';
5+
import { getGlobalRedisClient } from './analytics-api';
66

77
import { createLogger } from './logger';
88

@@ -34,7 +34,7 @@ if (redisNodes.length === 0) {
3434
async function testRedisOperations() {
3535
try {
3636
// Connect to Redis
37-
const cluster = getRedisClient();
37+
const cluster = getGlobalRedisClient();
3838
logger.info('Connected to Redis Cluster successfully');
3939

4040
// Set a key in Redis

packages/sdk-socket-server-next/src/socket-config.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { createAdapter } from '@socket.io/redis-adapter';
66

77
import { Server, Socket } from 'socket.io';
88
import { validate } from 'uuid';
9-
import { pubClient } from './analytics-api';
9+
import { pubClient, pubClientPool } from './analytics-api';
1010
import { getLogger } from './logger';
1111
import { ACKParams, handleAck } from './protocol/handleAck';
1212
import {
@@ -95,12 +95,14 @@ export const configureSocketServer = async (
9595
// Ignore invalid room IDs
9696
return;
9797
}
98+
99+
const client = await pubClientPool.acquire();
98100

99101
// Force keys into the same hash slot in Redis Cluster, using a hash tag (a substring enclosed in curly braces {})
100102
const channelOccupancyKey = `channel_occupancy:{${roomId}}`;
101103

102104
// Decrement the number of clients in the room
103-
const channelOccupancy = await pubClient.incrby(channelOccupancyKey, -1);
105+
const channelOccupancy = await client.incrby(channelOccupancyKey, -1);
104106

105107
logger.debug(
106108
`'leave-room' socket ${socketId} has left room ${roomId} --> channelOccupancy=${channelOccupancy}`,
@@ -112,14 +114,16 @@ export const configureSocketServer = async (
112114
const channelOccupancyKey = `channel_occupancy:{${roomId}}`;
113115

114116
// remove from redis
115-
await pubClient.del(channelOccupancyKey);
117+
await client.del(channelOccupancyKey);
116118
} else {
117119
logger.info(
118120
`'leave-room' Room ${roomId} kept alive with ${channelOccupancy} clients`,
119121
);
120122
// Inform the room of the disconnection
121123
io.to(roomId).emit(`clients_disconnected-${roomId}`);
122124
}
125+
126+
await pubClientPool.release(client);
123127
});
124128

125129
io.on('connection', (socket: Socket) => {

yarn.lock

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11613,6 +11613,7 @@ __metadata:
1161311613
eslint-plugin-prettier: ^3.4.0
1161411614
express: ^4.18.2
1161511615
express-rate-limit: ^7.1.5
11616+
generic-pool: ^3.9.0
1161611617
helmet: ^5.1.1
1161711618
ioredis: ^5.3.2
1161811619
jest: ^29.6.4
@@ -32806,7 +32807,7 @@ __metadata:
3280632807
languageName: node
3280732808
linkType: hard
3280832809

32809-
"generic-pool@npm:3.9.0":
32810+
"generic-pool@npm:3.9.0, generic-pool@npm:^3.9.0":
3281032811
version: 3.9.0
3281132812
resolution: "generic-pool@npm:3.9.0"
3281232813
checksum: 3d89e9b2018d2e3bbf44fec78c76b2b7d56d6a484237aa9daf6ff6eedb14b0899dadd703b5d810219baab2eb28e5128fb18b29e91e602deb2eccac14492d8ca8

0 commit comments

Comments
 (0)