-
Notifications
You must be signed in to change notification settings - Fork 2.4k
TopN selection for streaming terms aggregations #20481
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TopN selection for streaming terms aggregations #20481
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the
📝 WalkthroughWalkthroughThis pull request introduces segment-level top-N bucket filtering for streaming aggregations and refactors the Arrow Flight transport layer to support asynchronous prefetch-based response processing. Key updates include new streaming aggregation settings, changes to stream response handling mechanisms, and comprehensive test coverage for sub-aggregation ordering scenarios. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Suggested labels
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 10
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/VectorStreamOutput.java (1)
78-82:close()discards buffered data without flushing.If
close()is called afterwriteByte()calls but beforegetRoot(), buffered data intempBufferis silently lost. Consider flushing or at least logging a warning iftempBufferPos > 0.Proposed fix
`@Override` public void close() throws IOException { + if (tempBufferPos > 0) { + // Log or flush pending data to avoid silent data loss + flushTempBuffer(); + } row = 0; vector.close(); }plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightOutboundHandler.java (1)
1-15: Duplicate SPDX license header.The license block appears twice (lines 1-7 and 9-15).
Proposed fix
Remove lines 9-15 (the duplicate license block).
🤖 Fix all issues with AI agents
In `@CHANGELOG.md`:
- Line 49: Add a changelog entry documenting the TopN selection feature
introduced in this PR: describe "TopN selection at segment level for streaming
terms aggregations using quickselect", list the new index setting
`index.aggregation.streaming.min_shard_size` (default: 1000), and note benefits
(reduced data transfer and coordinator memory/CPU for high-cardinality
aggregations) plus support for sub-aggregations and metric aggregations with
TopN; also update the PR reference(s) to clearly mention both the base stream
transport fixes (`#20359`) and the TopN feature PR (`#20481`) so readers can follow
both changes.
In
`@plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/streaming/aggregation/SubAggregationIT.java`:
- Around line 197-220: The bulk indexing loop uses
client().bulk(orderBulkRequest).actionGet() without inspecting the BulkResponse,
risking silent failures; modify the loop to capture the BulkResponse from
client().bulk(...).actionGet(), check response.hasFailures() and/or iterate
response.getItems() for isFailed(), and fail the test or throw an exception (or
log and assert) when any item failed. Apply this pattern to the BulkRequest
usages in this method (the loop creating 3 segments) and the other similar
locations referenced (around the other BulkRequest blocks) so partial indexing
failures cannot silently break downstream assertions.
- Around line 781-783: The comment in
testNumericOrderByMaxSubAggregationAscending incorrectly says it reuses the
prior index; update the comment to accurately state that the test creates a new
index named "numeric_order_test2" (or remove the reuse wording) next to the
index creation code (look for testNumericOrderByMaxSubAggregationAscending and
the numericIndexSettings/index name variables) so the comment matches the actual
behavior.
- Around line 173-220: The test creates an "order_test" index in setUp() but
never deletes it, causing "index already exists" on subsequent runs; update the
tearDown() method to delete the "order_test" index (like the numeric_order_test
cleanup) by calling the cluster client to delete the index (e.g.,
client().admin().indices().delete(...)) or using the same helper used for
numeric_order_test removal, ensuring deletion is conditional/ignored if the
index doesn't exist to avoid flakiness.
In
`@plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightClientChannel.java`:
- Around line 254-291: The task exceptions from openStreamAndInvokeHandler
(e.g., header null or handler.handleStreamResponse throwing) must be propagated
to the response handler instead of being rethrown; update the Runnable so that
any Exception is caught, call cleanupStreamResponse(streamResponse) and then
call handleStreamException(streamResponse, e) (wrapping non-Exception Throwables
as needed, similar to earlier usage) before returning, and ensure the same
behavior occurs whether the task is executed inline (ThreadPool.Names.SAME) or
submitted to threadPool.executor(executor). Use the existing symbols:
openStreamAndInvokeHandler, handleStreamException, cleanupStreamResponse,
handler.handleStreamResponse, StreamException/StreamErrorCode.INTERNAL to locate
and implement the change.
In
`@plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightServerChannel.java`:
- Around line 67-68: FlightServerChannel currently calls
Long.parseLong(middleware.getCorrelationId()) which can throw
NullPointerException or NumberFormatException for null or non-numeric headers;
update the FlightServerChannel constructor to defensively read
middleware.getCorrelationId(), check for null/empty, and wrap
Long.parseLong(...) in a try/catch(NumberFormatException) block (or use
Long.parseLong only if StringUtils.isNumeric), logging a warning via logger.warn
including the raw header and stacktrace, and assign a safe fallback value to the
correlationId field (e.g., -1L or a generated id) so parsing failures do not
crash construction.
- Around line 126-146: The computed putNextTime is already in milliseconds
(putNextTime = (System.nanoTime() - batchStartTime) / 1_000_000) but the logger
calls divide it again by 1_000_000 producing near-zero values; update the two
logger.debug invocations in FlightServerChannel to use putNextTime directly
(remove the extra "/ 1_000_000" in the third format argument), and also fix the
else-branch logger format to remove the stray "bytes," placeholder or supply the
correct size argument so the format string and arguments match (references:
putNextTime, batchStartTime, callTracker,
FlightUtils.calculateVectorSchemaRootSize, and the logger.debug calls).
In
`@plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransport.java`:
- Around line 413-416: The createEventLoopGroup method currently builds a custom
ThreadFactory producing non-daemon threads; replace the custom factory with
Netty's DefaultThreadFactory to standardize thread configuration but preserve
non-daemon semantics (use new DefaultThreadFactory(name) rather than
DefaultThreadFactory(name, true)), or if you intentionally want daemon threads,
document that decision in the method javadoc and switch to new
DefaultThreadFactory(name, true); update the ThreadFactory in
createEventLoopGroup accordingly so MultiThreadIoEventLoopGroup(threads,
threadFactory, ...) uses the chosen default factory and behavior is explicit.
In
`@plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/VectorStreamOutput.java`:
- Around line 73-76: The flush() method in VectorStreamOutput currently does
nothing, leaving bytes buffered by writeByte() uncommitted; update
VectorStreamOutput.flush() to flush the temp buffer into the main buffer (commit
any pending single-byte buffer contents to the underlying Byte/Vector buffer)
and reset/clear the temp buffer so subsequent getRoot() sees all written data;
locate the temp buffer/fields used by writeByte() and call the same commit logic
there (or extract it to a private commitTempBuffer() helper) from both
writeByte() and flush() to avoid duplication and ensure flush() guarantees all
buffered bytes are emitted before returning.
In
`@server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregator.java`:
- Around line 95-116: ensureOrdinalComparator currently leaves ordinalComparator
null when isKeyOrder(order) is true, causing selection to fall back to doc-count
comparisons; update ensureOrdinalComparator so that when key ordering is
requested it sets ordinalComparator to compare bucket ordinals (respecting
ascending/descending key order) instead of leaving it null (or alternatively
mark to bypass quickselect for key order). Concretely, inside
ensureOrdinalComparator add a branch for isKeyOrder(order) that constructs
tempBucket1/tempBucket2 (or reuses existing ones) with compareKey implemented
and set ordinalComparator to compare leftOrd/rightOrd by assigning
bucketOrd/docCount and returning compareKey (or reverse the result for
descending), ensuring the comparator uses bucketOrd (and not bucketDocCount or
partiallyBuiltBucketComparator) when the ordering is key-based.
🧹 Nitpick comments (5)
plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/streaming/aggregation/SubAggregationIT.java (1)
706-996: Ensure numeric test indices are deleted even on assertion failure.
Deletion happens only at the end of each test. If an assertion fails mid-test, the index remains and can affect later tests. Wrapping each test body intry/finallyimproves reliability.🧹 Example pattern (apply to numeric_order_test*, similarly)
- client().admin().indices().create(numericIndexRequest).actionGet(); - client().admin() - .cluster() - .prepareHealth("numeric_order_test") - .setWaitForGreenStatus() - .setTimeout(TimeValue.timeValueSeconds(30)) - .get(); - - // Create 3 segments - for (int seg = 0; seg < 3; seg++) { - ... - } - - TermsAggregationBuilder agg = terms("categories")... - SearchResponse resp = client().prepareStreamSearch("numeric_order_test")... - ... - client().admin() - .indices() - .delete(new org.opensearch.action.admin.indices.delete.DeleteIndexRequest("numeric_order_test")) - .actionGet(); + client().admin().indices().create(numericIndexRequest).actionGet(); + client().admin() + .cluster() + .prepareHealth("numeric_order_test") + .setWaitForGreenStatus() + .setTimeout(TimeValue.timeValueSeconds(30)) + .get(); + + try { + // Create 3 segments + for (int seg = 0; seg < 3; seg++) { + ... + } + + TermsAggregationBuilder agg = terms("categories")... + SearchResponse resp = client().prepareStreamSearch("numeric_order_test")... + ... + } finally { + client().admin() + .indices() + .delete(new org.opensearch.action.admin.indices.delete.DeleteIndexRequest("numeric_order_test")) + .actionGet(); + }plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/ServerConfig.java (1)
199-211: Incomplete settings export —FLIGHT_THREAD_POOL_MAX_SIZEandFLIGHT_THREAD_POOL_KEEP_ALIVEare missing.The
getSettings()method now exportsFLIGHT_THREAD_POOL_MIN_SIZEbut omitsFLIGHT_THREAD_POOL_MAX_SIZEandFLIGHT_THREAD_POOL_KEEP_ALIVE. All three are used ininit()(lines 143-146). If the min size setting is needed externally, the max size and keep-alive settings likely are too.Proposed fix
public static List<Setting<?>> getSettings() { return new ArrayList<>( Arrays.asList( ARROW_ALLOCATION_MANAGER_TYPE, ARROW_ENABLE_NULL_CHECK_FOR_GET, ARROW_ENABLE_DEBUG_ALLOCATOR, ARROW_ENABLE_UNSAFE_MEMORY_ACCESS, ARROW_SSL_ENABLE, FLIGHT_EVENT_LOOP_THREADS, - FLIGHT_THREAD_POOL_MIN_SIZE + FLIGHT_THREAD_POOL_MIN_SIZE, + FLIGHT_THREAD_POOL_MAX_SIZE, + FLIGHT_THREAD_POOL_KEEP_ALIVE ) ); }plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransportResponse.java (1)
164-170: Silent swallow ofIllegalStateExceptionin close().The empty catch block at line 167 silently ignores
IllegalStateException. Consider adding a brief comment explaining when this is expected (e.g., stream already closed), or log at trace level.Proposed improvement
if (flightStream != null) { try { flightStream.close(); - } catch (IllegalStateException ignore) {} catch (Exception e) { + } catch (IllegalStateException e) { + // Expected if stream was already closed or never fully initialized + logger.trace("Ignoring IllegalStateException on close", e); + } catch (Exception e) { throw new StreamException(StreamErrorCode.INTERNAL, "Error closing flight stream", e); } }server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregator.java (2)
287-303: O(n·m) lookup when building selected buckets can be improved.After quickselect, the code iterates through all ordinals and for each checks against all selected ordinals (lines 291-300), resulting in O(n·m) complexity where n = total buckets and m = segmentSize. For high-cardinality fields, this could be a performance bottleneck.
The
StreamStringTermsAggregatoruses a more efficient approach by marking selected indices in thereusableIndicesarray and then iterating once through ordinals to check the mark.♻️ Suggested improvement using marking approach
- // Collect selected ordinals - int[] selectedOrdinals = new int[segmentSize]; - for (int i = 0; i < segmentSize; i++) { - selectedOrdinals[i] = reusableIndices.get(i); - } - - // Build result by finding values for selected ordinals - ordsEnum = bucketOrds.ordsEnum(owningBucketOrd); - List<B> result = new ArrayList<>(segmentSize); - long selectedDocCount = 0; - while (ordsEnum.next()) { - for (int selectedOrd : selectedOrdinals) { - if (ordsEnum.ord() == selectedOrd) { - long docCount = StreamNumericTermsAggregator.this.bucketDocCount(ordsEnum.ord()); - result.add(buildFinalBucket(ordsEnum.ord(), ordsEnum.value(), docCount, owningBucketOrd)); - selectedDocCount += docCount; - break; - } - } - } + // Mark selected ordinals in reusableIndices + int[] selected = new int[segmentSize]; + for (int i = 0; i < segmentSize; i++) { + selected[i] = reusableIndices.get(i); + } + + reusableIndices.fill(0, totalBuckets, 0); + for (int i = 0; i < segmentSize; i++) { + reusableIndices.set(selected[i], 1); + } + + // Build result by iterating once and checking marks - O(n) + ordsEnum = bucketOrds.ordsEnum(owningBucketOrd); + List<B> result = new ArrayList<>(segmentSize); + long selectedDocCount = 0; + while (ordsEnum.next()) { + if (reusableIndices.get((int) ordsEnum.ord()) == 1) { + long docCount = StreamNumericTermsAggregator.this.bucketDocCount(ordsEnum.ord()); + result.add(buildFinalBucket(ordsEnum.ord(), ordsEnum.value(), docCount, owningBucketOrd)); + selectedDocCount += docCount; + } + }
422-445: Consider extracting duplicateensureOrdinalComparatorpattern.The three
ensureOrdinalComparatorimplementations inLongTermsResults,DoubleTermsResults, andUnsignedLongTermsResultsfollow an identical pattern, differing only in bucket type instantiation. While this duplication is acceptable for type safety, if this pattern grows more complex, consider extracting the common logic to reduce maintenance burden.Also applies to: 528-551, 640-663
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (21)
CHANGELOG.mdplugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/streaming/aggregation/SubAggregationIT.javaplugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/bootstrap/ServerConfig.javaplugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightClientChannel.javaplugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightOutboundHandler.javaplugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightServerChannel.javaplugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightStreamPlugin.javaplugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransport.javaplugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransportResponse.javaplugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/ServerHeaderMiddleware.javaplugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/VectorStreamOutput.javaplugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/ArrowStreamSerializationTests.javaplugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/FlightClientChannelTests.javaplugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/FlightTransportTestBase.javaserver/src/main/java/org/opensearch/common/settings/IndexScopedSettings.javaserver/src/main/java/org/opensearch/index/IndexSettings.javaserver/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregator.javaserver/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregator.javaserver/src/test/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregatorTests.javaserver/src/test/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregatorTests.javatest/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-12-13T20:16:15.318Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4Http3ServerTransport.java:101-123
Timestamp: 2025-12-13T20:16:15.318Z
Learning: In OpenSearch, only one HTTP transport implementation can be active and loaded at a time, so duplicate setting definitions (such as h3.max_stream_local_length, h3.max_stream_remote_length, and h3.max_streams) across different transport implementations like Netty4Http3ServerTransport and ReactorNetty4HttpServerTransport will not cause setting registration conflicts.
Applied to files:
plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/FlightTransportTestBase.java
📚 Learning: 2026-01-13T17:40:27.167Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20411
File: server/src/main/java/org/opensearch/index/codec/CodecService.java:112-133
Timestamp: 2026-01-13T17:40:27.167Z
Learning: Avoid capturing or evaluating a supplier (e.g., this::defaultCodec) upfront when passing it to a registry during object construction. If registries may replace defaults during iteration (as in EnginePlugin.getAdditionalCodecs), pass the supplier itself and only resolve it at use time. This ensures dynamic behavior is preserved during initialization and prevents premature binding of defaults in codecs/registry setup. This pattern should apply to similar initialization paths in Java server code where registries may mutate defaults during construction.
Applied to files:
server/src/main/java/org/opensearch/index/IndexSettings.javaserver/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregator.javaserver/src/main/java/org/opensearch/common/settings/IndexScopedSettings.javaserver/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregator.java
📚 Learning: 2025-12-02T22:44:14.799Z
Learnt from: prudhvigodithi
Repo: opensearch-project/OpenSearch PR: 20112
File: server/src/internalClusterTest/java/org/opensearch/search/slice/SearchSliceIT.java:73-81
Timestamp: 2025-12-02T22:44:14.799Z
Learning: In OpenSearch integration tests extending OpenSearchIntegTestCase, using `LuceneTestCase.SuppressCodecs("*")` triggers special handling that selects a random production codec from the CODECS array, while `SuppressCodecs("Asserting")` or other specific codec suppressions still allow Lucene's default codec randomization which may include the asserting codec. Use `SuppressCodecs("*")` when you need to completely avoid asserting codecs (e.g., for cross-thread StoredFieldsReader usage) while maintaining production codec test coverage.
Applied to files:
server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregatorTests.java
🧬 Code graph analysis (4)
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransportResponse.java (3)
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/ClientHeaderMiddleware.java (1)
ClientHeaderMiddleware(35-127)plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightErrorMapper.java (1)
FlightErrorMapper(31-116)plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightUtils.java (1)
FlightUtils(13-30)
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightServerChannel.java (1)
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightUtils.java (1)
FlightUtils(13-30)
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightClientChannel.java (1)
server/src/main/java/org/opensearch/threadpool/ThreadPool.java (1)
Names(99-129)
server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregator.java (2)
libs/common/src/main/java/org/opensearch/common/Numbers.java (1)
Numbers(43-287)server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregator.java (2)
ResultStrategy(216-412)SelectionResult(271-279)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: gradle-check
🔇 Additional comments (38)
plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/ArrowStreamSerializationTests.java (1)
53-53: LGTM!The change from
Optional.empty()tonullcorrectly aligns with the updatedVectorStreamOutputconstructor signature that now accepts a nullableVectorSchemaRootdirectly instead of anOptional<VectorSchemaRoot>.plugins/arrow-flight-rpc/src/internalClusterTest/java/org/opensearch/streaming/aggregation/SubAggregationIT.java (1)
619-704: Ordering coverage for string categories looks solid.
The max/cardinality/no-sort scenarios are well targeted and read clearly.plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/ServerHeaderMiddleware.java (1)
40-42: LGTM!The new accessor aligns with the correlation ID tracking pattern used across the transport layer (e.g.,
CORRELATION_ID_KEYinClientHeaderMiddleware).plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightOutboundHandler.java (1)
217-219: LGTM!The
completeStreamcall now correctly passes the header buffer, aligning with the updatedFlightServerChannel.completeStream(ByteBuffer)signature.plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransportResponse.java (2)
77-108: Async prefetch implementation looks sound.The double-checked locking with
volatile prefetchStartedis correct. Using virtual threads for the blockinggetStream()andnext()calls is appropriate for this I/O-bound operation.
114-141: LGTM!The
nextResponse()method correctly handles the prefetch state viafirstBatchConsumed, includes slow-log monitoring, and properly maps exceptions.plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/FlightClientChannelTests.java (1)
558-561: Good: framework-level errors now unblock the test.Capturing the exception and counting down the latch avoids hangs and validates the expected failure path.
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightStreamPlugin.java (1)
363-369: LGTM: publish host/port settings are now exposed.This makes the new publish-port configuration available through the plugin settings list.
test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java (1)
502-505: Good: IndexShard now returns indexSettings in tests.This keeps SearchContext mocks aligned with index settings usage in new streaming logic.
server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java (1)
171-176: LGTM: streaming min shard size setting is registered.Keeps index-scoped validation consistent with the new setting.
plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/FlightTransportTestBase.java (1)
107-111: Good: TaskManager now returns a StoredContext for task start.This aligns the mock with thread-context handling used by stream transport tests.
plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransport.java (1)
136-138: LGTM: worker ELG sizing is now CPU-aligned.This keeps event-loop sizing predictable and avoids oversubscription.
server/src/main/java/org/opensearch/index/IndexSettings.java (5)
636-647: LGTM: streaming min shard size setting is well-scoped.
1014-1017: LGTM: cached field for streaming min shard size.
1187-1187: LGTM: initialization from scoped settings.
1320-1320: LGTM: dynamic update consumer registered.
2074-2083: LGTM: getter/setter exposure for the new setting.server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregator.java (11)
16-19: LGTM: selection-related imports added.
57-59: LGTM: ordinal comparator temp buckets are in place.
85-92: LGTM: reset releases reusable buffers and clears comparator state.
124-128: LGTM: segment size honors shard size minimum.
145-145: LGTM: valueCount captured per-segment.
220-230: LGTM: reusable index buffer is allocated/grown via big arrays.
234-260: LGTM: TopN selection integrated with otherDocCount tracking.
271-279: LGTM: SelectionResult bundles buckets and otherDocCount cleanly.
281-359: LGTM: quickselect path and otherDocCount accounting look solid.
361-364: LGTM: reusableIndices are released on strategy close.
506-509: LGTM: aggregator closes result strategy.plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightClientChannel.java (3)
47-48: LGTM: global channel counter for correlation ID seeding.
115-119: LGTM: correlation IDs seeded with timestamp + channel ID.
237-239: LGTM: sendMessage delegates to async stream-open path.server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregatorTests.java (4)
23-50: LGTM: test imports cover new settings and cardinality scenarios.
1158-1161: LGTM: updated expected counts for reduce with sub-aggregations.
1356-1357: LGTM: explicit assertion for multi-segment guard.
1511-2058: Great coverage for TopN ordering, cardinality, and minDocCount cases.server/src/test/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregatorTests.java (1)
1771-2649: LGTM! Comprehensive test coverage for TopN selection.The new test methods provide excellent coverage of the segment-level TopN filtering feature:
- Various sub-aggregation orderings (max, min, avg, sum, cardinality) in both ascending and descending modes
- Different numeric types (Long, Double, UnsignedLong)
minDocCountfiltering behavior- Correct
otherDocCountcalculation for excluded bucketsThe test data setup and assertions are consistent and verify both the bucket selection logic and the key-based sorting at shard level.
server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregator.java (2)
129-137: LGTM!The
getSegmentSize()method correctly enforces the minimum shard size setting while respecting the user-requested shard size, ensuring accuracy trade-offs are configurable.
85-89: LGTM! Proper resource cleanup in doReset.The
doResetmethod now properly closes bothbucketOrdsandresultStrategyusingReleasables.close(), which handles null values and prevents resource leaks across segment boundaries.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
...-rpc/src/internalClusterTest/java/org/opensearch/streaming/aggregation/SubAggregationIT.java
Show resolved
Hide resolved
...-rpc/src/internalClusterTest/java/org/opensearch/streaming/aggregation/SubAggregationIT.java
Outdated
Show resolved
Hide resolved
...-rpc/src/internalClusterTest/java/org/opensearch/streaming/aggregation/SubAggregationIT.java
Outdated
Show resolved
Hide resolved
...rrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightClientChannel.java
Show resolved
Hide resolved
...rrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightServerChannel.java
Show resolved
Hide resolved
...rrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightServerChannel.java
Show resolved
Hide resolved
...ns/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightTransport.java
Show resolved
Hide resolved
...arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/VectorStreamOutput.java
Show resolved
Hide resolved
...c/main/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregator.java
Show resolved
Hide resolved
|
❌ Gradle check result for 708675b: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
708675b to
7e85515
Compare
|
❌ Gradle check result for 7e85515: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #20481 +/- ##
============================================
+ Coverage 73.25% 73.35% +0.10%
- Complexity 71979 72092 +113
============================================
Files 5796 5796
Lines 329287 329539 +252
Branches 47419 47465 +46
============================================
+ Hits 241203 241725 +522
+ Misses 68759 68502 -257
+ Partials 19325 19312 -13 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
...c/main/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregator.java
Show resolved
Hide resolved
.../main/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregator.java
Show resolved
Hide resolved
.../main/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregator.java
Show resolved
Hide resolved
.../main/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregator.java
Outdated
Show resolved
Hide resolved
.../main/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregator.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/opensearch/search/aggregations/bucket/terms/StreamStringTermsAggregator.java
Show resolved
Hide resolved
06e3990 to
46cf3f6
Compare
|
❌ Gradle check result for 46cf3f6: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
46cf3f6 to
7277447
Compare
7277447 to
b27f4a9
Compare
|
❌ Gradle check result for b27f4a9: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
…for topN logic when no sort_order Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
b27f4a9 to
c7ea430
Compare
|
❕ Gradle check result for c7ea430: UNSTABLE Please review all flaky tests that succeeded after retry and create an issue if one does not already exist to track the flaky failure. |
…0481) * topN logic for StreamStringTermsAggregator * Add index setting for min threshold on shard_size for streaming; fix for topN logic when no sort_order * fix other doc count * fix minDocCount issue * topN for StreamNumericTermsAggregator and tests * Fix key order issue in streaming aggs with topN --------- Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com> Signed-off-by: Rakshit Goyal <irakshg@amazon.com>
Description
Problem
Streaming terms aggregators flush results per-segment, sending all buckets to the coordinator. This causes:
Performance degradation: Excessive data transfer and coordinator memory usage when dealing with high-cardinality fields
Memory pressure: Coordinator must process and reduce all buckets from all segments
Solution
Implemented TopN selection at segment level using quickselect algorithm, handling subAggs an supported metric Aggs, to send only the most relevant buckets to the coordinator.
Benefits:
Better Performance:
Reduces data transfer and coordinator memory/CPU usage by sending only TOP N buckets per segment
Controlled Accuracy:
TopN selection introduces potential accuracy trade-offs, which can be controlled using:
sizeparameter: Number of final buckets to returnshard_sizeparameter: Number of buckets to collect per segment (not per shard in streaming aggregations)New
index.aggregation.streaming.min_shard_sizesetting (default: 1000): Minimum buckets per segment to ensure accuracy. Ideally, we want to set its default as a function of shard_size and number of segments, @bowenlan-amzn is currently analyzing this part.Testing
A. SubAggregationIT:
otherDocCountcalculation.B. Unit Tests:
StreamStringTermsAggregatorTests:tests for TopN selection with various sub-aggregation orderings + minDocCount filteringStreamNumericTermsAggregatorTests:13 tests covering LongTerms, DoubleTerms, UnsignedLongTerms with max/min/avg/sum/cardinality ordering + minDocCount. Verifies correct bucket selection andotherDocCountcalculation.I will be opening a public doc issue for entire streaming aggregations.
Note: these changes are built on top of open PR #20359, so contains extra changes.
Related Issues
Resolves #[Issue number to be closed when this PR is merged]
Check List
- [ ] API changes companion pull request created, if applicable.- [ ] Public documentation issue/PR created, if applicable.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.