Skip to content

Commit 4ec4590

Browse files
committed
Adding support for optional and schemaless keys/values
1 parent 5dbeeeb commit 4ec4590

File tree

9 files changed

+141
-92
lines changed

9 files changed

+141
-92
lines changed

core/src/main/java/io/aiven/kafka/tieredstorage/IcebergRemoteStorageManager.java

Lines changed: 56 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import java.util.Set;
3131

3232
import org.apache.kafka.common.header.Header;
33-
import org.apache.kafka.common.record.FileLogInputStream;
3433
import org.apache.kafka.common.record.Record;
3534
import org.apache.kafka.common.record.RecordBatch;
3635
import org.apache.kafka.common.utils.ByteBufferInputStream;
@@ -61,8 +60,6 @@
6160
import io.aiven.kafka.tieredstorage.storage.BytesRange;
6261
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
6362

64-
import io.confluent.kafka.schemaregistry.ParsedSchema;
65-
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
6663
import org.apache.avro.Schema;
6764
import org.apache.avro.generic.GenericData;
6865
import org.apache.iceberg.AppendFiles;
@@ -82,6 +79,8 @@
8279
import org.slf4j.Logger;
8380
import org.slf4j.LoggerFactory;
8481

82+
import static io.aiven.kafka.tieredstorage.iceberg.StructureProvider.SchemaAndId;
83+
8584
public class IcebergRemoteStorageManager extends InternalRemoteStorageManager {
8685
private static final Logger LOG = LoggerFactory.getLogger(IcebergRemoteStorageManager.class);
8786

@@ -198,10 +197,10 @@ private LogSegmentProcessingResult processLogSegment(
198197

199198
for (final var batch : segment.log().batches()) {
200199
for (final Record record : batch) {
201-
final ParsedRecord parsedRecord = extractRecordData(batch, record, topicName);
200+
final ParsedRecord parsedRecord = extractRecordData(record, topicName);
202201

203-
keySchemaId = parsedRecord.keySchemaId();
204-
valueSchemaId = parsedRecord.valueSchemaId();
202+
keySchemaId = parsedRecord.key().schemaId();
203+
valueSchemaId = parsedRecord.value().schemaId();
205204

206205
if (writer == null) {
207206
final TableIdentifier tableIdentifier = TableIdentifier.of(icebergNamespace, topicName);
@@ -226,71 +225,67 @@ private LogSegmentProcessingResult processLogSegment(
226225
}
227226

228227
private ParsedRecord extractRecordData(
229-
final FileLogInputStream.FileChannelRecordBatch batch,
230228
final Record record,
231229
final String topicName) throws Exception {
232230

233-
final byte[] rawKey = new byte[record.keySize()];
234-
record.key().get(rawKey);
235-
final byte[] rawValue = new byte[record.valueSize()];
236-
record.value().get(rawValue);
237-
238-
final SchemaAndId keySchema = getSchema(rawKey);
239-
final SchemaAndId valueSchema = getSchema(rawValue);
240-
241-
final Object keyRecord = structureProvider.deserializeKey(topicName, null, rawKey);
242-
final Object valueRecord = structureProvider.deserializeValue(topicName, null, rawValue);
231+
final Deserialized deserializedKey = getDeserializedKey(record, topicName);
232+
final Deserialized deserializedValue = getDeserializedValue(record, topicName);
243233

244-
final Schema recordSchema = RowSchema.createRowSchema(keySchema.schema, valueSchema.schema);
234+
final Schema recordSchema = RowSchema.createRowSchema(
235+
deserializedKey.schema,
236+
deserializedValue.schema);
245237

246238
return new ParsedRecord(
247239
record.offset(),
248240
record.timestamp(),
249-
keySchema.schemaId,
250-
rawKey,
251-
keyRecord,
252-
valueSchema.schemaId,
253-
rawValue,
254-
valueRecord,
241+
deserializedKey,
242+
deserializedValue,
255243
recordSchema,
256244
record.headers()
257245
);
258246
}
259247

248+
private Deserialized getDeserializedKey(final Record record, final String topicName) throws IOException {
249+
if (record.hasKey()) {
250+
final byte[] rawKey = new byte[record.keySize()];
251+
record.key().get(rawKey);
252+
final Integer schemaId = getSchemaId(rawKey);
253+
final SchemaAndId<Schema> schema = structureProvider.getSchemaById(schemaId);
254+
final Object keyRecord = structureProvider.deserializeKey(topicName, null, rawKey);
255+
return new Deserialized(rawKey, keyRecord, schema.schemaId(), schema.schema());
256+
} else {
257+
final SchemaAndId<Schema> schema = structureProvider.getSchemaById(null);
258+
return new Deserialized(null, null, null, schema.schema());
259+
}
260+
}
261+
262+
private Deserialized getDeserializedValue(final Record record, final String topicName) throws IOException {
263+
if (record.hasValue()) {
264+
final byte[] rawValue = new byte[record.valueSize()];
265+
record.value().get(rawValue);
266+
final Integer schemaId = getSchemaId(rawValue);
267+
final SchemaAndId<Schema> schema = structureProvider.getSchemaById(schemaId);
268+
final Object valueRecord = structureProvider.deserializeValue(topicName, null, rawValue);
269+
return new Deserialized(rawValue, valueRecord, schema.schemaId(), schema.schema());
270+
} else {
271+
final SchemaAndId<Schema> schema = structureProvider.getSchemaById(null);
272+
return new Deserialized(null, null, null, schema.schema());
273+
}
274+
}
275+
276+
private record Deserialized(byte[] raw, Object record, Integer schemaId, Schema schema) {
277+
}
278+
260279
private record ParsedRecord(
261280
long recordOffset,
262281
long recordTimestamp,
263-
Integer keySchemaId,
264-
byte[] rawKey,
265-
Object keyRecord,
266-
Integer valueSchemaId,
267-
byte[] rawValue,
268-
Object valueRecord,
282+
Deserialized key,
283+
Deserialized value,
269284
Schema recordSchema,
270285
Header[] headers
271286
) {
272287
}
273288

274-
private SchemaAndId getSchema(final byte[] value) throws IOException {
275-
final int schemaId = getSchemaId(value);
276-
ParsedSchema schema = null;
277-
try {
278-
schema = structureProvider.getSchemaById(schemaId);
279-
} catch (final RestClientException ignore) {
280-
//writing raw value if schema is not found
281-
}
282-
283-
if (schema == null) {
284-
return new SchemaAndId(Schema.createUnion(Schema.create(Schema.Type.BYTES),
285-
Schema.create(Schema.Type.NULL)), schemaId);
286-
} else {
287-
return new SchemaAndId(Schema.createUnion((Schema) schema.rawSchema(), Schema.create(Schema.Type.NULL)),
288-
schemaId);
289-
}
290-
}
291-
292-
private record SchemaAndId(Schema schema, Integer schemaId) { }
293-
294289
private void writeRecordToIceberg(
295290
final IcebergWriter writer,
296291
final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
@@ -316,15 +311,15 @@ private void writeRecordToIceberg(
316311

317312
final GenericData.Record finalRecord = new GenericData.Record(parsedRecord.recordSchema());
318313
finalRecord.put("kafka", kafkaPart);
319-
if (parsedRecord.keyRecord() != null) {
320-
finalRecord.put("key", parsedRecord.keyRecord());
314+
if (parsedRecord.key().record() != null) {
315+
finalRecord.put("key", parsedRecord.key().record());
321316
} else {
322-
finalRecord.put("key_raw", parsedRecord.rawKey);
317+
finalRecord.put("key_raw", parsedRecord.key().raw());
323318
}
324-
if (parsedRecord.keyRecord() != null) {
325-
finalRecord.put("value", parsedRecord.valueRecord());
319+
if (parsedRecord.value().record() != null) {
320+
finalRecord.put("value", parsedRecord.value().record());
326321
} else {
327-
finalRecord.put("value_raw", parsedRecord.rawValue);
322+
finalRecord.put("value_raw", parsedRecord.value().raw());
328323
}
329324
finalRecord.put("headers", Arrays.asList(parsedRecord.headers()));
330325

@@ -374,19 +369,12 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme
374369
new RecordBatchGrouper(new MultiFileReader(remoteFilePaths, dataFileMetadata -> {
375370
final FileIO io = table.io();
376371

377-
final Schema valueSchema;
378-
final Schema keySchema;
379-
try {
380-
keySchema =
381-
(Schema) structureProvider.getSchemaById(dataFileMetadata.keySchemaId()).rawSchema();
382-
valueSchema =
383-
(Schema) structureProvider.getSchemaById(dataFileMetadata.valueSchemaId()).rawSchema();
384-
385-
} catch (final RestClientException e) {
386-
throw new RuntimeException(e);
387-
}
372+
final SchemaAndId<Schema> keySchema =
373+
structureProvider.getSchemaById(dataFileMetadata.keySchemaId());
374+
final SchemaAndId<Schema> valueSchema =
375+
structureProvider.getSchemaById(dataFileMetadata.valueSchemaId());
388376

389-
final Schema recordSchema = RowSchema.createRowSchema(keySchema, valueSchema);
377+
final Schema recordSchema = RowSchema.createRowSchema(keySchema.schema(), valueSchema.schema());
390378

391379
return Parquet.read(io.newInputFile(dataFileMetadata.location()))
392380
.project(table.schema())

core/src/main/java/io/aiven/kafka/tieredstorage/iceberg/AvroSchemaRegistryStructureProvider.java

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,17 @@
2424

2525
import org.apache.kafka.common.header.Headers;
2626

27-
import io.confluent.kafka.schemaregistry.ParsedSchema;
2827
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
2928
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
3029
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
3130
import io.confluent.kafka.serializers.KafkaAvroSerializer;
31+
import org.apache.avro.Schema;
3232

3333
public class AvroSchemaRegistryStructureProvider implements StructureProvider {
34+
private static final int SUBJECT_NOT_FOUND_ERROR_CODE = 40401;
35+
private static final int SCHEMA_NOT_FOUND_ERROR_CODE = 40403;
36+
private static final int JSON_PARSE_ERROR_CODE = 50005;
37+
3438
private final KafkaAvroSerializer keySerializer;
3539
private final KafkaAvroDeserializer keyDeserializer;
3640
private final KafkaAvroSerializer valueSerializer;
@@ -66,8 +70,26 @@ public void configure(final Map<String, ?> configs) {
6670
valueDeserializer.configure(serdeConfig, false);
6771
}
6872

69-
public ParsedSchema getSchemaById(final int schemaId) throws RestClientException, IOException {
70-
return valueDeserializer.getSchemaRegistryClient().getSchemaById(schemaId);
73+
@Override
74+
public SchemaAndId<Schema> getSchemaById(final Integer schemaId) throws IOException {
75+
if (schemaId == null) {
76+
return new SchemaAndId<>(Schema.createUnion(Schema.create(Schema.Type.BYTES),
77+
Schema.create(Schema.Type.NULL)), null);
78+
}
79+
try {
80+
return new SchemaAndId<>(Schema.createUnion(Schema.create(Schema.Type.NULL),
81+
(Schema) valueDeserializer.getSchemaRegistryClient().getSchemaById(schemaId).rawSchema()),
82+
schemaId);
83+
} catch (final RestClientException e) {
84+
if (e.getErrorCode() == SCHEMA_NOT_FOUND_ERROR_CODE
85+
|| e.getErrorCode() == SUBJECT_NOT_FOUND_ERROR_CODE
86+
|| e.getErrorCode() == JSON_PARSE_ERROR_CODE) {
87+
return new SchemaAndId<>(Schema.createUnion(Schema.create(Schema.Type.BYTES),
88+
Schema.create(Schema.Type.NULL)), null);
89+
} else {
90+
throw new RuntimeException("Failed to fetch schema with id " + schemaId + " from schema registry", e);
91+
}
92+
}
7193
}
7294

7395
@Override
@@ -77,7 +99,12 @@ public ByteBuffer serializeKey(final String topic, final Headers headers, final
7799

78100
@Override
79101
public Object deserializeKey(final String topic, final Headers headers, final byte[] data) {
80-
return keyDeserializer.deserialize(topic, headers, data);
102+
try {
103+
return keyDeserializer.deserialize(topic, headers, data);
104+
} catch (final Throwable e) {
105+
// If deserialization fails, return null. Further the raw bytes will be passed to Iceberg table
106+
return null;
107+
}
81108
}
82109

83110
@Override
@@ -87,6 +114,11 @@ public ByteBuffer serializeValue(final String topic, final Headers headers, fina
87114

88115
@Override
89116
public Object deserializeValue(final String topic, final Headers headers, final byte[] data) {
90-
return valueDeserializer.deserialize(topic, headers, data);
117+
try {
118+
return valueDeserializer.deserialize(topic, headers, data);
119+
} catch (final Throwable e) {
120+
// If deserialization fails, return null. Further the raw bytes will be passed to Iceberg table
121+
return null;
122+
}
91123
}
92124
}

core/src/main/java/io/aiven/kafka/tieredstorage/iceberg/BatchEnumeration.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ private ByteBuffer getKafkaKey(final GenericData.Record record) {
212212
} else if (key != null) {
213213
return structureProvider.serializeKey(topic, null, key);
214214
} else {
215-
return ByteBuffer.wrap((byte[]) rawKey);
215+
return (ByteBuffer) rawKey;
216216
}
217217
}
218218

@@ -224,7 +224,7 @@ private ByteBuffer getKafkaValue(final GenericData.Record record) {
224224
} else if (value != null) {
225225
return structureProvider.serializeValue(topic, null, value);
226226
} else {
227-
return ByteBuffer.wrap((byte[]) rawValue);
227+
return (ByteBuffer) rawValue;
228228
}
229229
}
230230

core/src/main/java/io/aiven/kafka/tieredstorage/iceberg/StructureProvider.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,8 @@
2222
import org.apache.kafka.common.Configurable;
2323
import org.apache.kafka.common.header.Headers;
2424

25-
import io.confluent.kafka.schemaregistry.ParsedSchema;
26-
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
27-
2825
public interface StructureProvider extends Configurable {
29-
ParsedSchema getSchemaById(final int schemaId) throws RestClientException, IOException;
26+
<T> SchemaAndId<T> getSchemaById(final Integer schemaId) throws IOException;
3027

3128
ByteBuffer serializeKey(final String topic, final Headers headers, final Object value);
3229

@@ -35,4 +32,6 @@ public interface StructureProvider extends Configurable {
3532
Object deserializeKey(final String topic, final Headers headers, final byte[] data);
3633

3734
Object deserializeValue(final String topic, final Headers headers, final byte[] data);
35+
36+
record SchemaAndId<T>(T schema, Integer schemaId) { }
3837
}

core/src/main/java/io/aiven/kafka/tieredstorage/iceberg/data/ParquetAvroValueReaders.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ public ParquetValueReader<?> struct(
175175
avroSchema = providedAvroSchema;
176176
} else {
177177
final int fieldId = struct.getId().intValue();
178-
avroSchema = avroSchemasByFieldId.get(fieldId);
178+
avroSchema = avroSchemasByFieldId.getOrDefault(fieldId, null);
179179
}
180180

181181
final Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
@@ -223,8 +223,8 @@ public ParquetValueReader<?> list(
223223
return new ListReader<>(
224224
repeatedD,
225225
repeatedR,
226-
ParquetValueReaders.option(elementType, elementD, elementReader),
227-
avroSchemasByFieldId.get(array.getId().intValue()));
226+
ParquetValueReaders.option(elementType, elementD, elementReader),
227+
avroSchemasByFieldId.getOrDefault(array.getId().intValue(), null));
228228
}
229229

230230
static class ListReader<E> extends ParquetValueReaders.ListReader<E> {
@@ -276,6 +276,9 @@ public ParquetValueReader<?> primitive(
276276

277277
final ColumnDescriptor desc = type.getColumnDescription(currentPath());
278278

279+
if (!avroSchemasByFieldId.containsKey(primitive.getId().intValue())) {
280+
return ParquetValueReaders.nulls();
281+
}
279282
if (primitive.getOriginalType() != null) {
280283
switch (primitive.getOriginalType()) {
281284
case ENUM:

core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataField;
3232
import io.aiven.kafka.tieredstorage.storage.StorageBackend;
3333

34-
import io.confluent.kafka.schemaregistry.ParsedSchema;
3534
import org.apache.iceberg.Table;
3635
import org.apache.iceberg.catalog.Catalog;
3736
import org.apache.iceberg.catalog.Namespace;
@@ -425,7 +424,7 @@ public void configure(final Map<String, ?> configs) {
425424
}
426425

427426
@Override
428-
public ParsedSchema getSchemaById(final int schemaId) {
427+
public SchemaAndId getSchemaById(final Integer schemaId) {
429428
return null;
430429
}
431430

0 commit comments

Comments
 (0)