Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/cluster/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ services:
TARGET_PORT: "3000"
API_PORT: "4000"
ENABLE_LOGGING: true
SIMULATE_CLUSTER: true
DEFAULT_INTERCEPTORS: 'cluster, hitless, logger'
ports:
- "6379:6379"
- "6380:6380"
Expand Down
2 changes: 1 addition & 1 deletion examples/cluster/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ docker run \
-e TARGET_PORT=<redis-port> \
-e TIMEOUT=0 \
-e API_PORT=4000 \
-e SIMULATE_CLUSTER=true \
-e DEFAULT_INTERCEPTORS=cluster, hitless, logger \
redislabs/client-resp-proxy

```
Expand Down
40 changes: 3 additions & 37 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
RedisProxy,
type SendResult,
} from "redis-monorepo/packages/test-utils/lib/proxy/redis-proxy.ts";
import applyDefaultInterceptors from "./default_interceptors/index.ts";
import ProxyStore, { makeId } from "./proxy-store.ts";
import {
connectionIdsQuerySchema,
Expand All @@ -29,40 +30,6 @@ const startNewProxy = (config: ProxyConfig) => {
return proxy;
};

const setClusterSimulateInterceptor = (proxyStore: ProxyStore) => {
const interceptor: InterceptorDescription = {
name: `cluster-simulation-interceptor`,
fn: async (data: Buffer, next: Next, state: InterceptorState) => {
state.invokeCount++;

if (data.toString().toLowerCase() !== "*2\r\n$7\r\ncluster\r\n$5\r\nslots\r\n") {
return next(data);
}

state.matchCount++;

const proxies = proxyStore.proxies;
const slotLenght = Math.floor(16384 / proxies.length);

let current = -1;
const mapping = proxyStore.proxies.map((proxy, i) => {
const from = current + 1;
const to = i === proxies.length - 1 ? 16383 : current + slotLenght;
current = to;
const id = `proxy-id-${proxy.config.listenPort}`;
return `*3\r\n:${from}\r\n:${to}\r\n*3\r\n$${proxy.config.listenHost.length}\r\n${proxy.config.listenHost}\r\n:${proxy.config.listenPort}\r\n$${id.length}\r\n${id}\r\n`;
});

const response = `*${proxies.length}\r\n${mapping.join("")}`;
return Buffer.from(response);
},
};

for (const proxy of proxyStore.proxies) {
proxy.setGlobalInterceptors([interceptor]);
}
};

export function createApp(testConfig?: ExtendedProxyConfig) {
const config = testConfig || getConfig();
const app = new Hono();
Expand All @@ -76,21 +43,20 @@ export function createApp(testConfig?: ExtendedProxyConfig) {
proxyStore.add(nodeId, startNewProxy(proxyConfig));
}

config.simulateCluster && setClusterSimulateInterceptor(proxyStore);
config.defaultInterceptors && applyDefaultInterceptors(config.defaultInterceptors, proxyStore);

app.post("/nodes", zValidator("json", proxyConfigSchema), async (c) => {
const data = await c.req.json();
const cfg: ProxyConfig = { ...config, ...data };
const nodeId = makeId(cfg.targetHost, cfg.targetPort, cfg.listenPort);
proxyStore.add(nodeId, startNewProxy(cfg));
config.simulateCluster && setClusterSimulateInterceptor(proxyStore);
config.defaultInterceptors && applyDefaultInterceptors(config.defaultInterceptors, proxyStore);
return c.json({ success: true, cfg });
});

app.delete("/nodes/:id", async (c) => {
const nodeId = c.req.param("id");
const success = await proxyStore.delete(nodeId);
config.simulateCluster && setClusterSimulateInterceptor(proxyStore);
return c.json({ success });
});

Expand Down
36 changes: 36 additions & 0 deletions src/default_interceptors/cluster-interceptor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import type {
InterceptorDescription,
InterceptorState,
Next,
RedisProxy,
} from "redis-monorepo/packages/test-utils/lib/proxy/redis-proxy";

// Sets up an interceptor that simulates Redis Cluster behavior by responding to CLUSTER SLOTS command
export default function createClusterInterceptor(proxies: RedisProxy[]): InterceptorDescription {
return {
name: `cluster-simulation-interceptor`,
fn: async (data: Buffer, next: Next, state: InterceptorState) => {
state.invokeCount++;

if (data.toString().toLowerCase() !== "*2\r\n$7\r\ncluster\r\n$5\r\nslots\r\n") {
return next(data);
}

state.matchCount++;

const slotLenght = Math.floor(16384 / proxies.length);

let current = -1;
const mapping = proxies.map((proxy, i) => {
const from = current + 1;
const to = i === proxies.length - 1 ? 16383 : current + slotLenght;
current = to;
const id = `proxy-id-${proxy.config.listenPort}`;
return `*3\r\n:${from}\r\n:${to}\r\n*3\r\n$${proxy.config.listenHost.length}\r\n${proxy.config.listenHost}\r\n:${proxy.config.listenPort}\r\n$${id.length}\r\n${id}\r\n`;
});

const response = `*${proxies.length}\r\n${mapping.join("")}`;
return Buffer.from(response);
},
};
}
25 changes: 25 additions & 0 deletions src/default_interceptors/hitless-interceptor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import type {
InterceptorDescription,
InterceptorState,
Next,
} from "redis-monorepo/packages/test-utils/lib/proxy/redis-proxy";

export default function createHitlessInterceptor(): InterceptorDescription {
return {
name: `hitless-simulation-interceptor`,
fn: async (data: Buffer, next: Next, state: InterceptorState) => {
state.invokeCount++;

if (
!data
.toString()
.toLowerCase()
.includes("*5\r\n$6\r\nclient\r\n$19\r\nmaint_notifications\r\n$2\r\non\r\n")
) {
return next(data);
}
state.matchCount++;
return Buffer.from("+OK\r\n");
},
};
}
32 changes: 32 additions & 0 deletions src/default_interceptors/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import type { InterceptorDescription } from "redis-monorepo/packages/test-utils/lib/proxy/redis-proxy";
import type ProxyStore from "../proxy-store";
import createClusterInterceptor from "./cluster-interceptor";
import createHitlessInterceptor from "./hitless-interceptor";
import createLoggingInterceptor from "./logging-interceptor";

export default function applyDefaultInterceptors(interceptorNames: string, proxyStore: ProxyStore) {
const interceptors: InterceptorDescription[] = [];
for (const interceptorName of interceptorNames.split(",").map((i) => i.trim())) {
switch (interceptorName) {
case "logger":
interceptors.push(createLoggingInterceptor());
break;
case "cluster":
interceptors.push(createClusterInterceptor(proxyStore.proxies));
break;
case "hitless":
interceptors.push(createHitlessInterceptor());
break;
default:
console.warn(`Unknown default interceptor: ${interceptorName}`);
}
}

if (interceptors.length) {
for (const proxy of proxyStore.proxies) {
for (const interceptor of interceptors) {
proxy.addGlobalInterceptor(interceptor);
}
}
}
}
18 changes: 18 additions & 0 deletions src/default_interceptors/logging-interceptor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import type {
InterceptorDescription,
InterceptorState,
Next,
} from "redis-monorepo/packages/test-utils/lib/proxy/redis-proxy";

export default function createLoggingInterceptor(): InterceptorDescription {
return {
name: `logging-interceptor`,
fn: async (data: Buffer, next: Next, state: InterceptorState) => {
state.invokeCount++;
console.log("[REQ]", data.toString().replaceAll("\r\n", " "));
const response = await next(data);
console.log("[RES]", response.toString().replaceAll("\r\n", " "));
return response;
},
};
}
19 changes: 7 additions & 12 deletions src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { z } from "zod";
export type ExtendedProxyConfig = Omit<ProxyConfig, "listenPort"> & {
listenPort: number[];
readonly apiPort?: number;
simulateCluster?: boolean;
defaultInterceptors?: string;
};

export const encodingSchema = z.object({
Expand Down Expand Up @@ -51,7 +51,6 @@ export const DEFAULT_LISTEN_PORT = [6379];
export const DEFAULT_LISTEN_HOST = "127.0.0.1";
export const DEFAULT_ENABLE_LOGGING = false;
export const DEFAULT_API_PORT = 3000;
export const DEFAULT_SIMULATE_CLUSTER = false;

const listenPortSchema = z
.preprocess((value) => {
Expand All @@ -69,7 +68,7 @@ export const proxyConfigSchema = z.object({
timeout: z.coerce.number().optional(),
enableLogging: z.boolean().optional().default(DEFAULT_ENABLE_LOGGING),
apiPort: z.number().optional().default(DEFAULT_API_PORT),
simulateCluster: z.boolean().optional().default(DEFAULT_SIMULATE_CLUSTER),
defaultInterceptors: z.string().optional(),
});

const envSchema = z.object({
Expand All @@ -83,11 +82,7 @@ const envSchema = z.object({
.transform((val) => val === "true")
.optional(),
API_PORT: z.coerce.number().optional().default(DEFAULT_API_PORT),
SIMULATE_CLUSTER: z
.enum(["true", "false"])
.transform((val) => val === "true")
.optional()
.default(DEFAULT_SIMULATE_CLUSTER),
DEFAULT_INTERCEPTORS: z.string().optional(),
});

export function parseCliArgs(argv: string[]): Record<string, string | boolean | number | number[]> {
Expand Down Expand Up @@ -138,15 +133,15 @@ Optional options:
--timeout <number> Connection timeout in milliseconds
--enableLogging Enable verbose logging
--apiPort Port to start the http on (default: 3000 )
--simulateCluster Simulate Redis Cluster behavior like \`cluster slots\` (default: false)
--defaultInterceptors Comma-separated list of default interceptors to load ( like cluster, hitless, etc. )

Or configure using environment variables:
LISTEN_PORT, TARGET_HOST, TARGET_PORT (required)
LISTEN_HOST, TIMEOUT, ENABLE_LOGGING, API_PORT, SIMULATE_CLUSTER (optional)
LISTEN_HOST, TIMEOUT, ENABLE_LOGGING, API_PORT, DEFAULT_INTERCEPTORS (optional)

Examples:
bun run proxy --listenPort=6379 --targetHost=localhost --targetPort=6380
bun run proxy --listenPort=6379,6380,6381 --simulateCluster --targetHost=localhost --targetPort=6382
bun run proxy --listenPort=6379,6380,6381 --defaultInterceptors=cluster --targetHost=localhost --targetPort=6382
docker run -p 3000:3000 -p 6379:6379 -e LISTEN_PORT=6379 -e TARGET_HOST=host.docker.internal -e TARGET_PORT=6380 your-image-name
`);
}
Expand All @@ -170,7 +165,7 @@ export function getConfig(): ExtendedProxyConfig {
timeout: parsedEnv.TIMEOUT,
enableLogging: parsedEnv.ENABLE_LOGGING,
apiPort: parsedEnv.API_PORT,
simulateCluster: parsedEnv.SIMULATE_CLUSTER,
defaultInterceptors: parsedEnv.DEFAULT_INTERCEPTORS,
};
}

Expand Down