Implement framework around moving fields into Schema.BYTES value #6
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Inspired by https://jobs.zalando.com/tech/blog/backing-up-kafka-zookeeper/
Code: https://github.com/imduffy15/kafka-connect-s3
Reasoning: Without a whole separate Connector implementation than Confluent's, we wanted a way to backup the holistic Kafka (Avro-encoded) payload to S3 for querying in downstream systems, and this transform seemed useful for that, but then we started getting requests to read that S3 data back into Kafka, and were conflicted with how to take an Avro file and convert it back into the Schema Registry wire format. I think issues arose with how to strip the
connect.metastring information, for example, in the schema that would cause a different ID in the registry. Basically, we didn't want to push old schemas over any new ones in the registry; in other words, no new IDs should get pushed onto a subject.So, we want to preserve the Avro schema ID that's already within the bytes, and not have the overhead of the AvroConverter translating formats back and forth and doing lookups against a registry.
Unless I am mistaken, the original transform code here +
ByteArrayConverterdoesn't quite do what we want (thinking back, I don't know if we actually tried that 🤔).Therefore, I am contributing this code and looking for feedback @jcustenborder . Thanks!
Note: I wrote this code before the schemaless support, so that feature is missing.