1313
1414import org .elasticsearch .common .breaker .CircuitBreaker ;
1515import org .elasticsearch .common .breaker .CircuitBreakingException ;
16+ import org .elasticsearch .common .io .stream .BytesStreamOutput ;
17+ import org .elasticsearch .common .io .stream .StreamInput ;
1618import org .elasticsearch .common .unit .ByteSizeValue ;
19+ import org .elasticsearch .core .CheckedFunction ;
20+ import org .elasticsearch .indices .CrankyCircuitBreakerService ;
1721import org .elasticsearch .test .ESTestCase ;
1822
23+ import java .io .IOException ;
1924import java .util .Arrays ;
2025
2126import static org .hamcrest .Matchers .equalTo ;
@@ -32,28 +37,84 @@ public TDigestStateReleasingTests(TDigestState.Type digestType) {
3237 this .digestType = digestType ;
3338 }
3439
40+ public void testCreateOfType () {
41+ testCircuitBreakerTrip (circuitBreaker -> TDigestState .createOfType (circuitBreaker , digestType , 100 ));
42+ }
43+
44+ public void testCreateUsingParamsFrom () {
45+ testCircuitBreakerTrip (circuitBreaker -> {
46+ try (TDigestState example = TDigestState .createOfType (newLimitedBreaker (ByteSizeValue .ofMb (100 )), digestType , 100 )) {
47+ return TDigestState .createUsingParamsFrom (example );
48+ }
49+ });
50+ }
51+
52+ /**
53+ * This test doesn't use the {@code digestType} param.
54+ */
55+ public void testCreate () {
56+ testCircuitBreakerTrip (circuitBreaker -> TDigestState .create (circuitBreaker , 100 ));
57+ }
58+
59+ /**
60+ * This test doesn't use the {@code digestType} param.
61+ */
62+ public void testCreateOptimizedForAccuracy () {
63+ testCircuitBreakerTrip (circuitBreaker -> TDigestState .createOptimizedForAccuracy (circuitBreaker , 100 ));
64+ }
65+
66+ public void testRead () throws IOException {
67+ try (
68+ TDigestState state = TDigestState .createOfType (newLimitedBreaker (ByteSizeValue .ofMb (100 )), digestType , 100 );
69+ BytesStreamOutput output = new BytesStreamOutput ()
70+ ) {
71+ TDigestState .write (state , output );
72+
73+ testCircuitBreakerTrip (circuitBreaker -> {
74+ try (StreamInput input = output .bytes ().streamInput ()) {
75+ return TDigestState .read (circuitBreaker , input );
76+ }
77+ });
78+ }
79+ }
80+
81+ public void testReadWithData () throws IOException {
82+ try (
83+ TDigestState state = TDigestState .createOfType (newLimitedBreaker (ByteSizeValue .ofMb (100 )), digestType , 100 );
84+ BytesStreamOutput output = new BytesStreamOutput ()
85+ ) {
86+ for (int i = 0 ; i < 1000 ; i ++) {
87+ state .add (randomDoubleBetween (-Double .MAX_VALUE , Double .MAX_VALUE , true ));
88+ }
89+
90+ TDigestState .write (state , output );
91+
92+ testCircuitBreakerTrip (circuitBreaker -> {
93+ try (StreamInput input = output .bytes ().streamInput ()) {
94+ return TDigestState .read (circuitBreaker , input );
95+ }
96+ });
97+ }
98+ }
99+
35100 /**
36101 * Tests that a circuit breaker trip leaves no unreleased memory.
37102 */
38- public void testCircuitBreakerTrip () {
39- for (int bytes = randomIntBetween (0 , 16 ); bytes < 50_000 ; bytes += 17 ) {
40- CircuitBreaker breaker = newLimitedBreaker (ByteSizeValue .ofBytes (bytes ));
103+ public <E extends Exception > void testCircuitBreakerTrip (CheckedFunction <CircuitBreaker , TDigestState , E > tDigestStateFactory )
104+ throws E {
105+ try (CrankyCircuitBreakerService circuitBreakerService = new CrankyCircuitBreakerService ()) {
106+ CircuitBreaker breaker = circuitBreakerService .getBreaker ("test" );
41107
42- try (TDigestState state = TDigestState . create (breaker , digestType , 100 )) {
108+ try (TDigestState state = tDigestStateFactory . apply (breaker )) {
43109 // Add some data to make it trip. It won't work in all digest types
44- for (int i = 0 ; i < 100 ; i ++) {
110+ for (int i = 0 ; i < 10 ; i ++) {
45111 state .add (randomDoubleBetween (-Double .MAX_VALUE , Double .MAX_VALUE , true ));
46112 }
47-
48- // Testing with more memory shouldn't change anything, we finished the test
49- return ;
50113 } catch (CircuitBreakingException e ) {
51114 // Expected
52115 } finally {
53- assertThat ("unreleased bytes with a " + bytes + " bytes limit " , breaker .getUsed (), equalTo (0L ));
116+ assertThat ("unreleased bytes" , breaker .getUsed (), equalTo (0L ));
54117 }
55118 }
56-
57- fail ("Test case didn't reach a non-tripping breaker limit" );
58119 }
59120}
0 commit comments