diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/archive/Archive.java b/src/main/java/com/github/jcustenborder/kafka/connect/archive/Archive.java index 2f3e831..f95a5c1 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/archive/Archive.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/archive/Archive.java @@ -42,18 +42,22 @@ public R apply(R r) { } private R applyWithSchema(R r) { - final Schema schema = SchemaBuilder.struct() + SchemaBuilder schemaBuilder = SchemaBuilder.struct() .name("com.github.jcustenborder.kafka.connect.archive.Storage") - .field("key", r.keySchema()) .field("value", r.valueSchema()) .field("topic", Schema.STRING_SCHEMA) .field("timestamp", Schema.INT64_SCHEMA); - Struct value = new Struct(schema) + //In case a key doen't exist + if (r.keySchema() != null){ + schemaBuilder.field("key", r.keySchame); + } + + Struct value = new Struct(schemaBuilder) .put("key", r.key()) .put("value", r.value()) .put("topic", r.topic()) .put("timestamp", r.timestamp()); - return r.newRecord(r.topic(), r.kafkaPartition(), null, null, schema, value, r.timestamp()); + return r.newRecord(r.topic(), r.kafkaPartition(), null, null, schemaBuilder, value, r.timestamp()); } @SuppressWarnings("unchecked")