Skip to content

Commit ec085a8

Browse files
committed
Removed kafka-junit and related tests.
1 parent 6f70d03 commit ec085a8

File tree

6 files changed

+20
-475
lines changed

6 files changed

+20
-475
lines changed

build.gradle

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ ext {
4141
}
4242

4343
dependencies {
44-
compileOnly "org.apache.kafka:connect-runtime:${kafkaVersion}"
44+
def kafkaConnectRuntime = "org.apache.kafka:connect-runtime:${kafkaVersion}"
45+
compileOnly kafkaConnectRuntime
4546
compileOnly "org.slf4j:slf4j-api:1.7.36"
4647

4748
// Force DHF to use the latest version of ml-app-deployer, which minimizes security vulnerabilities
@@ -65,9 +66,7 @@ dependencies {
6566
testImplementation 'com.marklogic:marklogic-junit5:1.5.0'
6667

6768
testImplementation "org.apache.kafka:connect-json:${kafkaVersion}"
68-
69-
// Can be deleted when the disabled kafka-junit tests are deleted.
70-
testImplementation 'net.mguenther.kafka:kafka-junit:3.6.0'
69+
testImplementation kafkaConnectRuntime
7170

7271
testImplementation "org.apache.avro:avro-compiler:1.12.0"
7372

src/test/java/com/marklogic/kafka/connect/sink/AbstractIntegrationSinkTest.java

Lines changed: 17 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,12 @@
2828
import static com.marklogic.kafka.connect.sink.MarkLogicSinkConfig.*;
2929

3030
/**
31-
* Base class for any test that wishes to connect to the "kafka-test-test-content" app server on port 8019.
32-
* AbstractSpringMarkLogicTest assumes it can find mlHost/mlTestRestPort/mlUsername/mlPassword properties in
33-
* gradle.properties and gradle-local.properties. It uses those to construct a DatabaseClient which can be fetched
31+
* Base class for any test that wishes to connect to the
32+
* "kafka-test-test-content" app server on port 8019.
33+
* AbstractSpringMarkLogicTest assumes it can find
34+
* mlHost/mlTestRestPort/mlUsername/mlPassword properties in
35+
* gradle.properties and gradle-local.properties. It uses those to construct a
36+
* DatabaseClient which can be fetched
3437
* via getDatabaseClient().
3538
*/
3639
public abstract class AbstractIntegrationSinkTest extends AbstractIntegrationTest {
@@ -39,31 +42,34 @@ public abstract class AbstractIntegrationSinkTest extends AbstractIntegrationTes
3942
@Autowired
4043
SimpleTestConfig testConfig;
4144

42-
private final static long DEFAULT_RETRY_SLEEP_TIME = 250;
43-
private final static int DEFAULT_RETRY_ATTEMPTS = 10;
4445
private Map<String, Object> taskConfig = new HashMap<>();
4546

4647
/**
47-
* @param configParamNamesAndValues - Configuration values that need to be set for the test.
48-
* @return a MarkLogicSinkTask based on the default connection config and any optional config params provided by
49-
* the caller
48+
* @param configParamNamesAndValues - Configuration values that need to be set
49+
* for the test.
50+
* @return a MarkLogicSinkTask based on the default connection config and any
51+
* optional config params provided by
52+
* the caller
5053
*/
5154
protected AbstractSinkTask startSinkTask(String... configParamNamesAndValues) {
5255
return startSinkTask(null, configParamNamesAndValues);
5356
}
5457

55-
protected AbstractSinkTask startSinkTask(BiConsumer<SinkRecord, Throwable> errorReporterMethod, String... configParamNamesAndValues) {
58+
protected AbstractSinkTask startSinkTask(BiConsumer<SinkRecord, Throwable> errorReporterMethod,
59+
String... configParamNamesAndValues) {
5660
Map<String, String> config = newMarkLogicConfig(testConfig);
5761
config.put(MarkLogicSinkConfig.DOCUMENT_PERMISSIONS, "rest-reader,read,rest-writer,update");
5862
for (int i = 0; i < configParamNamesAndValues.length; i += 2) {
5963
config.put(configParamNamesAndValues[i], configParamNamesAndValues[i + 1]);
6064
}
6165
taskConfig.putAll(config);
6266
if (taskConfig.containsKey(DMSDK_INCLUDE_KAFKA_METADATA)) {
63-
taskConfig.put(DMSDK_INCLUDE_KAFKA_METADATA, Boolean.valueOf((String) taskConfig.get(DMSDK_INCLUDE_KAFKA_METADATA)));
67+
taskConfig.put(DMSDK_INCLUDE_KAFKA_METADATA,
68+
Boolean.valueOf((String) taskConfig.get(DMSDK_INCLUDE_KAFKA_METADATA)));
6469
}
6570
if (taskConfig.containsKey(DOCUMENT_COLLECTIONS_ADD_TOPIC)) {
66-
taskConfig.put(DOCUMENT_COLLECTIONS_ADD_TOPIC, Boolean.valueOf((String) taskConfig.get(DOCUMENT_COLLECTIONS_ADD_TOPIC)));
71+
taskConfig.put(DOCUMENT_COLLECTIONS_ADD_TOPIC,
72+
Boolean.valueOf((String) taskConfig.get(DOCUMENT_COLLECTIONS_ADD_TOPIC)));
6773
}
6874

6975
MarkLogicSinkConnector connector = new MarkLogicSinkConnector();
@@ -92,31 +98,6 @@ protected void putAndFlushRecords(AbstractSinkTask task, SinkRecord... records)
9298
task.flush(new HashMap<>());
9399
}
94100

95-
protected final void retryIfNotSuccessful(Runnable r) {
96-
retryIfNotSuccessful(r, DEFAULT_RETRY_SLEEP_TIME, DEFAULT_RETRY_ATTEMPTS);
97-
}
98-
99-
@SuppressWarnings("java:S2925") // We're fine with the sleep call here, due to the nature of testing with kafka-junit
100-
protected final void retryIfNotSuccessful(Runnable r, long sleepTime, int attempts) {
101-
for (int i = 1; i <= attempts; i++) {
102-
logger.info("Trying assertion, attempt " + i + " out of " + attempts);
103-
try {
104-
r.run();
105-
return;
106-
} catch (Throwable ex) {
107-
if (i == attempts) {
108-
throw ex;
109-
}
110-
logger.info("Assertion failed: " + ex.getMessage() + "; will sleep for " + sleepTime + " ms and try again");
111-
try {
112-
Thread.sleep(sleepTime);
113-
} catch (InterruptedException e) {
114-
// Ignore, not expected during a test
115-
}
116-
}
117-
}
118-
}
119-
120101
protected Map<String, Object> getTaskConfig() {
121102
return taskConfig;
122103
}

src/test/java/com/marklogic/kafka/connect/sink/SendWriteFailureRecordsToDlqKafkaTest.java

Lines changed: 0 additions & 133 deletions
This file was deleted.

src/test/java/com/marklogic/kafka/connect/sink/WriteFromKafkaTest.java

Lines changed: 0 additions & 106 deletions
This file was deleted.

0 commit comments

Comments
 (0)