|
33 | 33 | import org.apache.flink.util.TestLogger;
|
34 | 34 |
|
35 | 35 | import com.google.common.base.MoreObjects;
|
36 |
| -import org.apache.kafka.clients.consumer.ConsumerRecord; |
37 | 36 | import org.apache.kafka.clients.producer.Callback;
|
38 | 37 | import org.apache.kafka.clients.producer.KafkaProducer;
|
39 | 38 | import org.apache.kafka.clients.producer.ProducerConfig;
|
|
48 | 47 |
|
49 | 48 | import javax.annotation.Nullable;
|
50 | 49 |
|
51 |
| -import java.nio.ByteBuffer; |
52 | 50 | import java.util.ArrayList;
|
53 | 51 | import java.util.Collection;
|
54 |
| -import java.util.HashSet; |
55 | 52 | import java.util.List;
|
56 | 53 | import java.util.Properties;
|
57 |
| -import java.util.Set; |
58 | 54 | import java.util.concurrent.atomic.AtomicReference;
|
59 | 55 |
|
60 |
| -import static org.assertj.core.api.Assertions.fail; |
61 |
| - |
62 | 56 | /**
|
63 | 57 | * The base for the Kafka tests. It brings up:
|
64 | 58 | *
|
@@ -278,96 +272,6 @@ public static <K, V> void produceToKafka(
|
278 | 272 | }
|
279 | 273 | }
|
280 | 274 |
|
281 |
| - /** |
282 |
| - * We manually handle the timeout instead of using JUnit's timeout to return failure instead of |
283 |
| - * timeout error. After timeout we assume that there are missing records and there is a bug, not |
284 |
| - * that the test has run out of time. |
285 |
| - */ |
286 |
| - public void assertAtLeastOnceForTopic( |
287 |
| - Properties properties, |
288 |
| - String topic, |
289 |
| - int partition, |
290 |
| - Set<Integer> expectedElements, |
291 |
| - long timeoutMillis) |
292 |
| - throws Exception { |
293 |
| - |
294 |
| - long startMillis = System.currentTimeMillis(); |
295 |
| - Set<Integer> actualElements = new HashSet<>(); |
296 |
| - |
297 |
| - // until we timeout... |
298 |
| - while (System.currentTimeMillis() < startMillis + timeoutMillis) { |
299 |
| - properties.put( |
300 |
| - "key.deserializer", |
301 |
| - "org.apache.kafka.common.serialization.IntegerDeserializer"); |
302 |
| - properties.put( |
303 |
| - "value.deserializer", |
304 |
| - "org.apache.kafka.common.serialization.IntegerDeserializer"); |
305 |
| - // We need to set these two properties so that they are lower than request.timeout.ms. |
306 |
| - // This is |
307 |
| - // required for some old KafkaConsumer versions. |
308 |
| - properties.put("session.timeout.ms", "2000"); |
309 |
| - properties.put("heartbeat.interval.ms", "500"); |
310 |
| - |
311 |
| - // query kafka for new records ... |
312 |
| - Collection<ConsumerRecord<Integer, Integer>> records = |
313 |
| - kafkaServer.getAllRecordsFromTopic(properties, topic); |
314 |
| - |
315 |
| - for (ConsumerRecord<Integer, Integer> record : records) { |
316 |
| - actualElements.add(record.value()); |
317 |
| - } |
318 |
| - |
319 |
| - // succeed if we got all expectedElements |
320 |
| - if (actualElements.containsAll(expectedElements)) { |
321 |
| - return; |
322 |
| - } |
323 |
| - } |
324 |
| - |
325 |
| - fail( |
326 |
| - String.format( |
327 |
| - "Expected to contain all of: <%s>, but was: <%s>", |
328 |
| - expectedElements, actualElements)); |
329 |
| - } |
330 |
| - |
331 |
| - public void assertExactlyOnceForTopic( |
332 |
| - Properties properties, String topic, List<Integer> expectedElements) { |
333 |
| - |
334 |
| - List<Integer> actualElements = new ArrayList<>(); |
335 |
| - |
336 |
| - Properties consumerProperties = new Properties(); |
337 |
| - consumerProperties.putAll(properties); |
338 |
| - consumerProperties.put( |
339 |
| - "key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); |
340 |
| - consumerProperties.put( |
341 |
| - "value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); |
342 |
| - consumerProperties.put("isolation.level", "read_committed"); |
343 |
| - |
344 |
| - // query kafka for new records ... |
345 |
| - Collection<ConsumerRecord<byte[], byte[]>> records = |
346 |
| - kafkaServer.getAllRecordsFromTopic(consumerProperties, topic); |
347 |
| - |
348 |
| - for (ConsumerRecord<byte[], byte[]> record : records) { |
349 |
| - actualElements.add(ByteBuffer.wrap(record.value()).getInt()); |
350 |
| - } |
351 |
| - |
352 |
| - // succeed if we got all expectedElements |
353 |
| - if (actualElements.equals(expectedElements)) { |
354 |
| - return; |
355 |
| - } |
356 |
| - |
357 |
| - fail( |
358 |
| - String.format( |
359 |
| - "Expected %s, but was: %s", |
360 |
| - formatElements(expectedElements), formatElements(actualElements))); |
361 |
| - } |
362 |
| - |
363 |
| - private String formatElements(List<Integer> elements) { |
364 |
| - if (elements.size() > 50) { |
365 |
| - return String.format("number of elements: <%s>", elements.size()); |
366 |
| - } else { |
367 |
| - return String.format("elements: <%s>", elements); |
368 |
| - } |
369 |
| - } |
370 |
| - |
371 | 275 | public static void setNumKafkaClusters(int size) {
|
372 | 276 | numKafkaClusters = size;
|
373 | 277 | }
|
|
0 commit comments