diff --git a/bun.lock b/bun.lock index 08da17d..bda4945 100644 --- a/bun.lock +++ b/bun.lock @@ -6,7 +6,7 @@ "dependencies": { "@hono/zod-validator": "^0.7.2", "hono": "^4.8.5", - "redis-monorepo": "github:redis/node-redis", + "redis-monorepo": "github:redis/node-redis#master", "zod": "^4.0.8", }, "devDependencies": { @@ -20,56 +20,56 @@ "@biomejs/biome", ], "packages": { - "@biomejs/biome": ["@biomejs/biome@2.1.2", "", { "optionalDependencies": { "@biomejs/cli-darwin-arm64": "2.1.2", "@biomejs/cli-darwin-x64": "2.1.2", "@biomejs/cli-linux-arm64": "2.1.2", "@biomejs/cli-linux-arm64-musl": "2.1.2", "@biomejs/cli-linux-x64": "2.1.2", "@biomejs/cli-linux-x64-musl": "2.1.2", "@biomejs/cli-win32-arm64": "2.1.2", "@biomejs/cli-win32-x64": "2.1.2" }, "bin": { "biome": "bin/biome" } }, "sha512-yq8ZZuKuBVDgAS76LWCfFKHSYIAgqkxVB3mGVVpOe2vSkUTs7xG46zXZeNPRNVjiJuw0SZ3+J2rXiYx0RUpfGg=="], + "@biomejs/biome": ["@biomejs/biome@2.2.6", "", { "optionalDependencies": { "@biomejs/cli-darwin-arm64": "2.2.6", "@biomejs/cli-darwin-x64": "2.2.6", "@biomejs/cli-linux-arm64": "2.2.6", "@biomejs/cli-linux-arm64-musl": "2.2.6", "@biomejs/cli-linux-x64": "2.2.6", "@biomejs/cli-linux-x64-musl": "2.2.6", "@biomejs/cli-win32-arm64": "2.2.6", "@biomejs/cli-win32-x64": "2.2.6" }, "bin": { "biome": "bin/biome" } }, "sha512-yKTCNGhek0rL5OEW1jbLeZX8LHaM8yk7+3JRGv08my+gkpmtb5dDE+54r2ZjZx0ediFEn1pYBOJSmOdDP9xtFw=="], - "@biomejs/cli-darwin-arm64": ["@biomejs/cli-darwin-arm64@2.1.2", "", { "os": "darwin", "cpu": "arm64" }, "sha512-leFAks64PEIjc7MY/cLjE8u5OcfBKkcDB0szxsWUB4aDfemBep1WVKt0qrEyqZBOW8LPHzrFMyDl3FhuuA0E7g=="], + "@biomejs/cli-darwin-arm64": ["@biomejs/cli-darwin-arm64@2.2.6", "", { "os": "darwin", "cpu": "arm64" }, "sha512-UZPmn3M45CjTYulgcrFJFZv7YmK3pTxTJDrFYlNElT2FNnkkX4fsxjExTSMeWKQYoZjvekpH5cvrYZZlWu3yfA=="], - "@biomejs/cli-darwin-x64": ["@biomejs/cli-darwin-x64@2.1.2", "", { "os": "darwin", "cpu": "x64" }, "sha512-Nmmv7wRX5Nj7lGmz0FjnWdflJg4zii8Ivruas6PBKzw5SJX/q+Zh2RfnO+bBnuKLXpj8kiI2x2X12otpH6a32A=="], + "@biomejs/cli-darwin-x64": ["@biomejs/cli-darwin-x64@2.2.6", "", { "os": "darwin", "cpu": "x64" }, "sha512-HOUIquhHVgh/jvxyClpwlpl/oeMqntlteL89YqjuFDiZ091P0vhHccwz+8muu3nTyHWM5FQslt+4Jdcd67+xWQ=="], - "@biomejs/cli-linux-arm64": ["@biomejs/cli-linux-arm64@2.1.2", "", { "os": "linux", "cpu": "arm64" }, "sha512-NWNy2Diocav61HZiv2enTQykbPP/KrA/baS7JsLSojC7Xxh2nl9IczuvE5UID7+ksRy2e7yH7klm/WkA72G1dw=="], + "@biomejs/cli-linux-arm64": ["@biomejs/cli-linux-arm64@2.2.6", "", { "os": "linux", "cpu": "arm64" }, "sha512-BpGtuMJGN+o8pQjvYsUKZ+4JEErxdSmcRD/JG3mXoWc6zrcA7OkuyGFN1mDggO0Q1n7qXxo/PcupHk8gzijt5g=="], - "@biomejs/cli-linux-arm64-musl": ["@biomejs/cli-linux-arm64-musl@2.1.2", "", { "os": "linux", "cpu": "arm64" }, "sha512-qgHvafhjH7Oca114FdOScmIKf1DlXT1LqbOrrbR30kQDLFPEOpBG0uzx6MhmsrmhGiCFCr2obDamu+czk+X0HQ=="], + "@biomejs/cli-linux-arm64-musl": ["@biomejs/cli-linux-arm64-musl@2.2.6", "", { "os": "linux", "cpu": "arm64" }, "sha512-TjCenQq3N6g1C+5UT3jE1bIiJb5MWQvulpUngTIpFsL4StVAUXucWD0SL9MCW89Tm6awWfeXBbZBAhJwjyFbRQ=="], - "@biomejs/cli-linux-x64": ["@biomejs/cli-linux-x64@2.1.2", "", { "os": "linux", "cpu": "x64" }, "sha512-Km/UYeVowygTjpX6sGBzlizjakLoMQkxWbruVZSNE6osuSI63i4uCeIL+6q2AJlD3dxoiBJX70dn1enjQnQqwA=="], + "@biomejs/cli-linux-x64": ["@biomejs/cli-linux-x64@2.2.6", "", { "os": "linux", "cpu": "x64" }, "sha512-1HaM/dpI/1Z68zp8ZdT6EiBq+/O/z97a2AiHMl+VAdv5/ELckFt9EvRb8hDHpk8hUMoz03gXkC7VPXOVtU7faA=="], - "@biomejs/cli-linux-x64-musl": ["@biomejs/cli-linux-x64-musl@2.1.2", "", { "os": "linux", "cpu": "x64" }, "sha512-xlB3mU14ZUa3wzLtXfmk2IMOGL+S0aHFhSix/nssWS/2XlD27q+S6f0dlQ8WOCbYoXcuz8BCM7rCn2lxdTrlQA=="], + "@biomejs/cli-linux-x64-musl": ["@biomejs/cli-linux-x64-musl@2.2.6", "", { "os": "linux", "cpu": "x64" }, "sha512-1ZcBux8zVM3JhWN2ZCPaYf0+ogxXG316uaoXJdgoPZcdK/rmRcRY7PqHdAos2ExzvjIdvhQp72UcveI98hgOog=="], - "@biomejs/cli-win32-arm64": ["@biomejs/cli-win32-arm64@2.1.2", "", { "os": "win32", "cpu": "arm64" }, "sha512-G8KWZli5ASOXA3yUQgx+M4pZRv3ND16h77UsdunUL17uYpcL/UC7RkWTdkfvMQvogVsAuz5JUcBDjgZHXxlKoA=="], + "@biomejs/cli-win32-arm64": ["@biomejs/cli-win32-arm64@2.2.6", "", { "os": "win32", "cpu": "arm64" }, "sha512-h3A88G8PGM1ryTeZyLlSdfC/gz3e95EJw9BZmA6Po412DRqwqPBa2Y9U+4ZSGUAXCsnSQE00jLV8Pyrh0d+jQw=="], - "@biomejs/cli-win32-x64": ["@biomejs/cli-win32-x64@2.1.2", "", { "os": "win32", "cpu": "x64" }, "sha512-9zajnk59PMpjBkty3bK2IrjUsUHvqe9HWwyAWQBjGLE7MIBjbX2vwv1XPEhmO2RRuGoTkVx3WCanHrjAytICLA=="], + "@biomejs/cli-win32-x64": ["@biomejs/cli-win32-x64@2.2.6", "", { "os": "win32", "cpu": "x64" }, "sha512-yx0CqeOhPjYQ5ZXgPfu8QYkgBhVJyvWe36as7jRuPrKPO5ylVDfwVtPQ+K/mooNTADW0IhxOZm3aPu16dP8yNQ=="], - "@hono/zod-validator": ["@hono/zod-validator@0.7.2", "", { "peerDependencies": { "hono": ">=3.9.0", "zod": "^3.25.0 || ^4.0.0" } }, "sha512-ub5eL/NeZ4eLZawu78JpW/J+dugDAYhwqUIdp9KYScI6PZECij4Hx4UsrthlEUutqDDhPwRI0MscUfNkvn/mqQ=="], + "@hono/zod-validator": ["@hono/zod-validator@0.7.4", "", { "peerDependencies": { "hono": ">=3.9.0", "zod": "^3.25.0 || ^4.0.0" } }, "sha512-biKGn3BRJVaftZlIPMyK+HCe/UHAjJ6sH0UyXe3+v0OcgVr9xfImDROTJFLtn9e3XEEAHGZIM9U6evu85abm8Q=="], - "@redis/bloom": ["@redis/bloom@5.6.1", "", { "peerDependencies": { "@redis/client": "^5.6.1" } }, "sha512-5/22U76IMEfn6TeZ+uvjXspHw+ykBF0kpBa8xouzeHaQMXs/auqBUOEYzU2VKYDvnd2RSpPTyIg82oB7PpUgLg=="], + "@redis/bloom": ["@redis/bloom@5.8.3", "", { "peerDependencies": { "@redis/client": "^5.8.3" } }, "sha512-1eldTzHvdW3Oi0TReb8m1yiFt8ZwyF6rv1NpZyG5R4TpCwuAdKQetBKoCw7D96tNFgsVVd6eL+NaGZZCqhRg4g=="], - "@redis/client": ["@redis/client@5.6.1", "", { "dependencies": { "cluster-key-slot": "1.1.2" } }, "sha512-bWHmSFIJ5w1Y4aHsYs46XMDHKQsBHFRhNcllYaBxz2Zl+lu+gbm5yI9BqxvKh48bLTs/Wx1Kns0gN2WIasE8MA=="], + "@redis/client": ["@redis/client@5.8.3", "", { "dependencies": { "cluster-key-slot": "1.1.2" } }, "sha512-MZVUE+l7LmMIYlIjubPosruJ9ltSLGFmJqsXApTqPLyHLjsJUSAbAJb/A3N34fEqean4ddiDkdWzNu4ZKPvRUg=="], - "@redis/json": ["@redis/json@5.6.1", "", { "peerDependencies": { "@redis/client": "^5.6.1" } }, "sha512-cTggVzPIVuiFeXcEcnTRiUzV7rmUvM9KUYxWiHyjsAVACTEUe4ifKkvzrij0H/z3ammU5tfGACffDB3olBwtVA=="], + "@redis/json": ["@redis/json@5.8.3", "", { "peerDependencies": { "@redis/client": "^5.8.3" } }, "sha512-DRR09fy/u8gynHGJ4gzXYeM7D8nlS6EMv5o+h20ndTJiAc7RGR01fdk2FNjnn1Nz5PjgGGownF+s72bYG4nZKQ=="], - "@redis/search": ["@redis/search@5.6.1", "", { "peerDependencies": { "@redis/client": "^5.6.1" } }, "sha512-+eOjx8O2YoKygjqkLpTHqcAq0zKLjior+ee2tRBx/3RSf1+OHxiC9Y6NstshQpvB1XHqTw9n7+f0+MsRJZrp0g=="], + "@redis/search": ["@redis/search@5.8.3", "", { "peerDependencies": { "@redis/client": "^5.8.3" } }, "sha512-EMIvEeGRR2I0BJEz4PV88DyCuPmMT1rDtznlsHY3cKSDcc9vj0Q411jUnX0iU2vVowUgWn/cpySKjpXdZ8m+5g=="], - "@redis/time-series": ["@redis/time-series@5.6.1", "", { "peerDependencies": { "@redis/client": "^5.6.1" } }, "sha512-sd3q4jMJdoSO2akw1L9NrdFI1JJ6zeMgMUoTh4a34p9sY3AnOI4aDLCecy8L2IcPAP1oNR3TbLFJiCJDQ35QTA=="], + "@redis/time-series": ["@redis/time-series@5.8.3", "", { "peerDependencies": { "@redis/client": "^5.8.3" } }, "sha512-5Jwy3ilsUYQjzpE7WZ1lEeG1RkqQ5kHtwV1p8yxXHSEmyUbC/T/AVgyjMcm52Olj/Ov/mhDKjx6ndYUi14bXsw=="], - "@types/bun": ["@types/bun@1.2.19", "", { "dependencies": { "bun-types": "1.2.19" } }, "sha512-d9ZCmrH3CJ2uYKXQIUuZ/pUnTqIvLDS0SK7pFmbx8ma+ziH/FRMoAq5bYpRG7y+w1gl+HgyNZbtqgMq4W4e2Lg=="], + "@types/bun": ["@types/bun@1.3.0", "", { "dependencies": { "bun-types": "1.3.0" } }, "sha512-+lAGCYjXjip2qY375xX/scJeVRmZ5cY0wyHYyCYxNcdEXrQ4AOe3gACgd4iQ8ksOslJtW4VNxBJ8llUwc3a6AA=="], - "@types/node": ["@types/node@24.1.0", "", { "dependencies": { "undici-types": "~7.8.0" } }, "sha512-ut5FthK5moxFKH2T1CUOC6ctR67rQRvvHdFLCD2Ql6KXmMuCrjsSsRI9UsLCm9M18BMwClv4pn327UvB7eeO1w=="], + "@types/node": ["@types/node@24.9.1", "", { "dependencies": { "undici-types": "~7.16.0" } }, "sha512-QoiaXANRkSXK6p0Duvt56W208du4P9Uye9hWLWgGMDTEoKPhuenzNcC4vGUmrNkiOKTlIrBoyNQYNpSwfEZXSg=="], - "@types/react": ["@types/react@19.1.8", "", { "dependencies": { "csstype": "^3.0.2" } }, "sha512-AwAfQ2Wa5bCx9WP8nZL2uMZWod7J7/JSplxbTmBQ5ms6QpqNYm672H0Vu9ZVKVngQ+ii4R/byguVEUZQyeg44g=="], + "@types/react": ["@types/react@19.2.2", "", { "dependencies": { "csstype": "^3.0.2" } }, "sha512-6mDvHUFSjyT2B2yeNx2nUgMxh9LtOWvkhIU3uePn2I2oyNymUAX1NIsdgviM4CH+JSrp2D2hsMvJOkxY+0wNRA=="], - "bun-types": ["bun-types@1.2.19", "", { "dependencies": { "@types/node": "*" }, "peerDependencies": { "@types/react": "^19" } }, "sha512-uAOTaZSPuYsWIXRpj7o56Let0g/wjihKCkeRqUBhlLVM/Bt+Fj9xTo+LhC1OV1XDaGkz4hNC80et5xgy+9KTHQ=="], + "bun-types": ["bun-types@1.3.0", "", { "dependencies": { "@types/node": "*" }, "peerDependencies": { "@types/react": "^19" } }, "sha512-u8X0thhx+yJ0KmkxuEo9HAtdfgCBaM/aI9K90VQcQioAmkVp3SG3FkwWGibUFz3WdXAdcsqOcbU40lK7tbHdkQ=="], "cluster-key-slot": ["cluster-key-slot@1.1.2", "", {}, "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA=="], "csstype": ["csstype@3.1.3", "", {}, "sha512-M1uQkMl8rQK/szD0LNhtqxIPLpimGm8sOBwU7lLnCpSbTyY3yeU1Vc7l4KT5zT4s/yOxHH5O7tIuuLOCnLADRw=="], - "hono": ["hono@4.8.5", "", {}, "sha512-Up2cQbtNz1s111qpnnECdTGqSIUIhZJMLikdKkshebQSEBcoUKq6XJayLGqSZWidiH0zfHRCJqFu062Mz5UuRA=="], + "hono": ["hono@4.10.1", "", {}, "sha512-rpGNOfacO4WEPClfkEt1yfl8cbu10uB1lNpiI33AKoiAHwOS8lV748JiLx4b5ozO/u4qLjIvfpFsPXdY5Qjkmg=="], - "redis": ["redis@5.6.1", "", { "dependencies": { "@redis/bloom": "5.6.1", "@redis/client": "5.6.1", "@redis/json": "5.6.1", "@redis/search": "5.6.1", "@redis/time-series": "5.6.1" } }, "sha512-O9DwAvcBm/lrlkGE0A6gNBtUdA8J9oD9njeLYlLzmm+MGTR7nd7VkpspfXqeXFg3gm89zldDqckyaHhXfhY80g=="], + "redis": ["redis@5.8.3", "", { "dependencies": { "@redis/bloom": "5.8.3", "@redis/client": "5.8.3", "@redis/json": "5.8.3", "@redis/search": "5.8.3", "@redis/time-series": "5.8.3" } }, "sha512-MfSrfV6+tEfTw8c4W0yFp6XWX8Il4laGU7Bx4kvW4uiYM1AuZ3KGqEGt1LdQHeD1nEyLpIWetZ/SpY3kkbgrYw=="], - "redis-monorepo": ["redis-monorepo@github:redis/node-redis#e96db0d", {}, "redis-node-redis-e96db0d"], + "redis-monorepo": ["redis-monorepo@github:redis/node-redis#e6025b1", {}, "redis-node-redis-e6025b1"], - "undici-types": ["undici-types@7.8.0", "", {}, "sha512-9UJ2xGDvQ43tYyVMpuHlsgApydB8ZKfVYTsLDhXkFL/6gfkp+U8xTGdh8pMJv1SpZna0zxG1DwsKZsreLbXBxw=="], + "undici-types": ["undici-types@7.16.0", "", {}, "sha512-Zz+aZWSj8LE6zoxD+xrjh4VfkIG8Ya6LvYkZqtUQGJPZjYl53ypCaUwWqo7eI0x66KBGeRo+mlBEkMSeSZ38Nw=="], - "zod": ["zod@4.0.8", "", {}, "sha512-+MSh9cZU9r3QKlHqrgHMTSr3QwMGv4PLfR0M4N/sYWV5/x67HgXEhIGObdBkpnX8G78pTgWnIrBL2lZcNJOtfg=="], + "zod": ["zod@4.1.12", "", {}, "sha512-JInaHOamG8pt5+Ey8kGmdcAcg3OL9reK8ltczgHTAwNhMys/6ThXHityHxVV2p3fkw/c+MAvBHFVYHFZDmjMCQ=="], } } diff --git a/package.json b/package.json index 69886e6..3cb3cff 100644 --- a/package.json +++ b/package.json @@ -22,7 +22,7 @@ "dependencies": { "@hono/zod-validator": "^0.7.2", "hono": "^4.8.5", - "redis-monorepo": "github:redis/node-redis", + "redis-monorepo": "github:redis/node-redis#master", "zod": "^4.0.8" }, "devDependencies": { diff --git a/src/app.test.ts b/src/app.test.ts index 03985bd..7624b9f 100644 --- a/src/app.test.ts +++ b/src/app.test.ts @@ -7,15 +7,19 @@ import { type RedisProxy, } from "redis-monorepo/packages/test-utils/lib/redis-proxy.ts"; import { createApp } from "./app"; +import { makeId } from "./proxy-store"; + +const TARGET_HOST = "127.0.0.1"; describe("Redis Proxy API", () => { let app: any; let proxy: RedisProxy; let mockRedisServer: any; + let targetPort: number; beforeAll(async () => { const freePort = await getFreePortNumber(); - const targetPort = await getFreePortNumber(); + targetPort = await getFreePortNumber(); mockRedisServer = Bun.listen({ hostname: "127.0.0.1", @@ -85,7 +89,7 @@ describe("Redis Proxy API", () => { const testConfig = { listenPort: freePort, listenHost: "127.0.0.1", - targetHost: "127.0.0.1", + targetHost: TARGET_HOST, targetPort: targetPort, timeout: 30000, enableLogging: true, @@ -113,7 +117,7 @@ describe("Redis Proxy API", () => { const res = await app.request("/stats"); expect(res.status).toBe(200); - const stats = await res.json(); + const stats = (await res.json())[makeId(TARGET_HOST, targetPort)]; expect(stats).toHaveProperty("activeConnections"); expect(stats).toHaveProperty("totalConnections"); expect(stats).toHaveProperty("connections"); @@ -125,10 +129,9 @@ describe("Redis Proxy API", () => { const res = await app.request("/connections"); expect(res.status).toBe(200); - const result = await res.json(); - expect(result).toHaveProperty("connectionIds"); - expect(Array.isArray(result.connectionIds)).toBe(true); - expect(result.connectionIds.length).toBe(0); + const result = (await res.json())[makeId(TARGET_HOST, targetPort)]; + expect(result).toBeArray(); + expect(result.length).toBe(0); }); test("POST /send-to-client with invalid connection", async () => { @@ -217,16 +220,16 @@ describe("Redis Proxy API", () => { const statsRes = await app.request("/stats"); expect(statsRes.status).toBe(200); - const stats = await statsRes.json(); + const stats = (await statsRes.json())[makeId(TARGET_HOST, targetPort)]; expect(stats.activeConnections).toBe(1); expect(stats.totalConnections).toBeGreaterThanOrEqual(1); expect(stats.connections.length).toBe(1); const connectionsRes = await app.request("/connections"); expect(connectionsRes.status).toBe(200); - const connectionsResult = await connectionsRes.json(); - expect(connectionsResult.connectionIds.length).toBe(1); - const connectionId = connectionsResult.connectionIds[0]; + const connectionsResult = (await connectionsRes.json())[makeId(TARGET_HOST, targetPort)]; + expect(connectionsResult.length).toBe(1); + const connectionId = connectionsResult[0]; expect(typeof connectionId).toBe("string"); expect(connectionId.length).toBeGreaterThan(0); @@ -255,7 +258,7 @@ describe("Redis Proxy API", () => { await new Promise((resolve) => setTimeout(resolve, 100)); const finalStatsRes = await app.request("/stats"); - const finalStats = await finalStatsRes.json(); + const finalStats = (await finalStatsRes.json())[makeId(TARGET_HOST, targetPort)]; expect(finalStats.activeConnections).toBe(0); resolve(); @@ -289,15 +292,15 @@ describe("Redis Proxy API", () => { await new Promise((resolve) => setTimeout(resolve, 100)); const statsRes = await app.request("/stats"); - const stats = await statsRes.json(); + const stats = (await statsRes.json())[makeId(TARGET_HOST, targetPort)]; expect(stats.activeConnections).toBe(1); expect(stats.totalConnections).toBeGreaterThanOrEqual(1); expect(stats.connections.length).toBe(1); const connectionsRes = await app.request("/connections"); - const connectionsResult = await connectionsRes.json(); - expect(connectionsResult.connectionIds.length).toBe(1); - const connectionId = connectionsResult.connectionIds[0]; + const connectionsResult = (await connectionsRes.json())[makeId(TARGET_HOST, targetPort)]; + expect(connectionsResult.length).toBe(1); + const connectionId = connectionsResult[0]; const result = await client.sendCommand(["FOO"]); expect(result).toBe("BAR" as unknown as SimpleStringReply); @@ -319,7 +322,7 @@ describe("Redis Proxy API", () => { await new Promise((resolve) => setTimeout(resolve, 100)); const finalStatsRes = await app.request("/stats"); - const finalStats = await finalStatsRes.json(); + const finalStats = (await finalStatsRes.json())[makeId(TARGET_HOST, targetPort)]; expect(finalStats.activeConnections).toBe(0); }); }); diff --git a/src/app.ts b/src/app.ts index 46d18b3..7dad8b8 100644 --- a/src/app.ts +++ b/src/app.ts @@ -1,30 +1,132 @@ import { zValidator } from "@hono/zod-validator"; import { Hono } from "hono"; import { logger } from "hono/logger"; -import type { ProxyConfig } from "redis-monorepo/packages/test-utils/lib/redis-proxy.ts"; +import type { + Interceptor, + ProxyConfig, + ProxyStats, + SendResult, +} from "redis-monorepo/packages/test-utils/lib/redis-proxy.ts"; import { RedisProxy } from "redis-monorepo/packages/test-utils/lib/redis-proxy.ts"; - +import ProxyStore, { makeId } from "./proxy-store.ts"; import { connectionIdsQuerySchema, encodingSchema, getConfig, paramSchema, parseBuffer, + proxyConfigSchema, } from "./util.ts"; +const startNewProxy = (config: ProxyConfig) => { + const proxy = new RedisProxy(config); + proxy.start().catch(console.error); + return proxy; +}; + +interface Mapping { + from: { + host: string; + port: number; + }; + to: { + host: string; + port: number; + }; +} + +const addressMapping = new Map(); + +const setTransformers = (addressMapping: Map, proxyStore: ProxyStore) => { + const interceptors = []; + for (const mapping of addressMapping.values()) { + interceptors.push(async (data: Buffer, next: Interceptor) => { + const response = await next(data); + // for example $9\r\n127.0.0.1\r\n:3000 + const from = `$${mapping.from.host.length}\r\n${mapping.from.host}\r\n:${mapping.from.port}`; + if (response.includes(from)) { + const to = `$${mapping.to.host.length}\r\n${mapping.to.host}\r\n:${mapping.to.port}`; + return Buffer.from(response.toString().replaceAll(from, to)); + } + return response; + }); + } + for (const proxy of proxyStore.proxies) { + proxy.setInterceptors(interceptors); + } +}; + export function createApp(testConfig?: ProxyConfig & { readonly apiPort?: number }) { const config = testConfig || getConfig(); const app = new Hono(); app.use(logger()); - const proxy = new RedisProxy(config); - proxy.start().catch(console.error); + const proxyStore = new ProxyStore(); + const nodeId = makeId(config.targetHost, config.targetPort); + proxyStore.add(nodeId, startNewProxy(config)); + addressMapping.set(nodeId, { + from: { + host: config.targetHost, + port: config.targetPort, + }, + to: { + host: config.listenHost ?? "127.0.0.1", + port: config.listenPort, + }, + }); + setTransformers(addressMapping, proxyStore); + + app.post("/nodes", zValidator("json", proxyConfigSchema), async (c) => { + const data = await c.req.json(); + const cfg: ProxyConfig = { ...config, ...data }; + const nodeId = makeId(cfg.targetHost, cfg.targetPort); + proxyStore.add(nodeId, startNewProxy(cfg)); + addressMapping.set(nodeId, { + from: { + host: cfg.targetHost, + port: cfg.targetPort, + }, + to: { + host: cfg.listenHost ?? "127.0.0.1", + port: cfg.listenPort, + }, + }); + setTransformers(addressMapping, proxyStore); + return c.json({ success: true, cfg }); + }); + + app.delete("/nodes/:id", async (c) => { + const nodeId = c.req.param("id"); + const success = await proxyStore.delete(nodeId); + addressMapping.delete(nodeId); + setTransformers(addressMapping, proxyStore); + return c.json({ success }); + }); + + app.get("/nodes", (c) => { + return c.json({ ids: proxyStore.nodeIds }); + }); + app.get("/stats", (c) => { - return c.json(proxy.getStats()); + const response = proxyStore.entries.reduce( + (acc, [id, proxy]) => { + acc[id] = proxy.getStats(); + return acc; + }, + {} as Record, + ); + return c.json(response); }); app.get("/connections", (c) => { - return c.json({ connectionIds: proxy.getActiveConnectionIds() }); + const response = proxyStore.entries.reduce( + (acc, [id, proxy]) => { + acc[id] = proxy.getActiveConnectionIds(); + return acc; + }, + {} as Record, + ); + return c.json(response); }); app.post( @@ -37,6 +139,15 @@ export function createApp(testConfig?: ProxyConfig & { readonly apiPort?: number const data = await c.req.text(); const buffer = parseBuffer(data, encoding); + + const proxy = proxyStore.getProxyByConnectionId(connectionId); + if (!proxy) + return c.json({ + success: false, + error: "Connection not found", + connectionId, + }); + const result = proxy.sendToClient(connectionId, buffer); return c.json(result); }, @@ -47,24 +158,36 @@ export function createApp(testConfig?: ProxyConfig & { readonly apiPort?: number const data = await c.req.text(); const buffer = parseBuffer(data, encoding); - const results = proxy.sendToClients(connectionIds, buffer); + + const results: SendResult[] = []; + for (const [proxy, matchingConIds] of proxyStore.getProxiesByConnectionIds(connectionIds)) { + results.push(...proxy.sendToClients(matchingConIds, buffer)); + } return c.json({ results }); }); app.post("/send-to-all-clients", zValidator("query", encodingSchema), async (c) => { const { encoding } = c.req.valid("query"); const data = await c.req.text(); - const buffer = parseBuffer(data, encoding); - const results = proxy.sendToAllClients(buffer); + const results: SendResult[] = []; + for (const proxy of proxyStore.proxies) { + results.push(...proxy.sendToAllClients(buffer)); + } return c.json({ results }); }); app.delete("/connections/:id", (c) => { const connectionId = c.req.param("id"); + const proxy = proxyStore.getProxyByConnectionId(connectionId); + if (!proxy) + return c.json({ + success: false, + connectionId, + }); const success = proxy.closeConnection(connectionId); return c.json({ success, connectionId }); }); - return { app, proxy, config }; + return { app, proxy: proxyStore.proxies[0], config }; } diff --git a/src/proxy-store.ts b/src/proxy-store.ts new file mode 100644 index 0000000..4244c2e --- /dev/null +++ b/src/proxy-store.ts @@ -0,0 +1,50 @@ +import type { RedisProxy } from "redis-monorepo/packages/test-utils/lib/redis-proxy"; + +export const makeId = (host: string, port: number) => `${host}:${port}`; + +export default class ProxyStore { + #proxies = new Map(); + + add(id: string, proxy: RedisProxy) { + this.#proxies.set(id, proxy); + } + + async delete(id: string) { + const proxy = this.#proxies.get(id); + if (!proxy) return false; + await proxy.stop(); + this.#proxies.delete(id); + } + + get nodeIds() { + return Array.from(this.#proxies.keys()); + } + + get proxies() { + return Array.from(this.#proxies.values()); + } + + get entries() { + return Array.from(this.#proxies.entries()); + } + + getProxyByConnectionId(connectionId: string) { + for (const proxy of this.#proxies.values()) { + if (proxy.getActiveConnectionIds().includes(connectionId)) { + return proxy; + } + } + } + + getProxiesByConnectionIds(connectionIds: string[]) { + const result: [RedisProxy, string[]][] = []; + for (const proxy of this.#proxies.values()) { + const activeIds = proxy.getActiveConnectionIds(); + const matchingIds = connectionIds.filter((id) => activeIds.includes(id)); + if (matchingIds.length > 0) { + result.push([proxy, matchingIds]); + } + } + return result; + } +} diff --git a/src/util.ts b/src/util.ts index 75956fe..0a05b98 100644 --- a/src/util.ts +++ b/src/util.ts @@ -33,7 +33,7 @@ export const DEFAULT_LISTEN_HOST = "127.0.0.1"; export const DEFAULT_ENABLE_LOGGING = false; export const DEFAULT_API_PORT = 3000; -const proxyConfigSchema = z.object({ +export const proxyConfigSchema = z.object({ listenPort: z.coerce.number().default(DEFAULT_LISTEN_PORT), listenHost: z.string().optional().default(DEFAULT_LISTEN_HOST), targetHost: z.string(),