|
13 | 13 | |
14 | 14 | package com.rabbitmq.stream.impl; |
15 | 15 |
|
| 16 | +import static com.rabbitmq.stream.impl.TestUtils.*; |
16 | 17 | import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.*; |
17 | | -import static com.rabbitmq.stream.impl.TestUtils.b; |
18 | | -import static com.rabbitmq.stream.impl.TestUtils.latchAssert; |
19 | | -import static com.rabbitmq.stream.impl.TestUtils.streamName; |
20 | | -import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; |
21 | 18 | import static java.util.concurrent.TimeUnit.SECONDS; |
22 | 19 | import static org.assertj.core.api.Assertions.assertThat; |
23 | 20 | import static org.assertj.core.api.Assertions.assertThatThrownBy; |
@@ -925,52 +922,44 @@ void streamStatsShouldReturnErrorWhenStreamDoesNotExist() { |
925 | 922 |
|
926 | 923 | @Test |
927 | 924 | @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11) |
928 | | - void streamStatsFirstOffsetShouldChangeAfterRetentionKickedIn(TestInfo info) { |
| 925 | + void streamStatsFirstOffsetShouldChangeAfterRetentionKickedIn(TestInfo info) throws Exception { |
929 | 926 | // this test is flaky in some CI environments, so we have to retry it |
930 | | - int attemptCount = 0; |
931 | | - int maxAttempts = 3; |
932 | | - while (attemptCount <= maxAttempts) { |
933 | | - attemptCount++; |
934 | | - int messageCount = 1000; |
935 | | - int payloadSize = 1000; |
936 | | - String s = TestUtils.streamName(info); |
937 | | - Client client = cf.get(); |
938 | | - try { |
939 | | - assertThat( |
940 | | - client |
941 | | - .create( |
942 | | - s, |
943 | | - new Client.StreamParametersBuilder() |
944 | | - .maxLengthBytes(messageCount * payloadSize / 10) |
945 | | - .maxSegmentSizeBytes(messageCount * payloadSize / 20) |
946 | | - .build()) |
947 | | - .isOk()) |
948 | | - .isTrue(); |
949 | | - |
950 | | - StreamStatsResponse response = client.streamStats(s); |
951 | | - assertThat(response.getInfo()).containsEntry("first_chunk_id", -1L); |
952 | | - assertThat(response.getInfo()).containsEntry("committed_chunk_id", -1L); |
953 | | - |
954 | | - byte[] payload = new byte[payloadSize]; |
955 | | - Function<MessageBuilder, Message> messageCreation = mb -> mb.addData(payload).build(); |
956 | | - |
957 | | - TestUtils.publishAndWaitForConfirms(cf, messageCreation, messageCount, s); |
958 | | - // publishing again, to make sure new segments trigger retention strategy |
959 | | - TestUtils.publishAndWaitForConfirms(cf, messageCreation, messageCount, s); |
960 | | - response = client.streamStats(s); |
961 | | - assertThat(response.getInfo().get("first_chunk_id")).isPositive(); |
962 | | - assertThat(response.getInfo().get("committed_chunk_id")).isPositive(); |
963 | | - |
964 | | - attemptCount = Integer.MAX_VALUE; |
965 | | - } catch (AssertionError e) { |
966 | | - // if too many attempts, fail the test, otherwise, try again |
967 | | - if (attemptCount > maxAttempts) { |
968 | | - throw e; |
969 | | - } |
970 | | - } finally { |
971 | | - assertThat(client.delete(s).isOk()).isTrue(); |
972 | | - } |
973 | | - } |
| 927 | + repeatIfFailure( |
| 928 | + () -> { |
| 929 | + int messageCount = 1000; |
| 930 | + int payloadSize = 1000; |
| 931 | + String s = TestUtils.streamName(info); |
| 932 | + Client client = cf.get(); |
| 933 | + try { |
| 934 | + assertThat( |
| 935 | + client |
| 936 | + .create( |
| 937 | + s, |
| 938 | + new Client.StreamParametersBuilder() |
| 939 | + .maxLengthBytes(messageCount * payloadSize / 10) |
| 940 | + .maxSegmentSizeBytes(messageCount * payloadSize / 20) |
| 941 | + .build()) |
| 942 | + .isOk()) |
| 943 | + .isTrue(); |
| 944 | + |
| 945 | + StreamStatsResponse response = client.streamStats(s); |
| 946 | + assertThat(response.getInfo()).containsEntry("first_chunk_id", -1L); |
| 947 | + assertThat(response.getInfo()).containsEntry("committed_chunk_id", -1L); |
| 948 | + |
| 949 | + byte[] payload = new byte[payloadSize]; |
| 950 | + Function<MessageBuilder, Message> messageCreation = mb -> mb.addData(payload).build(); |
| 951 | + |
| 952 | + TestUtils.publishAndWaitForConfirms(cf, messageCreation, messageCount, s); |
| 953 | + // publishing again, to make sure new segments trigger retention strategy |
| 954 | + TestUtils.publishAndWaitForConfirms(cf, messageCreation, messageCount, s); |
| 955 | + response = client.streamStats(s); |
| 956 | + assertThat(response.getInfo().get("first_chunk_id")).isPositive(); |
| 957 | + assertThat(response.getInfo().get("committed_chunk_id")).isPositive(); |
| 958 | + |
| 959 | + } finally { |
| 960 | + assertThat(client.delete(s).isOk()).isTrue(); |
| 961 | + } |
| 962 | + }); |
974 | 963 | } |
975 | 964 |
|
976 | 965 | @Test |
|
0 commit comments