Skip to content

Commit 95673f3

Browse files
committed
feat: add cluster support
1 parent a278293 commit 95673f3

File tree

4 files changed

+197
-29
lines changed

4 files changed

+197
-29
lines changed

src/app.test.ts

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,19 @@ import {
77
type RedisProxy,
88
} from "redis-monorepo/packages/test-utils/lib/redis-proxy.ts";
99
import { createApp } from "./app";
10+
import { makeId } from "./proxy-store";
11+
12+
const TARGET_HOST = "127.0.0.1"
1013

1114
describe("Redis Proxy API", () => {
1215
let app: any;
1316
let proxy: RedisProxy;
1417
let mockRedisServer: any;
18+
let targetPort: number;
1519

1620
beforeAll(async () => {
1721
const freePort = await getFreePortNumber();
18-
const targetPort = await getFreePortNumber();
22+
targetPort = await getFreePortNumber();
1923

2024
mockRedisServer = Bun.listen({
2125
hostname: "127.0.0.1",
@@ -85,7 +89,7 @@ describe("Redis Proxy API", () => {
8589
const testConfig = {
8690
listenPort: freePort,
8791
listenHost: "127.0.0.1",
88-
targetHost: "127.0.0.1",
92+
targetHost: TARGET_HOST,
8993
targetPort: targetPort,
9094
timeout: 30000,
9195
enableLogging: true,
@@ -113,7 +117,7 @@ describe("Redis Proxy API", () => {
113117
const res = await app.request("/stats");
114118
expect(res.status).toBe(200);
115119

116-
const stats = await res.json();
120+
const stats = (await res.json())[makeId(TARGET_HOST, targetPort)];
117121
expect(stats).toHaveProperty("activeConnections");
118122
expect(stats).toHaveProperty("totalConnections");
119123
expect(stats).toHaveProperty("connections");
@@ -125,10 +129,9 @@ describe("Redis Proxy API", () => {
125129
const res = await app.request("/connections");
126130
expect(res.status).toBe(200);
127131

128-
const result = await res.json();
129-
expect(result).toHaveProperty("connectionIds");
130-
expect(Array.isArray(result.connectionIds)).toBe(true);
131-
expect(result.connectionIds.length).toBe(0);
132+
const result = (await res.json())[makeId(TARGET_HOST, targetPort)];
133+
expect(result).toBeArray();
134+
expect(result.length).toBe(0);
132135
});
133136

134137
test("POST /send-to-client with invalid connection", async () => {
@@ -217,16 +220,16 @@ describe("Redis Proxy API", () => {
217220

218221
const statsRes = await app.request("/stats");
219222
expect(statsRes.status).toBe(200);
220-
const stats = await statsRes.json();
223+
const stats = (await statsRes.json())[makeId(TARGET_HOST, targetPort)];
221224
expect(stats.activeConnections).toBe(1);
222225
expect(stats.totalConnections).toBeGreaterThanOrEqual(1);
223226
expect(stats.connections.length).toBe(1);
224227

225228
const connectionsRes = await app.request("/connections");
226229
expect(connectionsRes.status).toBe(200);
227-
const connectionsResult = await connectionsRes.json();
228-
expect(connectionsResult.connectionIds.length).toBe(1);
229-
const connectionId = connectionsResult.connectionIds[0];
230+
const connectionsResult = (await connectionsRes.json())[makeId(TARGET_HOST, targetPort)];
231+
expect(connectionsResult.length).toBe(1);
232+
const connectionId = connectionsResult[0];
230233
expect(typeof connectionId).toBe("string");
231234
expect(connectionId.length).toBeGreaterThan(0);
232235

@@ -255,7 +258,7 @@ describe("Redis Proxy API", () => {
255258
await new Promise((resolve) => setTimeout(resolve, 100));
256259

257260
const finalStatsRes = await app.request("/stats");
258-
const finalStats = await finalStatsRes.json();
261+
const finalStats = (await finalStatsRes.json())[makeId(TARGET_HOST, targetPort)];
259262
expect(finalStats.activeConnections).toBe(0);
260263

261264
resolve();
@@ -289,15 +292,15 @@ describe("Redis Proxy API", () => {
289292
await new Promise((resolve) => setTimeout(resolve, 100));
290293

291294
const statsRes = await app.request("/stats");
292-
const stats = await statsRes.json();
295+
const stats = (await statsRes.json())[makeId(TARGET_HOST, targetPort)];
293296
expect(stats.activeConnections).toBe(1);
294297
expect(stats.totalConnections).toBeGreaterThanOrEqual(1);
295298
expect(stats.connections.length).toBe(1);
296299

297300
const connectionsRes = await app.request("/connections");
298-
const connectionsResult = await connectionsRes.json();
299-
expect(connectionsResult.connectionIds.length).toBe(1);
300-
const connectionId = connectionsResult.connectionIds[0];
301+
const connectionsResult = (await connectionsRes.json())[makeId(TARGET_HOST, targetPort)];
302+
expect(connectionsResult.length).toBe(1);
303+
const connectionId = connectionsResult[0];
301304

302305
const result = await client.sendCommand(["FOO"]);
303306
expect(result).toBe("BAR" as unknown as SimpleStringReply);
@@ -319,7 +322,7 @@ describe("Redis Proxy API", () => {
319322
await new Promise((resolve) => setTimeout(resolve, 100));
320323

321324
const finalStatsRes = await app.request("/stats");
322-
const finalStats = await finalStatsRes.json();
325+
const finalStats = (await finalStatsRes.json())[makeId(TARGET_HOST, targetPort)];
323326
expect(finalStats.activeConnections).toBe(0);
324327
});
325328
});

src/app.ts

Lines changed: 125 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { zValidator } from "@hono/zod-validator";
22
import { Hono } from "hono";
33
import { logger } from "hono/logger";
4-
import type { ProxyConfig } from "redis-monorepo/packages/test-utils/lib/redis-proxy.ts";
4+
import type { Interceptor, ProxyConfig, ProxyStats, SendResult } from "redis-monorepo/packages/test-utils/lib/redis-proxy.ts";
55
import { RedisProxy } from "redis-monorepo/packages/test-utils/lib/redis-proxy.ts";
66

77
import {
@@ -10,21 +10,116 @@ import {
1010
getConfig,
1111
paramSchema,
1212
parseBuffer,
13+
proxyConfigSchema,
1314
} from "./util.ts";
15+
import ProxyStore, { makeId } from "./proxy-store.ts";
16+
17+
const startNewProxy = (config: ProxyConfig) => {
18+
const proxy = new RedisProxy(config);
19+
proxy.start().catch(console.error);
20+
return proxy;
21+
}
22+
23+
interface Mapping {
24+
from: {
25+
host: string,
26+
port: number
27+
},
28+
to: {
29+
host: string,
30+
port: number
31+
}
32+
}
33+
34+
const addressMapping = new Map<string, Mapping>();
35+
36+
const setTransformers = (addressMapping: Map<string, Mapping>, proxyStore: ProxyStore) => {
37+
const interceptors = [];
38+
for(const mapping of addressMapping.values()) {
39+
interceptors.push(async (data: Buffer, next: Interceptor) => {
40+
const response = await next(data);
41+
// for example $9\r\n127.0.0.1\r\n:3000
42+
const from = `$${mapping.from.host.length}\r\n${mapping.from.host}\r\n:${mapping.from.port}`;
43+
if(response.includes(from)) {
44+
const to = `$${mapping.to.host.length}\r\n${mapping.to.host}\r\n:${mapping.to.port}`;
45+
return Buffer.from(response.toString().replaceAll(from, to));
46+
}
47+
return response;
48+
});
49+
}
50+
for(const proxy of proxyStore.proxies) {
51+
proxy.setInterceptors(interceptors);
52+
}
53+
}
54+
1455

1556
export function createApp(testConfig?: ProxyConfig & { readonly apiPort?: number }) {
1657
const config = testConfig || getConfig();
1758
const app = new Hono();
1859
app.use(logger());
1960

20-
const proxy = new RedisProxy(config);
21-
proxy.start().catch(console.error);
61+
const proxyStore = new ProxyStore();
62+
const nodeId = makeId(config.targetHost, config.targetPort)
63+
proxyStore.add(nodeId, startNewProxy(config));
64+
addressMapping.set(nodeId, {
65+
from: {
66+
host: config.targetHost,
67+
port: config.targetPort
68+
},
69+
to: {
70+
host: config.listenHost ?? '127.0.0.1',
71+
port: config.listenPort
72+
}
73+
});
74+
setTransformers(addressMapping, proxyStore);
75+
76+
app.post("/nodes",
77+
zValidator("json", proxyConfigSchema),
78+
async (c) => {
79+
const data = await c.req.json();
80+
const cfg: ProxyConfig = { ...config, ...data };
81+
const nodeId = makeId(cfg.targetHost, cfg.targetPort);
82+
proxyStore.add(nodeId, startNewProxy(cfg));
83+
addressMapping.set(nodeId, {
84+
from: {
85+
host: cfg.targetHost,
86+
port: cfg.targetPort
87+
},
88+
to: {
89+
host: cfg.listenHost ?? '127.0.0.1',
90+
port: cfg.listenPort
91+
}
92+
});
93+
setTransformers(addressMapping, proxyStore);
94+
return c.json({ success: true, cfg });
95+
});
96+
97+
app.delete("/nodes/:id", async (c) => {
98+
const nodeId = c.req.param("id");
99+
const success = await proxyStore.delete(nodeId);
100+
addressMapping.delete(nodeId);
101+
setTransformers(addressMapping, proxyStore);
102+
return c.json({ success });
103+
});
104+
105+
app.get("/nodes", (c) => {
106+
return c.json({ ids: proxyStore.nodeIds });
107+
});
108+
22109
app.get("/stats", (c) => {
23-
return c.json(proxy.getStats());
110+
const response = proxyStore.entries.reduce((acc, [id, proxy]) => {
111+
acc[id] = proxy.getStats();
112+
return acc;
113+
}, {} as Record<string, ProxyStats>);
114+
return c.json(response);
24115
});
25116

26117
app.get("/connections", (c) => {
27-
return c.json({ connectionIds: proxy.getActiveConnectionIds() });
118+
const response = proxyStore.entries.reduce((acc, [id, proxy]) => {
119+
acc[id] = proxy.getActiveConnectionIds();
120+
return acc;
121+
}, {} as Record<string, readonly string[]>)
122+
return c.json(response);
28123
});
29124

30125
app.post(
@@ -37,6 +132,14 @@ export function createApp(testConfig?: ProxyConfig & { readonly apiPort?: number
37132
const data = await c.req.text();
38133

39134
const buffer = parseBuffer(data, encoding);
135+
136+
const proxy = proxyStore.getProxyByConnectionId(connectionId);
137+
if (!proxy) return c.json({
138+
success: false,
139+
error: 'Connection not found',
140+
connectionId
141+
});
142+
40143
const result = proxy.sendToClient(connectionId, buffer);
41144
return c.json(result);
42145
},
@@ -47,24 +150,35 @@ export function createApp(testConfig?: ProxyConfig & { readonly apiPort?: number
47150
const data = await c.req.text();
48151

49152
const buffer = parseBuffer(data, encoding);
50-
const results = proxy.sendToClients(connectionIds, buffer);
153+
154+
const results: SendResult[] = [];
155+
for(const [proxy, matchingConIds] of proxyStore.getProxiesByConnectionIds(connectionIds)) {
156+
results.push(...proxy.sendToClients(matchingConIds, buffer));
157+
}
51158
return c.json({ results });
52159
});
53160

54161
app.post("/send-to-all-clients", zValidator("query", encodingSchema), async (c) => {
55162
const { encoding } = c.req.valid("query");
56163
const data = await c.req.text();
57-
58164
const buffer = parseBuffer(data, encoding);
59-
const results = proxy.sendToAllClients(buffer);
165+
const results: SendResult[] = [];
166+
for(const proxy of proxyStore.proxies) {
167+
results.push(...proxy.sendToAllClients(buffer));
168+
}
60169
return c.json({ results });
61170
});
62171

63-
app.delete("/connections/:id", (c) => {
64-
const connectionId = c.req.param("id");
172+
app.delete("/connections/:id", (c) => {
173+
const connectionId = c.req.param("id");
174+
const proxy = proxyStore.getProxyByConnectionId(connectionId);
175+
if (!proxy) return c.json({
176+
success: false,
177+
connectionId
178+
});
65179
const success = proxy.closeConnection(connectionId);
66180
return c.json({ success, connectionId });
67181
});
68182

69-
return { app, proxy, config };
183+
return { app, proxy: proxyStore.proxies[0]!, config };
70184
}

src/proxy-store.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import type { RedisProxy } from "redis-monorepo/packages/test-utils/lib/redis-proxy";
2+
3+
export const makeId = (host: string, port: number) => `${host}:${port}`;
4+
5+
export default class ProxyStore {
6+
#proxies = new Map<string, RedisProxy>();
7+
8+
add(id: string, proxy: RedisProxy) {
9+
this.#proxies.set(id, proxy);
10+
}
11+
12+
async delete(id: string) {
13+
const proxy = this.#proxies.get(id);
14+
if (!proxy) return false;
15+
await proxy.stop();
16+
this.#proxies.delete(id);
17+
}
18+
19+
get nodeIds() {
20+
return Array.from(this.#proxies.keys());
21+
}
22+
23+
get proxies() {
24+
return Array.from(this.#proxies.values());
25+
}
26+
27+
get entries() {
28+
return Array.from(this.#proxies.entries());
29+
}
30+
31+
getProxyByConnectionId(connectionId: string) {
32+
for (const proxy of this.#proxies.values()) {
33+
if (proxy.getActiveConnectionIds().includes(connectionId)) {
34+
return proxy;
35+
}
36+
}
37+
}
38+
39+
getProxiesByConnectionIds(connectionIds: string[]) {
40+
const result: [RedisProxy, string[]][] = [];
41+
for (const proxy of this.#proxies.values()) {
42+
const activeIds = proxy.getActiveConnectionIds();
43+
const matchingIds = connectionIds.filter((id) => activeIds.includes(id));
44+
if (matchingIds.length > 0) {
45+
result.push([proxy, matchingIds]);
46+
}
47+
}
48+
return result;
49+
}
50+
51+
}

src/util.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ export const DEFAULT_LISTEN_HOST = "127.0.0.1";
3333
export const DEFAULT_ENABLE_LOGGING = false;
3434
export const DEFAULT_API_PORT = 3000;
3535

36-
const proxyConfigSchema = z.object({
36+
export const proxyConfigSchema = z.object({
3737
listenPort: z.coerce.number().default(DEFAULT_LISTEN_PORT),
3838
listenHost: z.string().optional().default(DEFAULT_LISTEN_HOST),
3939
targetHost: z.string(),

0 commit comments

Comments
 (0)