What version of the Stream Reactor are you reporting this issue for?
AWS S3 Sink and Source 8.1.35
Are you running the correct version of Kafka/Confluent for the Stream Reactor release?
We are running this on Amazon MSK Connect with Kafka Connect 3.7.x against an MSK Kafka cluster <redacted-msk-cluster>.
Do you have a supported version of the data source/sink?
Yes.
- AWS S3
- Amazon MSK / MSK Connect
Have you read the docs?
Yes.
We are following the documented full-message envelope path with:
store.envelope=true
ByteArrayConverter for key and value
AVRO / PARQUET for the archive format
What is the expected behaviour?
We expect to be able to back up and restore a bytes topic using the full Kafka envelope:
- key
- value
- headers
- metadata
At minimum, we expect this to work for one of these documented subsets:
headers=true, metadata=false
headers=false, metadata=true
For restore, we expect the S3 source connector to replay the archived records into a Kafka topic when the target topic exists and the connector has topic write permission.
What was observed?
We tested on bytes topic <bytes-topic>.
1. Full-envelope sink fails on a real hyphenated header name
The topic contains a real Kafka header named:
timestamp-unix-milliseconds
The following sink variants all fail in the same general way:
- full-envelope AVRO with metadata enabled
- full-envelope PARQUET with headers and metadata enabled
- bounded sink isolation test with:
headers=true
metadata=false
Observed error:
org.apache.avro.SchemaParseException: Illegal character in: timestamp-unix-milliseconds
So the bytes envelope path appears to fail when a propagated header or generated field name contains -.
2. Metadata-only sink works
The bounded sink isolation test with:
headers=false
metadata=true
did succeed as a sink.
Observed result:
- the metadata-only sink test connector became
RUNNING
- it wrote
3 AVRO objects to S3
- sampled archive schema contains metadata fields:
timestamp
topic
partition
offset
So metadata-only archival works in this environment.
3. Metadata-only source restore is still broken
We then tested source restore from the metadata-only archive.
Restore drill 1
Connector:
Observed behavior:
UNKNOWN_TOPIC_OR_PARTITION
- connector materialized and tasks started
- destination topic
<restore-topic-2> did not exist when the tasks started
- after we created the topic manually, the running tasks did not recover
- the tasks later failed unrecoverably
<restore-topic-2> still has 0 records on all partitions
Restore drill 2
Connector:
This run was cleaner:
- we pre-created
<restore-topic-3>
- then created the source connector
Observed behavior:
- the connector initially stayed in
CREATING
- worker logs again showed repeated:
GET /connectors/<source-restore-test-3>/config HTTP/1.1" 404
- later the worker did materialize the connector config
- then the tasks failed to send records to
<restore-topic-3>
- both tasks eventually died unrecoverably
Observed log pattern:
WorkerSourceTask{id=<source-restore-test-3>-0} failed to send record to <restore-topic-3>
WorkerSourceTask{id=<source-restore-test-3>-1} failed to send record to <restore-topic-3>
Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted
So in our environment:
- header-bearing envelope backup is broken
- metadata-only backup works
- metadata-only source restore is still not working end to end
Please share the connector configuration
Working baseline sink
{
"connector.class": "io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector",
"topics": "<bytes-topic>",
"connect.s3.aws.auth.mode": "Default",
"connect.s3.aws.region": "<aws-region>",
"connect.s3.error.policy": "THROW",
"connect.s3.kcql": "INSERT INTO <bucket>:retention_class_7d/format_lenses-avro-envelope/... SELECT * FROM <bytes-topic> STOREAS `AVRO` PROPERTIES('store.envelope'=true,'store.envelope.fields.key'=true,'store.envelope.fields.value'=true,'store.envelope.fields.headers'=false,'store.envelope.fields.metadata'=false)",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
Failing headers-only sink variant
{
"connector.class": "io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector",
"topics": "<bytes-topic>",
"connect.s3.kcql": "INSERT INTO <bucket>:retention_class_7d/format_lenses-avro-envelope-hdr/... SELECT * FROM <bytes-topic> STOREAS `AVRO` PROPERTIES('store.envelope'=true,'store.envelope.fields.key'=true,'store.envelope.fields.value'=true,'store.envelope.fields.headers'=true,'store.envelope.fields.metadata'=false)",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
Working metadata-only sink variant
{
"connector.class": "io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector",
"topics": "<bytes-topic>",
"connect.s3.kcql": "INSERT INTO <bucket>:retention_class_7d/format_lenses-avro-envelope-md/... SELECT * FROM <bytes-topic> STOREAS `AVRO` PROPERTIES('store.envelope'=true,'store.envelope.fields.key'=true,'store.envelope.fields.value'=true,'store.envelope.fields.headers'=false,'store.envelope.fields.metadata'=true)",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
Failing metadata-only source variant
{
"connector.class": "io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector",
"tasks.max": "2",
"connect.s3.aws.auth.mode": "Default",
"connect.s3.aws.region": "<aws-region>",
"connect.s3.error.policy": "THROW",
"connect.s3.source.partition.search.continuous": "true",
"connect.s3.source.partition.search.recurse.levels": "1",
"connect.s3.source.partition.extractor.type": "hierarchical",
"connect.s3.kcql": "INSERT INTO <restore-topic-3> SELECT * FROM <bucket>:retention_class_7d/format_lenses-avro-envelope-md/.../<bytes-topic> STOREAS `AVRO` PROPERTIES('store.envelope'=true)",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
Please provide full log files (redact sensitive information)
Key excerpts:
org.apache.avro.SchemaParseException: Illegal character in: timestamp-unix-milliseconds
GET /connectors/<source-restore-test-3>/config HTTP/1.1" 404
WorkerSourceTask{id=<source-restore-test-3>-0} failed to send record to <restore-topic-3>
WorkerSourceTask{id=<source-restore-test-3>-1} failed to send record to <restore-topic-3>
Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted
Questions
- Is the
Illegal character in: timestamp-unix-milliseconds failure in the headers path an expected limitation or a defect?
- Is
headers=true, metadata=false for bytes topics supposed to work when the original Kafka header name contains -?
- Is metadata-only archive plus source restore expected to work on
8.1.35?
- Is the source connector expected to auto-create the destination topic for this restore pattern, or must the topic already exist?
- If the topic exists before connector creation and the source tasks still fail to send records, is that a known defect?
- Is there a recommended compatible version for this on MSK Connect / Kafka
3.7.x or 3.9.x?
What version of the Stream Reactor are you reporting this issue for?
AWS S3 Sink and Source
8.1.35Are you running the correct version of Kafka/Confluent for the Stream Reactor release?
We are running this on Amazon MSK Connect with Kafka Connect
3.7.xagainst an MSK Kafka cluster<redacted-msk-cluster>.Do you have a supported version of the data source/sink?
Yes.
Have you read the docs?
Yes.
We are following the documented full-message envelope path with:
store.envelope=trueByteArrayConverterforkeyandvalueAVRO/PARQUETfor the archive formatWhat is the expected behaviour?
We expect to be able to back up and restore a bytes topic using the full Kafka envelope:
At minimum, we expect this to work for one of these documented subsets:
headers=true, metadata=falseheaders=false, metadata=trueFor restore, we expect the S3 source connector to replay the archived records into a Kafka topic when the target topic exists and the connector has topic write permission.
What was observed?
We tested on bytes topic
<bytes-topic>.1. Full-envelope sink fails on a real hyphenated header name
The topic contains a real Kafka header named:
timestamp-unix-millisecondsThe following sink variants all fail in the same general way:
headers=truemetadata=falseObserved error:
So the bytes envelope path appears to fail when a propagated header or generated field name contains
-.2. Metadata-only sink works
The bounded sink isolation test with:
headers=falsemetadata=truedid succeed as a sink.
Observed result:
RUNNING3AVRO objects to S3timestamptopicpartitionoffsetSo metadata-only archival works in this environment.
3. Metadata-only source restore is still broken
We then tested source restore from the metadata-only archive.
Restore drill 1
Connector:
<source-restore-test-2>Observed behavior:
<restore-topic-2>did not exist when the tasks started<restore-topic-2>still has0records on all partitionsRestore drill 2
Connector:
<source-restore-test-3>This run was cleaner:
<restore-topic-3>Observed behavior:
CREATING<restore-topic-3>Observed log pattern:
So in our environment:
Please share the connector configuration
Working baseline sink
{ "connector.class": "io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector", "topics": "<bytes-topic>", "connect.s3.aws.auth.mode": "Default", "connect.s3.aws.region": "<aws-region>", "connect.s3.error.policy": "THROW", "connect.s3.kcql": "INSERT INTO <bucket>:retention_class_7d/format_lenses-avro-envelope/... SELECT * FROM <bytes-topic> STOREAS `AVRO` PROPERTIES('store.envelope'=true,'store.envelope.fields.key'=true,'store.envelope.fields.value'=true,'store.envelope.fields.headers'=false,'store.envelope.fields.metadata'=false)", "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter" }Failing headers-only sink variant
{ "connector.class": "io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector", "topics": "<bytes-topic>", "connect.s3.kcql": "INSERT INTO <bucket>:retention_class_7d/format_lenses-avro-envelope-hdr/... SELECT * FROM <bytes-topic> STOREAS `AVRO` PROPERTIES('store.envelope'=true,'store.envelope.fields.key'=true,'store.envelope.fields.value'=true,'store.envelope.fields.headers'=true,'store.envelope.fields.metadata'=false)", "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter" }Working metadata-only sink variant
{ "connector.class": "io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector", "topics": "<bytes-topic>", "connect.s3.kcql": "INSERT INTO <bucket>:retention_class_7d/format_lenses-avro-envelope-md/... SELECT * FROM <bytes-topic> STOREAS `AVRO` PROPERTIES('store.envelope'=true,'store.envelope.fields.key'=true,'store.envelope.fields.value'=true,'store.envelope.fields.headers'=false,'store.envelope.fields.metadata'=true)", "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter" }Failing metadata-only source variant
{ "connector.class": "io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector", "tasks.max": "2", "connect.s3.aws.auth.mode": "Default", "connect.s3.aws.region": "<aws-region>", "connect.s3.error.policy": "THROW", "connect.s3.source.partition.search.continuous": "true", "connect.s3.source.partition.search.recurse.levels": "1", "connect.s3.source.partition.extractor.type": "hierarchical", "connect.s3.kcql": "INSERT INTO <restore-topic-3> SELECT * FROM <bucket>:retention_class_7d/format_lenses-avro-envelope-md/.../<bytes-topic> STOREAS `AVRO` PROPERTIES('store.envelope'=true)", "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter" }Please provide full log files (redact sensitive information)
Key excerpts:
Questions
Illegal character in: timestamp-unix-millisecondsfailure in the headers path an expected limitation or a defect?headers=true, metadata=falsefor bytes topics supposed to work when the original Kafka header name contains-?8.1.35?3.7.xor3.9.x?