|
16 | 16 | package com.mongodb.kafka.connect.mongodb; |
17 | 17 |
|
18 | 18 | import static com.mongodb.kafka.connect.mongodb.ChangeStreamOperations.ChangeStreamOperation; |
19 | | -import static com.mongodb.kafka.connect.mongodb.ChangeStreamOperations.createChangeStreamOperation; |
20 | 19 | import static java.lang.String.format; |
21 | 20 | import static java.util.Collections.singletonList; |
22 | 21 | import static org.apache.kafka.common.utils.Utils.sleep; |
23 | | -import static org.junit.jupiter.api.Assertions.assertEquals; |
24 | 22 | import static org.junit.jupiter.api.Assertions.assertIterableEquals; |
25 | 23 |
|
26 | 24 | import java.time.Duration; |
27 | 25 | import java.util.ArrayList; |
28 | 26 | import java.util.List; |
29 | 27 | import java.util.Properties; |
30 | 28 | import java.util.concurrent.atomic.AtomicInteger; |
| 29 | +import java.util.function.Function; |
31 | 30 | import java.util.stream.Collectors; |
| 31 | +import java.util.stream.IntStream; |
32 | 32 |
|
33 | 33 | import io.confluent.connect.avro.AvroConverter; |
34 | 34 |
|
@@ -57,6 +57,8 @@ public class MongoKafkaTestCase { |
57 | 57 | protected static final Logger LOGGER = LoggerFactory.getLogger(MongoKafkaTestCase.class); |
58 | 58 | protected static final AtomicInteger POSTFIX = new AtomicInteger(); |
59 | 59 |
|
| 60 | + private static final int DEFAULT_MAX_RETRIES = 15; |
| 61 | + |
60 | 62 | @RegisterExtension public static final EmbeddedKafka KAFKA = new EmbeddedKafka(); |
61 | 63 | @RegisterExtension public static final MongoDBHelper MONGODB = new MongoDBHelper(); |
62 | 64 |
|
@@ -108,88 +110,90 @@ public boolean isGreaterThanThreeDotSix() { |
108 | 110 | } |
109 | 111 |
|
110 | 112 | public void assertProduced(final String topicName, final int expectedCount) { |
111 | | - assertEquals(expectedCount, getProduced(topicName, expectedCount).size()); |
| 113 | + List<Integer> expected = IntStream.range(1, expectedCount).boxed().collect(Collectors.toList()); |
| 114 | + AtomicInteger counter = new AtomicInteger(); |
| 115 | + List<Integer> produced = |
| 116 | + getProduced(topicName, b -> counter.addAndGet(1), expected, DEFAULT_MAX_RETRIES); |
| 117 | + assertIterableEquals(expected, produced); |
112 | 118 | } |
113 | 119 |
|
114 | 120 | public void assertProduced( |
115 | 121 | final List<ChangeStreamOperation> operationTypes, final MongoCollection<?> coll) { |
116 | | - assertProduced(operationTypes, coll.getNamespace().getFullName()); |
| 122 | + assertProduced(operationTypes, coll, DEFAULT_MAX_RETRIES); |
117 | 123 | } |
118 | 124 |
|
119 | 125 | public void assertProduced( |
120 | | - final List<ChangeStreamOperation> operationTypes, final String topicName) { |
121 | | - List<ChangeStreamOperation> produced = |
122 | | - getProduced(topicName, operationTypes.size()).stream() |
123 | | - .map((b) -> createChangeStreamOperation(b.toString())) |
124 | | - .collect(Collectors.toList()); |
125 | | - assertIterableEquals(operationTypes, produced); |
| 126 | + final List<ChangeStreamOperation> operationTypes, |
| 127 | + final MongoCollection<?> coll, |
| 128 | + final int maxRetryCount) { |
| 129 | + assertProduced(operationTypes, coll.getNamespace().getFullName(), maxRetryCount); |
126 | 130 | } |
127 | 131 |
|
128 | | - public void assertEventuallyProduces( |
129 | | - final List<ChangeStreamOperation> operationTypes, final MongoCollection<?> coll) { |
130 | | - assertEventuallyProduces(operationTypes, coll.getNamespace().getFullName()); |
| 132 | + public void assertProduced( |
| 133 | + final List<ChangeStreamOperation> operationTypes, final String topicName) { |
| 134 | + assertProduced(operationTypes, topicName, DEFAULT_MAX_RETRIES); |
131 | 135 | } |
132 | 136 |
|
133 | | - public void assertEventuallyProduces( |
134 | | - final List<ChangeStreamOperation> operationTypes, final String topicName) { |
| 137 | + public void assertProduced( |
| 138 | + final List<ChangeStreamOperation> operationTypes, |
| 139 | + final String topicName, |
| 140 | + final int maxRetryCount) { |
135 | 141 | List<ChangeStreamOperation> produced = |
136 | | - getProduced(topicName, Integer.MAX_VALUE).stream() |
137 | | - .map((b) -> createChangeStreamOperation(b.toString())) |
138 | | - .collect(Collectors.toList()); |
139 | | - |
140 | | - if (produced.size() > operationTypes.size()) { |
141 | | - boolean startsWith = |
142 | | - produced |
143 | | - .get(operationTypes.size() - 1) |
144 | | - .equals(operationTypes.get(operationTypes.size() - 1)); |
145 | | - if (startsWith) { |
146 | | - assertIterableEquals(operationTypes, produced.subList(0, operationTypes.size())); |
147 | | - } else { |
148 | | - assertIterableEquals( |
| 142 | + getProduced( |
| 143 | + topicName, |
| 144 | + ChangeStreamOperations::createChangeStreamOperation, |
149 | 145 | operationTypes, |
150 | | - produced.subList(produced.lastIndexOf(operationTypes.get(0)), produced.size())); |
151 | | - } |
152 | | - } else { |
153 | | - assertIterableEquals(operationTypes, produced); |
154 | | - } |
| 146 | + maxRetryCount); |
| 147 | + assertIterableEquals(operationTypes, produced); |
155 | 148 | } |
156 | 149 |
|
157 | 150 | public void assertProducedDocs(final List<Document> docs, final MongoCollection<?> coll) { |
158 | | - assertEquals( |
159 | | - docs, |
160 | | - getProduced(coll.getNamespace().getFullName(), docs.size()).stream() |
161 | | - .map((b) -> Document.parse(b.toString())) |
162 | | - .collect(Collectors.toList())); |
| 151 | + List<Document> produced = |
| 152 | + getProduced( |
| 153 | + coll.getNamespace().getFullName(), |
| 154 | + b -> Document.parse(b.toString()), |
| 155 | + docs, |
| 156 | + DEFAULT_MAX_RETRIES); |
| 157 | + assertIterableEquals(docs, produced); |
163 | 158 | } |
164 | 159 |
|
165 | | - public List<Bytes> getProduced(final String topicName, final int expectedCount) { |
166 | | - if (expectedCount != Integer.MAX_VALUE) { |
167 | | - LOGGER.info("Subscribing to {} expecting to see #{}", topicName, expectedCount); |
168 | | - } else { |
169 | | - LOGGER.info("Subscribing to {} getting all messages", topicName); |
170 | | - } |
| 160 | + public <T> List<T> getProduced( |
| 161 | + final String topicName, |
| 162 | + final Function<Bytes, T> mapper, |
| 163 | + final List<T> expected, |
| 164 | + final int maxRetryCount) { |
| 165 | + LOGGER.info("Subscribing to {}", topicName); |
171 | 166 |
|
172 | 167 | try (KafkaConsumer<?, ?> consumer = createConsumer()) { |
173 | 168 | consumer.subscribe(singletonList(topicName)); |
174 | | - List<Bytes> data = new ArrayList<>(); |
| 169 | + List<T> data = new ArrayList<>(); |
| 170 | + T firstExpected = expected.isEmpty() ? null : expected.get(0); |
| 171 | + T lastExpected = expected.isEmpty() ? null : expected.get(expected.size() - 1); |
175 | 172 | int counter = 0; |
176 | 173 | int retryCount = 0; |
177 | 174 | int previousDataSize; |
178 | | - while (data.size() < expectedCount && retryCount < 30) { |
| 175 | + |
| 176 | + while (retryCount < maxRetryCount) { |
179 | 177 | counter++; |
180 | | - LOGGER.info("Polling {} ({}) seen: #{}", topicName, counter, data.size()); |
181 | 178 | previousDataSize = data.size(); |
182 | | - |
183 | 179 | consumer |
184 | 180 | .poll(Duration.ofSeconds(2)) |
185 | 181 | .records(topicName) |
186 | | - .forEach((r) -> data.add((Bytes) r.value())); |
| 182 | + .forEach((r) -> data.add(mapper.apply((Bytes) r.value()))); |
| 183 | + |
| 184 | + int firstExpectedIndex = data.lastIndexOf(firstExpected); |
| 185 | + int lastExpectedIndex = data.lastIndexOf(lastExpected); |
| 186 | + int dataSize = lastExpectedIndex - firstExpectedIndex + 1; |
| 187 | + if (firstExpectedIndex > -1 && lastExpectedIndex > -1 && dataSize == expected.size()) { |
| 188 | + return data.subList(firstExpectedIndex, lastExpectedIndex + 1); |
| 189 | + } |
187 | 190 |
|
188 | 191 | // Wait at least 3 minutes for the first set of data to arrive |
189 | | - if (data.size() > 0 || counter > 90) { |
190 | | - retryCount = data.size() == previousDataSize ? retryCount + 1 : 0; |
| 192 | + if (expected.size() == 0 || data.size() > 0 || counter > 90) { |
| 193 | + retryCount += previousDataSize == data.size() ? 1 : 0; |
191 | 194 | } |
192 | 195 | } |
| 196 | + |
193 | 197 | return data; |
194 | 198 | } |
195 | 199 | } |
|
0 commit comments