Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ VITE_UPSTASH_REDIS_REST_TOKEN=

VITE_VERCEL_BLOB_READ_WRITE_TOKEN=

VITE_REDIS_URL=

VITE_CLOUDFLARE_ACC_ID=
VITE_CLOUDFLARE_KV_NS_ID=
VITE_CLOUDFLARE_TOKEN=
Expand Down
18 changes: 18 additions & 0 deletions docs/2.drivers/redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,24 @@ const storage = createStorage({
- `scanCount`: How many keys to scan at once ([redis documentation](https://redis.io/docs/latest/commands/scan/#the-count-option)).
- `preConnect`: Whether to initialize the redis instance immediately. Otherwise, it will be initialized on the first read/write call. Default: `false`.

## Watch support

The Redis driver supports `watch`/`unwatch` by subscribing to Redis keyspace notifications.
To receive events, your Redis server must have keyspace notifications enabled.
Unstorage will try to enable them at runtime using `CONFIG SET notify-keyspace-events K$gx`,
but this may be blocked by your Redis configuration or permissions.

If you manage the Redis server, you can set this in `redis.conf`:

```conf
notify-keyspace-events K$gx
```

Events are mapped as follows:

- `del`, `unlink`, `expired`, `evicted` => `remove`
- everything else => `update`

See [ioredis](https://github.com/redis/ioredis/blob/master/API.md#new-redisport-host-options) for all available options.

**Transaction options:**
Expand Down
68 changes: 68 additions & 0 deletions src/drivers/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ const DRIVER_NAME = "redis";

export default defineDriver((opts: RedisOptions) => {
let redisClient: Redis | Cluster;
let unwatch: (() => Promise<void> | void) | undefined;
let subscribers: Redis[] = [];
const getRedisClient = () => {
if (redisClient) {
return redisClient;
Expand All @@ -69,6 +71,10 @@ export default defineDriver((opts: RedisOptions) => {
const base = (opts.base || "").replace(/:$/, "");
const p = (...keys: string[]) => joinKeys(base, ...keys); // Prefix a key. Uses base for backwards compatibility
const d = (key: string) => (base ? key.replace(`${base}:`, "") : key); // Deprefix a key
const dbIndex = opts.db ?? 0;
const channelPrefix = `__keyspace@${dbIndex}__:`;
const keyPattern = base ? `${base}:*` : "*";
const channelPattern = `${channelPrefix}${keyPattern}`;

if (opts.preConnect) {
try {
Expand Down Expand Up @@ -137,7 +143,69 @@ export default defineDriver((opts: RedisOptions) => {
await getRedisClient().unlink(keys);
},
dispose() {
if (unwatch) {
Promise.resolve(unwatch()).catch(() => {});
unwatch = undefined;
}
return getRedisClient().disconnect();
},
async watch(callback) {
if (unwatch) {
return unwatch;
}

const removeEvents = new Set(["del", "unlink", "expired", "evicted"]);

const handleMessage = (channel: string, event: string) => {
if (!channel.startsWith(channelPrefix)) {
return;
}
const rawKey = channel.slice(channelPrefix.length);
if (base && !rawKey.startsWith(`${base}:`)) {
return;
}
const key = d(rawKey);
const type = removeEvents.has(event) ? "remove" : "update";
callback(type, key);
};

const createSubscriber = (client: Redis) => {
const sub = client.duplicate();
sub.on("pmessage", (_pattern, channel, message) => {
handleMessage(channel, message);
});
subscribers.push(sub);
return sub;
};

await getRedisClient().config("SET", "notify-keyspace-events", "K$gx");

const client = getRedisClient();
if (client instanceof Cluster) {
const nodes = client.nodes("master");
await Promise.all(
nodes.map(async (node) => {
const sub = createSubscriber(node);
await sub.psubscribe(channelPattern);
})
);
} else {
const sub = createSubscriber(client);
await sub.psubscribe(channelPattern);
}

unwatch = async () => {
await Promise.all(
subscribers.map(async (sub) => {
await sub.punsubscribe(channelPattern);
await sub.disconnect();
})
);
subscribers = [];
unwatch = undefined;
};

return unwatch;
},
};
});
91 changes: 81 additions & 10 deletions test/drivers/redis.test.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,51 @@
import { describe, vi, it, expect } from "vitest";
import { describe, vi, it, expect, beforeAll } from "vitest";
import * as ioredisMock from "ioredis-mock";
import redisDriver from "../../src/drivers/redis.ts";
import { testDriver } from "./utils.ts";

vi.mock("ioredis", () => ({ ...ioredisMock, Redis: ioredisMock.default }));
const useRealRedis = vi.hoisted(() => Boolean(process.env.VITE_REDIS_URL));

vi.mock("ioredis", async (importOriginal) => {
const actual = await importOriginal<typeof import("ioredis")>();
if (useRealRedis) {
return actual;
}
return {
...actual,
...ioredisMock,
Redis: ioredisMock.default,
Cluster: (ioredisMock as any).Cluster || actual.Cluster,
};
});

describe("drivers: redis", () => {
let RedisClient: any;
beforeAll(async () => {
if (useRealRedis) {
const mod = await import("ioredis");
RedisClient = mod.Redis;
} else {
RedisClient = (ioredisMock as any).default;
}
});

const redisUrl = process.env.VITE_REDIS_URL || "redis://localhost:6379/0";
const driver = redisDriver({
base: "test:",
url: "ioredis://localhost:6379/0",
url: redisUrl,
lazyConnect: false,
});

const ensureMockConfig = () => {
if (useRealRedis) {
return;
}
const client = driver.getInstance?.() as any;
if (client && typeof client.config === "function") {
client.config = () => Promise.resolve("OK");
}
};

testDriver({
driver,
additionalTests(ctx) {
Expand All @@ -20,10 +54,8 @@ describe("drivers: redis", () => {
await ctx.storage.setItem("s2:a", "test_data");
await ctx.storage.setItem("s3:a?q=1", "test_data");

const client = new (ioredisMock as any).default(
"ioredis://localhost:6379/0"
);
const keys = await client.keys("*");
const client = new RedisClient(redisUrl);
const keys = (await client.keys("*")).sort();
expect(keys).toMatchInlineSnapshot(`
[
"test:s1:a",
Expand All @@ -35,9 +67,48 @@ describe("drivers: redis", () => {
});

it("exposes instance", () => {
expect(driver.getInstance?.()).toBeInstanceOf(
(ioredisMock as any).default
);
expect(driver.getInstance?.()).toBeInstanceOf(RedisClient);
});

it("watch redis", async () => {
ensureMockConfig();
const watcher = vi.fn();
const unwatch = await ctx.storage.watch(watcher);
await new Promise((resolve) => setTimeout(resolve, 10));

const publisher = new RedisClient(redisUrl);
await publisher.publish("__keyspace@0__:test:s1:a", "set");
await publisher.publish("__keyspace@0__:test:s2:a", "del");
await publisher.publish("__keyspace@0__:other:s3:a", "set");
await publisher.disconnect();

await new Promise((resolve) => setTimeout(resolve, 0));

expect(watcher).toHaveBeenCalledWith("update", "s1:a");
expect(watcher).toHaveBeenCalledWith("remove", "s2:a");
expect(watcher).toHaveBeenCalledTimes(2);
await unwatch();
});

it("unwatch redis", async () => {
ensureMockConfig();
const watcher = vi.fn();
const unwatch = await ctx.storage.watch(watcher);
await new Promise((resolve) => setTimeout(resolve, 10));

const publisher = new RedisClient(redisUrl);
await publisher.publish("__keyspace@0__:test:s1:a", "set");
await new Promise((resolve) => setTimeout(resolve, 0));

await unwatch();

await publisher.publish("__keyspace@0__:test:s1:b", "set");
await publisher.disconnect();

await new Promise((resolve) => setTimeout(resolve, 0));

expect(watcher).toHaveBeenCalledTimes(1);
expect(watcher).toHaveBeenCalledWith("update", "s1:a");
});
},
});
Expand Down