diff --git a/README.md b/README.md index c7703389c..22f7549ab 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,13 @@ redis.zrange("sortedSet", 0, 2, "WITHSCORES").then((elements) => { // The format is: redis[SOME_REDIS_COMMAND_IN_LOWERCASE](ARGUMENTS_ARE_JOINED_INTO_COMMAND_STRING) // so the following statement is equivalent to the CLI: `redis> SET mykey hello EX 10` redis.set("mykey", "hello", "EX", 10); + +// Read the value of a redis key in chunks using nodejs streams +// Retuns a nodejs readable stream +const readStream = redis.getStream("mykey", { chunkSize: 100 * 1000 /* In Bytes */ }); + + + ``` See the `examples/` folder for more examples. For example: diff --git a/examples/get-stream.js b/examples/get-stream.js new file mode 100644 index 000000000..319647508 --- /dev/null +++ b/examples/get-stream.js @@ -0,0 +1,36 @@ +const http = require('node:http'); +const ioredis = require('ioredis'); + +const client = new ioredis(); + +async function streamFromRedis(key, response) { + const dataStream = client.getStream(key, { + chunkSize: 100 * 10, + pipeline: false, + }); + + for await (const data of dataStream) { + response.write(data); + } + + response.end(); + +} + +async function sendFromRedis(key, response) { + const reply = await client.get(key); + response.end(reply); +} + +const server = http.createServer(); + +server.on('request', (request, response) => { + if (request.url === '/stream') { + streamFromRedis('test', response).catch(console.error); + } else { + sendFromRedis('test', response).catch(console.error); + } +}); + +server.listen(3000); + diff --git a/lib/utils/Commander.ts b/lib/utils/Commander.ts index 0bb64869d..dd5c5b59b 100644 --- a/lib/utils/Commander.ts +++ b/lib/utils/Commander.ts @@ -1,3 +1,4 @@ +import { Readable } from "node:stream"; import { list } from "@ioredis/commands"; import { executeWithAutoPipelining, @@ -6,7 +7,8 @@ import { import Command, { ArgumentType } from "../Command"; import Script from "../Script"; import { Callback, WriteableStream } from "../types"; -import RedisCommander, { ClientContext } from "./RedisCommander"; +import RedisCommander, { ClientContext, GetStreamOptions, RedisKey } from "./RedisCommander"; +import { createGetStream } from "./nodeStreams"; export interface CommanderOptions { keyPrefix?: string; @@ -115,6 +117,10 @@ Commander.prototype.callBuffer = generateFunction("callBuffer", null); // @ts-expect-error Commander.prototype.send_command = Commander.prototype.call; +Commander.prototype.getStream = function getStream(key: RedisKey, opts: GetStreamOptions = {}) { + return Readable.from(createGetStream(this, key, opts)); +} + function generateFunction(functionName: string | null, _encoding: string); function generateFunction( functionName: string | null, diff --git a/lib/utils/RedisCommander.ts b/lib/utils/RedisCommander.ts index 49388d8ed..26f64232c 100644 --- a/lib/utils/RedisCommander.ts +++ b/lib/utils/RedisCommander.ts @@ -4,10 +4,12 @@ * this file. */ +import { Readable } from "stream"; import { Callback } from "../types"; export type RedisKey = string | Buffer; export type RedisValue = string | Buffer | number; +export type GetStreamOptions = { chunkSize?: number, pipeline?: boolean } // Inspired by https://github.com/mmkal/handy-redis/blob/main/src/generated/interface.ts. // Should be fixed with https://github.com/Microsoft/TypeScript/issues/1213 @@ -3666,6 +3668,10 @@ interface RedisCommander { key: RedisKey, callback?: Callback ): Result; + getStream( + key: RedisKey, + opts: GetStreamOptions + ): Readable; getBuffer( key: RedisKey, callback?: Callback diff --git a/lib/utils/nodeStreams.ts b/lib/utils/nodeStreams.ts new file mode 100644 index 000000000..38ffb0f88 --- /dev/null +++ b/lib/utils/nodeStreams.ts @@ -0,0 +1,25 @@ +import { executeWithAutoPipelining } from "../autoPipelining"; +import Commander from "./Commander"; +import { GetStreamOptions, RedisKey } from "./RedisCommander"; + +export async function* createGetStream(client: Commander, key: RedisKey, opts: GetStreamOptions = {}) { + const size = opts.chunkSize || 10 * 1000; + let cursor = 0; + let isReadable = true; + const isPipelineMode = opts.pipeline !== false; + + while (isReadable) { + let chunk; + if (isPipelineMode) { + chunk = await executeWithAutoPipelining(client, 'getrange', 'range', [key, cursor, cursor + size - 1], null) + } else { + chunk = await client.getrange(key, cursor, cursor + size - 1); + } + if (!chunk || typeof chunk !== 'string' || chunk?.length === 0) { + isReadable = false; + } else { + cursor += chunk.length; + yield chunk; + } + } + }; \ No newline at end of file