Skip to content

Commit bd13cf7

Browse files
authored
Merge pull request #153 from marklogic/feature/biju-feedback
Little bits of beta feedback
2 parents 16b4ecc + 8925591 commit bd13cf7

File tree

4 files changed

+43
-10
lines changed

4 files changed

+43
-10
lines changed

README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,10 +188,16 @@ data:
188188
- `topic` = the topic identified by `ml.source.topic`
189189

190190
The same query can also be executed by defining a serialized version of the plan; this can be useful when the plan has
191-
been constructed already via the MarkLogic Java Client:
191+
been constructed in another tool already, such as the MarkLogic Java Client, and can then be exported into its
192+
serialized form:
192193

193194
ml.source.optic.serialized={"$optic": {"ns": "op", "fn": "operators", "args": [{"ns": "op", "fn": "from-view", "args": ["demo", "purchases"]}]}}
194195

196+
**Warning** - the Kafka `tasks.max` property is ignored by the MarkLogic Kafka source connector. Running 2 or more tasks
197+
with the same configuration would produce 2 or more copies of the same records and may also lead to inconsistent
198+
results when using a constraint column. If you have a valid scenario for setting this property to 2 or higher, please
199+
file an issue with your use case.
200+
195201
### Selecting an output type
196202

197203
By default, the connector will return each row as JSON. As mentioned above, it is recommended to configure the Kafka

build.gradle

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ plugins {
55
id "com.github.jk1.dependency-license-report" version "1.3"
66

77
// Only used for testing
8-
id 'com.marklogic.ml-gradle' version '4.3.7'
8+
id 'com.marklogic.ml-gradle' version '4.5.0'
99
id 'jacoco'
1010
id "org.sonarqube" version "3.5.0.2730"
1111
}
@@ -47,12 +47,9 @@ dependencies {
4747
compileOnly "org.apache.kafka:connect-runtime:${kafkaVersion}"
4848
compileOnly "org.slf4j:slf4j-api:1.7.36"
4949

50-
implementation ('com.marklogic:ml-javaclient-util:4.5-SNAPSHOT') {
51-
changing = true
52-
}
53-
50+
implementation 'com.marklogic:ml-javaclient-util:4.5.0'
5451
// Force DHF to use the latest version of ml-app-deployer, which minimizes security vulnerabilities
55-
implementation "com.marklogic:ml-app-deployer:4.5-SNAPSHOT"
52+
implementation "com.marklogic:ml-app-deployer:4.5.0"
5653

5754
implementation "com.fasterxml.jackson.dataformat:jackson-dataformat-csv:2.14.1"
5855

src/main/java/com/marklogic/kafka/connect/source/MarkLogicSourceConnector.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,13 @@ public Class<? extends Task> taskClass() {
7171

7272
@Override
7373
public List<Map<String, String>> taskConfigs(final int taskCount) {
74-
final List<Map<String, String>> configs = new ArrayList<>(taskCount);
75-
for (int i = 0; i < taskCount; ++i) {
76-
configs.add(config);
74+
if (taskCount > 1) {
75+
logger.warn("As of the 1.8.0 release, the Kafka tasks.max property is ignored and a single source connector " +
76+
"task is created. This prevents duplicate records from being created via multiple instances of " +
77+
"the task with the exact same config.");
7778
}
79+
final List<Map<String, String>> configs = new ArrayList<>(1);
80+
configs.add(config);
7881
return configs;
7982
}
8083

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.marklogic.kafka.connect.source;
2+
3+
import org.junit.jupiter.api.Test;
4+
5+
import java.util.List;
6+
import java.util.Map;
7+
8+
import static org.junit.jupiter.api.Assertions.assertEquals;
9+
10+
public class CreateTasksTest extends AbstractIntegrationSourceTest {
11+
12+
@Test
13+
void taskCountGreaterThanOne() {
14+
Map<String, String> config = newMarkLogicConfig(testConfig);
15+
config.put(MarkLogicSourceConfig.DSL_QUERY, AUTHORS_OPTIC_DSL);
16+
MarkLogicSourceConnector connector = new MarkLogicSourceConnector();
17+
connector.start(config);
18+
19+
List configs = connector.taskConfigs(2);
20+
assertEquals(1, configs.size(),
21+
"For the 1.8.0 release, no matter what the users sets the Kafka tasks.max property to, we are only " +
22+
"supporting one task to prevent the user from a configuration that we do not believe will ever be " +
23+
"valid. That is - 2+ tasks with the same Optic query and config will produce 2+ copies of every row, " +
24+
"and it will also encounter race conditions if the constraint column config is stored in MarkLogic. " +
25+
"This restriction can be relaxed in the future if users identify valid scenarios for having 2+ tasks.");
26+
}
27+
}

0 commit comments

Comments
 (0)