-
Notifications
You must be signed in to change notification settings - Fork 72
connector wont restart after i stop the docker directly #743
Description
Describe the bug
The very first time or after I do $ curl -X DELETE http://localhost:8083/connectors/connect_file_pulse I am able to start the connector and it process the files. But if I stop the docker desktop direclty without the /stop api endpoint it wont start again. I get this error
org.apache.kafka.connect.errors.ConnectException: Failed to start connector: connect_file_pulse
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$startConnector$41(DistributedHerder.java:2045)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:360)
at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:144)
at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:122)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to transition connector connect_file_pulse to state STARTED
... 9 more
Caused by: java.lang.IllegalStateException: Cannot init again.
at io.streamthoughts.kafka.connect.filepulse.storage.KafkaStateBackingStore.start(KafkaStateBackingStore.java:84)
at io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore.start(KafkaFileObjectStateBackingStore.java:91)
at io.streamthoughts.kafka.connect.filepulse.fs.DefaultFileSystemMonitor.<init>(DefaultFileSystemMonitor.java:164)
at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector.createFileSystemMonitor(FilePulseSourceConnector.java:129)
at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector.start(FilePulseSourceConnector.java:102)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:192)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:217)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:376)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:357)
... 8 more
To Reproduce
I am doing a POC using the connect-file-pulse-quickstart-csv.json but with slight changes. See config below. I have downloaded the latest streamthoughts-kafka-connect-file-pulse-2.16.0.zip connector.
{
connector.class: "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
fs.listing.filters: "io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter",
filters.ParseCSVLine.separator: ",",
skip.headers: "1",
tasks.max: "1",
tasks.file.status.storage.topic: "connect-file-pulse-status",
offset.attributes.string: "name",
tasks.file.status.storage.topic.replication.factor: "1",
tasks.file.status.storage.security.protocol: "SSL",
fs.listing.interval.ms: "10000",
filters.ParseCSVLine.trim.column: "true",
fs.listing.class: "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
tasks.reader.class: "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader",
filters: "ParseCSVLine",
fs.listing.directory.path: "/tmp/kafka-connect/examples/",
tasks.file.status.storage.bootstrap.servers: "b-1.dev-managed-kafka-cdc........com:9094",
tasks.file.status.storage.topic.partitions: "2",
fs.cleanup.policy.class: "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",
fs.cleanup.policy.triggered.on: "COMMITTED",
name: "connect_file_pulse",
filters.ParseCSVLine.type: "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter",
topic: "event.s3.source",
filters.ParseCSVLine.extract.column.name: "headers",
offset.policy.class: "io.streamthoughts.kafka.connect.filepulse.offset.DefaultSourceOffsetPolicy",
internal.kafka.reporter.id: "newId22",
file.filter.regex.pattern: ".*\.csv$",
tasks.file.status.storage.class: "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore"
}
Expected behavior
It should easily start from the last offset like other confluent connectors.
Screenshots
If applicable, add screenshots to help explain your problem.
Additional context
I saw one of the older issue where they said that it was happening only on dev but in my case I think it will be a problem even for Prod since our CICD deployments dont run stop either. They just kill the container and start a new one.