diff --git a/.env.example b/.env.example index 51e03c323..8c9c32b45 100644 --- a/.env.example +++ b/.env.example @@ -3,6 +3,8 @@ VITE_UPSTASH_REDIS_REST_TOKEN= VITE_VERCEL_BLOB_READ_WRITE_TOKEN= +VITE_REDIS_URL= + VITE_CLOUDFLARE_ACC_ID= VITE_CLOUDFLARE_KV_NS_ID= VITE_CLOUDFLARE_TOKEN= diff --git a/docs/2.drivers/redis.md b/docs/2.drivers/redis.md index bb4b9bd8e..8b8d67502 100644 --- a/docs/2.drivers/redis.md +++ b/docs/2.drivers/redis.md @@ -73,6 +73,24 @@ const storage = createStorage({ - `scanCount`: How many keys to scan at once ([redis documentation](https://redis.io/docs/latest/commands/scan/#the-count-option)). - `preConnect`: Whether to initialize the redis instance immediately. Otherwise, it will be initialized on the first read/write call. Default: `false`. +## Watch support + +The Redis driver supports `watch`/`unwatch` by subscribing to Redis keyspace notifications. +To receive events, your Redis server must have keyspace notifications enabled. +Unstorage will try to enable them at runtime using `CONFIG SET notify-keyspace-events K$gx`, +but this may be blocked by your Redis configuration or permissions. + +If you manage the Redis server, you can set this in `redis.conf`: + +```conf +notify-keyspace-events K$gx +``` + +Events are mapped as follows: + +- `del`, `unlink`, `expired`, `evicted` => `remove` +- everything else => `update` + See [ioredis](https://github.com/redis/ioredis/blob/master/API.md#new-redisport-host-options) for all available options. **Transaction options:** diff --git a/src/drivers/redis.ts b/src/drivers/redis.ts index 3127bcf5b..a297aa590 100644 --- a/src/drivers/redis.ts +++ b/src/drivers/redis.ts @@ -52,6 +52,8 @@ const DRIVER_NAME = "redis"; export default defineDriver((opts: RedisOptions) => { let redisClient: Redis | Cluster; + let unwatch: (() => Promise | void) | undefined; + let subscribers: Redis[] = []; const getRedisClient = () => { if (redisClient) { return redisClient; @@ -69,6 +71,10 @@ export default defineDriver((opts: RedisOptions) => { const base = (opts.base || "").replace(/:$/, ""); const p = (...keys: string[]) => joinKeys(base, ...keys); // Prefix a key. Uses base for backwards compatibility const d = (key: string) => (base ? key.replace(`${base}:`, "") : key); // Deprefix a key + const dbIndex = opts.db ?? 0; + const channelPrefix = `__keyspace@${dbIndex}__:`; + const keyPattern = base ? `${base}:*` : "*"; + const channelPattern = `${channelPrefix}${keyPattern}`; if (opts.preConnect) { try { @@ -137,7 +143,69 @@ export default defineDriver((opts: RedisOptions) => { await getRedisClient().unlink(keys); }, dispose() { + if (unwatch) { + Promise.resolve(unwatch()).catch(() => {}); + unwatch = undefined; + } return getRedisClient().disconnect(); }, + async watch(callback) { + if (unwatch) { + return unwatch; + } + + const removeEvents = new Set(["del", "unlink", "expired", "evicted"]); + + const handleMessage = (channel: string, event: string) => { + if (!channel.startsWith(channelPrefix)) { + return; + } + const rawKey = channel.slice(channelPrefix.length); + if (base && !rawKey.startsWith(`${base}:`)) { + return; + } + const key = d(rawKey); + const type = removeEvents.has(event) ? "remove" : "update"; + callback(type, key); + }; + + const createSubscriber = (client: Redis) => { + const sub = client.duplicate(); + sub.on("pmessage", (_pattern, channel, message) => { + handleMessage(channel, message); + }); + subscribers.push(sub); + return sub; + }; + + await getRedisClient().config("SET", "notify-keyspace-events", "K$gx"); + + const client = getRedisClient(); + if (client instanceof Cluster) { + const nodes = client.nodes("master"); + await Promise.all( + nodes.map(async (node) => { + const sub = createSubscriber(node); + await sub.psubscribe(channelPattern); + }) + ); + } else { + const sub = createSubscriber(client); + await sub.psubscribe(channelPattern); + } + + unwatch = async () => { + await Promise.all( + subscribers.map(async (sub) => { + await sub.punsubscribe(channelPattern); + await sub.disconnect(); + }) + ); + subscribers = []; + unwatch = undefined; + }; + + return unwatch; + }, }; }); diff --git a/test/drivers/redis.test.ts b/test/drivers/redis.test.ts index 103800770..f0ff4fcba 100644 --- a/test/drivers/redis.test.ts +++ b/test/drivers/redis.test.ts @@ -1,17 +1,51 @@ -import { describe, vi, it, expect } from "vitest"; +import { describe, vi, it, expect, beforeAll } from "vitest"; import * as ioredisMock from "ioredis-mock"; import redisDriver from "../../src/drivers/redis.ts"; import { testDriver } from "./utils.ts"; -vi.mock("ioredis", () => ({ ...ioredisMock, Redis: ioredisMock.default })); +const useRealRedis = vi.hoisted(() => Boolean(process.env.VITE_REDIS_URL)); + +vi.mock("ioredis", async (importOriginal) => { + const actual = await importOriginal(); + if (useRealRedis) { + return actual; + } + return { + ...actual, + ...ioredisMock, + Redis: ioredisMock.default, + Cluster: (ioredisMock as any).Cluster || actual.Cluster, + }; +}); describe("drivers: redis", () => { + let RedisClient: any; + beforeAll(async () => { + if (useRealRedis) { + const mod = await import("ioredis"); + RedisClient = mod.Redis; + } else { + RedisClient = (ioredisMock as any).default; + } + }); + + const redisUrl = process.env.VITE_REDIS_URL || "redis://localhost:6379/0"; const driver = redisDriver({ base: "test:", - url: "ioredis://localhost:6379/0", + url: redisUrl, lazyConnect: false, }); + const ensureMockConfig = () => { + if (useRealRedis) { + return; + } + const client = driver.getInstance?.() as any; + if (client && typeof client.config === "function") { + client.config = () => Promise.resolve("OK"); + } + }; + testDriver({ driver, additionalTests(ctx) { @@ -20,10 +54,8 @@ describe("drivers: redis", () => { await ctx.storage.setItem("s2:a", "test_data"); await ctx.storage.setItem("s3:a?q=1", "test_data"); - const client = new (ioredisMock as any).default( - "ioredis://localhost:6379/0" - ); - const keys = await client.keys("*"); + const client = new RedisClient(redisUrl); + const keys = (await client.keys("*")).sort(); expect(keys).toMatchInlineSnapshot(` [ "test:s1:a", @@ -35,9 +67,48 @@ describe("drivers: redis", () => { }); it("exposes instance", () => { - expect(driver.getInstance?.()).toBeInstanceOf( - (ioredisMock as any).default - ); + expect(driver.getInstance?.()).toBeInstanceOf(RedisClient); + }); + + it("watch redis", async () => { + ensureMockConfig(); + const watcher = vi.fn(); + const unwatch = await ctx.storage.watch(watcher); + await new Promise((resolve) => setTimeout(resolve, 10)); + + const publisher = new RedisClient(redisUrl); + await publisher.publish("__keyspace@0__:test:s1:a", "set"); + await publisher.publish("__keyspace@0__:test:s2:a", "del"); + await publisher.publish("__keyspace@0__:other:s3:a", "set"); + await publisher.disconnect(); + + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(watcher).toHaveBeenCalledWith("update", "s1:a"); + expect(watcher).toHaveBeenCalledWith("remove", "s2:a"); + expect(watcher).toHaveBeenCalledTimes(2); + await unwatch(); + }); + + it("unwatch redis", async () => { + ensureMockConfig(); + const watcher = vi.fn(); + const unwatch = await ctx.storage.watch(watcher); + await new Promise((resolve) => setTimeout(resolve, 10)); + + const publisher = new RedisClient(redisUrl); + await publisher.publish("__keyspace@0__:test:s1:a", "set"); + await new Promise((resolve) => setTimeout(resolve, 0)); + + await unwatch(); + + await publisher.publish("__keyspace@0__:test:s1:b", "set"); + await publisher.disconnect(); + + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(watcher).toHaveBeenCalledTimes(1); + expect(watcher).toHaveBeenCalledWith("update", "s1:a"); }); }, });