Skip to content
Open
Show file tree
Hide file tree
Changes from 59 commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
d8a9424
Add streaming search with configurable scoring modes
atris Aug 25, 2025
3fe4f52
add javadocs
atris Aug 28, 2025
0b4d6a2
Fix changelog
atris Aug 28, 2025
f46f224
Fix forbidden APIs
atris Aug 28, 2025
9251736
Intermediate commit
atris Aug 30, 2025
ca8f639
Working intermediate commit
atris Aug 30, 2025
b8be3f6
Get streaming infra working
atris Aug 30, 2025
9cbdcbc
Phase 2
atris Aug 30, 2025
e967492
Intermediate commit
atris Aug 31, 2025
357848d
working commit
atris Sep 1, 2025
a30df2a
Working commit 2
atris Sep 2, 2025
c0d2a06
Cleanup
atris Sep 6, 2025
0e298e0
More cleanup
atris Sep 7, 2025
5061f09
Add streaming search with scoring using Hoeffding bounds
atris Sep 22, 2025
ac28752
Cleanup
atris Sep 24, 2025
3f78994
Add spotless output
atris Sep 24, 2025
9973de3
more cleanup
atris Sep 24, 2025
5edfe3c
Update per comments
atris Sep 27, 2025
6a4d92e
More cleanup
atris Sep 27, 2025
df7ad7b
Fix forbidden API issue
atris Sep 27, 2025
c084b56
Merge branch 'main' into streaming-scoring-clean
atris Oct 9, 2025
ad9c30d
Fix build issues
atris Oct 9, 2025
b4b16b0
More shenanigans
atris Oct 10, 2025
cbf228d
Remove confidence based streaming
atris Oct 11, 2025
dfcbbed
More cleanup
Oct 17, 2025
3d90216
Make spotless changes
Oct 17, 2025
dc5e1e8
Intermittent commit
atris Nov 4, 2025
938951f
4 to go
atris Nov 5, 2025
9233200
use global ordinals; fix per-leaf reset; enable under concurrent sear…
atris Nov 11, 2025
14f81b1
Fix more tests
atris Nov 11, 2025
a6a21b0
More tests fixes and cleanup
atris Nov 11, 2025
d751f73
Fix reindexing tests
atris Nov 20, 2025
cd0f276
Some tests pass
atris Jan 11, 2026
169bc33
Fix FlushModeResolver tests
atris Jan 11, 2026
1da8e37
Yet more fixes
atris Jan 12, 2026
b202193
Cleanup
atris Jan 12, 2026
4ce9a97
Merge remote-tracking branch 'origin/main' into streaming-scoring-clean
atris Jan 12, 2026
b4ef9e8
Fix compilation errors and merge conflicts after upstream merge
atris Jan 12, 2026
f851389
Spotless clean up
atris Jan 13, 2026
ae1b22f
Remove forbidden APIs
atris Jan 13, 2026
c967cbb
Sigh more test fixes
atris Jan 16, 2026
aceb756
More fixes
atris Jan 16, 2026
774588a
Spotless fixes
atris Jan 16, 2026
ae6c911
Yet more fixes
atris Jan 19, 2026
d0510cd
More cleanup
atris Jan 19, 2026
5352427
Miscellaneous refactoring
atris Jan 20, 2026
201110a
More refactor
atris Jan 21, 2026
3e1079a
Explicitly set partial to true
atris Jan 21, 2026
2fbd384
Revert silent drop of partial packets
atris Jan 21, 2026
edeabe5
Spotless changes
atris Jan 22, 2026
c204273
Merge branch 'main' into streaming-scoring-clean
atris Jan 22, 2026
5c095cf
Cleanup
atris Jan 22, 2026
6af9270
Fix serialization issue
atris Jan 22, 2026
87a3712
Streaming multiple partial results at coordinator and test fixes
Jan 28, 2026
b2f1903
Merge branch 'main' into streaming-scoring-clean
atris Jan 28, 2026
6d822bf
Multi shard failing tests
atris Jan 30, 2026
49cbe0f
More misc fixes
atris Feb 1, 2026
a72741d
Remove StreamingPerformanceBenchmarkTests.java to address reviewer fe…
Feb 26, 2026
737d8a4
Cleanup
Mar 3, 2026
0f5c5c2
Revert SearchProgressListener API break for source compatibility
Mar 6, 2026
6fc677b
Cleanup
Mar 13, 2026
b7f3128
More cleanup
Mar 17, 2026
e3fe3ae
Merge origin/main into streaming-scoring-clean
Mar 17, 2026
2e09c6b
Add missing javadoc
Mar 17, 2026
e9c1472
Remove transport and bound-provider changes from streaming PR
Mar 18, 2026
0092c49
More cleanup
Mar 18, 2026
0998491
Remove Streaming Search metrics
Mar 19, 2026
582a4c3
Trim unrelated plugin and reindex test churn from streaming PR
Mar 19, 2026
208eed3
MOre cleanup
Mar 19, 2026
560c347
Yet more cleanup
Mar 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.opensearch.action.search.StreamSearchTransportService.STREAM_SEARCH_ENABLED;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
Expand Down Expand Up @@ -383,6 +384,63 @@ public void testMissingSources() {
assertThat(response, matcher().deleted(0).slices(hasSize(0)));
}

/**
* Regression test to ensure delete-by-query works correctly even when streaming search is enabled.
* Since delete-by-query uses scroll searches, it should automatically opt out of the streaming pipeline.
*/
public void testDeleteByQueryWithStreamingEnabled() throws Exception {
// Enable streaming search globally
Settings streamingSettings = Settings.builder().put(STREAM_SEARCH_ENABLED.getKey(), true).build();
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(streamingSettings).get());

try {
// Index some test documents
indexRandom(
true,
client().prepareIndex("test").setId("1").setSource("foo", "delete_me"),
client().prepareIndex("test").setId("2").setSource("foo", "delete_me"),
client().prepareIndex("test").setId("3").setSource("foo", "keep_me")
);

assertHitCount(client().prepareSearch("test").setSize(0).get(), 3);

// Perform delete-by-query - this should work without "topDocs already consumed" errors
BulkByScrollResponse response = deleteByQuery().source("test").filter(termQuery("foo", "delete_me")).refresh(true).get();

// Verify the delete operation succeeded
assertThat(response, matcher().deleted(2));
assertHitCount(client().prepareSearch("test").setSize(0).get(), 1);

// Verify only the expected document remains
assertHitCount(client().prepareSearch("test").setQuery(termQuery("foo", "keep_me")).get(), 1);
} finally {
// Comprehensive cleanup: reset both transient and persistent streaming settings
Settings disableStreamingSettings = Settings.builder().putNull(STREAM_SEARCH_ENABLED.getKey()).build();
try {
// Clear transient settings
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(disableStreamingSettings).get());
// Clear persistent settings (in case they were set)
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(disableStreamingSettings).get());

// Assert the setting is cleared to prevent test pollution
assertFalse(
"Stream search should be disabled after cleanup",
client().admin()
.cluster()
.prepareState()
.get()
.getState()
.getMetadata()
.transientSettings()
.getAsBoolean(STREAM_SEARCH_ENABLED.getKey(), false)
);
} catch (Exception cleanupException) {
// Log cleanup failures but don't fail the test
logger.warn("Failed to clean up streaming settings", cleanupException);
}
}
}

/** Enables or disables the cluster disk allocation decider **/
private void setDiskAllocationDeciderEnabled(boolean value) {
Settings settings = value
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.opensearch.arrow.flight.transport;

import org.apache.arrow.flight.FlightRuntimeException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.BytesStreamOutput;
Expand All @@ -40,10 +42,13 @@

/**
* Outbound handler for Arrow Flight streaming responses.
* It must invoke messageListener and relay any exception back to the caller and not supress them
* It must invoke messageListener and relay any exception back to the caller and
* not supress them
*
* @opensearch.internal
*/
class FlightOutboundHandler extends ProtocolOutboundHandler {
private static final Logger logger = LogManager.getLogger(FlightOutboundHandler.class);
private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER;
private final String nodeName;
private final Version version;
Expand Down Expand Up @@ -157,6 +162,19 @@ private void processBatchTask(BatchTask task) {
try {
try (VectorStreamOutput out = new VectorStreamOutput(flightChannel.getAllocator(), flightChannel.getRoot())) {
task.response().writeTo(out);
if (task.response() instanceof org.opensearch.search.query.QuerySearchResult) {
logger.info(
"QuerySearchResult hasAggs: {}",
((org.opensearch.search.query.QuerySearchResult) task.response()).hasAggs()
);
}
logger.info(
"Sending batch for requestId [{}], action [{}], items [{}], rows [{}]",
task.requestId(),
task.action(),
task.response(),
out.getRoot().getRowCount()
);
flightChannel.sendBatch(getHeaderBuffer(task.requestId(), task.nodeVersion(), task.features()), out);
messageListener.onResponseSent(task.requestId(), task.action(), task.response());
}
Expand Down Expand Up @@ -292,7 +310,8 @@ public void setMessageListener(TransportMessageListener listener) {
}

private ByteBuffer getHeaderBuffer(long requestId, Version nodeVersion, Set<String> features) throws IOException {
// Just a way( probably inefficient) to serialize header to reuse existing logic present in
// Just a way( probably inefficient) to serialize header to reuse existing logic
// present in
// NativeOutboundMessage.Response#writeVariableHeader()
NativeOutboundMessage.Response headerMessage = new NativeOutboundMessage.Response(
threadPool.getThreadContext(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,21 @@ public Map<String, Supplier<Transport>> getTransports(
return Collections.emptyMap();
}

@Override
public Map<String, Supplier<Transport>> getStreamTransports(
Settings settings,
ThreadPool threadPool,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService,
Tracer tracer
) {
// Return empty map since getTransports() already registers the FLIGHT transport
// The NetworkModule will find it via getStreamTransportSupplier() which looks in transportFactories
return Collections.emptyMap();
}

/**
* Gets the auxiliary transports for the FlightStream plugin.
* @param settings The settings for the plugin.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@
import static org.opensearch.arrow.flight.transport.ClientHeaderMiddleware.CORRELATION_ID_KEY;

/**
* Arrow Flight implementation of streaming transport responses.
*
* <p>
* Handles streaming responses from Arrow Flight servers with lazy batch
* processing.
* Headers are extracted when first accessed, and responses are deserialized on
* demand.

* Streaming transport response implementation using Arrow Flight.
* Manages Flight stream lifecycle with lazy initialization and prefetching support.
*/
Expand Down Expand Up @@ -164,9 +172,13 @@ public void close() {
if (flightStream != null) {
try {
flightStream.close();
} catch (IllegalStateException ignore) {} catch (Exception e) {
} catch (IllegalStateException ignore) {
// this is fine if the allocator is already closed
} catch (Exception e) {
throw new StreamException(StreamErrorCode.INTERNAL, "Error closing flight stream", e);
}
}
closed = true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,14 @@
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchScrollAction;
import org.opensearch.action.search.StreamSearchAction;
import org.opensearch.action.search.StreamTransportSearchAction;
import org.opensearch.action.search.TransportClearScrollAction;
import org.opensearch.action.search.TransportCreatePitAction;
import org.opensearch.action.search.TransportDeletePitAction;
import org.opensearch.action.search.TransportGetAllPitsAction;
import org.opensearch.action.search.TransportMultiSearchAction;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.search.TransportSearchScrollAction;
import org.opensearch.action.search.TransportStreamSearchAction;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.AutoCreateIndex;
import org.opensearch.action.support.DestructiveOperations;
Expand Down Expand Up @@ -741,9 +741,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(MultiGetAction.INSTANCE, TransportMultiGetAction.class, TransportShardMultiGetAction.class);
actions.register(BulkAction.INSTANCE, TransportBulkAction.class, TransportShardBulkAction.class);
actions.register(SearchAction.INSTANCE, TransportSearchAction.class);
if (FeatureFlags.isEnabled(FeatureFlags.STREAM_TRANSPORT)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we remove it?

actions.register(StreamSearchAction.INSTANCE, StreamTransportSearchAction.class);
}
actions.register(StreamSearchAction.INSTANCE, TransportStreamSearchAction.class);
actions.register(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);
actions.register(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class);
actions.register(ExplainAction.INSTANCE, TransportExplainAction.class);
Expand Down
Loading
Loading