Add support for node client based API execution#20619
Add support for node client based API execution#20619rishabhmaurya wants to merge 1 commit intoopensearch-project:mainfrom
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis pull request introduces streaming action support to OpenSearch's transport layer, enabling multiple batched responses for single requests. It adds base classes for streaming actions, a streaming listener interface, NodeClient integration, and example implementations for both basic and node-based streaming patterns. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client<br/>(NodeClient)
participant StreamAction as StreamTransportAction<br/>(Basic)
participant Service as StreamTransportService
participant Handler as StreamTransportResponseHandler
participant Listener as StreamActionListener
Client->>StreamAction: executeStream(request, listener)
activate StreamAction
StreamAction->>Service: sendStreamRequest(target, request, options)
activate Service
Service->>Handler: new StreamTransportResponseHandler()
activate Handler
Handler->>Handler: handleStreamResponse(batch)
Handler->>Handler: iterate responses
Handler->>Listener: onStreamResponse(each response)
Handler->>Listener: onFinalResponse()
deactivate Handler
Service-->>StreamAction: completion
deactivate Service
deactivate StreamAction
sequenceDiagram
participant Client as Client<br/>(NodeClient)
participant NodesAction as TransportStreamNodesAction
participant ClusterState as ClusterState
participant NodeHandler as handleNodeRequest
participant Channel as TransportChannel
participant Listener as StreamActionListener
Client->>NodesAction: executeStream(nodesRequest, listener)
activate NodesAction
NodesAction->>ClusterState: resolveNodes()
loop For each target node
NodesAction->>NodeHandler: dispatch nodeRequest
activate NodeHandler
NodeHandler->>Channel: nodeStreamOperation()
activate Channel
Channel->>Channel: send batch responses
Channel->>Channel: optional delay
Channel->>Channel: completeStream()
deactivate Channel
deactivate NodeHandler
end
NodesAction->>Listener: onStreamResponse (from each node)
NodesAction->>Listener: onFinalResponse (aggregated)
deactivate NodesAction
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested labels
🚥 Pre-merge checks | ✅ 3 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 9
🤖 Fix all issues with AI agents
In `@plugins/examples/stream-transport-example/README.md`:
- Around line 114-116: Update the README example signature to match the plugin's
actual return type: change the getActions() declaration shown to use
List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> so it
aligns with the implementation in StreamTransportExamplePlugin (refer to
getActions and ActionHandler types) and avoids confusion for readers; ensure the
example instantiation (new ActionHandler<>(MyStreamAction.INSTANCE,
TransportMyStreamAction.class)) remains unchanged.
In
`@plugins/examples/stream-transport-example/src/internalClusterTest/java/org/opensearch/example/stream/basic/StreamTransportExampleIT.java`:
- Line 41: The `@AwaitsFix` annotation on the StreamTransportExampleIT test uses a
non-URL bugUrl ("flaky"); update the `@AwaitsFix`(bugUrl = "...") to point to a
real tracked issue URL that documents the flakiness (e.g., the GitHub issue/PR
URL) or remove the annotation if no issue exists; ensure you edit the annotation
declaration on the StreamTransportExampleIT class so bugUrl contains a valid
HTTP(S) link to the issue.
In
`@plugins/examples/stream-transport-example/src/internalClusterTest/java/org/opensearch/example/stream/nodes/StreamNodesTransportExampleIT.java`:
- Around line 43-44: The `@AwaitsFix` annotation on StreamNodesTransportExampleIT
(used alongside `@LockFeatureFlag`(STREAM_TRANSPORT)) currently uses a placeholder
bugUrl = "flaky"; create a real GitHub issue describing the flakiness (include
repro steps and suspected cause such as the fail()/latch ordering) and replace
the placeholder with the issue's full URL in the `@AwaitsFix`(bugUrl = "...")
value so the test points to a traceable bug report.
- Around line 66-71: onStreamFailure currently calls fail("...") which throws
AssertionError before latch.countDown() can run; move the latch countdown so it
always executes (e.g., call latch.countDown() before invoking fail) or ensure
it's executed in a finally block around the fail call; update the
onStreamFailure handler (method name onStreamFailure, uses logger.error, fail(),
and latch.countDown()) so the latch is always released when a stream error
occurs.
In
`@plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/nodes/TransportStreamNodesDataAction.java`:
- Line 64: The `@Nullable` on the constructor parameter streamTransportService
contradicts the constructor's null-check in TransportStreamNodesDataAction
(which throws IllegalStateException if null); remove the `@Nullable` annotation
from the streamTransportService parameter (and any corresponding import) so the
signature reflects that null is not allowed, and ensure any other occurrences of
`@Nullable` for this parameter are removed or updated to match the non-null
contract.
In
`@server/src/main/java/org/opensearch/action/support/nodes/TransportStreamNodesAction.java`:
- Around line 80-92: executeStream currently calls listener.onFinalResponse()
even when all remote node requests fail, so the accumulated failures (the
failures list built in handleException/send-request error paths) are never
surfaced; update executeStream to detect when nodes.length > 0 but every node
resulted in a failure and then invoke listener.onStreamFailure(...) with an
appropriate aggregated Exception (or pass the failures to the listener) instead
of silently calling listener.onFinalResponse(), or alternatively include the
failures in the payload passed to listener.onFinalResponse(); locate the
failure-accumulation logic around the failures list and
handleException/sendRequest code paths and ensure you call
listener.onStreamFailure (or augment onFinalResponse) when the failures list is
non-empty and no onStreamResponse was emitted for any node.
- Around line 109-126: The snapshot of the shared failures list is taken
unsafely in TransportStreamNodesAction.handleStreamResponse via new
ArrayList<>(failures) which can race with synchronized(failures){
failures.add(...) } in other handlers; fix by acquiring the same lock when
making the snapshot (e.g., wrap the creation of the ArrayList snapshot and the
listener.onStreamResponse call in synchronized(failures)) or replace the
failures list with a thread-safe implementation such as CopyOnWriteArrayList and
remove the manual synchronized blocks so that new ArrayList<>(failures) is safe.
- Around line 109-126: The handleStreamResponse method can leak the
StreamTransportResponse because streamResponse.close() is only called on the
happy path; move the close into a finally block to always release the resource.
Specifically, wrap the while-loop and listener.onStreamResponse calls inside a
try and call streamResponse.close() in a finally (or use try-with-resources if
StreamTransportResponse implements AutoCloseable) inside handleStreamResponse so
that close() runs regardless of exceptions; keep the existing catch behavior
that calls streamResponse.cancel(...) and adds a FailedNodeException to
failures, and ensure the counter.decrementAndGet() and
listener.onFinalResponse() logic remains in the outer finally.
In
`@server/src/main/java/org/opensearch/action/support/StreamTransportAction.java`:
- Around line 36-44: doExecute currently rejects streaming requests and
executeStream lacks a Task parameter, so streaming actions cannot be tracked or
canceled; update the abstract method signature in StreamTransportAction from
executeStream(Request request, StreamActionListener<Response> listener) to
include the Task (e.g., executeStream(Task task, Request request,
StreamActionListener<Response> listener), update any callers (including where
doExecute should forward the incoming Task into executeStream) to pass the Task
through, and ensure implementations use the Task for task
registration/cancellation; if omitting Task was intentional, add a comment on
StreamTransportAction.executeStream explaining why streaming actions are not
tracked.
🧹 Nitpick comments (10)
plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/nodes/package-info.java (1)
9-11: Consider expanding the package documentation.The current documentation is accurate but minimal. For an example plugin that demonstrates a specific pattern, consider adding more detail to help developers understand:
- What nodes streaming transport actions are
- How they differ from basic streaming actions
- Key classes in this package and their roles
This would make the example more educational for developers exploring the codebase.
📝 Suggested enhanced documentation
/** - * Nodes streaming transport action example + * Nodes streaming transport action example. + * + * <p>This package demonstrates how to implement a nodes-based streaming transport action + * that coordinates streaming responses across multiple nodes in an OpenSearch cluster. + * Key classes include: + * <ul> + * <li>{`@code` StreamNodesDataAction} - The streaming action definition</li> + * <li>{`@code` TransportStreamNodesDataAction} - Transport layer implementation with node coordination</li> + * <li>{`@code` NodeStreamDataRequest}/{`@code` NodeStreamDataResponse} - Node-level request/response types</li> + * </ul> + * + * <p>Compare with the {`@code` org.opensearch.example.stream.basic} package for simpler + * single-node streaming examples. */plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/basic/StreamDataResponse.java (1)
17-21: Duplicate and unhelpful Javadoc comments throughout the file.Every class member has two consecutive Javadoc blocks (e.g.,
/** Example class */followed by/** Example */). Only the last Javadoc before a declaration is recognized by the Javadoc processor, making the preceding blocks dead documentation. This pattern repeats for every constructor and method in the file.Additionally, the Javadoc at line 48–52 is placed after
@Override(line 47), which is unconventional — annotations typically follow the Javadoc block, not precede it.Consider cleaning up to a single, meaningful Javadoc per element.
server/src/main/java/org/opensearch/action/StreamActionListener.java (1)
19-33: Consider documenting the callback contract.The interface doesn't specify ordering/concurrency guarantees. For example:
- Can
onStreamResponsebe called concurrently from multiple threads?- Is
onStreamFailuremutually exclusive withonFinalResponse(i.e., exactly one of them is called to terminate the stream)?- Can
onStreamResponsebe called afteronFinalResponseoronStreamFailure?Even for an experimental API, documenting these invariants in the interface Javadoc helps implementers write correct listeners. The example implementations in
TransportStreamDataActionsuggest the contract is: multipleonStreamResponse→ exactly one ofonFinalResponseoronStreamFailure, but this isn't stated.plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/basic/StreamDataAction.java (1)
13-21: Duplicate and misleading Javadoc comments.There are three stacked Javadoc blocks on the class (lines 13–16) and the field-level comments say
/** Method */for what are actuallystatic finalfields. Only the last Javadoc block before each element is retained by the compiler, so the earlier blocks are dead documentation. Additionally, labelling fields as "Method" is misleading.Proposed fix
-/** - * Example class - */ -/** Example */ +/** Example streaming action type */ public class StreamDataAction extends ActionType<StreamDataResponse> { - /** Method */ + /** Singleton instance */ public static final StreamDataAction INSTANCE = new StreamDataAction(); - /** Method */ + /** Action name */ public static final String NAME = "cluster:admin/stream_data";plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/nodes/StreamNodesDataRequest.java (2)
17-23: Systematic triple-stacked Javadoc comments throughout the file.Every element in this file has three Javadoc comment blocks stacked (e.g., lines 17–23, 28–36, 43–50, etc.). Only the last
/** ... */block before a declaration is retained by the compiler—the preceding ones are silently discarded. This pattern repeats across all new files in this PR and appears to be a tooling artifact. Please clean up to keep only one meaningful Javadoc block per element.
85-95:@Overrideplaced before the Javadoc block.Convention in Java (and in the OpenSearch codebase) is to place
@Overridedirectly above the method signature, after any Javadoc. Here it appears before the Javadoc comments.Proposed fix
- `@Override` - /** - * Method - * `@return` result - */ - /** Method */ + /** {`@inheritDoc`} */ + `@Override` public void writeTo(StreamOutput out) throws IOException {plugins/examples/stream-transport-example/src/internalClusterTest/java/org/opensearch/example/stream/nodes/StreamNodesTransportExampleIT.java (1)
31-36: Use the importedSTREAM_TRANSPORTconstant instead of the hardcoded string.Line 25 imports
STREAM_TRANSPORTand line 43 uses it with@LockFeatureFlag, but line 34 hardcodes"opensearch.experimental.feature.transport.stream.enabled". Use the constant for consistency and to avoid drift if the flag name changes.Proposed fix
protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) - .put("opensearch.experimental.feature.transport.stream.enabled", true) + .put(STREAM_TRANSPORT, true) .build(); }plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/nodes/TransportStreamNodesDataAction.java (1)
32-38: Remove duplicate/stacked Javadoc blocks.There are triple Javadoc comments stacked before the class declaration (lines 32–38) and the constructor (lines 48–60). In Java, only the last Javadoc block immediately preceding the element is used; the earlier ones become orphaned comments that confuse readers and tools.
Keep only one Javadoc per declaration.
Proposed cleanup for class-level Javadoc
-/** - * Demonstrates streaming across multiple nodes with aggregation - */ -/** - * Example class - */ -/** Example */ +/** + * Demonstrates streaming across multiple nodes with aggregation + */ public class TransportStreamNodesDataAction extends TransportStreamNodesAction<Proposed cleanup for constructor Javadoc
/** * Constructor * `@param` threadPool the thread pool * `@param` clusterService the cluster service * `@param` streamTransportService the stream transport service * `@param` transportService the transport service * `@param` actionFilters action filters */ `@Inject` - /** - * Constructor - */ - /** Method */ public TransportStreamNodesDataAction(Also applies to: 48-60
plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/basic/TransportStreamDataAction.java (2)
36-39: Same stacked Javadoc issue — clean up duplicate blocks.Same pattern as in
TransportStreamNodesDataAction: multiple Javadoc comments stacked before class, constructor, and method declarations. Only the last Javadoc before each element is effective.Also applies to: 51-55, 73-78
91-103:streamResponse.close()should be in afinallyblock.If an exception occurs between
nextResponse()returningnull(end of stream) andclose(), or ifonFinalResponse()throws, theStreamTransportResponseresource may not be properly released. Consider using try-with-resources or afinallyblock forclose().Proposed fix
public void handleStreamResponse(StreamTransportResponse<StreamDataResponse> streamResponse) { try { StreamDataResponse response; while ((response = streamResponse.nextResponse()) != null) { listener.onStreamResponse(response); } - streamResponse.close(); listener.onFinalResponse(); } catch (Exception e) { streamResponse.cancel("Client error", e); listener.onStreamFailure(e); + } finally { + streamResponse.close(); } }
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (25)
plugins/examples/stream-transport-example/README.mdplugins/examples/stream-transport-example/build.gradleplugins/examples/stream-transport-example/src/internalClusterTest/java/org/opensearch/example/stream/StreamTransportExampleIT.javaplugins/examples/stream-transport-example/src/internalClusterTest/java/org/opensearch/example/stream/basic/StreamTransportExampleIT.javaplugins/examples/stream-transport-example/src/internalClusterTest/java/org/opensearch/example/stream/nodes/StreamNodesTransportExampleIT.javaplugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/StreamTransportExamplePlugin.javaplugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/TransportStreamDataAction.javaplugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/basic/StreamDataAction.javaplugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/basic/StreamDataRequest.javaplugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/basic/StreamDataResponse.javaplugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/basic/TransportStreamDataAction.javaplugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/basic/package-info.javaplugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/nodes/NodeStreamDataRequest.javaplugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/nodes/NodeStreamDataResponse.javaplugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/nodes/StreamNodesDataAction.javaplugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/nodes/StreamNodesDataRequest.javaplugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/nodes/StreamNodesDataResponse.javaplugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/nodes/TransportStreamNodesDataAction.javaplugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/nodes/package-info.javaserver/src/main/java/org/opensearch/action/StreamActionListener.javaserver/src/main/java/org/opensearch/action/support/StreamTransportAction.javaserver/src/main/java/org/opensearch/action/support/nodes/TransportStreamNodesAction.javaserver/src/main/java/org/opensearch/node/Node.javaserver/src/main/java/org/opensearch/transport/StreamTransportService.javaserver/src/main/java/org/opensearch/transport/client/node/NodeClient.java
💤 Files with no reviewable changes (2)
- plugins/examples/stream-transport-example/src/internalClusterTest/java/org/opensearch/example/stream/StreamTransportExampleIT.java
- plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/TransportStreamDataAction.java
🧰 Additional context used
🧠 Learnings (4)
📚 Learning: 2026-01-13T17:40:27.167Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20411
File: server/src/main/java/org/opensearch/index/codec/CodecService.java:112-133
Timestamp: 2026-01-13T17:40:27.167Z
Learning: Avoid capturing or evaluating a supplier (e.g., this::defaultCodec) upfront when passing it to a registry during object construction. If registries may replace defaults during iteration (as in EnginePlugin.getAdditionalCodecs), pass the supplier itself and only resolve it at use time. This ensures dynamic behavior is preserved during initialization and prevents premature binding of defaults in codecs/registry setup. This pattern should apply to similar initialization paths in Java server code where registries may mutate defaults during construction.
Applied to files:
server/src/main/java/org/opensearch/action/StreamActionListener.javaserver/src/main/java/org/opensearch/node/Node.javaserver/src/main/java/org/opensearch/action/support/StreamTransportAction.javaserver/src/main/java/org/opensearch/transport/StreamTransportService.javaserver/src/main/java/org/opensearch/transport/client/node/NodeClient.javaserver/src/main/java/org/opensearch/action/support/nodes/TransportStreamNodesAction.java
📚 Learning: 2026-01-02T19:23:29.698Z
Learnt from: karenyrx
Repo: opensearch-project/OpenSearch PR: 20335
File: modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/search/SearchSourceBuilderProtoUtils.java:155-172
Timestamp: 2026-01-02T19:23:29.698Z
Learning: The gRPC search API in OpenSearch is marked as "experimental" in official documentation, so changes to proto schemas that remove previously unsupported fields (those throwing UnsupportedOperationException) are not considered breaking changes.
Applied to files:
server/src/main/java/org/opensearch/transport/StreamTransportService.java
📚 Learning: 2025-12-13T20:16:15.318Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4Http3ServerTransport.java:101-123
Timestamp: 2025-12-13T20:16:15.318Z
Learning: In OpenSearch, only one HTTP transport implementation can be active and loaded at a time, so duplicate setting definitions (such as h3.max_stream_local_length, h3.max_stream_remote_length, and h3.max_streams) across different transport implementations like Netty4Http3ServerTransport and ReactorNetty4HttpServerTransport will not cause setting registration conflicts.
Applied to files:
server/src/main/java/org/opensearch/transport/StreamTransportService.javaserver/src/main/java/org/opensearch/transport/client/node/NodeClient.java
📚 Learning: 2025-12-12T18:40:08.452Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.
Applied to files:
plugins/examples/stream-transport-example/src/internalClusterTest/java/org/opensearch/example/stream/nodes/StreamNodesTransportExampleIT.javaplugins/examples/stream-transport-example/src/internalClusterTest/java/org/opensearch/example/stream/basic/StreamTransportExampleIT.java
🧬 Code graph analysis (7)
server/src/main/java/org/opensearch/action/StreamActionListener.java (2)
server/src/main/java/org/opensearch/action/support/StreamTransportAction.java (1)
ExperimentalApi(24-45)server/src/main/java/org/opensearch/transport/StreamTransportService.java (1)
ExperimentalApi(37-139)
plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/nodes/StreamNodesDataAction.java (1)
plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/nodes/StreamNodesDataResponse.java (1)
StreamNodesDataResponse(24-51)
server/src/main/java/org/opensearch/action/support/StreamTransportAction.java (1)
server/src/main/java/org/opensearch/transport/StreamTransportService.java (1)
ExperimentalApi(37-139)
server/src/main/java/org/opensearch/transport/StreamTransportService.java (3)
server/src/main/java/org/opensearch/action/support/StreamTransportAction.java (1)
ExperimentalApi(24-45)server/src/main/java/org/opensearch/cluster/StreamNodeConnectionsService.java (1)
ExperimentalApi(20-26)server/src/main/java/org/opensearch/transport/stream/StreamException.java (1)
ExperimentalApi(24-150)
plugins/examples/stream-transport-example/src/internalClusterTest/java/org/opensearch/example/stream/nodes/StreamNodesTransportExampleIT.java (5)
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightStreamPlugin.java (1)
FlightStreamPlugin(64-309)plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/StreamTransportExamplePlugin.java (1)
StreamTransportExamplePlugin(25-39)test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java (1)
Plugin(86-112)server/src/main/java/org/opensearch/common/util/FeatureFlags.java (1)
FeatureFlags(29-337)plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/nodes/StreamNodesDataAction.java (1)
StreamNodesDataAction(20-31)
plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/nodes/StreamNodesDataResponse.java (1)
plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/nodes/NodeStreamDataResponse.java (1)
NodeStreamDataResponse(22-75)
plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/StreamTransportExamplePlugin.java (4)
plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/basic/StreamDataAction.java (1)
StreamDataAction(17-26)plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/basic/TransportStreamDataAction.java (1)
TransportStreamDataAction(40-154)plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/nodes/StreamNodesDataAction.java (1)
StreamNodesDataAction(20-31)plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/nodes/TransportStreamNodesDataAction.java (1)
TransportStreamNodesDataAction(39-131)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (21)
- GitHub Check: gradle-check
- GitHub Check: Analyze (java)
- GitHub Check: precommit (21, windows-2025, true)
- GitHub Check: precommit (25, ubuntu-24.04-arm)
- GitHub Check: precommit (21, ubuntu-24.04-arm)
- GitHub Check: precommit (21, macos-15-intel)
- GitHub Check: precommit (25, macos-15)
- GitHub Check: precommit (21, macos-15)
- GitHub Check: precommit (21, ubuntu-latest)
- GitHub Check: precommit (21, windows-latest)
- GitHub Check: precommit (25, ubuntu-latest)
- GitHub Check: precommit (25, macos-15-intel)
- GitHub Check: precommit (25, windows-latest)
- GitHub Check: assemble (25, windows-latest)
- GitHub Check: detect-breaking-change
- GitHub Check: assemble (25, ubuntu-latest)
- GitHub Check: assemble (21, ubuntu-24.04-arm)
- GitHub Check: assemble (21, windows-latest)
- GitHub Check: assemble (21, ubuntu-latest)
- GitHub Check: assemble (25, ubuntu-24.04-arm)
- GitHub Check: Mend Security Check
🔇 Additional comments (16)
plugins/examples/stream-transport-example/build.gradle (1)
14-15: LGTM — disabling Javadoc checks is reasonable for an example plugin.Consistent with the existing
testingConventions.enabled = falseon the line above.plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/basic/package-info.java (1)
1-12: LGTM!Standard package-info with license header and package declaration.
server/src/main/java/org/opensearch/transport/StreamTransportService.java (1)
15-37: LGTM!The
@ExperimentalApiannotation and@opensearch.experimentalJavadoc tag are consistent with the other streaming classes in this PR.plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/StreamTransportExamplePlugin.java (1)
34-37: LGTM!Clean two-action registration using
List.of().server/src/main/java/org/opensearch/node/Node.java (1)
1697-1703: LGTM!The
streamTransportService.orElse(null)correctly unwraps theOptional<StreamTransportService>for theNodeClient.initialize()call, maintaining nullable semantics consistent with the feature-flag guard pattern used elsewhere (e.g., lines 1597–1601).plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/nodes/StreamNodesDataResponse.java (1)
24-50: Implementation follows standardBaseNodesResponsepattern — LGTM.The serialization/deserialization via
readNodesFrom/writeNodesTocorrectly delegates toNodeStreamDataResponse::newandout.writeList. Same triple-Javadoc issue as other files applies here too (lines 20–23, 26–29, 34–37).plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/nodes/NodeStreamDataResponse.java (1)
22-74: LGTM — serialization is correctly ordered and fields are properly immutable.Read order (
readString,readInt) matches write order (writeString,writeInt). The same triple-Javadoc and@Override-before-Javadoc issues noted in other files apply here as well.plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/nodes/NodeStreamDataRequest.java (1)
24-92: LGTM — serialization matches and the conversion constructor correctly copies parameters.Same Javadoc issues as other files apply. No functional concerns.
plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/nodes/StreamNodesDataAction.java (1)
20-30: LGTM — standard ActionType singleton pattern.Same duplicate Javadoc issue applies (lines 21–22, 24–25). Implementation is correct.
server/src/main/java/org/opensearch/action/support/StreamTransportAction.java (1)
24-34: Clean experimental base class design — LGTM on the overall structure.The class correctly extends
TransportAction, wires theStreamTransportServicetask manager, and provides a clear extension point viaexecuteStream. The@ExperimentalApiannotation appropriately signals the API's stability guarantees.server/src/main/java/org/opensearch/transport/client/node/NodeClient.java (2)
81-93: Deprecation approach looks clean.The backward-compatible overload correctly delegates to the new initialize with
nullforstreamTransportService. This preserves existing call sites while enabling the new streaming path.
195-212: The review comment's core concern is incorrect.StreamActionListenerdoes not extendActionListenerand is a completely separate interface. It defines its own contract withonStreamResponse(),onFinalResponse(), andonStreamFailure()methods. SinceStreamActionListeneris not a subtype ofActionListener, there is no risk ofActionListener.onFailure()being invoked on implementations ofStreamActionListener. TheexecuteStreammethod correctly passes the listener directly toStreamTransportAction.executeStream()without any contract violation.Likely an incorrect or invalid review comment.
plugins/examples/stream-transport-example/src/internalClusterTest/java/org/opensearch/example/stream/basic/StreamTransportExampleIT.java (1)
42-83: Test structure and async coordination look correct.The
CountDownLatchprovides a proper happens-before guarantee between the streaming callbacks and the assertion thread. The error handling path also counts down the latch, preventing indefinite hangs.plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/nodes/TransportStreamNodesDataAction.java (1)
86-111: Node streaming operation looks functionally correct.The loop properly sends batched responses, handles cancellation via
StreamErrorCode.CANCELLED, and forwards other exceptions to the channel. TheThread.sleepis fine for example code.plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/basic/TransportStreamDataAction.java (1)
129-153: Request handler looks correct.The
handleStreamRequestproperly batches responses, handlesStreamExceptionwithCANCELLEDcode check, and sends error responses for general exceptions. TheThread.sleepfor delays is acceptable in example code.server/src/main/java/org/opensearch/action/support/nodes/TransportStreamNodesAction.java (1)
199-203: Node resolution logic is standard and correct.The
resolveRequestmethod properly resolves node IDs from cluster state and converts them toDiscoveryNodeinstances, following the establishedTransportNodesActionpattern.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| public List<ActionHandler<?, ?>> getActions() { | ||
| return List.of(new ActionHandler<>(MyStreamAction.INSTANCE, TransportMyStreamAction.class)); | ||
| } |
There was a problem hiding this comment.
Minor: getActions() return type differs from actual plugin code.
The README shows List<ActionHandler<?, ?>> but the actual plugin code in StreamTransportExamplePlugin.java (line 33) uses List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>>. Consider aligning the documentation with the actual signature to avoid confusion for developers using this as a reference.
🤖 Prompt for AI Agents
In `@plugins/examples/stream-transport-example/README.md` around lines 114 - 116,
Update the README example signature to match the plugin's actual return type:
change the getActions() declaration shown to use List<ActionHandler<? extends
ActionRequest, ? extends ActionResponse>> so it aligns with the implementation
in StreamTransportExamplePlugin (refer to getActions and ActionHandler types)
and avoids confusion for readers; ensure the example instantiation (new
ActionHandler<>(MyStreamAction.INSTANCE, TransportMyStreamAction.class)) remains
unchanged.
| } | ||
|
|
||
| @LockFeatureFlag(STREAM_TRANSPORT) | ||
| @AwaitsFix(bugUrl = "flaky") |
There was a problem hiding this comment.
@AwaitsFix bugUrl should reference an actual issue.
"flaky" is not a valid URL. This should link to a tracked GitHub issue describing the flakiness root cause. Without a real issue, it's unclear when or how this will be addressed.
🤖 Prompt for AI Agents
In
`@plugins/examples/stream-transport-example/src/internalClusterTest/java/org/opensearch/example/stream/basic/StreamTransportExampleIT.java`
at line 41, The `@AwaitsFix` annotation on the StreamTransportExampleIT test uses
a non-URL bugUrl ("flaky"); update the `@AwaitsFix`(bugUrl = "...") to point to a
real tracked issue URL that documents the flakiness (e.g., the GitHub issue/PR
URL) or remove the annotation if no issue exists; ensure you edit the annotation
declaration on the StreamTransportExampleIT class so bugUrl contains a valid
HTTP(S) link to the issue.
| @LockFeatureFlag(STREAM_TRANSPORT) | ||
| @AwaitsFix(bugUrl = "flaky") |
There was a problem hiding this comment.
@AwaitsFix should reference an actual bug tracker URL, not a placeholder string.
bugUrl = "flaky" gives no traceability. File a GitHub issue describing the flakiness (which may be caused by the fail()/latch ordering issue above) and link it here.
🤖 Prompt for AI Agents
In
`@plugins/examples/stream-transport-example/src/internalClusterTest/java/org/opensearch/example/stream/nodes/StreamNodesTransportExampleIT.java`
around lines 43 - 44, The `@AwaitsFix` annotation on StreamNodesTransportExampleIT
(used alongside `@LockFeatureFlag`(STREAM_TRANSPORT)) currently uses a placeholder
bugUrl = "flaky"; create a real GitHub issue describing the flakiness (include
repro steps and suspected cause such as the fail()/latch ordering) and replace
the placeholder with the issue's full URL in the `@AwaitsFix`(bugUrl = "...")
value so the test points to a traceable bug report.
| @Override | ||
| public void onStreamFailure(Exception e) { | ||
| logger.error("Stream failed", e); | ||
| fail("Stream should not fail: " + e.getMessage()); | ||
| latch.countDown(); | ||
| } |
There was a problem hiding this comment.
latch.countDown() on line 70 is unreachable — fail() throws AssertionError.
fail() on line 69 throws an AssertionError, so latch.countDown() on line 70 is never executed. If a stream failure occurs, the test thread will block for the full 30-second timeout, then fail with the generic "Stream should complete" message instead of the actual error. This also explains why the test is flaky.
Proposed fix
`@Override`
public void onStreamFailure(Exception e) {
logger.error("Stream failed", e);
- fail("Stream should not fail: " + e.getMessage());
latch.countDown();
+ fail("Stream should not fail: " + e.getMessage());
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @Override | |
| public void onStreamFailure(Exception e) { | |
| logger.error("Stream failed", e); | |
| fail("Stream should not fail: " + e.getMessage()); | |
| latch.countDown(); | |
| } | |
| `@Override` | |
| public void onStreamFailure(Exception e) { | |
| logger.error("Stream failed", e); | |
| latch.countDown(); | |
| fail("Stream should not fail: " + e.getMessage()); | |
| } |
🤖 Prompt for AI Agents
In
`@plugins/examples/stream-transport-example/src/internalClusterTest/java/org/opensearch/example/stream/nodes/StreamNodesTransportExampleIT.java`
around lines 66 - 71, onStreamFailure currently calls fail("...") which throws
AssertionError before latch.countDown() can run; move the latch countdown so it
always executes (e.g., call latch.countDown() before invoking fail) or ensure
it's executed in a finally block around the fail call; update the
onStreamFailure handler (method name onStreamFailure, uses logger.error, fail(),
and latch.countDown()) so the latch is always released when a stream error
occurs.
| public TransportStreamNodesDataAction( | ||
| ThreadPool threadPool, | ||
| ClusterService clusterService, | ||
| @Nullable StreamTransportService streamTransportService, |
There was a problem hiding this comment.
@Nullable annotation contradicts the null-check in the constructor.
streamTransportService is annotated @Nullable (line 64) but the constructor throws IllegalStateException if it's null (line 80). Remove @Nullable since null is not actually acceptable here — it misleads callers into believing null is a valid argument.
Proposed fix
- `@Nullable` StreamTransportService streamTransportService,
+ StreamTransportService streamTransportService,Also applies to: 79-81
🤖 Prompt for AI Agents
In
`@plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/nodes/TransportStreamNodesDataAction.java`
at line 64, The `@Nullable` on the constructor parameter streamTransportService
contradicts the constructor's null-check in TransportStreamNodesDataAction
(which throws IllegalStateException if null); remove the `@Nullable` annotation
from the streamTransportService parameter (and any corresponding import) so the
signature reflects that null is not allowed, and ensure any other occurrences of
`@Nullable` for this parameter are removed or updated to match the non-null
contract.
| public void executeStream(NodesRequest request, StreamActionListener<NodesResponse> listener) { | ||
| if (request.concreteNodes() == null) { | ||
| resolveRequest(request, clusterService.state()); | ||
| assert request.concreteNodes() != null; | ||
| } | ||
|
|
||
| DiscoveryNode[] nodes = request.concreteNodes(); | ||
| request.setConcreteNodes(null); | ||
|
|
||
| if (nodes.length == 0) { | ||
| listener.onFinalResponse(); | ||
| return; | ||
| } |
There was a problem hiding this comment.
executeStream silently swallows node failures — no error propagation to listener.
When all nodes fail (via handleException or send-request exceptions), the failures list is populated but listener.onFinalResponse() is called without ever invoking listener.onStreamFailure(). The caller has no way to know that every node failed — they just get a "final response" signal with zero onStreamResponse calls.
Consider calling listener.onStreamFailure(...) if all nodes failed, or passing the accumulated failures to onFinalResponse.
🤖 Prompt for AI Agents
In
`@server/src/main/java/org/opensearch/action/support/nodes/TransportStreamNodesAction.java`
around lines 80 - 92, executeStream currently calls listener.onFinalResponse()
even when all remote node requests fail, so the accumulated failures (the
failures list built in handleException/send-request error paths) are never
surfaced; update executeStream to detect when nodes.length > 0 but every node
resulted in a failure and then invoke listener.onStreamFailure(...) with an
appropriate aggregated Exception (or pass the failures to the listener) instead
of silently calling listener.onFinalResponse(), or alternatively include the
failures in the payload passed to listener.onFinalResponse(); locate the
failure-accumulation logic around the failures list and
handleException/sendRequest code paths and ensure you call
listener.onStreamFailure (or augment onFinalResponse) when the failures list is
non-empty and no onStreamResponse was emitted for any node.
| public void handleStreamResponse(StreamTransportResponse<NodeResponse> streamResponse) { | ||
| try { | ||
| NodeResponse response; | ||
| while ((response = streamResponse.nextResponse()) != null) { | ||
| listener.onStreamResponse(newResponse(request, List.of(response), new ArrayList<>(failures))); | ||
| } | ||
| streamResponse.close(); | ||
| } catch (Exception e) { | ||
| streamResponse.cancel("Client error", e); | ||
| synchronized (failures) { | ||
| failures.add(new FailedNodeException(node.getId(), "Stream processing failed", e)); | ||
| } | ||
| } finally { | ||
| if (counter.decrementAndGet() == 0) { | ||
| listener.onFinalResponse(); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Unsynchronized read of failures list on line 113 — data race.
new ArrayList<>(failures) on line 113 iterates over the failures list without holding the synchronized(failures) lock. Concurrent handleException or error paths from other node handlers call synchronized(failures) { failures.add(...) }, which can structurally modify the list mid-iteration, causing ConcurrentModificationException or reading a corrupted state.
Proposed fix — synchronize the snapshot
public void handleStreamResponse(StreamTransportResponse<NodeResponse> streamResponse) {
try {
NodeResponse response;
while ((response = streamResponse.nextResponse()) != null) {
- listener.onStreamResponse(newResponse(request, List.of(response), new ArrayList<>(failures)));
+ List<FailedNodeException> failureSnapshot;
+ synchronized (failures) {
+ failureSnapshot = new ArrayList<>(failures);
+ }
+ listener.onStreamResponse(newResponse(request, List.of(response), failureSnapshot));
}
streamResponse.close();Alternatively, consider using a CopyOnWriteArrayList for failures to avoid manual synchronization entirely.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| public void handleStreamResponse(StreamTransportResponse<NodeResponse> streamResponse) { | |
| try { | |
| NodeResponse response; | |
| while ((response = streamResponse.nextResponse()) != null) { | |
| listener.onStreamResponse(newResponse(request, List.of(response), new ArrayList<>(failures))); | |
| } | |
| streamResponse.close(); | |
| } catch (Exception e) { | |
| streamResponse.cancel("Client error", e); | |
| synchronized (failures) { | |
| failures.add(new FailedNodeException(node.getId(), "Stream processing failed", e)); | |
| } | |
| } finally { | |
| if (counter.decrementAndGet() == 0) { | |
| listener.onFinalResponse(); | |
| } | |
| } | |
| } | |
| public void handleStreamResponse(StreamTransportResponse<NodeResponse> streamResponse) { | |
| try { | |
| NodeResponse response; | |
| while ((response = streamResponse.nextResponse()) != null) { | |
| List<FailedNodeException> failureSnapshot; | |
| synchronized (failures) { | |
| failureSnapshot = new ArrayList<>(failures); | |
| } | |
| listener.onStreamResponse(newResponse(request, List.of(response), failureSnapshot)); | |
| } | |
| streamResponse.close(); | |
| } catch (Exception e) { | |
| streamResponse.cancel("Client error", e); | |
| synchronized (failures) { | |
| failures.add(new FailedNodeException(node.getId(), "Stream processing failed", e)); | |
| } | |
| } finally { | |
| if (counter.decrementAndGet() == 0) { | |
| listener.onFinalResponse(); | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
In
`@server/src/main/java/org/opensearch/action/support/nodes/TransportStreamNodesAction.java`
around lines 109 - 126, The snapshot of the shared failures list is taken
unsafely in TransportStreamNodesAction.handleStreamResponse via new
ArrayList<>(failures) which can race with synchronized(failures){
failures.add(...) } in other handlers; fix by acquiring the same lock when
making the snapshot (e.g., wrap the creation of the ArrayList snapshot and the
listener.onStreamResponse call in synchronized(failures)) or replace the
failures list with a thread-safe implementation such as CopyOnWriteArrayList and
remove the manual synchronized blocks so that new ArrayList<>(failures) is safe.
🛠️ Refactor suggestion | 🟠 Major
streamResponse.close() should be in a finally block to avoid resource leaks.
Same pattern as noted in TransportStreamDataAction — if an exception is thrown between the while-loop exit and close() (e.g., from listener.onStreamResponse), the stream resource won't be released. The catch block calls cancel() but the happy-path close() on line 115 is not guaranteed.
Proposed fix
public void handleStreamResponse(StreamTransportResponse<NodeResponse> streamResponse) {
try {
NodeResponse response;
while ((response = streamResponse.nextResponse()) != null) {
- listener.onStreamResponse(newResponse(request, List.of(response), new ArrayList<>(failures)));
+ List<FailedNodeException> failureSnapshot;
+ synchronized (failures) {
+ failureSnapshot = new ArrayList<>(failures);
+ }
+ listener.onStreamResponse(newResponse(request, List.of(response), failureSnapshot));
}
- streamResponse.close();
} catch (Exception e) {
streamResponse.cancel("Client error", e);
synchronized (failures) {
failures.add(new FailedNodeException(node.getId(), "Stream processing failed", e));
}
+ } finally {
+ streamResponse.close();
if (counter.decrementAndGet() == 0) {
listener.onFinalResponse();
}
}
}🤖 Prompt for AI Agents
In
`@server/src/main/java/org/opensearch/action/support/nodes/TransportStreamNodesAction.java`
around lines 109 - 126, The handleStreamResponse method can leak the
StreamTransportResponse because streamResponse.close() is only called on the
happy path; move the close into a finally block to always release the resource.
Specifically, wrap the while-loop and listener.onStreamResponse calls inside a
try and call streamResponse.close() in a finally (or use try-with-resources if
StreamTransportResponse implements AutoCloseable) inside handleStreamResponse so
that close() runs regardless of exceptions; keep the existing catch behavior
that calls streamResponse.cancel(...) and adds a FailedNodeException to
failures, and ensure the counter.decrementAndGet() and
listener.onFinalResponse() logic remains in the outer finally.
| @Override | ||
| protected final void doExecute(Task task, Request request, ActionListener<Response> listener) { | ||
| listener.onFailure(new UnsupportedOperationException("Use executeStream() for streaming actions")); | ||
| } | ||
|
|
||
| /** | ||
| * Execute streaming request | ||
| */ | ||
| public abstract void executeStream(Request request, StreamActionListener<Response> listener); |
There was a problem hiding this comment.
executeStream lacks a Task parameter — streaming actions won't be trackable via the Task Management API.
doExecute receives a Task from the framework but discards it with an UnsupportedOperationException. The replacement executeStream method doesn't accept a Task, so streaming actions cannot participate in task tracking, cancellation, or resource accounting. If this is intentional for the experimental phase, consider adding a brief comment documenting why. Otherwise, threading a Task through would keep parity with the standard action lifecycle.
#!/bin/bash
# Check how executeStream is called from NodeClient to understand if Task is created upstream
ast-grep --pattern $'executeStream($$$) {
$$$
}'🤖 Prompt for AI Agents
In
`@server/src/main/java/org/opensearch/action/support/StreamTransportAction.java`
around lines 36 - 44, doExecute currently rejects streaming requests and
executeStream lacks a Task parameter, so streaming actions cannot be tracked or
canceled; update the abstract method signature in StreamTransportAction from
executeStream(Request request, StreamActionListener<Response> listener) to
include the Task (e.g., executeStream(Task task, Request request,
StreamActionListener<Response> listener), update any callers (including where
doExecute should forward the incoming Task into executeStream) to pass the Task
through, and ensure implementations use the Task for task
registration/cancellation; if omitting Task was intentional, add a comment on
StreamTransportAction.executeStream explaining why streaming actions are not
tracked.
|
❌ Gradle check result for 36012d9: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
36012d9 to
d7d3f5b
Compare
|
❗ AI-powered Code-Diff-Analyzer found issues on commit d7d3f5b.
The table above displays the top 10 most important findings. Pull Requests Author(s): Please update your Pull Request according to the report above. Repository Maintainer(s): You can Thanks. |
Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
d7d3f5b to
1e3ca36
Compare
|
❌ Gradle check result for 1e3ca36: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
This PR is stalled because it has been open for 30 days with no activity. |
Description
Adds
NodeClient.executeStream()API to enable seamless streaming transport action execution, following the same pattern as regularclient.execute(). Includes support for both basic streaming actions and multi-node streaming actions.Key Features:
client.executeStream(action, request, handler)pattern for invoking streaming actionsStreamTransportAction- Base class extendingTransportActionwith streaming supportHandledStreamTransportAction- Convenience class for automatic handler registrationStreamTransportNodesAction- Handles multi-node coordination automaticallyexecuteStream()methodExample Plugin:
Reorganized into two packages demonstrating both patterns:
basic/- Basic streaming action example (StreamTransportDataAction)nodes/- Multi-node streaming action example (StreamTransportNodesDataAction)Implementation Details:
StreamTransportServiceNodeClient.executeStream()retrieves action from registry and callsexecuteStreamRequest()transportService.getLocalNode()to get node with streaming port attributesRelated Issues
Resolves #19439
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.