diff --git a/.github/workflows/pro-integration-tests.yml b/.github/workflows/pro-integration-tests.yml index b99ee3871c..47d802a767 100644 --- a/.github/workflows/pro-integration-tests.yml +++ b/.github/workflows/pro-integration-tests.yml @@ -207,7 +207,7 @@ jobs: - name: Run Rails server in background run: | cd spec/dummy - RAILS_ENV=test rails server & + RAILS_ENV="test" rails server & - name: Wait for Rails server to start run: | @@ -392,7 +392,7 @@ jobs: - name: Run Rails server in background run: | cd spec/dummy - RAILS_ENV=test rails server & + RAILS_ENV="test" rails server & - name: Wait for Rails server to start run: | diff --git a/.github/workflows/pro-package-tests.yml b/.github/workflows/pro-package-tests.yml index 720166085b..8d94c49603 100644 --- a/.github/workflows/pro-package-tests.yml +++ b/.github/workflows/pro-package-tests.yml @@ -98,6 +98,17 @@ jobs: package-js-tests: needs: build-dummy-app-webpack-test-bundles runs-on: ubuntu-22.04 + # Redis service container + services: + redis: + image: cimg/redis:6.2.6 + ports: + - 6379:6379 + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 env: REACT_ON_RAILS_PRO_LICENSE: ${{ secrets.REACT_ON_RAILS_PRO_LICENSE }} steps: diff --git a/react_on_rails_pro/packages/node-renderer/tests/redisClient.test.ts b/react_on_rails_pro/packages/node-renderer/tests/redisClient.test.ts new file mode 100644 index 0000000000..bd0652d3f7 --- /dev/null +++ b/react_on_rails_pro/packages/node-renderer/tests/redisClient.test.ts @@ -0,0 +1,119 @@ +// This test in only for documenting Redis client usage + +import { createClient } from 'redis'; + +const redisClient = createClient({ url: process.env.REDIS_URL || 'redis://localhost:6379' }); + +interface RedisStreamMessage { + id: string; + message: Record; +} +interface RedisStreamResult { + name: string; + messages: RedisStreamMessage[]; +} + +test('Redis client connects successfully', async () => { + await redisClient.connect(); + expect(redisClient.isOpen).toBe(true); + await redisClient.quit(); +}); + +test('calls connect after quit', async () => { + await redisClient.connect(); + expect(redisClient.isOpen).toBe(true); + await redisClient.quit(); + + await redisClient.connect(); + expect(redisClient.isOpen).toBe(true); + await redisClient.quit(); +}); + +test('calls quit before connect is resolved', async () => { + const client = createClient({ url: process.env.REDIS_URL || 'redis://localhost:6379' }); + const connectPromise = client.connect(); + await client.quit(); + await connectPromise; + expect(client.isOpen).toBe(false); +}); + +test('multiple connect calls', async () => { + const client = createClient({ url: process.env.REDIS_URL || 'redis://localhost:6379' }); + const connectPromise1 = client.connect(); + const connectPromise2 = client.connect(); + await expect(connectPromise2).rejects.toThrow('Socket already opened'); + await expect(connectPromise1).resolves.toMatchObject({}); + expect(client.isOpen).toBe(true); + await client.quit(); +}); + +test('write to stream and read back', async () => { + const client = createClient({ url: process.env.REDIS_URL || 'redis://localhost:6379' }); + await client.connect(); + + const streamKey = 'test-stream'; + await client.del(streamKey); + const messageId = await client.xAdd(streamKey, '*', { field1: 'value1' }); + + const result = (await client.xRead({ key: streamKey, id: '0-0' }, { COUNT: 1, BLOCK: 2000 })) as + | RedisStreamResult[] + | null; + expect(result).not.toBeNull(); + expect(result).toBeDefined(); + + const [stream] = result!; + expect(stream).toBeDefined(); + expect(stream?.messages.length).toBe(1); + const [message] = stream!.messages; + expect(message!.id).toBe(messageId); + expect(message!.message).toEqual({ field1: 'value1' }); + + await client.quit(); +}); + +test('quit while reading from stream', async () => { + const client = createClient({ url: process.env.REDIS_URL || 'redis://localhost:6379' }); + await client.connect(); + + const streamKey = 'test-stream-quit'; + + const readPromise = client.xRead({ key: streamKey, id: '$' }, { BLOCK: 0 }); + + // Wait a moment to ensure xRead is blocking + await new Promise((resolve) => { + setTimeout(resolve, 500); + }); + + client.destroy(); + + await expect(readPromise).rejects.toThrow(); +}); + +it('expire sets TTL on stream', async () => { + const client = createClient({ url: process.env.REDIS_URL || 'redis://localhost:6379' }); + await client.connect(); + + const streamKey = 'test-stream-expire'; + await client.del(streamKey); + await client.xAdd(streamKey, '*', { field1: 'value1' }); + + const expireResult = await client.expire(streamKey, 1); // 1 second + expect(expireResult).toBe(1); // 1 means the key existed and TTL was set + + const ttl1 = await client.ttl(streamKey); + expect(ttl1).toBeLessThanOrEqual(1); + expect(ttl1).toBeGreaterThan(0); + + const existsBeforeTimeout = await client.exists(streamKey); + expect(existsBeforeTimeout).toBe(1); // Key should exist before timeout + + // Wait for 1.1 seconds + await new Promise((resolve) => { + setTimeout(resolve, 1100); + }); + + const existsAfterTimeout = await client.exists(streamKey); + expect(existsAfterTimeout).toBe(0); // Key should have expired + + await client.quit(); +}); diff --git a/react_on_rails_pro/spec/dummy/client/app/components/ErrorComponent.tsx b/react_on_rails_pro/spec/dummy/client/app/components/ErrorComponent.tsx index 860f5aafb8..6f13e18b2b 100644 --- a/react_on_rails_pro/spec/dummy/client/app/components/ErrorComponent.tsx +++ b/react_on_rails_pro/spec/dummy/client/app/components/ErrorComponent.tsx @@ -7,6 +7,7 @@ const ErrorComponent = ({ error }: { error: Error }) => {

Error happened while rendering RSC Page

{error.message}

+

{error.stack}

); }; diff --git a/react_on_rails_pro/spec/dummy/client/app/ror-auto-load-components/RSCPostsPageOverRedis.jsx b/react_on_rails_pro/spec/dummy/client/app/ror-auto-load-components/RSCPostsPageOverRedis.jsx index bb8f1a10ac..b2f276bece 100644 --- a/react_on_rails_pro/spec/dummy/client/app/ror-auto-load-components/RSCPostsPageOverRedis.jsx +++ b/react_on_rails_pro/spec/dummy/client/app/ror-auto-load-components/RSCPostsPageOverRedis.jsx @@ -3,14 +3,14 @@ import RSCPostsPage from '../components/RSCPostsPage/Main'; import { listenToRequestData } from '../utils/redisReceiver'; const RSCPostsPageOverRedis = ({ requestId, ...props }, railsContext) => { - const { getValue, close } = listenToRequestData(requestId); + const { getValue, destroy } = listenToRequestData(requestId); const fetchPosts = () => getValue('posts'); const fetchComments = (postId) => getValue(`comments:${postId}`); const fetchUser = (userId) => getValue(`user:${userId}`); if ('addPostSSRHook' in railsContext) { - railsContext.addPostSSRHook(close); + railsContext.addPostSSRHook(destroy); } return () => ( diff --git a/react_on_rails_pro/spec/dummy/client/app/ror-auto-load-components/RedisReceiver.jsx b/react_on_rails_pro/spec/dummy/client/app/ror-auto-load-components/RedisReceiver.jsx index f871a73c3d..117dfe9920 100644 --- a/react_on_rails_pro/spec/dummy/client/app/ror-auto-load-components/RedisReceiver.jsx +++ b/react_on_rails_pro/spec/dummy/client/app/ror-auto-load-components/RedisReceiver.jsx @@ -34,10 +34,10 @@ const AsyncToggleContainer = async ({ children, childrenTitle, getValue }) => { }; const RedisReceiver = ({ requestId, asyncToggleContainer }, railsContext) => { - const { getValue, close } = listenToRequestData(requestId); + const { getValue, destroy } = listenToRequestData(requestId); if ('addPostSSRHook' in railsContext) { - railsContext.addPostSSRHook(close); + railsContext.addPostSSRHook(destroy); } const UsedToggleContainer = asyncToggleContainer ? AsyncToggleContainer : ToggleContainer; diff --git a/react_on_rails_pro/spec/dummy/client/app/utils/redisReceiver.ts b/react_on_rails_pro/spec/dummy/client/app/utils/redisReceiver.ts index bf8b84369b..6ed88a06b9 100644 --- a/react_on_rails_pro/spec/dummy/client/app/utils/redisReceiver.ts +++ b/react_on_rails_pro/spec/dummy/client/app/utils/redisReceiver.ts @@ -1,6 +1,6 @@ import { createClient, RedisClientType } from 'redis'; -const REDIS_READ_TIMEOUT = 10000; +const REDIS_LISTENER_TIMEOUT = 15000; // 15 seconds /** * Redis xRead result message structure @@ -23,321 +23,174 @@ interface RedisStreamResult { */ interface RequestListener { getValue: (key: string) => Promise; - close: () => Promise; -} - -interface PendingPromise { - promise: Promise; - resolve: (value: unknown) => void; - reject: (reason: unknown) => void; - timer: NodeJS.Timeout; - resolved?: boolean; -} - -// Shared Redis client -let sharedRedisClient: RedisClientType | null = null; -let isClientConnected = false; - -// Store active listeners by requestId -const activeListeners: Record = {}; - -// Store pending promises -const pendingPromises: Record = {}; - -/** - * Gets or creates the shared Redis client - */ -async function getRedisClient() { - if (!sharedRedisClient) { - const url = process.env.REDIS_URL || 'redis://localhost:6379'; - sharedRedisClient = createClient({ url }); - } - - if (!isClientConnected) { - await sharedRedisClient.connect(); - isClientConnected = true; - } - - return sharedRedisClient; -} - -/** - * Closes the shared Redis client - */ -async function closeRedisClient() { - if (sharedRedisClient && isClientConnected) { - await sharedRedisClient.quit(); - isClientConnected = false; - } + destroy: () => void; } /** * Listens to a Redis stream for data based on a requestId * @param requestId - The stream key to listen on - * @returns An object with a getValue function to get values by key + * @returns An object with a getValue function to get values by key and a close function */ export function listenToRequestData(requestId: string): RequestListener { - // If a listener for this requestId already exists, return it - const existingListener = activeListeners[requestId]; - if (existingListener) { - return existingListener; - } - - const receivedKeys: string[] = []; - - // Stream key for this request + // State - all local to this listener instance + const valuesMap = new Map(); + const valuePromises = new Map>(); const streamKey = `stream:${requestId}`; - - // IDs of messages that need to be deleted - const messagesToDelete: string[] = []; - - // Track if this listener is active - let isActive = true; - // Track if we've received the end message - let isEnded = false; - - /** - * Process a message from the Redis stream - */ - function processMessage(message: Record, messageId: string) { - // Add message to delete queue - messagesToDelete.push(messageId); - - // Check for end message - if ('end' in message) { - isEnded = true; - - // Reject any pending promises that haven't been resolved yet - Object.entries(pendingPromises).forEach(([key, pendingPromise]) => { - if (pendingPromise && !pendingPromise.resolved) { - clearTimeout(pendingPromise.timer); - pendingPromise.reject(new Error(`Key ${key} not found before stream ended`)); - pendingPromises[key] = undefined; - } - }); - - return; - } - - // Process each key-value pair in the message - Object.entries(message).forEach(([key, value]) => { - const parsedValue = JSON.parse(value) as unknown; - - // Remove colon prefix if it exists - const normalizedKey = key.startsWith(':') ? key.substring(1) : key; - receivedKeys.push(normalizedKey); - - // Resolve any pending promises for this key - const pendingPromise = pendingPromises[normalizedKey]; - if (pendingPromise) { - clearTimeout(pendingPromise.timer); - pendingPromise.resolve(parsedValue); - pendingPromise.resolved = true; // Mark as resolved - } else { - pendingPromises[normalizedKey] = { - promise: Promise.resolve(parsedValue), - resolve: () => {}, - reject: () => {}, - timer: setTimeout(() => {}, 0), - resolved: true, // Mark as resolved immediately - }; - } - }); - } + let listenToStreamPromise: Promise | null = null; + let lastId = '0'; // Start from beginning of stream + // True when streams ends and the connection is closed + let isClosed = false; + // True when user explictly calls destroy, it makes any call to getValue throw immediately + let isDestroyed = false; + + // Redis client + const url = process.env.REDIS_URL || 'redis://localhost:6379'; + const redisClient: RedisClientType = createClient({ url }); + let isConnected = false; /** - * Delete processed messages from the stream + * Closes the Redis connection and rejects all pending promises */ - async function deleteProcessedMessages() { - if (messagesToDelete.length === 0 || !isActive) { - return; - } + function close() { + if (isClosed) return; + isClosed = true; + // Close client - this will cause xRead to throw, which rejects pending promises try { - const client = await getRedisClient(); - await client.xDel(streamKey, messagesToDelete); - messagesToDelete.length = 0; // Clear the array - } catch (error) { - console.error('Error deleting messages from stream:', error); + redisClient.destroy(); + } finally { + isConnected = false; } } /** - * Check for existing messages in the stream + * Listens to the stream for the next batch of messages + * Blocks until at least one message arrives + * Multiple concurrent calls return the same promise */ - async function checkExistingMessages() { - if (!isActive) { - return; + function listenToStream(): Promise { + // Return existing promise if already listening + if (listenToStreamPromise) { + return listenToStreamPromise; } - try { - const client = await getRedisClient(); - - // Read all messages from the beginning of the stream - const results = (await client.xRead({ key: streamKey, id: '0' }, { COUNT: 100 })) as - | RedisStreamResult[] - | null; + // Create new listening promise + const promise = (async (): Promise => { + if (isClosed) { + throw new Error('Redis Connection is closed'); + } - if (results && Array.isArray(results) && results.length > 0) { - const [{ messages }] = results; + // redisClient.connect(); is called only here + // And `listenToStream` runs only one promise at a time, so no fear of race condition + if (!isConnected) { + await redisClient.connect(); + await redisClient.expire(streamKey, REDIS_LISTENER_TIMEOUT / 1000); // Set TTL to avoid stale streams + isConnected = true; + } - // Process each message - for (const { id, message } of messages) { - processMessage(message, id); - } + // xRead blocks indefinitely until message arrives + const result = (await redisClient.xRead( + { key: streamKey, id: lastId }, + { BLOCK: 0 }, // Block indefinitely + )) as RedisStreamResult[] | null; - // Delete processed messages - await deleteProcessedMessages(); + if (!result || result.length === 0) { + return; } - } catch (error) { - console.error('Error checking existing messages:', error); - } - } - /** - * Setup a listener for new messages in the stream - */ - async function setupStreamListener() { - if (!isActive) { - return; - } - - try { - const client = await getRedisClient(); + const [{ messages }] = result; - // Use $ as the ID to read only new messages - let lastId = '$'; + let receivedEndMessage = false; + for (const { id, message } of messages) { + lastId = id; - // Start reading from the stream - const readStream = async () => { - if (!isActive || isEnded) { - return; + // Check for end message + if ('end' in message) { + receivedEndMessage = true; } - try { - const results = (await client.xRead( - { key: streamKey, id: lastId }, - { COUNT: 100, BLOCK: 1000 }, - )) as RedisStreamResult[] | null; - - if (results && Array.isArray(results) && results.length > 0) { - const [{ messages }] = results; + // Process key-value pairs + Object.entries(message).forEach(([key, value]) => { + const normalizedKey = key.startsWith(':') ? key.substring(1) : key; + const parsedValue = JSON.parse(value) as unknown; + valuesMap.set(normalizedKey, parsedValue); + }); + } - // Process each message from the stream - for (const { id, message } of messages) { - lastId = id; // Update the last ID for subsequent reads - processMessage(message, id); - } + // If end message received, close the connection + if (receivedEndMessage) { + close(); + } + })(); - // Delete processed messages - await deleteProcessedMessages(); - } - } catch (error) { - console.error('Error reading from stream:', error); - } finally { - void readStream(); - } - }; + listenToStreamPromise = promise.finally(() => { + // Reset so next call creates new promise + listenToStreamPromise = null; + }); - void readStream(); - } catch (error) { - console.error('Error setting up stream listener:', error); - } + return listenToStreamPromise; } - // Start listening to existing and new messages immediately - (async () => { - try { - await checkExistingMessages(); - await setupStreamListener(); - } catch (error) { - console.error('Error initializing Redis listener:', error); + /** + * Gets a value for a specific key from the Redis stream + * Returns the same promise for multiple calls with the same key + * @param key - The key to look for in the stream + * @returns A promise that resolves when the key is found + */ + async function getValue(key: string): Promise { + if (isDestroyed) { + throw new Error(`Can't get value for key "${key}" - Redis Connection is destroyed`); } - })().catch((error: unknown) => { - console.error('Error initializing Redis listener:', error); - }); - // Create the listener object - const listener: RequestListener = { - /** - * Gets a value for a specific key from the Redis stream - * @param key - The key to look for in the stream - * @returns A promise that resolves when the key is found - */ - getValue: async (key: string) => { - // If we already have a promise for this key, return it - const existingPromise = pendingPromises[key]; - if (existingPromise) { - return existingPromise.promise; - } - - // If we've received the end message and don't have this key, reject immediately - if (isEnded) { - return Promise.reject(new Error(`Key ${key} not available, stream has ended`)); - } - - // Create a new promise for this key - let resolvePromise: ((value: unknown) => void) | undefined; - let rejectPromise: ((reason: unknown) => void) | undefined; + // Return existing promise if already requested + const valuePromise = valuePromises.get(key); + if (valuePromise) { + return valuePromise; + } - const promise = new Promise((resolve, reject) => { - resolvePromise = resolve; - rejectPromise = reject; - }); + // Create new promise that loops until value is found + const promise = (async () => { + try { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + while (true) { + // Check if value already available + if (valuesMap.has(key)) { + return valuesMap.get(key); + } - // Create a timeout that will reject the promise after 8 seconds - const timer = setTimeout(() => { - const pendingPromise = pendingPromises[key]; - if (pendingPromise) { - pendingPromise.reject( - new Error(`Timeout waiting for key: ${key}, available keys: ${receivedKeys.join(', ')}`), - ); - // Keep the pending promise in the dictionary with the error state + // Wait for next batch of messages + // eslint-disable-next-line no-await-in-loop + await listenToStream(); } - }, REDIS_READ_TIMEOUT); - - // Store the promise and its controllers - if (resolvePromise && rejectPromise) { - pendingPromises[key] = { - promise, - resolve: resolvePromise, - reject: rejectPromise, - timer, - resolved: false, // Mark as not resolved initially - }; + } catch (error) { + throw new Error( + `Error getting value for key "${key}": ${(error as Error).message}, stack: ${(error as Error).stack}`, + ); } + })(); - return promise; - }, - - /** - * Closes the Redis client connection - */ - close: async () => { - isActive = false; + valuePromises.set(key, promise); + return promise; + } - // Delete this listener from active listeners - activeListeners[requestId] = undefined; + let globalTimeout: NodeJS.Timeout; + /** + * Destroys the listener, closing the connection and preventing further getValue calls + */ + function destroy() { + if (isDestroyed) return; + isDestroyed = true; - // Reject any pending promises - Object.entries(pendingPromises).forEach(([key, pendingPromise]) => { - if (pendingPromise) { - clearTimeout(pendingPromise.timer); - pendingPromise.reject(new Error('Redis connection closed')); - pendingPromises[key] = undefined; - } - }); + // Clear global timeout + clearTimeout(globalTimeout); - // Only close the Redis client if no other listeners are active - const hasActiveListeners = Object.values(activeListeners).some(Boolean); - if (!hasActiveListeners) { - await closeRedisClient(); - } - }, - }; + close(); + } - // Store the listener in active listeners - activeListeners[requestId] = listener; + // Global timeout - destroys listener after 15 seconds + globalTimeout = setTimeout(() => { + destroy(); + }, REDIS_LISTENER_TIMEOUT); - return listener; + return { getValue, destroy }; }