Skip to content

Conversation

@AbanoubGhadban
Copy link
Collaborator

Summary

Remove this paragraph and provide a general description of the code changes in your pull
request... were there any bugs you had fixed? If so, mention them. If
these bugs have open GitHub issues, be sure to tag them here as well,
to keep the conversation linked together.

Pull Request checklist

Remove this line after checking all the items here. If the item is not applicable to the PR, both check it out and wrap it by ~.

  • Add/update test to cover these changes
  • Update documentation
  • Update CHANGELOG file

Add the CHANGELOG entry at the top of the file.

Other Information

Remove this paragraph and mention any other important and relevant information such as benchmarks.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 30, 2025

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch run-rsc-payload-generation-on-separate-worker

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude
Copy link

claude bot commented Nov 30, 2025

Code Review: Inter-Worker Message Routing for RSC Payload Generation

Thank you for this PR! This introduces a sophisticated inter-worker communication system to enable RSC payload generation on separate workers. Here's my comprehensive review:


🎯 Architecture & Design

Strengths:

  • Well-structured message routing system with clear separation of concerns
  • Smart worker selection algorithm (workersCount - 1 - workerIndex) to distribute load
  • Proper type safety with TypeScript discriminated unions
  • Good use of streams for handling potentially large payloads

Concerns:

  1. Worker Selection Logic May Cause Issues:

    const otherWorker = workers[workersCount - 1 - workerIndex];
    • With 2 workers: Worker 0 → Worker 1, Worker 1 → Worker 0 ✅
    • With 3 workers: Worker 0 → Worker 2, Worker 1 → Worker 1 ❌, Worker 2 → Worker 0
    • Issue: Odd number of workers causes self-assignment (Worker 1 sends to itself)
    • Recommendation: Add validation to prevent self-routing or document that even worker counts are required
  2. Missing Error Handling in Stream Processing:

    } else if (result.stream) {
      result.stream.on('data', (chunk) => {
        reply(chunk.toString());
      });
      result.stream.on('end', () => {
        reply(undefined, true);
      });
    }
    • No 'error' event handler on the stream
    • Recommendation: Add error handling:
    result.stream.on('error', (err) => {
      const errMsg = formatExceptionMessage(renderingRequest, err);
      errorReporter.message(errMsg);
      reply(undefined, true); // Close the stream on error
    });

🐛 Potential Bugs

  1. Memory Leak: Event Listener Not Cleaned Up on Error:
    In sendMessage() (workerMessagesRouter.ts:96-109):

    process.on('message', function messageReceivedCallback(msg) {
      // ... listener logic
    });
    // Only removed on 'end' message
    • If the remote worker crashes or throws before sending 'end', this listener stays attached forever
    • mapRequestIdToWorker entry also persists
    • Recommendation: Add timeout cleanup:
    const timeout = setTimeout(() => {
      reveivedStream.destroy(new Error('Inter-worker request timeout'));
      process.off('message', messageReceivedCallback);
    }, 30000); // Configurable timeout
    
    // Clear timeout on 'end'
    if (messageType === 'end') {
      clearTimeout(timeout);
      // ... existing code
    }
  2. Race Condition: Stream Setup vs. Message Arrival:
    In sendMessage(), the event listener is registered AFTER sending the message:

    process.on('message', function messageReceivedCallback(msg) { ... }); // Line 96
    process.send?.(initiateMessage); // Line 117
    • If the reply comes back extremely fast (same process context), it might arrive before the listener is registered
    • Recommendation: Attach listener BEFORE sending:
    process.on('message', function messageReceivedCallback(msg) { ... });
    process.send?.(initiateMessage);

    Actually, looking at the code again, the listener IS set up before sending, so this is fine. ✅

  3. Typo in Variable Name:
    Line 94: reveivedStream → should be receivedStream


🔒 Security Concerns

  1. No Validation of Worker Identity:

    • Any worker can send messages claiming any requestId
    • Malicious or buggy code could hijack requests
    • Recommendation: Add worker identity validation or at minimum, log suspicious activity
  2. Unbounded Map Growth:

    • mapRequestIdToWorker can grow indefinitely if workers crash without sending 'end'
    • Recommendation: Add periodic cleanup of stale entries (requests older than 30s)

Performance Considerations

  1. Global Process Event Listener Registration:
    In onMessageReceived() (line 68):

    process.on('message', (msg) => { ... });
    • Called during worker initialization, registers ONE global listener per worker ✅
    • However, multiple calls would add duplicate listeners
    • Recommendation: Guard against duplicate registration:
    let isListenerRegistered = false;
    export const onMessageReceived = (callback: (receivedMessage: ReceivedMessage) => void) => {
      if (isListenerRegistered) {
        throw new Error('onMessageReceived already called - only call once per worker');
      }
      isListenerRegistered = true;
      // ... rest of code
    };
  2. Stream Buffer Accumulation:

    • PassThrough stream in sendMessage() has no backpressure handling
    • If consumer is slow, chunks accumulate in memory
    • Recommendation: Document expected payload sizes or add highWaterMark limits

🧪 Test Coverage

CRITICAL MISSING: No tests for workerMessagesRouter.ts

Required test scenarios:

  1. ✅ Basic message send/receive roundtrip
  2. ✅ Stream-based responses with multiple chunks
  3. ❌ Worker crash during message processing (memory leak test)
  4. ❌ Concurrent requests from same worker
  5. ❌ Odd number of workers (self-routing scenario)
  6. ❌ Error handling in stream processing
  7. ❌ Cleanup of event listeners and map entries

Recommendation: Add workerMessagesRouter.test.ts covering these scenarios before merge.


📝 Documentation & Code Quality

  1. Missing JSDoc Comments:

    • routeMessagesFromWorker, onMessageReceived, sendMessage lack documentation
    • Complex inter-process communication needs clear docs explaining:
      • Thread safety guarantees
      • Expected call patterns
      • Lifecycle management
  2. Incomplete PR Description:

    • Template sections not filled out (Summary, Test coverage, CHANGELOG)
    • Per CLAUDE.md: "Update CHANGELOG file" is unchecked
    • Required: Update /CHANGELOG_PRO.md with entry for this Pro feature
  3. requestIdLogLabel Configuration:
    Line 138 in worker.ts:

    requestIdLogLabel: 'trackingId',
    • This seems unrelated to the inter-worker messaging feature
    • Question: Should this be a separate commit, or is it used for tracing inter-worker requests?

Positives

  • Clean separation between master and worker concerns
  • Proper TypeScript typing throughout
  • Good use of type guards (isInterWorkerMessage, isRenderOnOtherBundleRequest)
  • Stream-based API allows for efficient large payload handling
  • Error messages are descriptive

🚀 Recommendations Before Merge

Must Fix:

  1. Add comprehensive tests for workerMessagesRouter.ts
  2. Fix memory leak: Add timeout cleanup in sendMessage()
  3. Add stream error handling in worker.ts:303-309
  4. Update CHANGELOG_PRO.md
  5. Document worker count requirements (even numbers preferred) or fix odd-count issue
  6. Fix typo: reveivedStreamreceivedStream

Should Fix:
7. Add JSDoc comments to public functions
8. Add guard against duplicate onMessageReceived() calls
9. Add cleanup mechanism for stale mapRequestIdToWorker entries
10. Fill out PR description template sections

Consider:
11. Add distributed tracing IDs for inter-worker requests
12. Add metrics/logging for inter-worker communication (latency, errors)
13. Document performance characteristics (what happens with N workers)


📋 Testing Checklist

Before claiming this is ready for production:

  • Test with 1, 2, 3, 4+ workers
  • Test worker crash scenarios
  • Test with large streaming payloads (>1MB)
  • Test concurrent requests from multiple workers
  • Load test to verify no memory leaks over time
  • Verify error messages are helpful for debugging

Great work on a complex feature! The architecture is sound but needs polish around edge cases and test coverage. Happy to discuss any of these points further.

cc: @AbanoubGhadban

@claude
Copy link

claude bot commented Nov 30, 2025

Code Review

Thank you for this PR! This introduces inter-worker messaging for running RSC payload generation on separate workers. Here's my detailed feedback:

Strengths

  1. Clean Architecture: The workerMessagesRouter.ts module provides a well-encapsulated messaging system with clear separation of concerns
  2. Type Safety: Good use of TypeScript type guards (isInterWorkerMessage, isRenderOnOtherBundleRequest)
  3. Graceful Shutdown Enhancement: The integration with graceful shutdown to track active message channels is well thought out
  4. Stream Handling: The use of PassThrough streams for inter-worker communication is appropriate

🔴 Critical Issues

1. Duplicate Null Check in routeMessagesFromWorker (workerMessagesRouter.ts:39-45)

const otherWorker = workers[workersCount - 1 - workerIndex];
if (\!otherWorker) {
  throw new Error("Can't find a worker to forward the message to");
}

if (\!otherWorker) {  // ❌ Duplicate check - unreachable code
  return;
}

Fix: Remove the second if (\!otherWorker) check (lines 44-46) as it's unreachable after the throw.

2. Typo in Variable Name (workerMessagesRouter.ts:119)

const reveivedStream = new PassThrough();  // ❌ Typo: "reveived" should be "received"

Fix: Rename to receivedStream throughout the sendMessage function.

3. Missing File Extension in Import (workerMessagesRouter.ts:4)

import log from './shared/log';  // ❌ Missing .js extension

Fix: Change to import log from './shared/log.js'; for consistency with other imports and ESM compatibility.


⚠️ Potential Issues

4. Weak Worker Selection Algorithm (workerMessagesRouter.ts:37-39)

const workerIndex = workers.findIndex((w) => w?.id === worker.id);
const otherWorker = workers[workersCount - 1 - workerIndex];

Issues:

  • For 2 workers, this creates a ping-pong pattern (worker 0 ↔ worker 1)
  • With filtering on line 28-29, workerIndex might not match expectations if some workers are excluded
  • No load balancing - always picks the same "mirror" worker

Suggestion: Consider:

  • Round-robin selection
  • Random selection for better load distribution
  • Or document why the current algorithm is intentional

5. Potential Memory Leak in Event Listeners (workerMessagesRouter.ts:125-143)

process.on('message', function messageReceivedCallback(msg) {
  // ...
  if (messageType === 'end') {
    reveivedStream.push(null);
    process.off('message', messageReceivedCallback);  // ✅ Cleaned up
  }
});

Risk: If the 'end' message never arrives (network issue, worker crash), the listener leaks.

Suggestions:

  • Add timeout to clean up stale listeners
  • Track active listeners for debugging
  • Consider cleanup on stream destruction

6. Race Condition in Worker List (workerMessagesRouter.ts:28-32)

const workers = Object.values(cluster.workers ?? {}).filter((w) => {
  return worker.id === w?.id || (w?.isConnected() && \!w.isScheduledRestart);
});

Workers might disconnect between reading and sending. No error handling if otherWorker.send(msg) fails.

Suggestion: Wrap otherWorker.send(msg) in try-catch and handle failures gracefully.

7. Missing Error Propagation (worker.ts:303-309)

} else if (result.stream) {
  result.stream.on('data', (chunk) => {
    reply(chunk.toString());
  });
  result.stream.on('end', () => {
    reply(undefined, true);
  });
}

Issue: No error event handler on the stream. If the stream errors, it could crash the process or leave the reply channel open.

Fix: Add error handler:

result.stream.on('error', (err) => {
  const exceptionMessage = formatExceptionMessage(renderingRequest, err, 'Stream error');
  errorReporter.message(exceptionMessage);
  throw exceptionMessage;
});

📝 Code Quality & Maintainability

8. Inconsistent Logging Levels (handleGracefulShutdown.ts)

Changed from log.debug to log.info for shutdown messages. This is good for production visibility, but consider:

  • Are these logs too noisy for production?
  • Should they be configurable based on environment?

9. Missing JSDoc Comments

The new workerMessagesRouter.ts module lacks documentation for public functions:

  • routeMessagesFromWorker
  • onMessageReceived
  • sendMessage
  • onMessageInitiated
  • onMessageEnded

Suggestion: Add JSDoc comments explaining parameters, return values, and usage patterns.

10. No Validation of bundleTimestamp (worker.ts:292, vm.ts:200)

The bundleTimestamp from the message is used directly without validation. Consider validating it's a valid timestamp/bundle identifier.


🧪 Testing Concerns

11. No Tests for New Module

The workerMessagesRouter.ts module has no corresponding test file. Given the complexity of inter-worker communication, this needs comprehensive testing:

Required tests:

  • ✅ Message routing between 2 workers
  • ✅ Message routing with 3+ workers
  • ✅ Handling worker disconnection during message routing
  • ✅ Cleanup of mapRequestIdToWorker after 'end' message
  • ✅ Stream handling in sendMessage
  • ✅ Callback invocation for onMessageInitiated/onMessageEnded
  • ✅ Race conditions when workers restart
  • ✅ Error cases (invalid messages, missing workers, etc.)

12. Integration Testing Needed

The interaction between:

  • master.tsworkerMessagesRouter.tsworker.tsvm.ts

needs end-to-end testing to ensure RSC payloads are correctly generated on separate workers.


🔒 Security Considerations

13. No Message Payload Validation

Messages are passed directly without size limits or validation. Consider:

  • Max payload size check
  • Sanitization of renderingRequest content
  • Rate limiting per worker to prevent DoS

📋 PR Checklist (from template)

The PR template items are not completed:

  • ❌ Add/update test to cover these changes
  • ❌ Update documentation
  • ❌ Update CHANGELOG file
  • ❌ Summary section still has template text

Required before merging:

  1. Fill out PR description
  2. Add tests for workerMessagesRouter.ts
  3. Update CHANGELOG_PRO.md (this is a Pro package feature)
  4. Add documentation for the new inter-worker messaging system

🎯 Action Items

Must Fix Before Merge:

  1. Remove duplicate null check (workerMessagesRouter.ts:44-46)
  2. Fix typo: reveivedStreamreceivedStream
  3. Fix import: add .js extension to log import
  4. Add error handler for stream in worker.ts:303-309
  5. Add comprehensive tests
  6. Complete PR checklist items
  7. Update CHANGELOG_PRO.md

Should Consider:
8. Add timeout/cleanup for orphaned message listeners
9. Improve worker selection algorithm or document current approach
10. Add JSDoc documentation
11. Add error handling for worker.send() failures
12. Validate message payloads

Nice to Have:
13. Add integration tests for full message flow
14. Add metrics/monitoring for inter-worker messages
15. Consider configurability of logging levels


📚 Related to CLAUDE.md Guidelines

Per the repository's guidelines:

  • ⚠️ CRITICAL: Run bundle exec rubocop and pnpm run lint before committing
  • ⚠️ LOCAL TESTING: Since this is marked [WIP], ensure you test the inter-worker communication locally before marking ready for review
  • 📝 CHANGELOG: Update CHANGELOG_PRO.md (not CHANGELOG.md) as this is Pro-only functionality

Overall Assessment: This is a solid foundation for inter-worker messaging, but needs tests, bug fixes, and error handling improvements before merging. The architecture is sound, but the implementation has several edge cases that need addressing.

Let me know if you'd like help with any of these items!

@AbanoubGhadban AbanoubGhadban changed the base branch from master to upcoming-v17 November 30, 2025 11:02
@AbanoubGhadban AbanoubGhadban changed the title [WIP] Run RSC payload generation on other worker Run RSC payload generation on other worker Nov 30, 2025
@AbanoubGhadban AbanoubGhadban marked this pull request as ready for review November 30, 2025 11:05
@AbanoubGhadban AbanoubGhadban merged commit 066f715 into upcoming-v17 Nov 30, 2025
23 checks passed
@AbanoubGhadban AbanoubGhadban deleted the run-rsc-payload-generation-on-separate-worker branch November 30, 2025 11:05
@claude
Copy link

claude bot commented Nov 30, 2025

Code Review - PR #2145: Run RSC payload generation on other worker

Thanks for working on this feature! This PR introduces an inter-worker communication mechanism to handle RSC payload generation on separate workers. I've reviewed the code and have several important findings across code quality, bugs, performance, security, and testing.


🐛 Critical Bugs

1. Duplicate null check in routeMessagesFromWorker (workerMessagesRouter.ts:40-46)

The code checks !otherWorker twice - once throwing an error, then immediately checking again with a return statement. The second check is unreachable.

Fix: Remove lines 44-46.

2. Memory leak: Event listener not removed on worker exit (workerMessagesRouter.ts:18-64)

The routeMessagesFromWorker function registers a message listener on each worker, but these listeners are never cleaned up when workers die/restart. With scheduled worker restarts, this will accumulate listeners and leak memory from the mapRequestIdToWorker Map.

Fix: Add cleanup on worker exit event.

3. Typo in variable name (workerMessagesRouter.ts:119)

reveivedStream should be receivedStream

4. Potential race condition in graceful shutdown (handleGracefulShutdown.ts:15-16)

The singleton pattern for gracefulShutdownController is not thread-safe. While Node.js is single-threaded, async operations could potentially call setupGracefulShutdownHandler multiple times before initialization completes.


⚠️ Potential Issues

5. Worker selection algorithm may not distribute load evenly (workerMessagesRouter.ts:36-39)

The mirror selection algorithm (first worker → last worker, second → second-to-last) could create uneven load distribution if most requests come to specific workers.

Consider: Round-robin or least-loaded worker selection.

6. No timeout mechanism for inter-worker requests (workerMessagesRouter.ts:117-154)

If a worker crashes or becomes unresponsive after receiving a message, the requesting worker will wait indefinitely. The stream will never close.

Fix: Add timeout handling with setTimeout and clearTimeout.

7. No error propagation from worker to sender (worker.ts:282-325)

When handleRenderRequest throws an error in the receiving worker, the error is logged but never sent back to the requesting worker via reply(). The requesting worker's stream will hang.


🔒 Security Concerns

8. No validation of message payload size

The payload field in InterWorkerMessage is typed as unknown with no size limits. Malicious or buggy code could send extremely large payloads, causing memory exhaustion.

9. No message origin validation

The code trusts that messages with type 'inter-worker-message' are legitimate. Consider more thorough message structure validation.


Performance Considerations

10. Excessive use of setTimeout(..., 0) for callbacks (workerMessagesRouter.ts:104, 111, 122, 140)

This defers execution to the next tick unnecessarily. Unless there's a specific reason to break the call stack, invoke callbacks directly or use process.nextTick().

11. Global event listener in sendMessage (workerMessagesRouter.ts:125)

Every call to sendMessage registers a new process.on('message') listener. While removed after the message ends, this pattern can cause performance issues under high load.

Consider: Use a single global listener with a request ID router.


🧪 Test Coverage

13. No tests for new functionality

This PR adds workerMessagesRouter.ts with ~150 lines of critical inter-process communication code, but no corresponding tests.

Required tests:

  • Message routing between workers
  • Graceful shutdown with active inter-worker messages
  • Error handling when target worker dies mid-request
  • Timeout scenarios
  • Edge cases (1 worker, 2 workers, worker restart during message)

14. Integration tests needed

Add tests that spawn multiple workers, trigger runOnOtherBundle, verify correct routing, and test graceful shutdown.


📋 Process & Documentation

15. PR description is incomplete

Please update the PR body with actual description, motivation, and any breaking changes.

16. Missing CHANGELOG entry

Per CLAUDE.md, this should be in /CHANGELOG_PRO.md (Pro package code).

17. Missing tests checkbox

The PR checklist shows "Add/update test" is unchecked.

18. Missing JSDoc comments

The new file lacks documentation explaining what problem it solves, how routing works, and concurrency considerations.


What's Good

  • Clear separation of concerns
  • Good TypeScript type guards
  • Integration with graceful shutdown
  • Stream-based responses

🎯 Priority Actions Before Merging

  1. Fix duplicate null check (Bug TODO for first version #1)
  2. Fix typo "reveivedStream" (Bug Add linting and CI scripts #3)
  3. Add worker exit cleanup (Bug Make work with turbolinks and better helper #2)
  4. Add timeout mechanism (Issue Detailed Setup and Usage Docs #6)
  5. Add error propagation (Issue Clean up app/startup/clientApp.jsx and app/startup/serverApp.jsx #7)
  6. Add comprehensive tests
  7. Update CHANGELOG_PRO.md
  8. Complete PR description

Let me know if you'd like help with any of these!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants