-
-
Notifications
You must be signed in to change notification settings - Fork 638
Add Concurrency Tests #1890
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Concurrency Tests #1890
Changes from 3 commits
55acf47
e4ae8b4
8e857ac
ec23294
aa3d0c2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| import * as EventEmitter from 'node:events'; | ||
|
|
||
| class AsyncQueue<T> { | ||
| private eventEmitter = new EventEmitter<{ data: any, end: any }>(); | ||
|
Check failure on line 4 in packages/react-on-rails-pro/tests/AsyncQueue.ts
|
||
| private buffer: T[] = []; | ||
| private isEnded = false; | ||
|
|
||
| enqueue(value: T) { | ||
| if (this.isEnded) { | ||
| throw new Error("Queue Ended"); | ||
| } | ||
|
|
||
| if (this.eventEmitter.listenerCount('data') > 0) { | ||
| this.eventEmitter.emit('data', value); | ||
| } else { | ||
| this.buffer.push(value); | ||
| } | ||
| } | ||
|
|
||
| end() { | ||
| this.isEnded = true; | ||
| this.eventEmitter.emit('end'); | ||
| } | ||
|
|
||
| dequeue() { | ||
| return new Promise<T>((resolve, reject) => { | ||
| const bufferValueIfExist = this.buffer.shift(); | ||
| if (bufferValueIfExist) { | ||
| resolve(bufferValueIfExist); | ||
| } else if (this.isEnded) { | ||
| reject(new Error("Queue Ended")); | ||
| } else { | ||
| let teardown = () => {} | ||
| const onData = (value: T) => { | ||
| resolve(value); | ||
| teardown(); | ||
| } | ||
|
|
||
| const onEnd = () => { | ||
| reject(new Error("Queue Ended")); | ||
| teardown(); | ||
| } | ||
|
|
||
| this.eventEmitter.on('data', onData); | ||
| this.eventEmitter.on('end', onEnd); | ||
| teardown = () => { | ||
| this.eventEmitter.off('data', onData); | ||
| this.eventEmitter.off('end', onEnd); | ||
| } | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| toString() { | ||
| return "" | ||
| } | ||
| } | ||
|
|
||
| export default AsyncQueue; | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,33 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import { PassThrough, Readable } from 'node:stream'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import AsyncQueue from './AsyncQueue'; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| class StreamReader { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private asyncQueue: AsyncQueue<string>; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| constructor(pipeableStream: Pick<Readable, 'pipe'>) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this.asyncQueue = new AsyncQueue(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const decoder = new TextDecoder(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const readableStream = new PassThrough(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| pipeableStream.pipe(readableStream); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| readableStream.on('data', (chunk) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const decodedChunk = decoder.decode(chunk); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this.asyncQueue.enqueue(decodedChunk); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (readableStream.closed) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this.asyncQueue.end(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| readableStream.on('end', () => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this.asyncQueue.end(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| readableStream.on('data', (chunk) => { | |
| const decodedChunk = decoder.decode(chunk); | |
| this.asyncQueue.enqueue(decodedChunk); | |
| }); | |
| if (readableStream.closed) { | |
| this.asyncQueue.end(); | |
| } else { | |
| readableStream.on('end', () => { | |
| this.asyncQueue.end(); | |
| }); | |
| } | |
| readableStream.on('data', (chunk) => { | |
| const decodedChunk = decoder.decode(chunk, { stream: true }); | |
| if (decodedChunk.length > 0) { | |
| this.asyncQueue.enqueue(decodedChunk); | |
| } | |
| }); | |
| readableStream.on('end', () => { | |
| const flushed = decoder.decode(); | |
| if (flushed.length > 0) { | |
| this.asyncQueue.enqueue(flushed); | |
| } | |
| this.asyncQueue.end(); | |
| }); |
🤖 Prompt for AI Agents
In packages/react-on-rails-pro/tests/StreamReader.ts around lines 14–25, the
code decodes each chunk with decoder.decode(chunk) which resets decoder state
and breaks multi-byte characters; change to decoder.decode(chunk, { stream: true
}) for each 'data' chunk, and on both the immediate closed path and the 'end'
handler call decoder.decode() (or decoder.decode(undefined)) once to flush any
buffered bytes, enqueue the flushed string if non-empty, then call
this.asyncQueue.end().
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,141 @@ | ||
| /** | ||
| * @jest-environment node | ||
| */ | ||
| /// <reference types="react/experimental" /> | ||
|
|
||
| import * as React from 'react'; | ||
| import { PassThrough, Readable, Transform } from 'node:stream'; | ||
| import { text } from 'node:stream/consumers'; | ||
| import { Suspense, PropsWithChildren } from 'react'; | ||
|
|
||
| import * as path from 'path'; | ||
| import * as mock from 'mock-fs'; | ||
|
|
||
| import ReactOnRails, { RailsContextWithServerStreamingCapabilities } from '../src/ReactOnRailsRSC'; | ||
| import AsyncQueue from './AsyncQueue'; | ||
| import StreamReader from './StreamReader'; | ||
| import removeRSCChunkStack from './utils/removeRSCChunkStack'; | ||
|
|
||
| const manifestFileDirectory = path.resolve(__dirname, '../src') | ||
| const clientManifestPath = path.join(manifestFileDirectory, 'react-client-manifest.json'); | ||
|
|
||
| mock({ | ||
| [clientManifestPath]: JSON.stringify({ | ||
| filePathToModuleMetadata: {}, | ||
| moduleLoading: { prefix: '', crossOrigin: null }, | ||
| }), | ||
| }); | ||
|
|
||
| afterAll(() => mock.restore()); | ||
|
|
||
| const AsyncQueueItem = async ({ asyncQueue, children }: PropsWithChildren<{asyncQueue: AsyncQueue<string>}>) => { | ||
| const value = await asyncQueue.dequeue(); | ||
|
|
||
| return ( | ||
| <> | ||
| <p>Data: {value}</p> | ||
| {children} | ||
| </> | ||
| ) | ||
| } | ||
|
|
||
| const AsyncQueueContainer = ({ asyncQueue }: { asyncQueue: AsyncQueue<string> }) => { | ||
| return ( | ||
| <div> | ||
| <h1>Async Queue</h1> | ||
| <Suspense fallback={<p>Loading Item1</p>}> | ||
| <AsyncQueueItem asyncQueue={asyncQueue}> | ||
| <Suspense fallback={<p>Loading Item2</p>}> | ||
| <AsyncQueueItem asyncQueue={asyncQueue}> | ||
| <Suspense fallback={<p>Loading Item3</p>}> | ||
| <AsyncQueueItem asyncQueue={asyncQueue} /> | ||
| </Suspense> | ||
| </AsyncQueueItem> | ||
| </Suspense> | ||
| </AsyncQueueItem> | ||
| </Suspense> | ||
| </div> | ||
| ) | ||
| } | ||
|
|
||
| ReactOnRails.register({ AsyncQueueContainer }); | ||
|
|
||
| const renderComponent = (props: Record<string, unknown>) => { | ||
| return ReactOnRails.serverRenderRSCReactComponent({ | ||
| railsContext: { | ||
| reactClientManifestFileName: 'react-client-manifest.json', | ||
| reactServerClientManifestFileName: 'react-server-client-manifest.json', | ||
| } as unknown as RailsContextWithServerStreamingCapabilities, | ||
| name: 'AsyncQueueContainer', | ||
| renderingReturnsPromises: true, | ||
| throwJsErrors: true, | ||
| domNodeId: 'dom-id', | ||
| props, | ||
| }); | ||
| } | ||
|
||
|
|
||
| const createParallelRenders = (size: number) => { | ||
| const asyncQueues = new Array(size).fill(null).map(() => new AsyncQueue<string>()); | ||
| const streams = asyncQueues.map((asyncQueue) => { | ||
| return renderComponent({ asyncQueue }); | ||
| }); | ||
| const readers = streams.map(stream => new StreamReader(stream)); | ||
|
|
||
| const enqueue = (value: string) => asyncQueues.forEach(asyncQueues => asyncQueues.enqueue(value)); | ||
|
|
||
| const expectNextChunk = (nextChunk: string) => Promise.all( | ||
| readers.map(async (reader) => { | ||
| const chunk = await reader.nextChunk(); | ||
| expect(removeRSCChunkStack(chunk)).toEqual(removeRSCChunkStack(nextChunk)); | ||
| }) | ||
| ); | ||
|
|
||
| const expectEndOfStream = () => Promise.all( | ||
| readers.map(reader => expect(reader.nextChunk()).rejects.toThrow(/Queue Ended/)) | ||
| ); | ||
|
|
||
| return { enqueue, expectNextChunk, expectEndOfStream }; | ||
| } | ||
|
|
||
| test('Renders concurrent rsc streams as single rsc stream', async () => { | ||
| expect.assertions(258); | ||
| const asyncQueue = new AsyncQueue<string>(); | ||
| const stream = renderComponent({ asyncQueue }); | ||
| const reader = new StreamReader(stream); | ||
|
|
||
| const chunks: string[] = []; | ||
| let chunk = await reader.nextChunk() | ||
| chunks.push(chunk); | ||
| expect(chunk).toContain("Async Queue"); | ||
| expect(chunk).toContain("Loading Item2"); | ||
| expect(chunk).not.toContain("Random Value"); | ||
|
|
||
| asyncQueue.enqueue("Random Value1"); | ||
| chunk = await reader.nextChunk(); | ||
| chunks.push(chunk); | ||
| expect(chunk).toContain("Random Value1"); | ||
|
|
||
| asyncQueue.enqueue("Random Value2"); | ||
| chunk = await reader.nextChunk(); | ||
| chunks.push(chunk); | ||
| expect(chunk).toContain("Random Value2"); | ||
|
|
||
| asyncQueue.enqueue("Random Value3"); | ||
| chunk = await reader.nextChunk(); | ||
| chunks.push(chunk); | ||
| expect(chunk).toContain("Random Value3"); | ||
|
|
||
| await expect(reader.nextChunk()).rejects.toThrow(/Queue Ended/); | ||
|
|
||
| const { enqueue, expectNextChunk, expectEndOfStream } = createParallelRenders(50); | ||
|
|
||
| expect(chunks).toHaveLength(4); | ||
| await expectNextChunk(chunks[0]!); | ||
| enqueue("Random Value1"); | ||
| await expectNextChunk(chunks[1]!); | ||
| enqueue("Random Value2"); | ||
| await expectNextChunk(chunks[2]!); | ||
| enqueue("Random Value3"); | ||
| await expectNextChunk(chunks[3]!); | ||
| await expectEndOfStream(); | ||
| }); | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dequeue must honor falsy buffered values.
buffer.shift()can legitimately return'',0, orfalse, but the current truthiness check falls through to the “queue ended” branch and rejects even though data is buffered. Compare againstundefinedinstead so every enqueued value is delivered.📝 Committable suggestion
🤖 Prompt for AI Agents