@@ -159,14 +159,18 @@ public void testProgressListenerExceptionsAreCaught() throws Exception {
159159 }
160160 }
161161
162+ /**
163+ * Adds batches with a high simulated size, expecting the CB to trip before deserialization.
164+ */
162165 public void testBatchedEstimateSizeTooBig () throws Exception {
163166 SearchRequest searchRequest = new SearchRequest ("index" );
164167 searchRequest .source (new SearchSourceBuilder ().aggregation (new SumAggregationBuilder ("sum" )));
165168
169+ var aggCount = randomIntBetween (1 , 10 );
166170 var circuitBreakerLimit = ByteSizeValue .ofMb (256 );
167171 var circuitBreaker = newLimitedBreaker (circuitBreakerLimit );
168172 // More than what the CircuitBreaker should allow
169- long aggregationEstimatedSize = (long ) (circuitBreakerLimit .getBytes () * 1.1 );
173+ long aggregationEstimatedSize = (long ) (circuitBreakerLimit .getBytes () * 1.05 / aggCount );
170174
171175 try (
172176 QueryPhaseResultConsumer queryPhaseResultConsumer = new QueryPhaseResultConsumer (
@@ -181,29 +185,41 @@ public void testBatchedEstimateSizeTooBig() throws Exception {
181185 e -> {}
182186 )
183187 ) {
184- var mergeResult = new QueryPhaseResultConsumer .MergeResult (List .of (), null , new DelegatingDelayableWriteable <>(() -> {
185- fail ("This shouldn't be called" );
186- return null ;
187- }), aggregationEstimatedSize );
188- queryPhaseResultConsumer .addBatchedPartialResult (new SearchPhaseController .TopDocsStats (0 ), mergeResult );
188+ for (int i = 0 ; i < aggCount ; i ++) {
189+ // Add a dummy merge result with a high estimated size
190+ var mergeResult = new QueryPhaseResultConsumer .MergeResult (List .of (), null , new DelegatingDelayableWriteable <>(() -> {
191+ fail ("This shouldn't be called" );
192+ return null ;
193+ }), aggregationEstimatedSize );
194+ queryPhaseResultConsumer .addBatchedPartialResult (new SearchPhaseController .TopDocsStats (0 ), mergeResult );
195+ }
189196
190197 try {
191198 queryPhaseResultConsumer .reduce ();
192199 fail ("Expecting a circuit breaking exception to be thrown" );
193200 } catch (CircuitBreakingException e ) {
201+ // The last merge result estimate should break
194202 assertThat (e .getBytesWanted (), equalTo (aggregationEstimatedSize ));
195203 }
196204 }
197205 }
198206
207+ /**
208+ * Adds batches with a high simulated size, expecting the CB to trip before deserialization.
209+ * <p>
210+ * Similar to {@link #testBatchedEstimateSizeTooBig()}, but this tests the extra size
211+ * </p>
212+ */
199213 public void testBatchedEstimateSizeTooBigAfterDeserialization () throws Exception {
200214 SearchRequest searchRequest = new SearchRequest ("index" );
201215 searchRequest .source (new SearchSourceBuilder ().aggregation (new SumAggregationBuilder ("sum" )));
202216
217+ var aggCount = randomIntBetween (1 , 10 );
203218 var circuitBreakerLimit = ByteSizeValue .ofMb (256 );
204219 var circuitBreaker = newLimitedBreaker (circuitBreakerLimit );
205220 // Less than the CB, but more after the 1.5x
206- long aggregationEstimatedSize = (long ) (circuitBreakerLimit .getBytes () * 0.75 );
221+ long aggregationEstimatedSize = (long ) (circuitBreakerLimit .getBytes () * 0.75 / aggCount );
222+ long totalAggregationsEstimatedSize = aggregationEstimatedSize * aggCount ;
207223
208224 try (
209225 QueryPhaseResultConsumer queryPhaseResultConsumer = new QueryPhaseResultConsumer (
@@ -218,18 +234,22 @@ public void testBatchedEstimateSizeTooBigAfterDeserialization() throws Exception
218234 e -> {}
219235 )
220236 ) {
221- var mergeResult = new QueryPhaseResultConsumer .MergeResult (List .of (), null , new DelegatingDelayableWriteable <>(() -> {
222- fail ("This shouldn't be called" );
223- return null ;
224- }), aggregationEstimatedSize );
225- queryPhaseResultConsumer .addBatchedPartialResult (new SearchPhaseController .TopDocsStats (0 ), mergeResult );
237+ for (int i = 0 ; i < aggCount ; i ++) {
238+ // Add a dummy merge result with a high estimated size
239+ var mergeResult = new QueryPhaseResultConsumer .MergeResult (List .of (), null , new DelegatingDelayableWriteable <>(() -> {
240+ fail ("This shouldn't be called" );
241+ return null ;
242+ }), aggregationEstimatedSize );
243+ queryPhaseResultConsumer .addBatchedPartialResult (new SearchPhaseController .TopDocsStats (0 ), mergeResult );
244+ }
226245
227246 try {
228247 queryPhaseResultConsumer .reduce ();
229248 fail ("Expecting a circuit breaking exception to be thrown" );
230249 } catch (CircuitBreakingException e ) {
231250 assertThat (circuitBreaker .getUsed (), greaterThanOrEqualTo (aggregationEstimatedSize ));
232- assertThat (e .getBytesWanted (), equalTo ((long ) (aggregationEstimatedSize * 0.5 )));
251+ // A final +0.5x is added to account for the serialized->deserialized extra size
252+ assertThat (e .getBytesWanted (), equalTo (Math .round (totalAggregationsEstimatedSize * 0.5 )));
233253 }
234254 }
235255 }
0 commit comments