Skip to content

Commit 685a553

Browse files
committed
Start work on v1.9
Update the version. Cleanup a logging smell. Remove a deprecated function in several places.
1 parent 105506c commit 685a553

File tree

11 files changed

+10
-30
lines changed

11 files changed

+10
-30
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ dependencies {
4040
compileOnly "org.apache.kafka:connect-runtime:${kafkaVersion}"
4141
compileOnly "org.slf4j:slf4j-api:1.7.36"
4242

43-
implementation 'com.marklogic:ml-javaclient-util:4.6.0'
43+
implementation 'com.marklogic:ml-javaclient-util:4.7.0'
4444
// Force DHF to use the latest version of ml-app-deployer, which minimizes security vulnerabilities
4545
implementation "com.marklogic:ml-app-deployer:4.6.0"
4646

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ name: marklogic-kafka
44
services:
55

66
marklogic:
7-
image: "marklogicdb/marklogic-db:11.1.0-centos-1.1.0"
7+
image: "marklogicdb/marklogic-db:11.2.0-centos-1.1.2"
88
platform: linux/amd64
99
environment:
1010
- INSTALL_CONVERTERS=true

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
group=com.marklogic
2-
version=1.8.1
2+
version=1.9-SNAPSHOT
33

44
# For the Confluent Connector Archive
55
componentOwner=marklogic

src/main/java/com/marklogic/kafka/connect/sink/WriteBatcherSinkTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ public void processFailure(WriteBatch batch, Throwable throwable) {
314314

315315
private void logFailedWriteEvent(WriteEvent writeEvent, DocumentMetadataHandle metadata) {
316316
DocumentMetadataHandle.DocumentMetadataValues values = metadata.getMetadataValues();
317-
if (values != null) {
317+
if ((values != null) && logger.isErrorEnabled()) {
318318
logger.error("URI: {}; key: {}; partition: {}; offset: {}; timestamp: {}; topic: {}",
319319
writeEvent.getTargetUri(), values.get("kafka-key"), values.get("kafka-partition"),
320320
values.get("kafka-offset"), values.get("kafka-timestamp"), values.get("kafka-topic"));

src/main/java/com/marklogic/kafka/connect/source/DocumentWriteOperationBuilder.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package com.marklogic.kafka.connect.source;
1717

1818
import com.marklogic.client.document.DocumentWriteOperation;
19-
import com.marklogic.client.ext.util.DefaultDocumentPermissionsParser;
2019
import com.marklogic.client.impl.DocumentWriteOperationImpl;
2120
import com.marklogic.client.io.DocumentMetadataHandle;
2221
import com.marklogic.client.io.marker.AbstractWriteHandle;
@@ -43,7 +42,7 @@ public DocumentWriteOperation build(RecordContent recordContent) {
4342
metadata.getCollections().addAll(collections.trim().split(","));
4443
}
4544
if (StringUtils.hasText(permissions)) {
46-
new DefaultDocumentPermissionsParser().parsePermissions(permissions.trim(), metadata.getPermissions());
45+
metadata.getPermissions().addFromDelimitedString(permissions.trim());
4746
}
4847

4948
if (StringUtils.hasText(uriPrefix)) {

src/main/java/com/marklogic/kafka/connect/source/MarkLogicConstraintValueStore.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import com.marklogic.client.DatabaseClient;
1919
import com.marklogic.client.document.DocumentDescriptor;
2020
import com.marklogic.client.document.JSONDocumentManager;
21-
import com.marklogic.client.ext.util.DefaultDocumentPermissionsParser;
2221
import com.marklogic.client.io.DocumentMetadataHandle;
2322
import com.marklogic.client.io.Format;
2423
import com.marklogic.client.io.JacksonHandle;
@@ -29,7 +28,6 @@
2928
import java.util.Map;
3029

3130
public class MarkLogicConstraintValueStore extends ConstraintValueStore {
32-
private final DefaultDocumentPermissionsParser permissionsParser = new DefaultDocumentPermissionsParser();
3331
private final DatabaseClient databaseClient;
3432
private final String constraintStorageUri;
3533
private String constraintStoragePermissions;
@@ -69,7 +67,7 @@ public void storeConstraintState(String previousMaxConstraintColumnValue, int la
6967

7068
private void insertPermissions(DocumentMetadataHandle metadataHandle) {
7169
if (StringUtils.hasText(constraintStoragePermissions)) {
72-
permissionsParser.parsePermissions(constraintStoragePermissions, metadataHandle.getPermissions());
70+
metadataHandle.getPermissions().addFromDelimitedString(constraintStoragePermissions);
7371
}
7472
}
7573

src/main/java/com/marklogic/kafka/connect/source/MarkLogicSourceConfig.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package com.marklogic.kafka.connect.source;
1717

18-
import com.marklogic.client.ext.util.DefaultDocumentPermissionsParser;
1918
import com.marklogic.client.io.DocumentMetadataHandle;
2019
import com.marklogic.kafka.connect.MarkLogicConfig;
2120
import org.apache.kafka.common.config.ConfigDef;
@@ -51,7 +50,6 @@ enum OUTPUT_TYPE {JSON, XML, CSV}
5150
public static final String WAIT_TIME = "ml.source.waitTime";
5251

5352
public static final ConfigDef CONFIG_DEF = getConfigDef();
54-
private static final DefaultDocumentPermissionsParser permissionsParser = new DefaultDocumentPermissionsParser();
5553
private static final String GROUP = "MarkLogic Source Settings";
5654

5755
private static ConfigDef getConfigDef() {
@@ -113,7 +111,7 @@ public void ensureValid(String name, Object value) {
113111
if (StringUtils.hasText((String) value)) {
114112
DocumentMetadataHandle metadata = new DocumentMetadataHandle();
115113
try {
116-
permissionsParser.parsePermissions((String) value, metadata.getPermissions());
114+
metadata.getPermissions().addFromDelimitedString((String) value);
117115
} catch (IllegalArgumentException ex) {
118116
throw new ConfigException(ex.getMessage());
119117
}

src/test/avro/avroTestClass-schema.avsc

Lines changed: 0 additions & 15 deletions
This file was deleted.

src/test/java/com/marklogic/kafka/connect/sink/WriteAvroDataTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import static org.junit.jupiter.api.Assertions.assertEquals;
2121
import static org.junit.jupiter.api.Assertions.assertTrue;
2222

23-
public class WriteAvroDataTest extends AbstractIntegrationSinkTest {
23+
class WriteAvroDataTest extends AbstractIntegrationSinkTest {
2424

2525
/**
2626
* Verifies that an instance of an Avro-generated class - AvroTestClass - can be serialized to a byte array and

src/test/java/com/marklogic/kafka/connect/source/CreateTasksTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import static org.junit.jupiter.api.Assertions.assertEquals;
99

10-
public class CreateTasksTest extends AbstractIntegrationSourceTest {
10+
class CreateTasksTest extends AbstractIntegrationSourceTest {
1111

1212
@Test
1313
void taskCountGreaterThanOne() {

0 commit comments

Comments
 (0)