diff --git a/build.gradle b/build.gradle index 178efdb..e259172 100644 --- a/build.gradle +++ b/build.gradle @@ -41,7 +41,8 @@ ext { } dependencies { - compileOnly "org.apache.kafka:connect-runtime:${kafkaVersion}" + def kafkaConnectRuntime = "org.apache.kafka:connect-runtime:${kafkaVersion}" + compileOnly kafkaConnectRuntime compileOnly "org.slf4j:slf4j-api:1.7.36" // Force DHF to use the latest version of ml-app-deployer, which minimizes security vulnerabilities @@ -65,9 +66,7 @@ dependencies { testImplementation 'com.marklogic:marklogic-junit5:1.5.0' testImplementation "org.apache.kafka:connect-json:${kafkaVersion}" - - // Can be deleted when the disabled kafka-junit tests are deleted. - testImplementation 'net.mguenther.kafka:kafka-junit:3.6.0' + testImplementation kafkaConnectRuntime testImplementation "org.apache.avro:avro-compiler:1.12.0" diff --git a/src/test/java/com/marklogic/kafka/connect/sink/AbstractIntegrationSinkTest.java b/src/test/java/com/marklogic/kafka/connect/sink/AbstractIntegrationSinkTest.java index 882f6dc..54a6e97 100644 --- a/src/test/java/com/marklogic/kafka/connect/sink/AbstractIntegrationSinkTest.java +++ b/src/test/java/com/marklogic/kafka/connect/sink/AbstractIntegrationSinkTest.java @@ -28,9 +28,12 @@ import static com.marklogic.kafka.connect.sink.MarkLogicSinkConfig.*; /** - * Base class for any test that wishes to connect to the "kafka-test-test-content" app server on port 8019. - * AbstractSpringMarkLogicTest assumes it can find mlHost/mlTestRestPort/mlUsername/mlPassword properties in - * gradle.properties and gradle-local.properties. It uses those to construct a DatabaseClient which can be fetched + * Base class for any test that wishes to connect to the + * "kafka-test-test-content" app server on port 8019. + * AbstractSpringMarkLogicTest assumes it can find + * mlHost/mlTestRestPort/mlUsername/mlPassword properties in + * gradle.properties and gradle-local.properties. It uses those to construct a + * DatabaseClient which can be fetched * via getDatabaseClient(). */ public abstract class AbstractIntegrationSinkTest extends AbstractIntegrationTest { @@ -39,20 +42,21 @@ public abstract class AbstractIntegrationSinkTest extends AbstractIntegrationTes @Autowired SimpleTestConfig testConfig; - private final static long DEFAULT_RETRY_SLEEP_TIME = 250; - private final static int DEFAULT_RETRY_ATTEMPTS = 10; private Map taskConfig = new HashMap<>(); /** - * @param configParamNamesAndValues - Configuration values that need to be set for the test. - * @return a MarkLogicSinkTask based on the default connection config and any optional config params provided by - * the caller + * @param configParamNamesAndValues - Configuration values that need to be set + * for the test. + * @return a MarkLogicSinkTask based on the default connection config and any + * optional config params provided by + * the caller */ protected AbstractSinkTask startSinkTask(String... configParamNamesAndValues) { return startSinkTask(null, configParamNamesAndValues); } - protected AbstractSinkTask startSinkTask(BiConsumer errorReporterMethod, String... configParamNamesAndValues) { + protected AbstractSinkTask startSinkTask(BiConsumer errorReporterMethod, + String... configParamNamesAndValues) { Map config = newMarkLogicConfig(testConfig); config.put(MarkLogicSinkConfig.DOCUMENT_PERMISSIONS, "rest-reader,read,rest-writer,update"); for (int i = 0; i < configParamNamesAndValues.length; i += 2) { @@ -60,10 +64,12 @@ protected AbstractSinkTask startSinkTask(BiConsumer error } taskConfig.putAll(config); if (taskConfig.containsKey(DMSDK_INCLUDE_KAFKA_METADATA)) { - taskConfig.put(DMSDK_INCLUDE_KAFKA_METADATA, Boolean.valueOf((String) taskConfig.get(DMSDK_INCLUDE_KAFKA_METADATA))); + taskConfig.put(DMSDK_INCLUDE_KAFKA_METADATA, + Boolean.valueOf((String) taskConfig.get(DMSDK_INCLUDE_KAFKA_METADATA))); } if (taskConfig.containsKey(DOCUMENT_COLLECTIONS_ADD_TOPIC)) { - taskConfig.put(DOCUMENT_COLLECTIONS_ADD_TOPIC, Boolean.valueOf((String) taskConfig.get(DOCUMENT_COLLECTIONS_ADD_TOPIC))); + taskConfig.put(DOCUMENT_COLLECTIONS_ADD_TOPIC, + Boolean.valueOf((String) taskConfig.get(DOCUMENT_COLLECTIONS_ADD_TOPIC))); } MarkLogicSinkConnector connector = new MarkLogicSinkConnector(); @@ -92,31 +98,6 @@ protected void putAndFlushRecords(AbstractSinkTask task, SinkRecord... records) task.flush(new HashMap<>()); } - protected final void retryIfNotSuccessful(Runnable r) { - retryIfNotSuccessful(r, DEFAULT_RETRY_SLEEP_TIME, DEFAULT_RETRY_ATTEMPTS); - } - - @SuppressWarnings("java:S2925") // We're fine with the sleep call here, due to the nature of testing with kafka-junit - protected final void retryIfNotSuccessful(Runnable r, long sleepTime, int attempts) { - for (int i = 1; i <= attempts; i++) { - logger.info("Trying assertion, attempt " + i + " out of " + attempts); - try { - r.run(); - return; - } catch (Throwable ex) { - if (i == attempts) { - throw ex; - } - logger.info("Assertion failed: " + ex.getMessage() + "; will sleep for " + sleepTime + " ms and try again"); - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - // Ignore, not expected during a test - } - } - } - } - protected Map getTaskConfig() { return taskConfig; } diff --git a/src/test/java/com/marklogic/kafka/connect/sink/SendWriteFailureRecordsToDlqKafkaTest.java b/src/test/java/com/marklogic/kafka/connect/sink/SendWriteFailureRecordsToDlqKafkaTest.java deleted file mode 100644 index 3bc8624..0000000 --- a/src/test/java/com/marklogic/kafka/connect/sink/SendWriteFailureRecordsToDlqKafkaTest.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Copyright (c) 2023 MarkLogic Corporation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.marklogic.kafka.connect.sink; - -import com.marklogic.junit5.spring.SimpleTestConfig; -import kafka.server.KafkaConfig$; -import net.mguenther.kafka.junit.EmbeddedKafkaCluster; -import net.mguenther.kafka.junit.KeyValue; -import net.mguenther.kafka.junit.ReadKeyValues; -import net.mguenther.kafka.junit.SendKeyValues; -import org.apache.kafka.common.header.Headers; -import org.junit.jupiter.api.*; -import org.springframework.beans.factory.annotation.Autowired; - -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.UUID; - -import static com.marklogic.kafka.connect.sink.AbstractSinkTask.MARKLOGIC_MESSAGE_EXCEPTION_MESSAGE; -import static com.marklogic.kafka.connect.sink.AbstractSinkTask.MARKLOGIC_MESSAGE_FAILURE_HEADER; -import static com.marklogic.kafka.connect.sink.AbstractSinkTask.MARKLOGIC_ORIGINAL_TOPIC; -import static com.marklogic.kafka.connect.sink.AbstractSinkTask.MARKLOGIC_WRITE_FAILURE; -import static net.mguenther.kafka.junit.EmbeddedConnectConfig.kafkaConnect; -import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith; -import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.newClusterConfig; -import static net.mguenther.kafka.junit.ObserveKeyValues.on; - -class SendWriteFailureRecordsToDlqKafkaTest extends AbstractIntegrationSinkTest { - - // Declared by AbstractSpringMarkLogicTest - @Autowired - private SimpleTestConfig testConfig; - - private final String ML_COLLECTION = "kafka-data"; - private final String TOPIC = "test-topic"; - private final String DLQ_TOPIC = "test-dlq"; - private final String KEY = String.format("key-%s", UUID.randomUUID()); - private final Integer NUM_RECORDS = 1; - - private EmbeddedKafkaCluster kafka; - - @BeforeEach - void setupKafka() { - provisionKafkaWithConnectAndMarkLogicConnector(); - kafka.start(); - } - - @AfterEach - void tearDownKafka() { - kafka.stop(); - } - - @Test - @Disabled("This test is disabled because kafka-junit is not compatible with kafka > 3.6.0") - void failedBatchesShouldGoToTheDlq() throws InterruptedException { - sendSomeJsonMessages(NUM_RECORDS); - - assertMessageOnDlqAndHasExpectedHeaders(DLQ_TOPIC, (NUM_RECORDS)); - } - - private void provisionKafkaWithConnectAndMarkLogicConnector() { - kafka = provisionWith( - newClusterConfig() - .configure( - kafkaConnect() - .deployConnector(connectorConfig(TOPIC, KEY)) - .with(KafkaConfig$.MODULE$.NumPartitionsProp(), "5") - ) - ); - } - - private Properties connectorConfig(final String topic, final String key) { - return MarkLogicSinkConnectorConfigBuilder.create() - .withTopic(topic) - .withKey(key) - .with(MarkLogicSinkConfig.CONNECTION_HOST, testConfig.getHost()) - .with(MarkLogicSinkConfig.CONNECTION_PORT, testConfig.getRestPort()) - .with(MarkLogicSinkConfig.CONNECTION_USERNAME, "kafka-unprivileged-user") - .with(MarkLogicSinkConfig.CONNECTION_PASSWORD, "kafkatest") - .with(MarkLogicSinkConfig.DOCUMENT_COLLECTIONS, ML_COLLECTION) - .with(MarkLogicSinkConfig.DMSDK_THREAD_COUNT, 1) - .with(MarkLogicSinkConfig.DMSDK_BATCH_SIZE, 100) - .with("errors.deadletterqueue.topic.name", DLQ_TOPIC) - .with("errors.deadletterqueue.topic.replication.factor", 1) - .with("value.converter", "org.apache.kafka.connect.json.JsonConverter") - .with("errors.tolerance", "all") - .with("value.converter.schemas.enable", false) - .build(); - } - - private void sendSomeJsonMessages(Integer numberOfRecords) throws InterruptedException { - List> records = new ArrayList<>(); - for (int i = 0; i < numberOfRecords; i++) { - records.add(new KeyValue<>("aggregate", "{\"A\": \"" + i + "\"}")); - } - kafka.send(SendKeyValues.to(TOPIC, records)); - } - - private void assertMessageOnDlqAndHasExpectedHeaders(String topic, Integer numRecords) throws InterruptedException { - kafka.observe(on(topic, numRecords)); - - Headers headers = kafka.read(ReadKeyValues.from(topic)) - .stream() - .findFirst() - .map(KeyValue::getHeaders) - .orElseThrow(() -> new RuntimeException("No records found.")); - - Assertions.assertEquals(MARKLOGIC_WRITE_FAILURE, - new String(headers.headers(MARKLOGIC_MESSAGE_FAILURE_HEADER).iterator().next().value()), - "The failure reason on the DLQ message was not what was expected"); - Assertions.assertTrue( - new String(headers.headers(MARKLOGIC_MESSAGE_EXCEPTION_MESSAGE).iterator().next().value()) - .startsWith("Local message: failed to apply resource at documents: Internal Server Error. Server Message: SEC-COLPERM:"), - "The exception message on the DLQ message was not what was expected"); - Assertions.assertEquals(TOPIC, - new String(headers.headers(MARKLOGIC_ORIGINAL_TOPIC).iterator().next().value()), - "The original topic on the DLQ message was not what was expected"); - } -} diff --git a/src/test/java/com/marklogic/kafka/connect/sink/WriteFromKafkaTest.java b/src/test/java/com/marklogic/kafka/connect/sink/WriteFromKafkaTest.java deleted file mode 100644 index e595741..0000000 --- a/src/test/java/com/marklogic/kafka/connect/sink/WriteFromKafkaTest.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright (c) 2023 MarkLogic Corporation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.marklogic.kafka.connect.sink; - -import com.marklogic.client.io.SearchHandle; -import com.marklogic.client.query.StructuredQueryBuilder; -import com.marklogic.client.query.StructuredQueryDefinition; -import kafka.server.KafkaConfig$; -import net.mguenther.kafka.junit.EmbeddedKafkaCluster; -import net.mguenther.kafka.junit.KeyValue; -import net.mguenther.kafka.junit.SendKeyValues; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; - -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.UUID; - -import static net.mguenther.kafka.junit.EmbeddedConnectConfig.kafkaConnect; -import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith; -import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.newClusterConfig; -import static org.junit.jupiter.api.Assertions.assertEquals; - -class WriteFromKafkaTest extends AbstractIntegrationSinkTest { - - private final String ML_COLLECTION = "kafka-data"; - private final String TOPIC = "test-topic"; - private final String KEY = String.format("key-%s", UUID.randomUUID()); - - private EmbeddedKafkaCluster kafka; - - @BeforeEach - void setupKafka() { - provisionKafkaWithConnectAndMarkLogicConnector(); - kafka.start(); - } - - @AfterEach - void tearDownKafka() { - kafka.stop(); - } - - @Test - @Disabled("This test is disabled because kafka-junit is not compatible with kafka > 3.6.0") - void shouldWaitForKeyedRecordsToBePublished() throws InterruptedException { - Integer NUM_RECORDS = 2; - sendSomeJsonMessages(NUM_RECORDS); - retryIfNotSuccessful(() -> assertMarkLogicDocumentsExistInCollection(ML_COLLECTION, NUM_RECORDS, - format("Expected to find %d records in the ML database", NUM_RECORDS))); - } - - private void provisionKafkaWithConnectAndMarkLogicConnector() { - kafka = provisionWith( - newClusterConfig() - .configure( - kafkaConnect() - .deployConnector(connectorConfig(TOPIC, KEY)) - .with(KafkaConfig$.MODULE$.NumPartitionsProp(), "5") - ) - ); - } - - private Properties connectorConfig(final String topic, final String key) { - return MarkLogicSinkConnectorConfigBuilder.create() - .withTopic(topic) - .withKey(key) - .with(MarkLogicSinkConfig.CONNECTION_HOST, testConfig.getHost()) - .with(MarkLogicSinkConfig.CONNECTION_PORT, testConfig.getRestPort()) - .with(MarkLogicSinkConfig.CONNECTION_USERNAME, testConfig.getUsername()) - .with(MarkLogicSinkConfig.CONNECTION_PASSWORD, testConfig.getPassword()) - .with(MarkLogicSinkConfig.DOCUMENT_COLLECTIONS, ML_COLLECTION) - .with(MarkLogicSinkConfig.DMSDK_BATCH_SIZE, 1) - .build(); - } - - private void sendSomeJsonMessages(Integer numberOfRecords) throws InterruptedException { - List> records = new ArrayList<>(); - for (int i = 0; i < numberOfRecords; i++) { - records.add(new KeyValue<>("aggregate", "{\"A\": \"" + i + "\"}")); - } - kafka.send(SendKeyValues.to(TOPIC, records)); - } - - private void assertMarkLogicDocumentsExistInCollection(String collection, Integer numRecords, String message) { - StructuredQueryBuilder qb = new StructuredQueryBuilder(); - StructuredQueryDefinition queryDefinition = qb.collection(collection); - SearchHandle results = getDatabaseClient().newQueryManager().search(queryDefinition, new SearchHandle()); - assertEquals(numRecords.longValue(), results.getTotalResults(), message); - } -} diff --git a/src/test/java/com/marklogic/kafka/connect/sink/WriteTransformDocumentTest.java b/src/test/java/com/marklogic/kafka/connect/sink/WriteTransformDocumentTest.java deleted file mode 100644 index 88cdc8f..0000000 --- a/src/test/java/com/marklogic/kafka/connect/sink/WriteTransformDocumentTest.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright (c) 2023 MarkLogic Corporation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.marklogic.kafka.connect.sink; - -import com.marklogic.client.io.SearchHandle; -import com.marklogic.client.query.StructuredQueryBuilder; -import com.marklogic.client.query.StructuredQueryDefinition; -import kafka.server.KafkaConfig$; -import net.mguenther.kafka.junit.EmbeddedKafkaCluster; -import net.mguenther.kafka.junit.KeyValue; -import net.mguenther.kafka.junit.SendKeyValues; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; - -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.UUID; - -import static net.mguenther.kafka.junit.EmbeddedConnectConfig.kafkaConnect; -import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith; -import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.newClusterConfig; -import static org.junit.jupiter.api.Assertions.assertEquals; - -class WriteTransformDocumentTest extends AbstractIntegrationSinkTest { - - private final String ML_COLLECTION = "kafka-data"; - private final String TOPIC = "test-topic"; - private final String KEY = String.format("key-%s", UUID.randomUUID()); - - private EmbeddedKafkaCluster kafka; - - @BeforeEach - void setupKafka() { - provisionKafkaWithConnectAndMarkLogicConnector(); - kafka.start(); - } - - @AfterEach - void tearDownKafka() { - kafka.stop(); - } - - @Test - @Disabled("This test is disabled because kafka-junit is not compatible with kafka > 3.6.0") - void shouldWaitForKeyedRecordsToBePublished() throws InterruptedException { - Integer NUM_RECORDS = 2; - sendSomeJsonMessages(NUM_RECORDS); - retryIfNotSuccessful(() -> assertMarkLogicDocumentsExistInCollection(ML_COLLECTION, NUM_RECORDS, - format("Expected to find %d records in the ML database", NUM_RECORDS))); - } - - private void provisionKafkaWithConnectAndMarkLogicConnector() { - kafka = provisionWith( - newClusterConfig() - .configure( - kafkaConnect() - .deployConnector(connectorConfig(TOPIC, KEY)) - .with(KafkaConfig$.MODULE$.NumPartitionsProp(), "5") - ) - ); - } - - private Properties connectorConfig(final String topic, final String key) { - return MarkLogicSinkConnectorConfigBuilder.create() - .withTopic(topic) - .withKey(key) - .with(MarkLogicSinkConfig.CONNECTION_HOST, testConfig.getHost()) - .with(MarkLogicSinkConfig.CONNECTION_PORT, testConfig.getRestPort()) - .with(MarkLogicSinkConfig.CONNECTION_USERNAME, testConfig.getUsername()) - .with(MarkLogicSinkConfig.CONNECTION_PASSWORD, testConfig.getPassword()) - .with(MarkLogicSinkConfig.DOCUMENT_COLLECTIONS, ML_COLLECTION) - .with(MarkLogicSinkConfig.DMSDK_BATCH_SIZE, 1) - .with(MarkLogicSinkConfig.DMSDK_TRANSFORM, "exampleTransform") - .build(); - } - - private void sendSomeJsonMessages(Integer numberOfRecords) throws InterruptedException { - List> records = new ArrayList<>(); - for (int i = 0; i < numberOfRecords; i++) { - records.add(new KeyValue<>("aggregate", "{\"A\": \"" + i + "\"}")); - } - kafka.send(SendKeyValues.to(TOPIC, records)); - } - - private void assertMarkLogicDocumentsExistInCollection(String collection, Integer numRecords, String message) { - StructuredQueryBuilder qb = new StructuredQueryBuilder(); - StructuredQueryDefinition queryDefinition = qb.collection(collection).withCriteria("Chartreuse"); - SearchHandle results = getDatabaseClient().newQueryManager().search(queryDefinition, new SearchHandle()); - assertEquals(numRecords.longValue(), results.getTotalResults(), message); - } -} diff --git a/src/test/java/com/marklogic/kafka/connect/source/ReadRowsViaOpticDslKafkaTest.java b/src/test/java/com/marklogic/kafka/connect/source/ReadRowsViaOpticDslKafkaTest.java deleted file mode 100644 index 7ac3da8..0000000 --- a/src/test/java/com/marklogic/kafka/connect/source/ReadRowsViaOpticDslKafkaTest.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright (c) 2023 MarkLogic Corporation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.marklogic.kafka.connect.source; - -import kafka.server.KafkaConfig$; -import net.mguenther.kafka.junit.EmbeddedConnectConfig; -import net.mguenther.kafka.junit.EmbeddedKafkaCluster; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; - -import java.util.Properties; - -import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith; -import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.newClusterConfig; -import static net.mguenther.kafka.junit.ObserveKeyValues.on; -import static net.mguenther.kafka.junit.TopicConfig.withName; -import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_DELETE; - -class ReadRowsViaOpticDslKafkaTest extends AbstractIntegrationSourceTest { - - private EmbeddedKafkaCluster kafka; - - @BeforeEach - void setup() { - loadFifteenAuthorsIntoMarkLogic(); - setupKafka(); - } - - @AfterEach - void tearDownKafka() { - kafka.stop(); - } - - @SuppressWarnings("java:S2699") // The assertion happens via kafka.observe - @Test - @Disabled("This test is disabled because kafka-junit is not compatible with kafka > 3.6.0") - void shouldWaitForKeyedRecordsToBePublished() throws InterruptedException { - kafka.observe(on(AUTHORS_TOPIC, 15)); - } - - void setupKafka() { - provisionKafkaWithConnectAndMarkLogicConnector(); - kafka.start(); - kafka.createTopic( - withName(AUTHORS_TOPIC) - .with(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_DELETE) - .build() - ); - } - - private void provisionKafkaWithConnectAndMarkLogicConnector() { - kafka = provisionWith( - newClusterConfig() - .configure( - EmbeddedConnectConfig.kafkaConnect() - .deployConnector(sourceConnectorConfig(AUTHORS_TOPIC, AUTHORS_OPTIC_DSL)) - .with(KafkaConfig$.MODULE$.NumPartitionsProp(), "5") - ) - ); - } - - private Properties sourceConnectorConfig(final String topic, final String opticDsl) { - return MarkLogicSourceConnectorConfigBuilder.create() - .withTopic(topic) - .withDsl(opticDsl) - .with(MarkLogicSourceConfig.WAIT_TIME, 0) - .with(MarkLogicSourceConfig.CONNECTION_HOST, testConfig.getHost()) - .with(MarkLogicSourceConfig.CONNECTION_PORT, testConfig.getRestPort()) - .with(MarkLogicSourceConfig.CONNECTION_USERNAME, testConfig.getUsername()) - .with(MarkLogicSourceConfig.CONNECTION_PASSWORD, testConfig.getPassword()) - .build(); - } -}