Skip to content

Commit 9c8a2ca

Browse files
conker84jexp
authored andcommitted
fixes #241: Temporary disable DLQ from the connect plugin (#242)
1 parent 5646428 commit 9c8a2ca

File tree

2 files changed

+19
-15
lines changed

2 files changed

+19
-15
lines changed

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkTask.kt

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ class Neo4jSinkTask : SinkTask() {
1717
private val log: Logger = LoggerFactory.getLogger(Neo4jSinkTask::class.java)
1818
private lateinit var config: Neo4jSinkConnectorConfig
1919
private lateinit var neo4jService: Neo4jService
20-
private lateinit var errorService: ErrorService
20+
// private lateinit var errorService: ErrorService
2121

2222
override fun version(): String {
2323
return VersionUtil.version(this.javaClass as Class<*>)
@@ -28,27 +28,27 @@ class Neo4jSinkTask : SinkTask() {
2828

2929
val kafkaConfig = Properties()
3030
kafkaConfig.putAll(map)
31-
this.errorService = KafkaErrorService(kafkaConfig, ErrorService.ErrorConfig.from(kafkaConfig), log::error)
31+
// this.errorService = KafkaErrorService(kafkaConfig, ErrorService.ErrorConfig.from(kafkaConfig), log::error)
3232
this.neo4jService = Neo4jService(this.config)
3333
}
3434

3535
override fun put(collection: Collection<SinkRecord>) = runBlocking(Dispatchers.IO) {
3636
if (collection.isEmpty()) {
3737
return@runBlocking
3838
}
39-
try {
40-
val data = EventBuilder()
41-
.withBatchSize(config.batchSize)
42-
.withTopics(config.topics.allTopics())
43-
.withSinkRecords(collection)
44-
.build()
45-
46-
neo4jService.writeData(data)
47-
} catch(e:Exception) {
48-
errorService.report(collection.map {
49-
ErrorData(it.topic(), it.timestamp(),it.key(), it.value(), it.kafkaPartition(), it.kafkaOffset(), this::class.java,e)
50-
})
51-
}
39+
// try {
40+
val data = EventBuilder()
41+
.withBatchSize(config.batchSize)
42+
.withTopics(config.topics.allTopics())
43+
.withSinkRecords(collection)
44+
.build()
45+
46+
neo4jService.writeData(data)
47+
// } catch(e:Exception) {
48+
// errorService.report(collection.map {
49+
// ErrorData(it.topic(), it.timestamp(),it.key(), it.value(), it.kafkaPartition(), it.kafkaOffset(), this::class.java,e)
50+
// })
51+
// }
5252
}
5353

5454
override fun stop() {

kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkTaskTest.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import org.apache.kafka.connect.sink.SinkTask
99
import org.apache.kafka.connect.sink.SinkTaskContext
1010
import org.junit.After
1111
import org.junit.Before
12+
import org.junit.Ignore
1213
import org.junit.Test
1314
import org.mockito.Mockito.mock
1415
import org.neo4j.graphdb.Label
@@ -388,6 +389,7 @@ class Neo4jSinkTaskTest {
388389
}
389390

390391
@Test
392+
@Ignore("temporary disabled the DLQ")
391393
fun `should report but not fail parsing data`() {
392394
val topic = "neotopic"
393395
val props = mutableMapOf<String, String>()
@@ -410,6 +412,7 @@ class Neo4jSinkTaskTest {
410412
}
411413

412414
@Test
415+
@Ignore("temporary disabled the DLQ")
413416
fun `should report but not fail invalid schema`() {
414417
val topic = "neotopic"
415418
val props = mutableMapOf<String, String>()
@@ -432,6 +435,7 @@ class Neo4jSinkTaskTest {
432435
}
433436

434437
@Test
438+
@Ignore("temporary disabled the DLQ")
435439
fun `should fail running invalid cypher`() {
436440
val topic = "neotopic"
437441
val props = mutableMapOf<String, String>()

0 commit comments

Comments
 (0)