Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ ext {
}

dependencies {
compileOnly "org.apache.kafka:connect-runtime:${kafkaVersion}"
def kafkaConnectRuntime = "org.apache.kafka:connect-runtime:${kafkaVersion}"
compileOnly kafkaConnectRuntime
compileOnly "org.slf4j:slf4j-api:1.7.36"

// Force DHF to use the latest version of ml-app-deployer, which minimizes security vulnerabilities
Expand All @@ -65,9 +66,7 @@ dependencies {
testImplementation 'com.marklogic:marklogic-junit5:1.5.0'

testImplementation "org.apache.kafka:connect-json:${kafkaVersion}"

// Can be deleted when the disabled kafka-junit tests are deleted.
testImplementation 'net.mguenther.kafka:kafka-junit:3.6.0'
testImplementation kafkaConnectRuntime

testImplementation "org.apache.avro:avro-compiler:1.12.0"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
import static com.marklogic.kafka.connect.sink.MarkLogicSinkConfig.*;

/**
* Base class for any test that wishes to connect to the "kafka-test-test-content" app server on port 8019.
* AbstractSpringMarkLogicTest assumes it can find mlHost/mlTestRestPort/mlUsername/mlPassword properties in
* gradle.properties and gradle-local.properties. It uses those to construct a DatabaseClient which can be fetched
* Base class for any test that wishes to connect to the
* "kafka-test-test-content" app server on port 8019.
* AbstractSpringMarkLogicTest assumes it can find
* mlHost/mlTestRestPort/mlUsername/mlPassword properties in
* gradle.properties and gradle-local.properties. It uses those to construct a
* DatabaseClient which can be fetched
* via getDatabaseClient().
*/
public abstract class AbstractIntegrationSinkTest extends AbstractIntegrationTest {
Expand All @@ -39,31 +42,34 @@ public abstract class AbstractIntegrationSinkTest extends AbstractIntegrationTes
@Autowired
SimpleTestConfig testConfig;

private final static long DEFAULT_RETRY_SLEEP_TIME = 250;
private final static int DEFAULT_RETRY_ATTEMPTS = 10;
private Map<String, Object> taskConfig = new HashMap<>();

/**
* @param configParamNamesAndValues - Configuration values that need to be set for the test.
* @return a MarkLogicSinkTask based on the default connection config and any optional config params provided by
* the caller
* @param configParamNamesAndValues - Configuration values that need to be set
* for the test.
* @return a MarkLogicSinkTask based on the default connection config and any
* optional config params provided by
* the caller
*/
protected AbstractSinkTask startSinkTask(String... configParamNamesAndValues) {
return startSinkTask(null, configParamNamesAndValues);
}

protected AbstractSinkTask startSinkTask(BiConsumer<SinkRecord, Throwable> errorReporterMethod, String... configParamNamesAndValues) {
protected AbstractSinkTask startSinkTask(BiConsumer<SinkRecord, Throwable> errorReporterMethod,
String... configParamNamesAndValues) {
Map<String, String> config = newMarkLogicConfig(testConfig);
config.put(MarkLogicSinkConfig.DOCUMENT_PERMISSIONS, "rest-reader,read,rest-writer,update");
for (int i = 0; i < configParamNamesAndValues.length; i += 2) {
config.put(configParamNamesAndValues[i], configParamNamesAndValues[i + 1]);
}
taskConfig.putAll(config);
if (taskConfig.containsKey(DMSDK_INCLUDE_KAFKA_METADATA)) {
taskConfig.put(DMSDK_INCLUDE_KAFKA_METADATA, Boolean.valueOf((String) taskConfig.get(DMSDK_INCLUDE_KAFKA_METADATA)));
taskConfig.put(DMSDK_INCLUDE_KAFKA_METADATA,
Boolean.valueOf((String) taskConfig.get(DMSDK_INCLUDE_KAFKA_METADATA)));
}
if (taskConfig.containsKey(DOCUMENT_COLLECTIONS_ADD_TOPIC)) {
taskConfig.put(DOCUMENT_COLLECTIONS_ADD_TOPIC, Boolean.valueOf((String) taskConfig.get(DOCUMENT_COLLECTIONS_ADD_TOPIC)));
taskConfig.put(DOCUMENT_COLLECTIONS_ADD_TOPIC,
Boolean.valueOf((String) taskConfig.get(DOCUMENT_COLLECTIONS_ADD_TOPIC)));
}

MarkLogicSinkConnector connector = new MarkLogicSinkConnector();
Expand Down Expand Up @@ -92,31 +98,6 @@ protected void putAndFlushRecords(AbstractSinkTask task, SinkRecord... records)
task.flush(new HashMap<>());
}

protected final void retryIfNotSuccessful(Runnable r) {
retryIfNotSuccessful(r, DEFAULT_RETRY_SLEEP_TIME, DEFAULT_RETRY_ATTEMPTS);
}

@SuppressWarnings("java:S2925") // We're fine with the sleep call here, due to the nature of testing with kafka-junit
protected final void retryIfNotSuccessful(Runnable r, long sleepTime, int attempts) {
for (int i = 1; i <= attempts; i++) {
logger.info("Trying assertion, attempt " + i + " out of " + attempts);
try {
r.run();
return;
} catch (Throwable ex) {
if (i == attempts) {
throw ex;
}
logger.info("Assertion failed: " + ex.getMessage() + "; will sleep for " + sleepTime + " ms and try again");
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
// Ignore, not expected during a test
}
}
}
}

protected Map<String, Object> getTaskConfig() {
return taskConfig;
}
Expand Down

This file was deleted.

This file was deleted.

Loading
Loading