11package com .marklogic .kafka .connect .sink ;
22
3- import com .google . gson . JsonObject ;
4- import com .google . gson . JsonParser ;
3+ import com .fasterxml . jackson . databind . JsonNode ;
4+ import com .fasterxml . jackson . databind . ObjectMapper ;
55import com .marklogic .client .document .DocumentWriteOperation ;
66import com .marklogic .client .io .BytesHandle ;
77import com .marklogic .client .io .DocumentMetadataHandle ;
2020 */
2121public class ConvertSinkRecordTest {
2222
23- DefaultSinkRecordConverter converter ;
24- MarkLogicSinkTask markLogicSinkTask = new MarkLogicSinkTask ();
25- private JsonObject doc1 , doc2 , doc3 ;
23+ private DefaultSinkRecordConverter converter ;
24+ private final MarkLogicSinkTask markLogicSinkTask = new MarkLogicSinkTask ();
2625
2726 @ Test
28- public void allPropertiesSet () throws IOException {
27+ public void allPropertiesSet () {
2928 Map <String , Object > config = new HashMap <>();
3029 config .put ("ml.document.collections" , "one,two" );
3130 config .put ("ml.document.format" , "json" );
@@ -57,10 +56,9 @@ public void allPropertiesSet() throws IOException {
5756 }
5857
5958 @ Test
60- public void noPropertiesSet () throws IOException {
59+ public void noPropertiesSet () {
6160 Map <String , Object > kafkaConfig = new HashMap <String , Object >();
6261 converter = new DefaultSinkRecordConverter (kafkaConfig );
63- converter .getDocumentWriteOperationBuilder ();
6462
6563 DocumentWriteOperation op = converter .convert (newSinkRecord ("doesn't matter" ));
6664
@@ -73,11 +71,10 @@ public void noPropertiesSet() throws IOException {
7371 }
7472
7573 @ Test
76- public void UriWithUUIDStrategy () throws IOException {
74+ public void uriWithUUIDStrategy () {
7775 Map <String , Object > kafkaConfig = new HashMap <String , Object >();
7876 kafkaConfig .put ("ml.id.strategy" , "UUID" );
7977 converter = new DefaultSinkRecordConverter (kafkaConfig );
80- converter .getDocumentWriteOperationBuilder ();
8178
8279 DocumentWriteOperation op = converter .convert (newSinkRecord ("doesn't matter" ));
8380
@@ -90,10 +87,9 @@ public void UriWithUUIDStrategy() throws IOException {
9087 }
9188
9289 @ Test
93- public void UriWithDefaultStrategy () throws IOException {
90+ public void uriWithDefaultStrategy () {
9491 Map <String , Object > kafkaConfig = new HashMap <String , Object >();
9592 converter = new DefaultSinkRecordConverter (kafkaConfig );
96- converter .getDocumentWriteOperationBuilder ();
9793
9894 DocumentWriteOperation op = converter .convert (newSinkRecord ("doesn't matter" ));
9995
@@ -106,11 +102,10 @@ public void UriWithDefaultStrategy() throws IOException {
106102 }
107103
108104 @ Test
109- public void UriWithKafkaMetaData () throws IOException {
105+ public void uriWithKafkaMetaData () {
110106 Map <String , Object > kafkaConfig = new HashMap <String , Object >();
111107 kafkaConfig .put ("ml.id.strategy" , "KAFKA_META_WITH_SLASH" );
112108 converter = new DefaultSinkRecordConverter (kafkaConfig );
113- converter .getDocumentWriteOperationBuilder ();
114109
115110 DocumentWriteOperation op = converter .convert (newSinkRecord ("doesn't matter" ));
116111
@@ -122,14 +117,12 @@ public void UriWithKafkaMetaData() throws IOException {
122117 }
123118
124119 @ Test
125- public void UriWithJsonPath () throws IOException {
126- JsonParser parser = new JsonParser ();
127- doc1 = parser .parse ("{\" f1\" :\" 100\" }" ).getAsJsonObject ();
120+ public void uriWithJsonPath () throws IOException {
121+ JsonNode doc1 = new ObjectMapper ().readTree ("{\" f1\" :\" 100\" }" );
128122 Map <String , Object > kafkaConfig = new HashMap <String , Object >();
129123 kafkaConfig .put ("ml.id.strategy" , "JSONPATH" );
130124 kafkaConfig .put ("ml.id.strategy.paths" , "/f1" );
131125 converter = new DefaultSinkRecordConverter (kafkaConfig );
132- converter .getDocumentWriteOperationBuilder ();
133126
134127 DocumentWriteOperation op = converter .convert (newSinkRecord (doc1 ));
135128
@@ -142,14 +135,12 @@ public void UriWithJsonPath() throws IOException {
142135 }
143136
144137 @ Test
145- public void UriWithHashedJsonPaths () throws IOException {
146- JsonParser parser = new JsonParser ();
147- doc1 = parser .parse ("{\" f1\" :\" 100\" ,\" f2\" :\" 200\" }" ).getAsJsonObject ();
138+ public void uriWithHashedJsonPaths () throws IOException {
139+ JsonNode doc1 = new ObjectMapper ().readTree ("{\" f1\" :\" 100\" ,\" f2\" :\" 200\" }" );
148140 Map <String , Object > kafkaConfig = new HashMap <String , Object >();
149141 kafkaConfig .put ("ml.id.strategy" , "HASH" );
150142 kafkaConfig .put ("ml.id.strategy.paths" , "/f1,/f2" );
151143 converter = new DefaultSinkRecordConverter (kafkaConfig );
152- converter .getDocumentWriteOperationBuilder ();
153144
154145 DocumentWriteOperation op = converter .convert (newSinkRecord (doc1 ));
155146
@@ -162,11 +153,10 @@ public void UriWithHashedJsonPaths() throws IOException {
162153 }
163154
164155 @ Test
165- public void UriWithHashedKafkaMeta () throws IOException {
156+ public void uriWithHashedKafkaMeta () {
166157 Map <String , Object > kafkaConfig = new HashMap <String , Object >();
167158 kafkaConfig .put ("ml.id.strategy" , "KAFKA_META_HASHED" );
168159 converter = new DefaultSinkRecordConverter (kafkaConfig );
169- converter .getDocumentWriteOperationBuilder ();
170160
171161 DocumentWriteOperation op = converter .convert (newSinkRecord ("doesn't matter" ));
172162
@@ -179,7 +169,7 @@ public void UriWithHashedKafkaMeta() throws IOException {
179169 }
180170
181171 @ Test
182- public void binaryContent () throws IOException {
172+ public void binaryContent () {
183173 converter = new DefaultSinkRecordConverter (new HashMap <>());
184174
185175 DocumentWriteOperation op = converter .convert (newSinkRecord ("hello world" .getBytes ()));
0 commit comments