From 7471346330d405d22f2c823f86de9c91200a8adc Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Tue, 21 Oct 2025 19:19:47 +0300 Subject: [PATCH 01/14] Add example cluster config --- examples/cluster/docker-compose.yml | 76 +++++++++++++++++++++++++++++ examples/cluster/readme.md | 61 +++++++++++++++++++++++ 2 files changed, 137 insertions(+) create mode 100644 examples/cluster/docker-compose.yml create mode 100644 examples/cluster/readme.md diff --git a/examples/cluster/docker-compose.yml b/examples/cluster/docker-compose.yml new file mode 100644 index 0000000..4c3e702 --- /dev/null +++ b/examples/cluster/docker-compose.yml @@ -0,0 +1,76 @@ +networks: + redis-net: + driver: bridge + ipam: + config: + - subnet: 172.25.0.0/24 + gateway: 172.25.0.1 + +services: + redis-cluster: + image: redislabs/client-libs-test:8.2 + container_name: redis-cluster + command: ["--bind", "0.0.0.0", "--cluster-announce-ip", "172.25.0.10"] + environment: + REDIS_CLUSTER: "yes" + ports: + - "3000:3000" + - "3001:3001" + - "3002:3002" + networks: + redis-net: + ipv4_address: 172.25.0.10 + # Optional healthcheck (tweak as needed) + healthcheck: + test: ["CMD", "redis-cli", "-p", "3000", "PING"] + interval: 10s + timeout: 3s + retries: 5 + + resp-proxy: + image: redislabs/client-resp-proxy + container_name: resp-proxy + environment: + LISTEN_HOST: "0.0.0.0" + LISTEN_PORT: "6379" + TARGET_HOST: "172.25.0.10" + TARGET_PORT: "3000" + API_PORT: "4000" + ENABLE_LOGGING: true + ports: + - "6379:6379" + - "6479:6479" + - "6579:6579" + - "4000:4000" + depends_on: + - redis-cluster + networks: + redis-net: + ipv4_address: 172.25.0.11 + # Optional healthcheck to verify proxy API port + healthcheck: + test: ["CMD", "sh", "-c", "wget -qO- http://localhost:4000/stats || exit 1"] + interval: 10s + timeout: 3s + retries: 5 + + resp-proxy-init: + image: curlimages/curl:8.9.1 + container_name: resp-proxy-init + depends_on: + - resp-proxy + networks: + - redis-net + command: > + sh -c ' + set -e + echo "Waiting for proxy API..."; + until curl -s http://resp-proxy:4000/stats >/dev/null 2>&1; do + sleep 1; + done; + echo "Registering node port 3001 -> listen 6479"; + curl -s -X POST http://resp-proxy:4000/nodes -H "Content-Type: application/json" -d "{\"targetHost\":\"172.25.0.10\",\"targetPort\":3001,\"listenPort\":6479}"; + echo "Registering node port 3002 -> listen 6579"; + curl -s -X POST http://resp-proxy:4000/nodes -H "Content-Type: application/json" -d "{\"targetHost\":\"172.25.0.10\",\"targetPort\":3002,\"listenPort\":6579}"; + echo "Done." + ' diff --git a/examples/cluster/readme.md b/examples/cluster/readme.md new file mode 100644 index 0000000..65cdd1e --- /dev/null +++ b/examples/cluster/readme.md @@ -0,0 +1,61 @@ +# Cluster Example + +Short example demonstrating how to use the Proxy in front of a Redis Cluster setup. + +Run the setup +```bash +docker compose up +``` +This will start a 3 node Redis Cluster (ports 3000, 3001, 3002) and a Proxy instance (ports 6379, 6479 and 6579 for proxying and 4000 for the REST API). + +Open a separate terminal + +```bash +redis-cli cluster slots +``` + +Response should be similar to the following, where the ports are the proxy listen ports ( 6379, 6479 and 6579 ): +``` +1) 1) (integer) 0 + 2) (integer) 5460 + 3) 1) "0.0.0.0" + 2) (integer) 6379 + 3) "7183e22bdcbae8338909fe5282a88ba62d88bdd4" + 4) (empty array) +2) 1) (integer) 5461 + 2) (integer) 10922 + 3) 1) "0.0.0.0" + 2) (integer) 6479 + 3) "a6a3e1859b33451c0d56569dc10a5aa6e32eef32" + 4) (empty array) +3) 1) (integer) 10923 + 2) (integer) 16383 + 3) 1) "0.0.0.0" + 2) (integer) 6579 + 3) "8ee7f4ab67b3da89575cd6f912c645f52e6b962b" + 4) (empty array) +``` + +```bash +redis-cli subscribe foo +``` + +Open another terminal + +Encode your messagee +```bash +echo '>3\r\n$7\r\nmessage\r\n$3\r\nfoo\r\n$4\r\neeee\r' | base64 +PjMNCiQ3DQptZXNzYWdlDQokMw0KZm9vDQokNA0KZWVlZQ0K +``` + +Push the message to all connected clients +```bash +curl -X POST "http://localhost:4000/send-to-all-clients?encoding=base64" -d "PjMNCiQ3DQptZXNzYWdlDQokMw0KZm9vDQokNA0KZWVlZQ0K" +``` + +You should see the following message in the `redis-cli subscribe` terminal: +``` +1) "message" +2) "foo" +3) "eeee" +``` From 5c27ddbacf548399883f0298c40b982fcd9b5891 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Wed, 22 Oct 2025 11:59:29 +0300 Subject: [PATCH 02/14] refactor according to proxy changes --- package.json | 2 +- src/app.ts | 51 +++++++++++++++++++++++++++++---------------------- 2 files changed, 30 insertions(+), 23 deletions(-) diff --git a/package.json b/package.json index 867d253..11b8639 100644 --- a/package.json +++ b/package.json @@ -22,7 +22,7 @@ "dependencies": { "@hono/zod-validator": "^0.7.2", "hono": "^4.8.5", - "redis-monorepo": "github:redis/node-redis#master", + "redis-monorepo": "file:/Users/nikolay.karadzhov/Projects/node-redis", "zod": "^4.0.8" }, "devDependencies": { diff --git a/src/app.ts b/src/app.ts index 7dad8b8..43393bd 100644 --- a/src/app.ts +++ b/src/app.ts @@ -1,13 +1,15 @@ import { zValidator } from "@hono/zod-validator"; import { Hono } from "hono"; import { logger } from "hono/logger"; -import type { - Interceptor, - ProxyConfig, - ProxyStats, - SendResult, -} from "redis-monorepo/packages/test-utils/lib/redis-proxy.ts"; -import { RedisProxy } from "redis-monorepo/packages/test-utils/lib/redis-proxy.ts"; +import { + RedisProxy, + type InterceptorDescription, + type InterceptorState, + type Next, + type ProxyConfig, + type ProxyStats, + type SendResult, +} from "redis-monorepo/packages/test-utils/lib/proxy/redis-proxy.ts"; import ProxyStore, { makeId } from "./proxy-store.ts"; import { connectionIdsQuerySchema, @@ -37,22 +39,27 @@ interface Mapping { const addressMapping = new Map(); -const setTransformers = (addressMapping: Map, proxyStore: ProxyStore) => { - const interceptors = []; +const setClusterOverwriteInterceptors = (addressMapping: Map, proxyStore: ProxyStore) => { + const interceptors: InterceptorDescription[] = []; for (const mapping of addressMapping.values()) { - interceptors.push(async (data: Buffer, next: Interceptor) => { - const response = await next(data); - // for example $9\r\n127.0.0.1\r\n:3000 - const from = `$${mapping.from.host.length}\r\n${mapping.from.host}\r\n:${mapping.from.port}`; - if (response.includes(from)) { - const to = `$${mapping.to.host.length}\r\n${mapping.to.host}\r\n:${mapping.to.port}`; - return Buffer.from(response.toString().replaceAll(from, to)); + interceptors.push({ + name: `ip-replacer-${mapping.to.port}`, + fn: async (data: Buffer, next: Next, state: InterceptorState) => { + state.invokeCount++; + const response = await next(data); + // for example $9\r\n127.0.0.1\r\n:3000 + const from = `$${mapping.from.host.length}\r\n${mapping.from.host}\r\n:${mapping.from.port}`; + if (response.includes(from)) { + state.matchCount++; + const to = `$${mapping.to.host.length}\r\n${mapping.to.host}\r\n:${mapping.to.port}`; + return Buffer.from(response.toString().replaceAll(from, to)); + } + return response; } - return response; }); - } + } for (const proxy of proxyStore.proxies) { - proxy.setInterceptors(interceptors); + proxy.setGlobalInterceptors(interceptors); } }; @@ -74,7 +81,7 @@ export function createApp(testConfig?: ProxyConfig & { readonly apiPort?: number port: config.listenPort, }, }); - setTransformers(addressMapping, proxyStore); + setClusterOverwriteInterceptors(addressMapping, proxyStore); app.post("/nodes", zValidator("json", proxyConfigSchema), async (c) => { const data = await c.req.json(); @@ -91,7 +98,7 @@ export function createApp(testConfig?: ProxyConfig & { readonly apiPort?: number port: cfg.listenPort, }, }); - setTransformers(addressMapping, proxyStore); + setClusterOverwriteInterceptors(addressMapping, proxyStore); return c.json({ success: true, cfg }); }); @@ -99,7 +106,7 @@ export function createApp(testConfig?: ProxyConfig & { readonly apiPort?: number const nodeId = c.req.param("id"); const success = await proxyStore.delete(nodeId); addressMapping.delete(nodeId); - setTransformers(addressMapping, proxyStore); + setClusterOverwriteInterceptors(addressMapping, proxyStore); return c.json({ success }); }); From 8571e9bec43c910092973da9d713d3655b19695c Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Mon, 27 Oct 2025 15:47:01 +0200 Subject: [PATCH 03/14] add support for hardcoded scenarios --- src/app.test.ts | 2 +- src/app.ts | 47 ++++++++-- src/scenarios.test.ts | 212 ++++++++++++++++++++++++++++++++++++++++++ src/util.ts | 5 + 4 files changed, 257 insertions(+), 9 deletions(-) create mode 100644 src/scenarios.test.ts diff --git a/src/app.test.ts b/src/app.test.ts index 7624b9f..483aac5 100644 --- a/src/app.test.ts +++ b/src/app.test.ts @@ -5,7 +5,7 @@ import { createClient } from "redis"; import { getFreePortNumber, type RedisProxy, -} from "redis-monorepo/packages/test-utils/lib/redis-proxy.ts"; +} from "redis-monorepo/packages/test-utils/lib/proxy/redis-proxy.ts"; import { createApp } from "./app"; import { makeId } from "./proxy-store"; diff --git a/src/app.ts b/src/app.ts index 43393bd..a6275e5 100644 --- a/src/app.ts +++ b/src/app.ts @@ -2,7 +2,7 @@ import { zValidator } from "@hono/zod-validator"; import { Hono } from "hono"; import { logger } from "hono/logger"; import { - RedisProxy, + RedisProxy, type InterceptorDescription, type InterceptorState, type Next, @@ -18,6 +18,7 @@ import { paramSchema, parseBuffer, proxyConfigSchema, + scenarioSchema, } from "./util.ts"; const startNewProxy = (config: ProxyConfig) => { @@ -39,25 +40,28 @@ interface Mapping { const addressMapping = new Map(); -const setClusterOverwriteInterceptors = (addressMapping: Map, proxyStore: ProxyStore) => { +const setClusterOverwriteInterceptors = ( + addressMapping: Map, + proxyStore: ProxyStore, +) => { const interceptors: InterceptorDescription[] = []; for (const mapping of addressMapping.values()) { interceptors.push({ - name: `ip-replacer-${mapping.to.port}`, - fn: async (data: Buffer, next: Next, state: InterceptorState) => { - state.invokeCount++; + name: `ip-replacer-${mapping.to.port}`, + fn: async (data: Buffer, next: Next, state: InterceptorState) => { + state.invokeCount++; const response = await next(data); // for example $9\r\n127.0.0.1\r\n:3000 const from = `$${mapping.from.host.length}\r\n${mapping.from.host}\r\n:${mapping.from.port}`; if (response.includes(from)) { - state.matchCount++; + state.matchCount++; const to = `$${mapping.to.host.length}\r\n${mapping.to.host}\r\n:${mapping.to.port}`; return Buffer.from(response.toString().replaceAll(from, to)); } return response; - } + }, }); - } + } for (const proxy of proxyStore.proxies) { proxy.setGlobalInterceptors(interceptors); } @@ -196,5 +200,32 @@ export function createApp(testConfig?: ProxyConfig & { readonly apiPort?: number return c.json({ success, connectionId }); }); + app.post("/scenarios", zValidator("json", scenarioSchema), async (c) => { + const { responses, encoding } = c.req.valid("json"); + + const responsesBuffers = responses.map((response) => parseBuffer(response, encoding)); + let currentIndex = 0; + + const scenarioInterceptor: InterceptorDescription = { + name: "scenario-interceptor", + fn: async (data: Buffer, next: Next, state: InterceptorState): Promise => { + state.invokeCount++; + if (currentIndex < responsesBuffers.length) { + state.matchCount++; + const response = responsesBuffers[currentIndex]!; + currentIndex++; + return response; + } + return await next(data); + }, + }; + + for (const proxy of proxyStore.proxies) { + proxy.setGlobalInterceptors([scenarioInterceptor]); + } + + return c.json({ success: true, totalResponses: responses.length }); + }); + return { app, proxy: proxyStore.proxies[0], config }; } diff --git a/src/scenarios.test.ts b/src/scenarios.test.ts new file mode 100644 index 0000000..c11d7ef --- /dev/null +++ b/src/scenarios.test.ts @@ -0,0 +1,212 @@ +import { afterAll, beforeAll, describe, expect, test } from "bun:test"; +import type { SimpleStringReply } from "@redis/client/dist/lib/RESP/types"; +import { createClient } from "redis"; + +import { getFreePortNumber } from "redis-monorepo/packages/test-utils/lib/proxy/redis-proxy.ts"; +import { createApp } from "./app"; + +describe("POST /scenarios", () => { + let app: any; + let proxy: any; + let mockRedisServer: any; + let targetPort: number; + + beforeAll(async () => { + const freePort = await getFreePortNumber(); + targetPort = await getFreePortNumber(); + + mockRedisServer = Bun.listen({ + hostname: "127.0.0.1", + port: targetPort, + socket: { + data(socket, data) { + const command = data.toString(); + console.log("Mock Redis received:", command.replace(/\r\n/g, "\\r\\n")); + + const commandCount = (command.match(/\*\d+\r\n/g) || []).length; + console.log("Command count:", commandCount); + + let responses = ""; + + if (command.includes("HELLO")) { + responses += + "*7\r\n$6\r\nserver\r\n$5\r\nredis\r\n$7\r\nversion\r\n$5\r\n7.2.0\r\n$5\r\nproto\r\n:3\r\n$2\r\nid\r\n:1\r\n"; + } + + if (command.includes("CLIENT")) { + const clientCommands = (command.match(/\*4\r\n\$6\r\nCLIENT\r\n/g) || []).length; + for (let i = 0; i < clientCommands; i++) { + responses += "+OK\r\n"; + } + } + + if (command.includes("AUTH") && !command.includes("CLIENT")) { + responses += "+OK\r\n"; + } + if (command.includes("PING") && !command.includes("CLIENT")) { + responses += "+PONG\r\n"; + } + if (command.includes("FOO")) { + responses += "+BAR\r\n"; + } + if (command.includes("SELECT") && !command.includes("CLIENT")) { + responses += "+OK\r\n"; + } + if (command.includes("INFO") && !command.includes("CLIENT")) { + responses += "$23\r\n# Server\r\nredis_version:7.2.0\r\n"; + } + + if (!responses) { + for (let i = 0; i < commandCount; i++) { + responses += "+OK\r\n"; + } + } + + console.log("Sending responses:", responses.replace(/\r\n/g, "\\r\\n")); + socket.write(responses); + }, + open() { + console.log("Mock Redis TCP connection opened"); + }, + close() { + console.log("Mock Redis TCP connection closed"); + }, + error(error) { + console.error("Mock Redis TCP error:", error); + }, + }, + }); + + const testConfig = { + listenPort: freePort, + listenHost: "127.0.0.1", + targetHost: "127.0.0.1", + targetPort: targetPort, + timeout: 30000, + enableLogging: true, + apiPort: 3001, + }; + + const appInstance = createApp(testConfig); + app = appInstance.app; + proxy = appInstance.proxy; + + await new Promise((resolve) => setTimeout(resolve, 200)); + }); + + afterAll(async () => { + if (proxy) { + await proxy.stop(); + } + if (mockRedisServer) { + mockRedisServer?.stop(true); + } + }); + + test("POST /scenarios with invalid data", async () => { + const res = await app.request("/scenarios", { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({}), + }); + + expect(res.status).toBe(400); + }); + + test("POST /scenarios with empty responses", async () => { + const res = await app.request("/scenarios", { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ responses: [] }), + }); + + expect(res.status).toBe(400); + }); + + test("POST /scenarios with raw encoding", async () => { + const res = await app.request("/scenarios", { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ + responses: ["+FIRST\r\n", "+SECOND\r\n", "+THIRD\r\n"], + encoding: "raw", + }), + }); + + expect(res.status).toBe(200); + const result = await res.json(); + expect(result.success).toBe(true); + expect(result.totalResponses).toBe(3); + }); + + test("POST /scenarios with base64 encoding", async () => { + const response1 = Buffer.from("+RESPONSE1\r\n").toString("base64"); + const response2 = Buffer.from("+RESPONSE2\r\n").toString("base64"); + + const res = await app.request("/scenarios", { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ + responses: [response1, response2], + encoding: "base64", + }), + }); + + expect(res.status).toBe(200); + const result = await res.json(); + expect(result.success).toBe(true); + expect(result.totalResponses).toBe(2); + }); + + test("Scenario interceptor returns responses sequentially then passes through", async () => { + const client = createClient({ + socket: { + host: "127.0.0.1", + port: proxy.config.listenPort, + }, + }); + + await client.connect(); + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Set up scenario with 2 responses + const scenarioRes = await app.request("/scenarios", { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ + responses: ["+SCENARIO1\r\n", "+SCENARIO2\r\n"], + encoding: "raw", + }), + }); + + expect(scenarioRes.status).toBe(200); + + // First command should get first scenario response + const result1 = await client.sendCommand(["PING"]); + expect(result1).toBe("SCENARIO1" as unknown as SimpleStringReply); + + // Second command should get second scenario response + const result2 = await client.sendCommand(["PING"]); + expect(result2).toBe("SCENARIO2" as unknown as SimpleStringReply); + + // Third command should pass through to real server + const result3 = await client.sendCommand(["PING"]); + expect(result3).toBe("PONG" as unknown as SimpleStringReply); + + // Fourth command should also pass through + const result4 = await client.sendCommand(["FOO"]); + expect(result4).toBe("BAR" as unknown as SimpleStringReply); + + await client.disconnect(); + }); +}); diff --git a/src/util.ts b/src/util.ts index 0a05b98..3b04e51 100644 --- a/src/util.ts +++ b/src/util.ts @@ -17,6 +17,11 @@ export const connectionIdsQuerySchema = z.object({ encoding: z.enum(["base64", "raw"]).default("base64"), }); +export const scenarioSchema = z.object({ + responses: z.array(z.string()).min(1, "At least one response is required"), + encoding: z.enum(["base64", "raw"]).default("base64"), +}); + export function parseBuffer(data: string, encoding: "base64" | "raw"): Buffer { switch (encoding) { case "base64": From 3c22a676cadd4f0d35f1dc26a3b2e58c73335190 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Mon, 27 Oct 2025 17:01:02 +0200 Subject: [PATCH 04/14] expose interceptors api --- src/app.test.ts | 66 +------------ src/app.ts | 32 ++++++- src/interceptors.test.ts | 199 +++++++++++++++++++++++++++++++++++++++ src/mock-server.ts | 66 +++++++++++++ src/scenarios.test.ts | 63 +------------ src/util.ts | 9 +- 6 files changed, 306 insertions(+), 129 deletions(-) create mode 100644 src/interceptors.test.ts create mode 100644 src/mock-server.ts diff --git a/src/app.test.ts b/src/app.test.ts index 483aac5..0af0564 100644 --- a/src/app.test.ts +++ b/src/app.test.ts @@ -7,6 +7,7 @@ import { type RedisProxy, } from "redis-monorepo/packages/test-utils/lib/proxy/redis-proxy.ts"; import { createApp } from "./app"; +import createMockRedisServer from "./mock-server"; import { makeId } from "./proxy-store"; const TARGET_HOST = "127.0.0.1"; @@ -21,70 +22,7 @@ describe("Redis Proxy API", () => { const freePort = await getFreePortNumber(); targetPort = await getFreePortNumber(); - mockRedisServer = Bun.listen({ - hostname: "127.0.0.1", - port: targetPort, - socket: { - data(socket, data) { - const command = data.toString(); - console.log("Mock Redis received:", command.replace(/\r\n/g, "\\r\\n")); - - // Count how many Redis commands are in this data packet - // Each command starts with * followed by number of arguments - const commandCount = (command.match(/\*\d+\r\n/g) || []).length; - console.log("Command count:", commandCount); - - let responses = ""; - - if (command.includes("HELLO")) { - responses += - "*7\r\n$6\r\nserver\r\n$5\r\nredis\r\n$7\r\nversion\r\n$5\r\n7.2.0\r\n$5\r\nproto\r\n:3\r\n$2\r\nid\r\n:1\r\n"; - } - - if (command.includes("CLIENT")) { - const clientCommands = (command.match(/\*4\r\n\$6\r\nCLIENT\r\n/g) || []).length; - for (let i = 0; i < clientCommands; i++) { - responses += "+OK\r\n"; - } - } - - if (command.includes("AUTH") && !command.includes("CLIENT")) { - responses += "+OK\r\n"; - } - if (command.includes("PING") && !command.includes("CLIENT")) { - responses += "+PONG\r\n"; - } - if (command.includes("FOO")) { - responses += "+BAR\r\n"; - } - if (command.includes("SELECT") && !command.includes("CLIENT")) { - responses += "+OK\r\n"; - } - if (command.includes("INFO") && !command.includes("CLIENT")) { - responses += "$23\r\n# Server\r\nredis_version:7.2.0\r\n"; - } - - // If no specific responses were generated, send OK for each command - if (!responses) { - for (let i = 0; i < commandCount; i++) { - responses += "+OK\r\n"; - } - } - - console.log("Sending responses:", responses.replace(/\r\n/g, "\\r\\n")); - socket.write(responses); - }, - open() { - console.log("Mock Redis TCP connection opened"); - }, - close() { - console.log("Mock Redis TCP connection closed"); - }, - error(error) { - console.error("Mock Redis TCP error:", error); - }, - }, - }); + mockRedisServer = createMockRedisServer(targetPort); const testConfig = { listenPort: freePort, diff --git a/src/app.ts b/src/app.ts index a6275e5..fe0de64 100644 --- a/src/app.ts +++ b/src/app.ts @@ -2,12 +2,12 @@ import { zValidator } from "@hono/zod-validator"; import { Hono } from "hono"; import { logger } from "hono/logger"; import { - RedisProxy, type InterceptorDescription, type InterceptorState, type Next, type ProxyConfig, type ProxyStats, + RedisProxy, type SendResult, } from "redis-monorepo/packages/test-utils/lib/proxy/redis-proxy.ts"; import ProxyStore, { makeId } from "./proxy-store.ts"; @@ -15,6 +15,7 @@ import { connectionIdsQuerySchema, encodingSchema, getConfig, + interceptorSchema, paramSchema, parseBuffer, proxyConfigSchema, @@ -212,7 +213,7 @@ export function createApp(testConfig?: ProxyConfig & { readonly apiPort?: number state.invokeCount++; if (currentIndex < responsesBuffers.length) { state.matchCount++; - const response = responsesBuffers[currentIndex]!; + const response = responsesBuffers[currentIndex] as Buffer; currentIndex++; return response; } @@ -221,11 +222,36 @@ export function createApp(testConfig?: ProxyConfig & { readonly apiPort?: number }; for (const proxy of proxyStore.proxies) { - proxy.setGlobalInterceptors([scenarioInterceptor]); + proxy.addGlobalInterceptor(scenarioInterceptor); } return c.json({ success: true, totalResponses: responses.length }); }); + app.post("/interceptors", zValidator("json", interceptorSchema), async (c) => { + const { name, match, response, encoding } = c.req.valid("json"); + + const responseBuffer = parseBuffer(response, encoding); + const matchBuffer = parseBuffer(match, encoding); + + const interceptor: InterceptorDescription = { + name, + fn: async (data: Buffer, next: Next, state: InterceptorState): Promise => { + state.invokeCount++; + if (data.equals(matchBuffer)) { + state.matchCount++; + return responseBuffer; + } + return next(data); + }, + }; + + for (const proxy of proxyStore.proxies) { + proxy.addGlobalInterceptor(interceptor); + } + + return c.json({ success: true, name }); + }); + return { app, proxy: proxyStore.proxies[0], config }; } diff --git a/src/interceptors.test.ts b/src/interceptors.test.ts new file mode 100644 index 0000000..285da44 --- /dev/null +++ b/src/interceptors.test.ts @@ -0,0 +1,199 @@ +import { afterAll, beforeAll, beforeEach, describe, expect, test } from "bun:test"; +import type { SimpleStringReply } from "@redis/client/dist/lib/RESP/types"; +import { createClient } from "redis"; + +import { getFreePortNumber } from "redis-monorepo/packages/test-utils/lib/proxy/redis-proxy.ts"; +import { createApp } from "./app"; +import createMockRedisServer from "./mock-server"; + +describe("POST /interceptors", () => { + let app: any; + let proxy: any; + let mockRedisServer: any; + let targetPort: number; + + beforeAll(async () => { + const freePort = await getFreePortNumber(); + targetPort = await getFreePortNumber(); + + mockRedisServer = createMockRedisServer(targetPort); + + const testConfig = { + listenPort: freePort, + listenHost: "127.0.0.1", + targetHost: "127.0.0.1", + targetPort: targetPort, + timeout: 30000, + enableLogging: true, + apiPort: 3001, + }; + + const appInstance = createApp(testConfig); + app = appInstance.app; + proxy = appInstance.proxy; + + await new Promise((resolve) => setTimeout(resolve, 200)); + }); + + afterAll(async () => { + if (proxy) { + await proxy.stop(); + } + if (mockRedisServer) { + mockRedisServer?.stop(true); + } + }); + + beforeEach(() => { + proxy.setGlobalInterceptors([]); + }); + + test("Interceptor matches command and returns custom response", async () => { + const client = createClient({ + socket: { + host: "127.0.0.1", + port: proxy.config.listenPort, + }, + }); + + await client.connect(); + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Set up interceptor for PING command + const pingMatch = Buffer.from("*1\r\n$4\r\nPING\r\n").toString("base64"); + const pingResponse = Buffer.from("+INTERCEPTED_PING\r\n").toString("base64"); + + const interceptorRes = await app.request("/interceptors", { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ + name: "ping-interceptor", + match: pingMatch, + response: pingResponse, + encoding: "base64", + }), + }); + + expect(interceptorRes.status).toBe(200); + + // Command should be intercepted + const result = await client.sendCommand(["PING"]); + expect(result).toBe("INTERCEPTED_PING" as unknown as SimpleStringReply); + + // Non-matching command should pass through + const fooResult = await client.sendCommand(["FOO"]); + expect(fooResult).toBe("BAR" as unknown as SimpleStringReply); + + await client.disconnect(); + }); + + test("Multiple interceptors can be added and work independently", async () => { + const client = createClient({ + socket: { + host: "127.0.0.1", + port: proxy.config.listenPort, + }, + }); + + await client.connect(); + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Add first interceptor for GET command + const getMatch = Buffer.from("*2\r\n$3\r\nGET\r\n$7\r\nTESTKEY\r\n").toString("base64"); + const getResponse = Buffer.from("$11\r\nINTERCEPTED\r\n").toString("base64"); + + const interceptor1Res = await app.request("/interceptors", { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ + name: "get-interceptor", + match: getMatch, + response: getResponse, + encoding: "base64", + }), + }); + + expect(interceptor1Res.status).toBe(200); + + // Add second interceptor for SET command + const setMatch = Buffer.from( + "*3\r\n$3\r\nSET\r\n$7\r\nTESTKEY\r\n$9\r\nTESTVALUE\r\n", + ).toString("base64"); + const setResponse = Buffer.from("+INTERCEPTED_SET\r\n").toString("base64"); + + const interceptor2Res = await app.request("/interceptors", { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ + name: "set-interceptor", + match: setMatch, + response: setResponse, + encoding: "base64", + }), + }); + + expect(interceptor2Res.status).toBe(200); + + // GET command should be intercepted by first interceptor + const getResult = await client.get("TESTKEY"); + expect(getResult).toBe("INTERCEPTED"); + + // SET command should be intercepted by second interceptor + const setResult = await client.set("TESTKEY", "TESTVALUE"); + expect(setResult).toBe("INTERCEPTED_SET"); + + // Non-matching commands should still pass through + const pingResult = await client.sendCommand(["PING"]); + expect(pingResult).toBe("PONG" as unknown as SimpleStringReply); + + await client.disconnect(); + }); + + test("Interceptor does not affect pass-through commands", async () => { + const client = createClient({ + socket: { + host: "127.0.0.1", + port: proxy.config.listenPort, + }, + }); + + await client.connect(); + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Add interceptor for specific command + const match = Buffer.from("*1\r\n$5\r\nINTER\r\n").toString("base64"); + const response = Buffer.from("+CAUGHT\r\n").toString("base64"); + + await app.request("/interceptors", { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ + name: "selective-interceptor", + match, + response, + encoding: "base64", + }), + }); + + // All normal commands should still work + const pingResult = await client.sendCommand(["PING"]); + expect(pingResult).toBe("PONG" as unknown as SimpleStringReply); + + const fooResult = await client.sendCommand(["FOO"]); + expect(fooResult).toBe("BAR" as unknown as SimpleStringReply); + + // Only the specific intercepted command should be affected + const interResult = await client.sendCommand(["INTER"]); + expect(interResult).toBe("CAUGHT" as unknown as SimpleStringReply); + + await client.disconnect(); + }); +}); diff --git a/src/mock-server.ts b/src/mock-server.ts new file mode 100644 index 0000000..c4c07c8 --- /dev/null +++ b/src/mock-server.ts @@ -0,0 +1,66 @@ +export default function createMockRedisServer(targetPort: number) { + return Bun.listen({ + hostname: "127.0.0.1", + port: targetPort, + socket: { + data(socket, data) { + const command = data.toString(); + console.log("Mock Redis received:", command.replace(/\r\n/g, "\\r\\n")); + + // Count how many Redis commands are in this data packet + // Each command starts with * followed by number of arguments + const commandCount = (command.match(/\*\d+\r\n/g) || []).length; + console.log("Command count:", commandCount); + + let responses = ""; + + if (command.includes("HELLO")) { + responses += + "*7\r\n$6\r\nserver\r\n$5\r\nredis\r\n$7\r\nversion\r\n$5\r\n7.2.0\r\n$5\r\nproto\r\n:3\r\n$2\r\nid\r\n:1\r\n"; + } + + if (command.includes("CLIENT")) { + const clientCommands = (command.match(/\*4\r\n\$6\r\nCLIENT\r\n/g) || []).length; + for (let i = 0; i < clientCommands; i++) { + responses += "+OK\r\n"; + } + } + + if (command.includes("AUTH") && !command.includes("CLIENT")) { + responses += "+OK\r\n"; + } + if (command.includes("PING") && !command.includes("CLIENT")) { + responses += "+PONG\r\n"; + } + if (command.includes("FOO")) { + responses += "+BAR\r\n"; + } + if (command.includes("SELECT") && !command.includes("CLIENT")) { + responses += "+OK\r\n"; + } + if (command.includes("INFO") && !command.includes("CLIENT")) { + responses += "$23\r\n# Server\r\nredis_version:7.2.0\r\n"; + } + + // If no specific responses were generated, send OK for each command + if (!responses) { + for (let i = 0; i < commandCount; i++) { + responses += "+OK\r\n"; + } + } + + console.log("Sending responses:", responses.replace(/\r\n/g, "\\r\\n")); + socket.write(responses); + }, + open() { + console.log("Mock Redis TCP connection opened"); + }, + close() { + console.log("Mock Redis TCP connection closed"); + }, + error(error) { + console.error("Mock Redis TCP error:", error); + }, + }, + }); +} diff --git a/src/scenarios.test.ts b/src/scenarios.test.ts index c11d7ef..52bd867 100644 --- a/src/scenarios.test.ts +++ b/src/scenarios.test.ts @@ -4,6 +4,7 @@ import { createClient } from "redis"; import { getFreePortNumber } from "redis-monorepo/packages/test-utils/lib/proxy/redis-proxy.ts"; import { createApp } from "./app"; +import createMockRedisServer from "./mock-server"; describe("POST /scenarios", () => { let app: any; @@ -15,67 +16,7 @@ describe("POST /scenarios", () => { const freePort = await getFreePortNumber(); targetPort = await getFreePortNumber(); - mockRedisServer = Bun.listen({ - hostname: "127.0.0.1", - port: targetPort, - socket: { - data(socket, data) { - const command = data.toString(); - console.log("Mock Redis received:", command.replace(/\r\n/g, "\\r\\n")); - - const commandCount = (command.match(/\*\d+\r\n/g) || []).length; - console.log("Command count:", commandCount); - - let responses = ""; - - if (command.includes("HELLO")) { - responses += - "*7\r\n$6\r\nserver\r\n$5\r\nredis\r\n$7\r\nversion\r\n$5\r\n7.2.0\r\n$5\r\nproto\r\n:3\r\n$2\r\nid\r\n:1\r\n"; - } - - if (command.includes("CLIENT")) { - const clientCommands = (command.match(/\*4\r\n\$6\r\nCLIENT\r\n/g) || []).length; - for (let i = 0; i < clientCommands; i++) { - responses += "+OK\r\n"; - } - } - - if (command.includes("AUTH") && !command.includes("CLIENT")) { - responses += "+OK\r\n"; - } - if (command.includes("PING") && !command.includes("CLIENT")) { - responses += "+PONG\r\n"; - } - if (command.includes("FOO")) { - responses += "+BAR\r\n"; - } - if (command.includes("SELECT") && !command.includes("CLIENT")) { - responses += "+OK\r\n"; - } - if (command.includes("INFO") && !command.includes("CLIENT")) { - responses += "$23\r\n# Server\r\nredis_version:7.2.0\r\n"; - } - - if (!responses) { - for (let i = 0; i < commandCount; i++) { - responses += "+OK\r\n"; - } - } - - console.log("Sending responses:", responses.replace(/\r\n/g, "\\r\\n")); - socket.write(responses); - }, - open() { - console.log("Mock Redis TCP connection opened"); - }, - close() { - console.log("Mock Redis TCP connection closed"); - }, - error(error) { - console.error("Mock Redis TCP error:", error); - }, - }, - }); + mockRedisServer = createMockRedisServer(targetPort); const testConfig = { listenPort: freePort, diff --git a/src/util.ts b/src/util.ts index 3b04e51..bdccac4 100644 --- a/src/util.ts +++ b/src/util.ts @@ -1,4 +1,4 @@ -import type { ProxyConfig } from "redis-monorepo/packages/test-utils/lib/redis-proxy.ts"; +import type { ProxyConfig } from "redis-monorepo/packages/test-utils/lib/proxy/redis-proxy.ts"; import { z } from "zod"; export const encodingSchema = z.object({ @@ -22,6 +22,13 @@ export const scenarioSchema = z.object({ encoding: z.enum(["base64", "raw"]).default("base64"), }); +export const interceptorSchema = z.object({ + name: z.string(), + encoding: z.enum(["raw", "base64"]), + match: z.string(), + response: z.string(), +}); + export function parseBuffer(data: string, encoding: "base64" | "raw"): Buffer { switch (encoding) { case "base64": From 05fe821e5fd0dfff04083b7944d93fa265adafdb Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Tue, 28 Oct 2025 11:11:16 +0200 Subject: [PATCH 05/14] add option to provide multiple listenPorts --- src/app.test.ts | 24 ++++++++++++++++-------- src/app.ts | 35 ++++++++++++++++++++++------------- src/proxy-store.ts | 5 +++-- src/util.ts | 31 ++++++++++++++++++++++--------- 4 files changed, 63 insertions(+), 32 deletions(-) diff --git a/src/app.test.ts b/src/app.test.ts index 0af0564..41c33c0 100644 --- a/src/app.test.ts +++ b/src/app.test.ts @@ -17,9 +17,11 @@ describe("Redis Proxy API", () => { let proxy: RedisProxy; let mockRedisServer: any; let targetPort: number; + let listenPort: number; beforeAll(async () => { const freePort = await getFreePortNumber(); + listenPort = freePort; targetPort = await getFreePortNumber(); mockRedisServer = createMockRedisServer(targetPort); @@ -55,7 +57,7 @@ describe("Redis Proxy API", () => { const res = await app.request("/stats"); expect(res.status).toBe(200); - const stats = (await res.json())[makeId(TARGET_HOST, targetPort)]; + const stats = (await res.json())[makeId(TARGET_HOST, targetPort, listenPort)]; expect(stats).toHaveProperty("activeConnections"); expect(stats).toHaveProperty("totalConnections"); expect(stats).toHaveProperty("connections"); @@ -67,7 +69,7 @@ describe("Redis Proxy API", () => { const res = await app.request("/connections"); expect(res.status).toBe(200); - const result = (await res.json())[makeId(TARGET_HOST, targetPort)]; + const result = (await res.json())[makeId(TARGET_HOST, targetPort, listenPort)]; expect(result).toBeArray(); expect(result.length).toBe(0); }); @@ -158,14 +160,16 @@ describe("Redis Proxy API", () => { const statsRes = await app.request("/stats"); expect(statsRes.status).toBe(200); - const stats = (await statsRes.json())[makeId(TARGET_HOST, targetPort)]; + const stats = (await statsRes.json())[makeId(TARGET_HOST, targetPort, listenPort)]; expect(stats.activeConnections).toBe(1); expect(stats.totalConnections).toBeGreaterThanOrEqual(1); expect(stats.connections.length).toBe(1); const connectionsRes = await app.request("/connections"); expect(connectionsRes.status).toBe(200); - const connectionsResult = (await connectionsRes.json())[makeId(TARGET_HOST, targetPort)]; + const connectionsResult = (await connectionsRes.json())[ + makeId(TARGET_HOST, targetPort, listenPort) + ]; expect(connectionsResult.length).toBe(1); const connectionId = connectionsResult[0]; expect(typeof connectionId).toBe("string"); @@ -196,7 +200,9 @@ describe("Redis Proxy API", () => { await new Promise((resolve) => setTimeout(resolve, 100)); const finalStatsRes = await app.request("/stats"); - const finalStats = (await finalStatsRes.json())[makeId(TARGET_HOST, targetPort)]; + const finalStats = (await finalStatsRes.json())[ + makeId(TARGET_HOST, targetPort, listenPort) + ]; expect(finalStats.activeConnections).toBe(0); resolve(); @@ -230,13 +236,15 @@ describe("Redis Proxy API", () => { await new Promise((resolve) => setTimeout(resolve, 100)); const statsRes = await app.request("/stats"); - const stats = (await statsRes.json())[makeId(TARGET_HOST, targetPort)]; + const stats = (await statsRes.json())[makeId(TARGET_HOST, targetPort, listenPort)]; expect(stats.activeConnections).toBe(1); expect(stats.totalConnections).toBeGreaterThanOrEqual(1); expect(stats.connections.length).toBe(1); const connectionsRes = await app.request("/connections"); - const connectionsResult = (await connectionsRes.json())[makeId(TARGET_HOST, targetPort)]; + const connectionsResult = (await connectionsRes.json())[ + makeId(TARGET_HOST, targetPort, listenPort) + ]; expect(connectionsResult.length).toBe(1); const connectionId = connectionsResult[0]; @@ -260,7 +268,7 @@ describe("Redis Proxy API", () => { await new Promise((resolve) => setTimeout(resolve, 100)); const finalStatsRes = await app.request("/stats"); - const finalStats = (await finalStatsRes.json())[makeId(TARGET_HOST, targetPort)]; + const finalStats = (await finalStatsRes.json())[makeId(TARGET_HOST, targetPort, listenPort)]; expect(finalStats.activeConnections).toBe(0); }); }); diff --git a/src/app.ts b/src/app.ts index fe0de64..f769ece 100644 --- a/src/app.ts +++ b/src/app.ts @@ -12,6 +12,7 @@ import { } from "redis-monorepo/packages/test-utils/lib/proxy/redis-proxy.ts"; import ProxyStore, { makeId } from "./proxy-store.ts"; import { + type ExtendedProxyConfig, connectionIdsQuerySchema, encodingSchema, getConfig, @@ -68,24 +69,32 @@ const setClusterOverwriteInterceptors = ( } }; -export function createApp(testConfig?: ProxyConfig & { readonly apiPort?: number }) { +export function createApp(testConfig?: ExtendedProxyConfig) { const config = testConfig || getConfig(); const app = new Hono(); app.use(logger()); const proxyStore = new ProxyStore(); - const nodeId = makeId(config.targetHost, config.targetPort); - proxyStore.add(nodeId, startNewProxy(config)); - addressMapping.set(nodeId, { - from: { - host: config.targetHost, - port: config.targetPort, - }, - to: { - host: config.listenHost ?? "127.0.0.1", - port: config.listenPort, - }, - }); + + // Handle both single port and array of ports + const listenPorts = Array.isArray(config.listenPort) ? config.listenPort : [config.listenPort]; + + for (const port of listenPorts) { + const proxyConfig: ProxyConfig = { ...config, listenPort: port }; + const nodeId = makeId(config.targetHost, config.targetPort, port); + proxyStore.add(nodeId, startNewProxy(proxyConfig)); + addressMapping.set(nodeId, { + from: { + host: config.targetHost, + port: config.targetPort, + }, + to: { + host: config.listenHost ?? "127.0.0.1", + port: port, + }, + }); + } + setClusterOverwriteInterceptors(addressMapping, proxyStore); app.post("/nodes", zValidator("json", proxyConfigSchema), async (c) => { diff --git a/src/proxy-store.ts b/src/proxy-store.ts index 4244c2e..84a3404 100644 --- a/src/proxy-store.ts +++ b/src/proxy-store.ts @@ -1,6 +1,7 @@ -import type { RedisProxy } from "redis-monorepo/packages/test-utils/lib/redis-proxy"; +import type { RedisProxy } from "redis-monorepo/packages/test-utils/lib/proxy/redis-proxy"; -export const makeId = (host: string, port: number) => `${host}:${port}`; +export const makeId = (host: string, port: number, listenPort?: number) => + listenPort ? `${host}:${port}@${listenPort}` : `${host}:${port}`; export default class ProxyStore { #proxies = new Map(); diff --git a/src/util.ts b/src/util.ts index bdccac4..61f452a 100644 --- a/src/util.ts +++ b/src/util.ts @@ -1,6 +1,12 @@ import type { ProxyConfig } from "redis-monorepo/packages/test-utils/lib/proxy/redis-proxy.ts"; import { z } from "zod"; +// Extended ProxyConfig that supports multiple listen ports +export type ExtendedProxyConfig = Omit & { + listenPort: number | number[]; + readonly apiPort?: number; +}; + export const encodingSchema = z.object({ encoding: z.enum(["base64", "raw"]).default("base64"), }); @@ -46,7 +52,9 @@ export const DEFAULT_ENABLE_LOGGING = false; export const DEFAULT_API_PORT = 3000; export const proxyConfigSchema = z.object({ - listenPort: z.coerce.number().default(DEFAULT_LISTEN_PORT), + listenPort: z + .union([z.coerce.number(), z.array(z.coerce.number()).min(1)]) + .default(DEFAULT_LISTEN_PORT), listenHost: z.string().optional().default(DEFAULT_LISTEN_HOST), targetHost: z.string(), targetPort: z.coerce.number(), @@ -68,8 +76,8 @@ const envSchema = z.object({ API_PORT: z.coerce.number().optional().default(DEFAULT_API_PORT), }); -export function parseCliArgs(argv: string[]): Record { - const args: Record = {}; +export function parseCliArgs(argv: string[]): Record { + const args: Record = {}; for (let i = 0; i < argv.length; i++) { const arg = argv[i]; if (arg?.startsWith("--")) { @@ -90,9 +98,13 @@ export function parseCliArgs(argv: string[]): Record Number(v.trim())); + if (parts.every((n) => !Number.isNaN(n))) return parts; + } const num = Number(value); if (!Number.isNaN(num)) return num; return value; @@ -103,9 +115,9 @@ export function printUsage() { Usage: bun run proxy [options] Required options: - --listenPort Port to listen on - --targetHost Target host to forward to - --targetPort Target port to forward to + --listenPort Port(s) to listen on (comma-separated for multiple) + --targetHost Target host to forward to + --targetPort Target port to forward to Optional options: --listenHost Host to listen on (default: 127.0.0.1) @@ -119,14 +131,15 @@ Or configure using environment variables: Examples: bun run proxy --listenPort=6379 --targetHost=localhost --targetPort=6380 + bun run proxy --listenPort=6379,6380,6381 --targetHost=localhost --targetPort=6382 docker run -p 3000:3000 -p 6379:6379 -e LISTEN_PORT=6379 -e TARGET_HOST=host.docker.internal -e TARGET_PORT=6380 your-image-name `); } -export function getConfig(): ProxyConfig & { readonly apiPort?: number } { +export function getConfig(): ExtendedProxyConfig { const cliArgs = parseCliArgs(Bun.argv.slice(2)); - let configSource: Record; + let configSource: Record; if (Object.keys(cliArgs).length > 0) { console.log("Using configuration from command-line arguments."); From 79688d385f4e9c5985e1a68790c845df83fac7cf Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Tue, 28 Oct 2025 14:20:23 +0200 Subject: [PATCH 06/14] fixes --- bun.lock | 4 +- examples/cluster/readme.md | 50 +++++++++++-------- package.json | 2 +- src/app.ts | 98 ++++++++++++++------------------------ src/proxy-store.ts | 2 +- src/util.ts | 23 +++++++-- 6 files changed, 90 insertions(+), 89 deletions(-) diff --git a/bun.lock b/bun.lock index bda4945..75419fe 100644 --- a/bun.lock +++ b/bun.lock @@ -6,7 +6,7 @@ "dependencies": { "@hono/zod-validator": "^0.7.2", "hono": "^4.8.5", - "redis-monorepo": "github:redis/node-redis#master", + "redis-monorepo": "github:nkaradzhov/node-redis#proxy-improvements", "zod": "^4.0.8", }, "devDependencies": { @@ -66,7 +66,7 @@ "redis": ["redis@5.8.3", "", { "dependencies": { "@redis/bloom": "5.8.3", "@redis/client": "5.8.3", "@redis/json": "5.8.3", "@redis/search": "5.8.3", "@redis/time-series": "5.8.3" } }, "sha512-MfSrfV6+tEfTw8c4W0yFp6XWX8Il4laGU7Bx4kvW4uiYM1AuZ3KGqEGt1LdQHeD1nEyLpIWetZ/SpY3kkbgrYw=="], - "redis-monorepo": ["redis-monorepo@github:redis/node-redis#e6025b1", {}, "redis-node-redis-e6025b1"], + "redis-monorepo": ["redis-monorepo@github:nkaradzhov/node-redis#b5c9b07", {}, "nkaradzhov-node-redis-b5c9b07"], "undici-types": ["undici-types@7.16.0", "", {}, "sha512-Zz+aZWSj8LE6zoxD+xrjh4VfkIG8Ya6LvYkZqtUQGJPZjYl53ypCaUwWqo7eI0x66KBGeRo+mlBEkMSeSZ38Nw=="], diff --git a/examples/cluster/readme.md b/examples/cluster/readme.md index 65cdd1e..17679ce 100644 --- a/examples/cluster/readme.md +++ b/examples/cluster/readme.md @@ -1,12 +1,26 @@ # Cluster Example -Short example demonstrating how to use the Proxy in front of a Redis Cluster setup. +Short example demonstrating how to use the Proxy in cluster simulating mode in front of a standalone Redis. -Run the setup +First, start a standalone Redis server with the client libs test image: +``` +docker run -p 3000:3000 redislabs/client-libs-test:8.2 +``` + +Run the proxy in cluster mode: ```bash -docker compose up +docker run \ + -p 6379:6379 -p 6380:6380 -p 6381:6381 -p 4000:4000 \ + -e TARGET_HOST=127.0.0.1 \ + -e TARGET_PORT=3000 \ + -e TIMEOUT=0 \ + -e API_PORT=4000 \ + -e SIMULATE_CLUSTER=yes \ + redislabs/client-resp-proxy + ``` -This will start a 3 node Redis Cluster (ports 3000, 3001, 3002) and a Proxy instance (ports 6379, 6479 and 6579 for proxying and 4000 for the REST API). +This will start a Proxy instance (ports 6379, 6380 and 6381 for proxying and 4000 for the REST API). +The proxy will simulate a cluster with 3 nodes running on ports 6379, 6479 and 6579 by intercepting the `cluster slots` command and returning a fake response. Open a separate terminal @@ -20,20 +34,17 @@ Response should be similar to the following, where the ports are the proxy liste 2) (integer) 5460 3) 1) "0.0.0.0" 2) (integer) 6379 - 3) "7183e22bdcbae8338909fe5282a88ba62d88bdd4" - 4) (empty array) + 3) "proxy-id-6379" 2) 1) (integer) 5461 2) (integer) 10922 3) 1) "0.0.0.0" - 2) (integer) 6479 - 3) "a6a3e1859b33451c0d56569dc10a5aa6e32eef32" - 4) (empty array) + 2) (integer) 6380 + 3) "proxy-id-6380" 3) 1) (integer) 10923 2) (integer) 16383 3) 1) "0.0.0.0" - 2) (integer) 6579 - 3) "8ee7f4ab67b3da89575cd6f912c645f52e6b962b" - 4) (empty array) + 2) (integer) 6381 + 3) "proxy-id-6381" ``` ```bash @@ -42,15 +53,9 @@ redis-cli subscribe foo Open another terminal -Encode your messagee -```bash -echo '>3\r\n$7\r\nmessage\r\n$3\r\nfoo\r\n$4\r\neeee\r' | base64 -PjMNCiQ3DQptZXNzYWdlDQokMw0KZm9vDQokNA0KZWVlZQ0K -``` - -Push the message to all connected clients +Push a message to all connected clients ```bash -curl -X POST "http://localhost:4000/send-to-all-clients?encoding=base64" -d "PjMNCiQ3DQptZXNzYWdlDQokMw0KZm9vDQokNA0KZWVlZQ0K" +curl -X POST "http://localhost:4000/send-to-all-clients?encoding=raw" --data-binary ">3\r\n$7\r\nmessage\r\n$3\r\nfoo\r\n$4\r\neeee\r\n" ``` You should see the following message in the `redis-cli subscribe` terminal: @@ -59,3 +64,8 @@ You should see the following message in the `redis-cli subscribe` terminal: 2) "foo" 3) "eeee" ``` + +Change cluster topology. This is done by adding an interceptor that will catch the `cluster slots` command and return a different response. In this case we swapped the ports of node 2 and node 3. +``` +curl -X POST "http://localhost:4000/interceptors" -H 'Content-Type: application/json' -d '{"name":"test", "match":"*2\r\n$7\r\ncluster\r\n$5\r\nslots\r\n", "response":"*3\r\n*3\r\n:0\r\n:5460\r\n*3\r\n$9\r\n127.0.0.1\r\n:6381\r\n$13\r\nproxy-id-6379\r\n*3\r\n:5461\r\n:10921\r\n*3\r\n$9\r\n127.0.0.1\r\n:6380\r\n$13\r\nproxy-id-6380\r\n*3\r\n:10922\r\n:16383\r\n*3\r\n$9\r\n127.0.0.1\r\n:6379\r\n$13\r\nproxy-id-6381\r\n", "encoding":"raw"}' +``` diff --git a/package.json b/package.json index 11b8639..e98910c 100644 --- a/package.json +++ b/package.json @@ -22,7 +22,7 @@ "dependencies": { "@hono/zod-validator": "^0.7.2", "hono": "^4.8.5", - "redis-monorepo": "file:/Users/nikolay.karadzhov/Projects/node-redis", + "redis-monorepo": "github:nkaradzhov/node-redis#proxy-improvements", "zod": "^4.0.8" }, "devDependencies": { diff --git a/src/app.ts b/src/app.ts index f769ece..f8fe52c 100644 --- a/src/app.ts +++ b/src/app.ts @@ -12,8 +12,8 @@ import { } from "redis-monorepo/packages/test-utils/lib/proxy/redis-proxy.ts"; import ProxyStore, { makeId } from "./proxy-store.ts"; import { - type ExtendedProxyConfig, connectionIdsQuerySchema, + type ExtendedProxyConfig, encodingSchema, getConfig, interceptorSchema, @@ -29,43 +29,37 @@ const startNewProxy = (config: ProxyConfig) => { return proxy; }; -interface Mapping { - from: { - host: string; - port: number; - }; - to: { - host: string; - port: number; +const setClusterSimulateInterceptor = (proxyStore: ProxyStore) => { + const interceptor: InterceptorDescription = { + name: `cluster-simulation-interceptor`, + fn: async (data: Buffer, next: Next, state: InterceptorState) => { + state.invokeCount++; + + if (data.toString().toLowerCase() !== "*2\r\n$7\r\ncluster\r\n$5\r\nslots\r\n") { + return next(data); + } + + state.matchCount++; + + const proxies = proxyStore.proxies; + const slotLenght = Math.floor(16384 / proxies.length); + + let current = -1; + const mapping = proxyStore.proxies.map((proxy, i) => { + const from = current + 1; + const to = i === proxies.length - 1 ? 16383 : current + slotLenght; + current = to; + const id = `proxy-id-${proxy.config.listenPort}`; + return `*3\r\n:${from}\r\n:${to}\r\n*3\r\n$${proxy.config.listenHost.length}\r\n${proxy.config.listenHost}\r\n:${proxy.config.listenPort}\r\n$${id.length}\r\n${id}\r\n`; + }); + + const response = `*${proxies.length}\r\n${mapping.join("")}`; + return Buffer.from(response); + }, }; -} -const addressMapping = new Map(); - -const setClusterOverwriteInterceptors = ( - addressMapping: Map, - proxyStore: ProxyStore, -) => { - const interceptors: InterceptorDescription[] = []; - for (const mapping of addressMapping.values()) { - interceptors.push({ - name: `ip-replacer-${mapping.to.port}`, - fn: async (data: Buffer, next: Next, state: InterceptorState) => { - state.invokeCount++; - const response = await next(data); - // for example $9\r\n127.0.0.1\r\n:3000 - const from = `$${mapping.from.host.length}\r\n${mapping.from.host}\r\n:${mapping.from.port}`; - if (response.includes(from)) { - state.matchCount++; - const to = `$${mapping.to.host.length}\r\n${mapping.to.host}\r\n:${mapping.to.port}`; - return Buffer.from(response.toString().replaceAll(from, to)); - } - return response; - }, - }); - } for (const proxy of proxyStore.proxies) { - proxy.setGlobalInterceptors(interceptors); + proxy.setGlobalInterceptors([interceptor]); } }; @@ -73,6 +67,7 @@ export function createApp(testConfig?: ExtendedProxyConfig) { const config = testConfig || getConfig(); const app = new Hono(); app.use(logger()); + console.log(config); const proxyStore = new ProxyStore(); @@ -83,44 +78,23 @@ export function createApp(testConfig?: ExtendedProxyConfig) { const proxyConfig: ProxyConfig = { ...config, listenPort: port }; const nodeId = makeId(config.targetHost, config.targetPort, port); proxyStore.add(nodeId, startNewProxy(proxyConfig)); - addressMapping.set(nodeId, { - from: { - host: config.targetHost, - port: config.targetPort, - }, - to: { - host: config.listenHost ?? "127.0.0.1", - port: port, - }, - }); } - setClusterOverwriteInterceptors(addressMapping, proxyStore); + config.simulateCluster && setClusterSimulateInterceptor(proxyStore); app.post("/nodes", zValidator("json", proxyConfigSchema), async (c) => { const data = await c.req.json(); const cfg: ProxyConfig = { ...config, ...data }; - const nodeId = makeId(cfg.targetHost, cfg.targetPort); + const nodeId = makeId(cfg.targetHost, cfg.targetPort, cfg.listenPort); proxyStore.add(nodeId, startNewProxy(cfg)); - addressMapping.set(nodeId, { - from: { - host: cfg.targetHost, - port: cfg.targetPort, - }, - to: { - host: cfg.listenHost ?? "127.0.0.1", - port: cfg.listenPort, - }, - }); - setClusterOverwriteInterceptors(addressMapping, proxyStore); + config.simulateCluster && setClusterSimulateInterceptor(proxyStore); return c.json({ success: true, cfg }); }); app.delete("/nodes/:id", async (c) => { const nodeId = c.req.param("id"); const success = await proxyStore.delete(nodeId); - addressMapping.delete(nodeId); - setClusterOverwriteInterceptors(addressMapping, proxyStore); + config.simulateCluster && setClusterSimulateInterceptor(proxyStore); return c.json({ success }); }); @@ -247,7 +221,7 @@ export function createApp(testConfig?: ExtendedProxyConfig) { name, fn: async (data: Buffer, next: Next, state: InterceptorState): Promise => { state.invokeCount++; - if (data.equals(matchBuffer)) { + if (data.toString().toLowerCase() === matchBuffer.toString().toLowerCase()) { state.matchCount++; return responseBuffer; } @@ -262,5 +236,5 @@ export function createApp(testConfig?: ExtendedProxyConfig) { return c.json({ success: true, name }); }); - return { app, proxy: proxyStore.proxies[0], config }; + return { app, proxy: proxyStore.proxies[0] as RedisProxy, config }; } diff --git a/src/proxy-store.ts b/src/proxy-store.ts index 84a3404..e6b09f4 100644 --- a/src/proxy-store.ts +++ b/src/proxy-store.ts @@ -1,6 +1,6 @@ import type { RedisProxy } from "redis-monorepo/packages/test-utils/lib/proxy/redis-proxy"; -export const makeId = (host: string, port: number, listenPort?: number) => +export const makeId = (host: string, port: number, listenPort: number) => listenPort ? `${host}:${port}@${listenPort}` : `${host}:${port}`; export default class ProxyStore { diff --git a/src/util.ts b/src/util.ts index 61f452a..5abd9f4 100644 --- a/src/util.ts +++ b/src/util.ts @@ -5,6 +5,7 @@ import { z } from "zod"; export type ExtendedProxyConfig = Omit & { listenPort: number | number[]; readonly apiPort?: number; + simulateCluster?: boolean; }; export const encodingSchema = z.object({ @@ -50,6 +51,7 @@ export const DEFAULT_LISTEN_PORT = 6379; export const DEFAULT_LISTEN_HOST = "127.0.0.1"; export const DEFAULT_ENABLE_LOGGING = false; export const DEFAULT_API_PORT = 3000; +export const DEFAULT_SIMULATE_CLUSTER = false; export const proxyConfigSchema = z.object({ listenPort: z @@ -61,10 +63,13 @@ export const proxyConfigSchema = z.object({ timeout: z.coerce.number().optional(), enableLogging: z.boolean().optional().default(DEFAULT_ENABLE_LOGGING), apiPort: z.number().optional().default(DEFAULT_API_PORT), + simulateCluster: z.boolean().optional().default(DEFAULT_SIMULATE_CLUSTER), }); const envSchema = z.object({ - LISTEN_PORT: z.coerce.number().default(DEFAULT_LISTEN_PORT), + LISTEN_PORT: z + .union([z.coerce.number(), z.array(z.coerce.number()).min(1)]) + .default(DEFAULT_LISTEN_PORT), TARGET_HOST: z.string(), TARGET_PORT: z.coerce.number(), LISTEN_HOST: z.string().optional(), @@ -74,6 +79,14 @@ const envSchema = z.object({ .transform((val) => val === "true") .optional(), API_PORT: z.coerce.number().optional().default(DEFAULT_API_PORT), + SIMULATE_CLUSTER: z + .enum(["true", "false"]) + .transform((val) => { + console.log('transform', val, val === "true"); + return val === "true"; + }) + .optional() + .default(DEFAULT_SIMULATE_CLUSTER), }); export function parseCliArgs(argv: string[]): Record { @@ -124,14 +137,15 @@ Optional options: --timeout Connection timeout in milliseconds --enableLogging Enable verbose logging --apiPort Port to start the http on (default: 3000 ) + --simulateCluster Simulate Redis Cluster behavior like \`cluster slots\` (default: false) Or configure using environment variables: LISTEN_PORT, TARGET_HOST, TARGET_PORT (required) - LISTEN_HOST, TIMEOUT, ENABLE_LOGGING, API_PORT (optional) + LISTEN_HOST, TIMEOUT, ENABLE_LOGGING, API_PORT, SIMULATE_CLUSTER (optional) Examples: bun run proxy --listenPort=6379 --targetHost=localhost --targetPort=6380 - bun run proxy --listenPort=6379,6380,6381 --targetHost=localhost --targetPort=6382 + bun run proxy --listenPort=6379,6380,6381 --simulateCluster --targetHost=localhost --targetPort=6382 docker run -p 3000:3000 -p 6379:6379 -e LISTEN_PORT=6379 -e TARGET_HOST=host.docker.internal -e TARGET_PORT=6380 your-image-name `); } @@ -146,7 +160,9 @@ export function getConfig(): ExtendedProxyConfig { configSource = cliArgs; } else { console.log("Using configuration from environment variables."); + console.log('process.env', process.env.SIMULATE_CLUSTER); const parsedEnv = envSchema.parse(process.env); + console.log('parsedEnv', parsedEnv); configSource = { listenPort: parsedEnv.LISTEN_PORT, listenHost: parsedEnv.LISTEN_HOST, @@ -155,6 +171,7 @@ export function getConfig(): ExtendedProxyConfig { timeout: parsedEnv.TIMEOUT, enableLogging: parsedEnv.ENABLE_LOGGING, apiPort: parsedEnv.API_PORT, + simulateCluster: parsedEnv.SIMULATE_CLUSTER }; } From c13075e0956729984cb57f8f39c5a9541dae6d53 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Wed, 29 Oct 2025 16:44:06 +0200 Subject: [PATCH 07/14] Switch data endpoints to accept JSON payloads --- README.md | 39 ++++++++++++++++++++++++--------------- src/app.ts | 13 +++++++------ src/util.ts | 4 ++++ 3 files changed, 35 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 3f81419..0e613c0 100644 --- a/README.md +++ b/README.md @@ -39,14 +39,15 @@ RESP3 Push notification: `>4\r\n$6\r\nMOVING\r\n:1\r\n:2\r\n$6\r\nhost:3\r\n` **cURL Example:** ```bash curl -X POST "http://localhost:3000/send-to-all-clients?encoding=raw" \ - --data-binary ">4\r\n\$6\r\nMOVING\r\n:1\r\n:2\r\n\$6\r\nhost:3\r\n" + -H 'Content-Type: application/json' \ + -d '{"data" : ">4\r\n$6\r\nMOVING\r\n:1\r\n:2\r\n$6\r\nhost:3\r\n"}' ``` **TypeScript Example:** ```typescript const response = await fetch('http://localhost:3000/send-to-all-clients?encoding=raw', { method: 'POST', - body: '>4\r\n$6\r\nMOVING\r\n:1\r\n:2\r\n$6\r\nhost:3\r\n' + body: { data: '>4\r\n$6\r\nMOVING\r\n:1\r\n:2\r\n$6\r\nhost:3\r\n' } }); const result = await response.json(); @@ -58,16 +59,19 @@ console.log(result.success ? 'Injected' : 'Failed'); package main import ( + "bytes" + "encoding/json" "io" "net/http" "strings" ) func main() { - payload := strings.NewReader(">4\r\n$6\r\nMOVING\r\n:1\r\n:2\r\n$6\r\nhost:3\r\n") - resp, _ := http.Post("http://localhost:3000/send-to-all-clients?encoding=raw", "", payload) + data := map[string]string{"data": ">4\r\n$6\r\nMOVING\r\n:1\r\n:2\r\n$6\r\nhost:3\r\n"} + jsonData, _ := json.Marshal(data) + resp, _ := http.Post("http://localhost:3000/send-to-all-clients?encoding=raw", "application/json", bytes.NewBuffer(jsonData)) defer resp.Body.Close() - + body, _ := io.ReadAll(resp.Body) if strings.Contains(string(body), `"success":true`) { println("Injected") @@ -83,10 +87,11 @@ import java.net.URI; public class RespProxyClient { public static void main(String[] args) throws Exception { var client = HttpClient.newHttpClient(); + var json = "{\"data\":\">4\\r\\n$6\\r\\nMOVING\\r\\n:1\\r\\n:2\\r\\n$6\\r\\nhost:3\\r\\n\"}"; var request = HttpRequest.newBuilder() .uri(URI.create("http://localhost:3000/send-to-all-clients?encoding=raw")) - .POST(HttpRequest.BodyPublishers.ofString( - ">4\r\n$6\r\nMOVING\r\n:1\r\n:2\r\n$6\r\nhost:3\r\n")) + .header("Content-Type", "application/json") + .POST(HttpRequest.BodyPublishers.ofString(json)) .build(); var response = client.send(request, HttpResponse.BodyHandlers.ofString()); @@ -103,8 +108,9 @@ public class RespProxyClient { import json import urllib.request -data = b">4\r\n$6\r\nMOVING\r\n:1\r\n:2\r\n$6\r\nhost:3\r\n" -req = urllib.request.Request("http://localhost:3000/send-to-all-clients?encoding=raw", data) +payload = {"data": ">4\r\n$6\r\nMOVING\r\n:1\r\n:2\r\n$6\r\nhost:3\r\n"} +data = json.dumps(payload).encode('utf-8') +req = urllib.request.Request("http://localhost:3000/send-to-all-clients?encoding=raw", data, {"Content-Type": "application/json"}) with urllib.request.urlopen(req) as response: result = json.loads(response.read()) @@ -255,17 +261,19 @@ Send Redis protocol data to a specific client connection. - `connectionId` (path): Target connection ID - `encoding` (query): Data encoding format (`base64` or `raw`, default: `base64`) -**Body:** Raw data or base64-encoded data +**Body:** JSON object with "data" property containing the payload **Example:** ```bash # Send PING command (base64 encoded) curl -X POST "http://localhost:3000/send-to-client/conn_123?encoding=base64" \ - -d "KjENCiQ0DQpQSU5HDQo=" + -H 'Content-Type: application/json' \ + -d '{"data": "KjENCiQ0DQpQSU5HDQo="}' # Send raw binary data curl -X POST "http://localhost:3000/send-to-client/conn_123?encoding=raw" \ - --data-binary "*1\r\n$4\r\nPING\r\n" + -H 'Content-Type: application/json' \ + -d '{"data": "*1\r\n$4\r\nPING\r\n"}' ``` **Response:** @@ -289,7 +297,8 @@ Send data to multiple specific client connections. **Example:** ```bash curl -X POST "http://localhost:3000/send-to-clients?connectionIds=conn_123,conn_456&encoding=base64" \ - -d "KjENCiQ0DQpQSU5HDQo=" + -H 'Content-Type: application/json' \ + -d '{"data": "KjENCiQ0DQpQSU5HDQo="}' ``` #### Send Data to All Clients @@ -301,7 +310,8 @@ Broadcast data to all active client connections. **Example:** ```bash curl -X POST "http://localhost:3000/send-to-all-clients?encoding=base64" \ - -d "KjENCiQ0DQpQSU5HDQo=" + -H 'Content-Type: application/json' \ + -d '{"data": "KjENCiQ0DQpQSU5HDQo="}' ``` #### Close Connection @@ -328,4 +338,3 @@ Send custom Redis responses to specific clients for testing scenarios. ### Protocol Analysis Analyze Redis protocol communication patterns. - diff --git a/src/app.ts b/src/app.ts index f8fe52c..f9b68c2 100644 --- a/src/app.ts +++ b/src/app.ts @@ -21,6 +21,7 @@ import { parseBuffer, proxyConfigSchema, scenarioSchema, + dataSchema, } from "./util.ts"; const startNewProxy = (config: ProxyConfig) => { @@ -67,7 +68,6 @@ export function createApp(testConfig?: ExtendedProxyConfig) { const config = testConfig || getConfig(); const app = new Hono(); app.use(logger()); - console.log(config); const proxyStore = new ProxyStore(); @@ -128,10 +128,11 @@ export function createApp(testConfig?: ExtendedProxyConfig) { "/send-to-client/:connectionId", zValidator("param", paramSchema), zValidator("query", encodingSchema), + zValidator("json", dataSchema), async (c) => { const { connectionId } = c.req.valid("param"); const { encoding } = c.req.valid("query"); - const data = await c.req.text(); + const { data } = c.req.valid("json"); const buffer = parseBuffer(data, encoding); @@ -148,9 +149,9 @@ export function createApp(testConfig?: ExtendedProxyConfig) { }, ); - app.post("/send-to-clients", zValidator("query", connectionIdsQuerySchema), async (c) => { + app.post("/send-to-clients", zValidator("query", connectionIdsQuerySchema), zValidator("json", dataSchema), async (c) => { const { connectionIds, encoding } = c.req.valid("query"); - const data = await c.req.text(); + const { data } = c.req.valid("json"); const buffer = parseBuffer(data, encoding); @@ -161,9 +162,9 @@ export function createApp(testConfig?: ExtendedProxyConfig) { return c.json({ results }); }); - app.post("/send-to-all-clients", zValidator("query", encodingSchema), async (c) => { + app.post("/send-to-all-clients", zValidator("query", encodingSchema), zValidator("json", dataSchema), async (c) => { const { encoding } = c.req.valid("query"); - const data = await c.req.text(); + const { data } = c.req.valid("json"); const buffer = parseBuffer(data, encoding); const results: SendResult[] = []; for (const proxy of proxyStore.proxies) { diff --git a/src/util.ts b/src/util.ts index 5abd9f4..0291d2f 100644 --- a/src/util.ts +++ b/src/util.ts @@ -8,6 +8,10 @@ export type ExtendedProxyConfig = Omit & { simulateCluster?: boolean; }; +export const dataSchema = z.object({ + data: z.string().min(1, "Data is required"), +}); + export const encodingSchema = z.object({ encoding: z.enum(["base64", "raw"]).default("base64"), }); From 4acf6c6fea657958774eb581063bd195329494d7 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Wed, 29 Oct 2025 16:46:03 +0200 Subject: [PATCH 08/14] Simplify LISTEN_PORT parsing and validation Update docker-compose and docs for multi-port setup Remove legacy cluster node registration logic --- examples/cluster/docker-compose.yml | 62 ++++++++++------------------- examples/cluster/readme.md | 31 +++++++++++---- src/app.ts | 5 +-- src/util.ts | 25 +++++------- 4 files changed, 55 insertions(+), 68 deletions(-) diff --git a/examples/cluster/docker-compose.yml b/examples/cluster/docker-compose.yml index 4c3e702..8106032 100644 --- a/examples/cluster/docker-compose.yml +++ b/examples/cluster/docker-compose.yml @@ -1,26 +1,15 @@ networks: redis-net: driver: bridge - ipam: - config: - - subnet: 172.25.0.0/24 - gateway: 172.25.0.1 services: - redis-cluster: + redis: image: redislabs/client-libs-test:8.2 - container_name: redis-cluster - command: ["--bind", "0.0.0.0", "--cluster-announce-ip", "172.25.0.10"] - environment: - REDIS_CLUSTER: "yes" + container_name: redis ports: - "3000:3000" - - "3001:3001" - - "3002:3002" networks: - redis-net: - ipv4_address: 172.25.0.10 - # Optional healthcheck (tweak as needed) + - redis-net healthcheck: test: ["CMD", "redis-cli", "-p", "3000", "PING"] interval: 10s @@ -32,45 +21,34 @@ services: container_name: resp-proxy environment: LISTEN_HOST: "0.0.0.0" - LISTEN_PORT: "6379" - TARGET_HOST: "172.25.0.10" + LISTEN_PORT: "6379,6380,6381" + TARGET_HOST: "redis" TARGET_PORT: "3000" API_PORT: "4000" ENABLE_LOGGING: true + SIMULATE_CLUSTER: true ports: - "6379:6379" - - "6479:6479" - - "6579:6579" + - "6380:6380" + - "6381:6381" - "4000:4000" depends_on: - - redis-cluster + - redis networks: - redis-net: - ipv4_address: 172.25.0.11 - # Optional healthcheck to verify proxy API port + - redis-net healthcheck: test: ["CMD", "sh", "-c", "wget -qO- http://localhost:4000/stats || exit 1"] interval: 10s timeout: 3s retries: 5 - resp-proxy-init: - image: curlimages/curl:8.9.1 - container_name: resp-proxy-init - depends_on: - - resp-proxy - networks: - - redis-net - command: > - sh -c ' - set -e - echo "Waiting for proxy API..."; - until curl -s http://resp-proxy:4000/stats >/dev/null 2>&1; do - sleep 1; - done; - echo "Registering node port 3001 -> listen 6479"; - curl -s -X POST http://resp-proxy:4000/nodes -H "Content-Type: application/json" -d "{\"targetHost\":\"172.25.0.10\",\"targetPort\":3001,\"listenPort\":6479}"; - echo "Registering node port 3002 -> listen 6579"; - curl -s -X POST http://resp-proxy:4000/nodes -H "Content-Type: application/json" -d "{\"targetHost\":\"172.25.0.10\",\"targetPort\":3002,\"listenPort\":6579}"; - echo "Done." - ' + # debug: + # image: nicolaka/netshoot:latest + # container_name: debug + # depends_on: + # - resp-proxy + # networks: + # - redis-net + # command: sleep infinity + # stdin_open: true + # tty: true diff --git a/examples/cluster/readme.md b/examples/cluster/readme.md index 17679ce..cd01067 100644 --- a/examples/cluster/readme.md +++ b/examples/cluster/readme.md @@ -2,17 +2,26 @@ Short example demonstrating how to use the Proxy in cluster simulating mode in front of a standalone Redis. -First, start a standalone Redis server with the client libs test image: -``` -docker run -p 3000:3000 redislabs/client-libs-test:8.2 +## Step 1: Setup redis + proxy + +- Option 1: Using Docker Compose + +Running the provided `docker-compose.yml` file: + +```bash +docker-compose up ``` -Run the proxy in cluster mode: +- Option 2: Using external Redis server + +1. Start a standalone Redis server on port 3000 + +2. Run the proxy in cluster mode: ```bash docker run \ -p 6379:6379 -p 6380:6380 -p 6381:6381 -p 4000:4000 \ - -e TARGET_HOST=127.0.0.1 \ - -e TARGET_PORT=3000 \ + -e TARGET_HOST= \ + -e TARGET_PORT= \ -e TIMEOUT=0 \ -e API_PORT=4000 \ -e SIMULATE_CLUSTER=yes \ @@ -22,6 +31,8 @@ docker run \ This will start a Proxy instance (ports 6379, 6380 and 6381 for proxying and 4000 for the REST API). The proxy will simulate a cluster with 3 nodes running on ports 6379, 6479 and 6579 by intercepting the `cluster slots` command and returning a fake response. +## Step 2: Check if `cluster slots` reports correctly + Open a separate terminal ```bash @@ -47,6 +58,8 @@ Response should be similar to the following, where the ports are the proxy liste 3) "proxy-id-6381" ``` +### Step 3: Test push + ```bash redis-cli subscribe foo ``` @@ -55,7 +68,7 @@ Open another terminal Push a message to all connected clients ```bash -curl -X POST "http://localhost:4000/send-to-all-clients?encoding=raw" --data-binary ">3\r\n$7\r\nmessage\r\n$3\r\nfoo\r\n$4\r\neeee\r\n" +curl -X POST "http://localhost:4000/send-to-all-clients?encoding=raw" -H 'Content-Type: application/json' -d '{"data": ">3\r\n$7\r\nmessage\r\n$3\r\nfoo\r\n$4\r\neeee\r\n"}' ``` You should see the following message in the `redis-cli subscribe` terminal: @@ -65,7 +78,9 @@ You should see the following message in the `redis-cli subscribe` terminal: 3) "eeee" ``` -Change cluster topology. This is done by adding an interceptor that will catch the `cluster slots` command and return a different response. In this case we swapped the ports of node 2 and node 3. +### Step 4: Test topology change + +Changing cluster topology is done by adding an interceptor that will catch the `cluster slots` command and return a different response. In this case we swapped the ports of node 2 and node 3. ``` curl -X POST "http://localhost:4000/interceptors" -H 'Content-Type: application/json' -d '{"name":"test", "match":"*2\r\n$7\r\ncluster\r\n$5\r\nslots\r\n", "response":"*3\r\n*3\r\n:0\r\n:5460\r\n*3\r\n$9\r\n127.0.0.1\r\n:6381\r\n$13\r\nproxy-id-6379\r\n*3\r\n:5461\r\n:10921\r\n*3\r\n$9\r\n127.0.0.1\r\n:6380\r\n$13\r\nproxy-id-6380\r\n*3\r\n:10922\r\n:16383\r\n*3\r\n$9\r\n127.0.0.1\r\n:6379\r\n$13\r\nproxy-id-6381\r\n", "encoding":"raw"}' ``` diff --git a/src/app.ts b/src/app.ts index f9b68c2..898b488 100644 --- a/src/app.ts +++ b/src/app.ts @@ -71,10 +71,7 @@ export function createApp(testConfig?: ExtendedProxyConfig) { const proxyStore = new ProxyStore(); - // Handle both single port and array of ports - const listenPorts = Array.isArray(config.listenPort) ? config.listenPort : [config.listenPort]; - - for (const port of listenPorts) { + for (const port of config.listenPort) { const proxyConfig: ProxyConfig = { ...config, listenPort: port }; const nodeId = makeId(config.targetHost, config.targetPort, port); proxyStore.add(nodeId, startNewProxy(proxyConfig)); diff --git a/src/util.ts b/src/util.ts index 0291d2f..6958589 100644 --- a/src/util.ts +++ b/src/util.ts @@ -3,7 +3,7 @@ import { z } from "zod"; // Extended ProxyConfig that supports multiple listen ports export type ExtendedProxyConfig = Omit & { - listenPort: number | number[]; + listenPort: number[]; readonly apiPort?: number; simulateCluster?: boolean; }; @@ -51,16 +51,20 @@ export function parseBuffer(data: string, encoding: "base64" | "raw"): Buffer { } } -export const DEFAULT_LISTEN_PORT = 6379; +export const DEFAULT_LISTEN_PORT = [6379]; export const DEFAULT_LISTEN_HOST = "127.0.0.1"; export const DEFAULT_ENABLE_LOGGING = false; export const DEFAULT_API_PORT = 3000; export const DEFAULT_SIMULATE_CLUSTER = false; +const listenPortSchema = z.preprocess(value => { + if (Array.isArray(value)) return value; + if (typeof value === 'string') return value.split(','); + return [value] +}, z.array(z.coerce.number()).min(1)).default(DEFAULT_LISTEN_PORT) + export const proxyConfigSchema = z.object({ - listenPort: z - .union([z.coerce.number(), z.array(z.coerce.number()).min(1)]) - .default(DEFAULT_LISTEN_PORT), + listenPort: listenPortSchema, listenHost: z.string().optional().default(DEFAULT_LISTEN_HOST), targetHost: z.string(), targetPort: z.coerce.number(), @@ -71,9 +75,7 @@ export const proxyConfigSchema = z.object({ }); const envSchema = z.object({ - LISTEN_PORT: z - .union([z.coerce.number(), z.array(z.coerce.number()).min(1)]) - .default(DEFAULT_LISTEN_PORT), + LISTEN_PORT: listenPortSchema, TARGET_HOST: z.string(), TARGET_PORT: z.coerce.number(), LISTEN_HOST: z.string().optional(), @@ -85,10 +87,7 @@ const envSchema = z.object({ API_PORT: z.coerce.number().optional().default(DEFAULT_API_PORT), SIMULATE_CLUSTER: z .enum(["true", "false"]) - .transform((val) => { - console.log('transform', val, val === "true"); - return val === "true"; - }) + .transform((val) => val === "true") .optional() .default(DEFAULT_SIMULATE_CLUSTER), }); @@ -164,9 +163,7 @@ export function getConfig(): ExtendedProxyConfig { configSource = cliArgs; } else { console.log("Using configuration from environment variables."); - console.log('process.env', process.env.SIMULATE_CLUSTER); const parsedEnv = envSchema.parse(process.env); - console.log('parsedEnv', parsedEnv); configSource = { listenPort: parsedEnv.LISTEN_PORT, listenHost: parsedEnv.LISTEN_HOST, From 87d5989413198c9844cf3c092a0775d751c0fa7e Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Wed, 29 Oct 2025 16:47:36 +0200 Subject: [PATCH 09/14] fix lint/formatting --- src/app.ts | 52 +++++++++++++++++++++++++++++++--------------------- src/util.ts | 26 ++++++++++++++------------ 2 files changed, 45 insertions(+), 33 deletions(-) diff --git a/src/app.ts b/src/app.ts index 898b488..5655996 100644 --- a/src/app.ts +++ b/src/app.ts @@ -13,6 +13,7 @@ import { import ProxyStore, { makeId } from "./proxy-store.ts"; import { connectionIdsQuerySchema, + dataSchema, type ExtendedProxyConfig, encodingSchema, getConfig, @@ -21,7 +22,6 @@ import { parseBuffer, proxyConfigSchema, scenarioSchema, - dataSchema, } from "./util.ts"; const startNewProxy = (config: ProxyConfig) => { @@ -146,29 +146,39 @@ export function createApp(testConfig?: ExtendedProxyConfig) { }, ); - app.post("/send-to-clients", zValidator("query", connectionIdsQuerySchema), zValidator("json", dataSchema), async (c) => { - const { connectionIds, encoding } = c.req.valid("query"); - const { data } = c.req.valid("json"); + app.post( + "/send-to-clients", + zValidator("query", connectionIdsQuerySchema), + zValidator("json", dataSchema), + async (c) => { + const { connectionIds, encoding } = c.req.valid("query"); + const { data } = c.req.valid("json"); - const buffer = parseBuffer(data, encoding); + const buffer = parseBuffer(data, encoding); - const results: SendResult[] = []; - for (const [proxy, matchingConIds] of proxyStore.getProxiesByConnectionIds(connectionIds)) { - results.push(...proxy.sendToClients(matchingConIds, buffer)); - } - return c.json({ results }); - }); + const results: SendResult[] = []; + for (const [proxy, matchingConIds] of proxyStore.getProxiesByConnectionIds(connectionIds)) { + results.push(...proxy.sendToClients(matchingConIds, buffer)); + } + return c.json({ results }); + }, + ); - app.post("/send-to-all-clients", zValidator("query", encodingSchema), zValidator("json", dataSchema), async (c) => { - const { encoding } = c.req.valid("query"); - const { data } = c.req.valid("json"); - const buffer = parseBuffer(data, encoding); - const results: SendResult[] = []; - for (const proxy of proxyStore.proxies) { - results.push(...proxy.sendToAllClients(buffer)); - } - return c.json({ results }); - }); + app.post( + "/send-to-all-clients", + zValidator("query", encodingSchema), + zValidator("json", dataSchema), + async (c) => { + const { encoding } = c.req.valid("query"); + const { data } = c.req.valid("json"); + const buffer = parseBuffer(data, encoding); + const results: SendResult[] = []; + for (const proxy of proxyStore.proxies) { + results.push(...proxy.sendToAllClients(buffer)); + } + return c.json({ results }); + }, + ); app.delete("/connections/:id", (c) => { const connectionId = c.req.param("id"); diff --git a/src/util.ts b/src/util.ts index 6958589..4301c79 100644 --- a/src/util.ts +++ b/src/util.ts @@ -9,7 +9,7 @@ export type ExtendedProxyConfig = Omit & { }; export const dataSchema = z.object({ - data: z.string().min(1, "Data is required"), + data: z.string().min(1, "Data is required"), }); export const encodingSchema = z.object({ @@ -57,11 +57,13 @@ export const DEFAULT_ENABLE_LOGGING = false; export const DEFAULT_API_PORT = 3000; export const DEFAULT_SIMULATE_CLUSTER = false; -const listenPortSchema = z.preprocess(value => { - if (Array.isArray(value)) return value; - if (typeof value === 'string') return value.split(','); - return [value] -}, z.array(z.coerce.number()).min(1)).default(DEFAULT_LISTEN_PORT) +const listenPortSchema = z + .preprocess((value) => { + if (Array.isArray(value)) return value; + if (typeof value === "string") return value.split(","); + return [value]; + }, z.array(z.coerce.number()).min(1)) + .default(DEFAULT_LISTEN_PORT); export const proxyConfigSchema = z.object({ listenPort: listenPortSchema, @@ -85,11 +87,11 @@ const envSchema = z.object({ .transform((val) => val === "true") .optional(), API_PORT: z.coerce.number().optional().default(DEFAULT_API_PORT), - SIMULATE_CLUSTER: z - .enum(["true", "false"]) - .transform((val) => val === "true") - .optional() - .default(DEFAULT_SIMULATE_CLUSTER), + SIMULATE_CLUSTER: z + .enum(["true", "false"]) + .transform((val) => val === "true") + .optional() + .default(DEFAULT_SIMULATE_CLUSTER), }); export function parseCliArgs(argv: string[]): Record { @@ -172,7 +174,7 @@ export function getConfig(): ExtendedProxyConfig { timeout: parsedEnv.TIMEOUT, enableLogging: parsedEnv.ENABLE_LOGGING, apiPort: parsedEnv.API_PORT, - simulateCluster: parsedEnv.SIMULATE_CLUSTER + simulateCluster: parsedEnv.SIMULATE_CLUSTER, }; } From b9a318f0f1c278ddd03afd252965150c234f8845 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Wed, 29 Oct 2025 17:07:14 +0200 Subject: [PATCH 10/14] fix tests --- src/app.test.ts | 32 +++++++++++++++++++++++++------- src/interceptors.test.ts | 2 +- src/scenarios.test.ts | 2 +- 3 files changed, 27 insertions(+), 9 deletions(-) diff --git a/src/app.test.ts b/src/app.test.ts index 41c33c0..2cc43a4 100644 --- a/src/app.test.ts +++ b/src/app.test.ts @@ -27,7 +27,7 @@ describe("Redis Proxy API", () => { mockRedisServer = createMockRedisServer(targetPort); const testConfig = { - listenPort: freePort, + listenPort: [freePort], listenHost: "127.0.0.1", targetHost: TARGET_HOST, targetPort: targetPort, @@ -79,7 +79,10 @@ describe("Redis Proxy API", () => { const res = await app.request("/send-to-client/non-existent-connection?encoding=base64", { method: "POST", - body: testData, + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ data: testData }), }); expect(res.status).toBe(200); @@ -94,7 +97,10 @@ describe("Redis Proxy API", () => { const res = await app.request("/send-to-clients", { method: "POST", - body: testData, + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ data: testData }), }); expect(res.status).toBe(400); // Should fail validation due to missing connectionIds @@ -105,7 +111,10 @@ describe("Redis Proxy API", () => { const res = await app.request("/send-to-clients?connectionIds=conn1,conn2&encoding=base64", { method: "POST", - body: testData, + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ data: testData }), }); expect(res.status).toBe(200); @@ -119,7 +128,10 @@ describe("Redis Proxy API", () => { const res = await app.request("/send-to-all-clients?encoding=base64", { method: "POST", - body: testData, + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ data: testData }), }); expect(res.status).toBe(200); @@ -183,7 +195,10 @@ describe("Redis Proxy API", () => { const pingCommand = Buffer.from("*1\r\n$4\r\nPING\r\n").toString("base64"); const sendRes = await app.request(`/send-to-client/${connectionId}?encoding=base64`, { method: "POST", - body: pingCommand, + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ data: pingCommand }), }); expect(sendRes.status).toBe(200); @@ -254,7 +269,10 @@ describe("Redis Proxy API", () => { const pingCommand = Buffer.from("*1\r\n$4\r\nPING\r\n").toString("base64"); const sendRes = await app.request(`/send-to-client/${connectionId}?encoding=base64`, { method: "POST", - body: pingCommand, + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ data: pingCommand }), }); expect(sendRes.status).toBe(200); diff --git a/src/interceptors.test.ts b/src/interceptors.test.ts index 285da44..366aaaf 100644 --- a/src/interceptors.test.ts +++ b/src/interceptors.test.ts @@ -19,7 +19,7 @@ describe("POST /interceptors", () => { mockRedisServer = createMockRedisServer(targetPort); const testConfig = { - listenPort: freePort, + listenPort: [freePort], listenHost: "127.0.0.1", targetHost: "127.0.0.1", targetPort: targetPort, diff --git a/src/scenarios.test.ts b/src/scenarios.test.ts index 52bd867..1b57cbb 100644 --- a/src/scenarios.test.ts +++ b/src/scenarios.test.ts @@ -19,7 +19,7 @@ describe("POST /scenarios", () => { mockRedisServer = createMockRedisServer(targetPort); const testConfig = { - listenPort: freePort, + listenPort: [freePort], listenHost: "127.0.0.1", targetHost: "127.0.0.1", targetPort: targetPort, From 3d3c1d65b36f14bd9066ce5e732770522ca2ef65 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Thu, 30 Oct 2025 14:53:48 +0200 Subject: [PATCH 11/14] Revert "Switch data endpoints to accept JSON payloads" This reverts commit c13075e0956729984cb57f8f39c5a9541dae6d53. --- README.md | 39 +++++++++++++++----------------------- examples/cluster/readme.md | 4 +++- src/app.test.ts | 24 ++++++----------------- src/app.ts | 39 ++++++++++++++------------------------ src/util.ts | 4 ---- 5 files changed, 38 insertions(+), 72 deletions(-) diff --git a/README.md b/README.md index 0e613c0..3f81419 100644 --- a/README.md +++ b/README.md @@ -39,15 +39,14 @@ RESP3 Push notification: `>4\r\n$6\r\nMOVING\r\n:1\r\n:2\r\n$6\r\nhost:3\r\n` **cURL Example:** ```bash curl -X POST "http://localhost:3000/send-to-all-clients?encoding=raw" \ - -H 'Content-Type: application/json' \ - -d '{"data" : ">4\r\n$6\r\nMOVING\r\n:1\r\n:2\r\n$6\r\nhost:3\r\n"}' + --data-binary ">4\r\n\$6\r\nMOVING\r\n:1\r\n:2\r\n\$6\r\nhost:3\r\n" ``` **TypeScript Example:** ```typescript const response = await fetch('http://localhost:3000/send-to-all-clients?encoding=raw', { method: 'POST', - body: { data: '>4\r\n$6\r\nMOVING\r\n:1\r\n:2\r\n$6\r\nhost:3\r\n' } + body: '>4\r\n$6\r\nMOVING\r\n:1\r\n:2\r\n$6\r\nhost:3\r\n' }); const result = await response.json(); @@ -59,19 +58,16 @@ console.log(result.success ? 'Injected' : 'Failed'); package main import ( - "bytes" - "encoding/json" "io" "net/http" "strings" ) func main() { - data := map[string]string{"data": ">4\r\n$6\r\nMOVING\r\n:1\r\n:2\r\n$6\r\nhost:3\r\n"} - jsonData, _ := json.Marshal(data) - resp, _ := http.Post("http://localhost:3000/send-to-all-clients?encoding=raw", "application/json", bytes.NewBuffer(jsonData)) + payload := strings.NewReader(">4\r\n$6\r\nMOVING\r\n:1\r\n:2\r\n$6\r\nhost:3\r\n") + resp, _ := http.Post("http://localhost:3000/send-to-all-clients?encoding=raw", "", payload) defer resp.Body.Close() - + body, _ := io.ReadAll(resp.Body) if strings.Contains(string(body), `"success":true`) { println("Injected") @@ -87,11 +83,10 @@ import java.net.URI; public class RespProxyClient { public static void main(String[] args) throws Exception { var client = HttpClient.newHttpClient(); - var json = "{\"data\":\">4\\r\\n$6\\r\\nMOVING\\r\\n:1\\r\\n:2\\r\\n$6\\r\\nhost:3\\r\\n\"}"; var request = HttpRequest.newBuilder() .uri(URI.create("http://localhost:3000/send-to-all-clients?encoding=raw")) - .header("Content-Type", "application/json") - .POST(HttpRequest.BodyPublishers.ofString(json)) + .POST(HttpRequest.BodyPublishers.ofString( + ">4\r\n$6\r\nMOVING\r\n:1\r\n:2\r\n$6\r\nhost:3\r\n")) .build(); var response = client.send(request, HttpResponse.BodyHandlers.ofString()); @@ -108,9 +103,8 @@ public class RespProxyClient { import json import urllib.request -payload = {"data": ">4\r\n$6\r\nMOVING\r\n:1\r\n:2\r\n$6\r\nhost:3\r\n"} -data = json.dumps(payload).encode('utf-8') -req = urllib.request.Request("http://localhost:3000/send-to-all-clients?encoding=raw", data, {"Content-Type": "application/json"}) +data = b">4\r\n$6\r\nMOVING\r\n:1\r\n:2\r\n$6\r\nhost:3\r\n" +req = urllib.request.Request("http://localhost:3000/send-to-all-clients?encoding=raw", data) with urllib.request.urlopen(req) as response: result = json.loads(response.read()) @@ -261,19 +255,17 @@ Send Redis protocol data to a specific client connection. - `connectionId` (path): Target connection ID - `encoding` (query): Data encoding format (`base64` or `raw`, default: `base64`) -**Body:** JSON object with "data" property containing the payload +**Body:** Raw data or base64-encoded data **Example:** ```bash # Send PING command (base64 encoded) curl -X POST "http://localhost:3000/send-to-client/conn_123?encoding=base64" \ - -H 'Content-Type: application/json' \ - -d '{"data": "KjENCiQ0DQpQSU5HDQo="}' + -d "KjENCiQ0DQpQSU5HDQo=" # Send raw binary data curl -X POST "http://localhost:3000/send-to-client/conn_123?encoding=raw" \ - -H 'Content-Type: application/json' \ - -d '{"data": "*1\r\n$4\r\nPING\r\n"}' + --data-binary "*1\r\n$4\r\nPING\r\n" ``` **Response:** @@ -297,8 +289,7 @@ Send data to multiple specific client connections. **Example:** ```bash curl -X POST "http://localhost:3000/send-to-clients?connectionIds=conn_123,conn_456&encoding=base64" \ - -H 'Content-Type: application/json' \ - -d '{"data": "KjENCiQ0DQpQSU5HDQo="}' + -d "KjENCiQ0DQpQSU5HDQo=" ``` #### Send Data to All Clients @@ -310,8 +301,7 @@ Broadcast data to all active client connections. **Example:** ```bash curl -X POST "http://localhost:3000/send-to-all-clients?encoding=base64" \ - -H 'Content-Type: application/json' \ - -d '{"data": "KjENCiQ0DQpQSU5HDQo="}' + -d "KjENCiQ0DQpQSU5HDQo=" ``` #### Close Connection @@ -338,3 +328,4 @@ Send custom Redis responses to specific clients for testing scenarios. ### Protocol Analysis Analyze Redis protocol communication patterns. + diff --git a/examples/cluster/readme.md b/examples/cluster/readme.md index cd01067..618ca2a 100644 --- a/examples/cluster/readme.md +++ b/examples/cluster/readme.md @@ -68,7 +68,9 @@ Open another terminal Push a message to all connected clients ```bash -curl -X POST "http://localhost:4000/send-to-all-clients?encoding=raw" -H 'Content-Type: application/json' -d '{"data": ">3\r\n$7\r\nmessage\r\n$3\r\nfoo\r\n$4\r\neeee\r\n"}' +echo '>3\r\n$7\r\nmessage\r\n$3\r\nfoo\r\n$4\r\neeee\r' | base64 +# PjMNCiQ3DQptZXNzYWdlDQokMw0KZm9vDQokNA0KZWVlZQ0K +curl -X POST "http://localhost:4000/send-to-all-clients?encoding=base64" -d "PjMNCiQ3DQptZXNzYWdlDQokMw0KZm9vDQokNA0KZWVlZQ0K" ``` You should see the following message in the `redis-cli subscribe` terminal: diff --git a/src/app.test.ts b/src/app.test.ts index 2cc43a4..a723c54 100644 --- a/src/app.test.ts +++ b/src/app.test.ts @@ -79,10 +79,7 @@ describe("Redis Proxy API", () => { const res = await app.request("/send-to-client/non-existent-connection?encoding=base64", { method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ data: testData }), + testData, }); expect(res.status).toBe(200); @@ -97,10 +94,7 @@ describe("Redis Proxy API", () => { const res = await app.request("/send-to-clients", { method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ data: testData }), + testData, }); expect(res.status).toBe(400); // Should fail validation due to missing connectionIds @@ -114,7 +108,7 @@ describe("Redis Proxy API", () => { headers: { "Content-Type": "application/json", }, - body: JSON.stringify({ data: testData }), + testData, }); expect(res.status).toBe(200); @@ -131,7 +125,7 @@ describe("Redis Proxy API", () => { headers: { "Content-Type": "application/json", }, - body: JSON.stringify({ data: testData }), + testData, }); expect(res.status).toBe(200); @@ -195,10 +189,7 @@ describe("Redis Proxy API", () => { const pingCommand = Buffer.from("*1\r\n$4\r\nPING\r\n").toString("base64"); const sendRes = await app.request(`/send-to-client/${connectionId}?encoding=base64`, { method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ data: pingCommand }), + body: pingCommand, }); expect(sendRes.status).toBe(200); @@ -269,10 +260,7 @@ describe("Redis Proxy API", () => { const pingCommand = Buffer.from("*1\r\n$4\r\nPING\r\n").toString("base64"); const sendRes = await app.request(`/send-to-client/${connectionId}?encoding=base64`, { method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ data: pingCommand }), + pingCommand, }); expect(sendRes.status).toBe(200); diff --git a/src/app.ts b/src/app.ts index 5655996..ef0aa31 100644 --- a/src/app.ts +++ b/src/app.ts @@ -13,7 +13,6 @@ import { import ProxyStore, { makeId } from "./proxy-store.ts"; import { connectionIdsQuerySchema, - dataSchema, type ExtendedProxyConfig, encodingSchema, getConfig, @@ -125,11 +124,10 @@ export function createApp(testConfig?: ExtendedProxyConfig) { "/send-to-client/:connectionId", zValidator("param", paramSchema), zValidator("query", encodingSchema), - zValidator("json", dataSchema), async (c) => { const { connectionId } = c.req.valid("param"); const { encoding } = c.req.valid("query"); - const { data } = c.req.valid("json"); + const data = await c.req.text(); const buffer = parseBuffer(data, encoding); @@ -146,13 +144,9 @@ export function createApp(testConfig?: ExtendedProxyConfig) { }, ); - app.post( - "/send-to-clients", - zValidator("query", connectionIdsQuerySchema), - zValidator("json", dataSchema), - async (c) => { - const { connectionIds, encoding } = c.req.valid("query"); - const { data } = c.req.valid("json"); + app.post("/send-to-clients", zValidator("query", connectionIdsQuerySchema), async (c) => { + const { connectionIds, encoding } = c.req.valid("query"); + const data = await c.req.text(); const buffer = parseBuffer(data, encoding); @@ -164,21 +158,16 @@ export function createApp(testConfig?: ExtendedProxyConfig) { }, ); - app.post( - "/send-to-all-clients", - zValidator("query", encodingSchema), - zValidator("json", dataSchema), - async (c) => { - const { encoding } = c.req.valid("query"); - const { data } = c.req.valid("json"); - const buffer = parseBuffer(data, encoding); - const results: SendResult[] = []; - for (const proxy of proxyStore.proxies) { - results.push(...proxy.sendToAllClients(buffer)); - } - return c.json({ results }); - }, - ); + app.post("/send-to-all-clients", zValidator("query", encodingSchema), async (c) => { + const { encoding } = c.req.valid("query"); + const data = await c.req.text(); + const buffer = parseBuffer(data, encoding); + const results: SendResult[] = []; + for (const proxy of proxyStore.proxies) { + results.push(...proxy.sendToAllClients(buffer)); + } + return c.json({ results }); + }); app.delete("/connections/:id", (c) => { const connectionId = c.req.param("id"); diff --git a/src/util.ts b/src/util.ts index 4301c79..a40f0a2 100644 --- a/src/util.ts +++ b/src/util.ts @@ -8,10 +8,6 @@ export type ExtendedProxyConfig = Omit & { simulateCluster?: boolean; }; -export const dataSchema = z.object({ - data: z.string().min(1, "Data is required"), -}); - export const encodingSchema = z.object({ encoding: z.enum(["base64", "raw"]).default("base64"), }); From e536a7dc3ea8ef9e1a30db8e3433b7a9b1363489 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Thu, 30 Oct 2025 15:32:11 +0200 Subject: [PATCH 12/14] add examples --- README.md | 111 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 107 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 3f81419..2bcbdc4 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ const response = await fetch('http://localhost:3000/send-to-all-clients?encoding }); const result = await response.json(); -console.log(result.success ? 'Injected' : 'Failed'); +console.log(result.results.length > 0 ? 'Injected' : 'Failed'); ``` **Go Example:** @@ -69,7 +69,7 @@ func main() { defer resp.Body.Close() body, _ := io.ReadAll(resp.Body) - if strings.Contains(string(body), `"success":true`) { + if strings.Contains(string(body), `"results"`) { println("Injected") } } @@ -91,7 +91,7 @@ public class RespProxyClient { var response = client.send(request, HttpResponse.BodyHandlers.ofString()); - if (response.body().contains("\"success\":true")) { + if (response.body().contains("\"results\"")) { System.out.println("Injected"); } } @@ -108,7 +108,7 @@ req = urllib.request.Request("http://localhost:3000/send-to-all-clients?encoding with urllib.request.urlopen(req) as response: result = json.loads(response.read()) - print("Injected" if result["success"] else "Failed") + print("Injected" if len(result["results"]) > 0 else "Failed") ``` Key Endpoints: `POST /send-to-client/{id}`, `POST /send-to-all-clients`, `GET /connections`, `GET /stats` @@ -318,6 +318,109 @@ Forcefully close a specific client connection. } ``` +#### Add Interceptor +```http +POST /interceptors +``` +Add a custom interceptor to match commands and return custom responses. + +**Example:** +```bash +curl -X POST "http://localhost:3000/interceptors" \ + -H "Content-Type: application/json" \ + -d '{"name":"ping-interceptor","match":"*1\r\n$4\r\nPING\r\n","response":"+CUSTOM PONG\r\n","encoding":"raw"}' +``` + +**Response:** +```json +{ + "success": true, + "name": "ping-interceptor" +} +``` + +#### Create Scenario +```http +POST /scenarios +``` +Set up automated response sequence for testing. + +**Example:** +```bash +curl -X POST "http://localhost:3000/scenarios" \ + -H "Content-Type: application/json" \ + -d '{"responses":["+OK\r\n",":42\r\n"],"encoding":"raw"}' +``` + +**Response:** +```json +{ + "success": true, + "totalResponses": 2 +} +``` + +#### Get Nodes +```http +GET /nodes +``` +List all proxy node IDs. + +**Example:** +```bash +curl http://localhost:3000/nodes +``` + +**Response:** +```json +{ + "ids": ["localhost:6379:6379", "localhost:6379:6380"] +} +``` + +#### Add Node +```http +POST /nodes +``` +Add a new proxy node dynamically. + +**Example:** +```bash +curl -X POST "http://localhost:3000/nodes" \ + -H "Content-Type: application/json" \ + -d '{"listenPort":6380,"targetHost":"localhost","targetPort":6379}' +``` + +**Response:** +```json +{ + "success": true, + "cfg": { + "listenPort": 6380, + "targetHost": "localhost", + "targetPort": 6379 + } +} +``` + +#### Delete Node +```http +DELETE /nodes/{nodeId} +``` +Remove a proxy node. + +**Example:** +```bash +curl -X DELETE "http://localhost:3000/nodes/localhost:6379:6380" +``` + +**Response:** +```json +{ + "success": true +} +``` + ## Use Cases ### Testing Redis Applications From 535aa9039652b5f9b89ad7ae9f746cca79d56efb Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Thu, 30 Oct 2025 15:48:06 +0200 Subject: [PATCH 13/14] address PR comments --- src/proxy-store.ts | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/proxy-store.ts b/src/proxy-store.ts index e6b09f4..42af960 100644 --- a/src/proxy-store.ts +++ b/src/proxy-store.ts @@ -1,35 +1,35 @@ import type { RedisProxy } from "redis-monorepo/packages/test-utils/lib/proxy/redis-proxy"; -export const makeId = (host: string, port: number, listenPort: number) => +export const makeId = (host: string, port: number, listenPort: number): string => listenPort ? `${host}:${port}@${listenPort}` : `${host}:${port}`; export default class ProxyStore { #proxies = new Map(); - add(id: string, proxy: RedisProxy) { + add(id: string, proxy: RedisProxy): void { this.#proxies.set(id, proxy); } - async delete(id: string) { + async delete(id: string): Promise { const proxy = this.#proxies.get(id); if (!proxy) return false; await proxy.stop(); - this.#proxies.delete(id); + return this.#proxies.delete(id); } - get nodeIds() { + get nodeIds(): string[] { return Array.from(this.#proxies.keys()); } - get proxies() { + get proxies(): RedisProxy[] { return Array.from(this.#proxies.values()); } - get entries() { + get entries(): [string, RedisProxy][] { return Array.from(this.#proxies.entries()); } - getProxyByConnectionId(connectionId: string) { + getProxyByConnectionId(connectionId: string): RedisProxy | undefined { for (const proxy of this.#proxies.values()) { if (proxy.getActiveConnectionIds().includes(connectionId)) { return proxy; @@ -37,7 +37,7 @@ export default class ProxyStore { } } - getProxiesByConnectionIds(connectionIds: string[]) { + getProxiesByConnectionIds(connectionIds: string[]): [RedisProxy, string[]][] { const result: [RedisProxy, string[]][] = []; for (const proxy of this.#proxies.values()) { const activeIds = proxy.getActiveConnectionIds(); From 9e31935dc1c74bcf19d8b6068002b44974200347 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Thu, 30 Oct 2025 16:31:03 +0200 Subject: [PATCH 14/14] fix formatting --- src/app.ts | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/app.ts b/src/app.ts index ef0aa31..1f4b956 100644 --- a/src/app.ts +++ b/src/app.ts @@ -148,15 +148,14 @@ export function createApp(testConfig?: ExtendedProxyConfig) { const { connectionIds, encoding } = c.req.valid("query"); const data = await c.req.text(); - const buffer = parseBuffer(data, encoding); + const buffer = parseBuffer(data, encoding); - const results: SendResult[] = []; - for (const [proxy, matchingConIds] of proxyStore.getProxiesByConnectionIds(connectionIds)) { - results.push(...proxy.sendToClients(matchingConIds, buffer)); - } - return c.json({ results }); - }, - ); + const results: SendResult[] = []; + for (const [proxy, matchingConIds] of proxyStore.getProxiesByConnectionIds(connectionIds)) { + results.push(...proxy.sendToClients(matchingConIds, buffer)); + } + return c.json({ results }); + }); app.post("/send-to-all-clients", zValidator("query", encodingSchema), async (c) => { const { encoding } = c.req.valid("query");