Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.Set;

import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.FileLogInputStream;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.ByteBufferInputStream;
Expand Down Expand Up @@ -61,8 +60,6 @@
import io.aiven.kafka.tieredstorage.storage.BytesRange;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;

import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.iceberg.AppendFiles;
Expand All @@ -82,6 +79,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.aiven.kafka.tieredstorage.iceberg.StructureProvider.SchemaAndId;

public class IcebergRemoteStorageManager extends InternalRemoteStorageManager {
private static final Logger LOG = LoggerFactory.getLogger(IcebergRemoteStorageManager.class);

Expand Down Expand Up @@ -198,10 +197,10 @@ private LogSegmentProcessingResult processLogSegment(

for (final var batch : segment.log().batches()) {
for (final Record record : batch) {
final ParsedRecord parsedRecord = extractRecordData(batch, record, topicName);
final ParsedRecord parsedRecord = extractRecordData(record, topicName);

keySchemaId = parsedRecord.keySchemaId();
valueSchemaId = parsedRecord.valueSchemaId();
keySchemaId = parsedRecord.key().schemaId();
valueSchemaId = parsedRecord.value().schemaId();

if (writer == null) {
final TableIdentifier tableIdentifier = TableIdentifier.of(icebergNamespace, topicName);
Expand All @@ -226,71 +225,67 @@ private LogSegmentProcessingResult processLogSegment(
}

private ParsedRecord extractRecordData(
final FileLogInputStream.FileChannelRecordBatch batch,
final Record record,
final String topicName) throws Exception {

final byte[] rawKey = new byte[record.keySize()];
record.key().get(rawKey);
final byte[] rawValue = new byte[record.valueSize()];
record.value().get(rawValue);

final SchemaAndId keySchema = getSchema(rawKey);
final SchemaAndId valueSchema = getSchema(rawValue);

final Object keyRecord = structureProvider.deserializeKey(topicName, null, rawKey);
final Object valueRecord = structureProvider.deserializeValue(topicName, null, rawValue);
final Deserialized deserializedKey = getDeserializedKey(record, topicName);
final Deserialized deserializedValue = getDeserializedValue(record, topicName);

final Schema recordSchema = RowSchema.createRowSchema(keySchema.schema, valueSchema.schema);
final Schema recordSchema = RowSchema.createRowSchema(
deserializedKey.schema,
deserializedValue.schema);

return new ParsedRecord(
record.offset(),
record.timestamp(),
keySchema.schemaId,
rawKey,
keyRecord,
valueSchema.schemaId,
rawValue,
valueRecord,
deserializedKey,
deserializedValue,
recordSchema,
record.headers()
);
}

private Deserialized getDeserializedKey(final Record record, final String topicName) throws IOException {
if (record.hasKey()) {
final byte[] rawKey = new byte[record.keySize()];
record.key().get(rawKey);
final Integer schemaId = getSchemaId(rawKey);
final SchemaAndId<Schema> schema = structureProvider.getSchemaById(schemaId);
final Object keyRecord = structureProvider.deserializeKey(topicName, null, rawKey);
return new Deserialized(rawKey, keyRecord, schema.schemaId(), schema.schema());
} else {
final SchemaAndId<Schema> schema = structureProvider.getSchemaById(null);
return new Deserialized(null, null, null, schema.schema());
}
}

private Deserialized getDeserializedValue(final Record record, final String topicName) throws IOException {
if (record.hasValue()) {
final byte[] rawValue = new byte[record.valueSize()];
record.value().get(rawValue);
final Integer schemaId = getSchemaId(rawValue);
final SchemaAndId<Schema> schema = structureProvider.getSchemaById(schemaId);
final Object valueRecord = structureProvider.deserializeValue(topicName, null, rawValue);
return new Deserialized(rawValue, valueRecord, schema.schemaId(), schema.schema());
} else {
final SchemaAndId<Schema> schema = structureProvider.getSchemaById(null);
return new Deserialized(null, null, null, schema.schema());
}
}

private record Deserialized(byte[] raw, Object record, Integer schemaId, Schema schema) {
}

private record ParsedRecord(
long recordOffset,
long recordTimestamp,
Integer keySchemaId,
byte[] rawKey,
Object keyRecord,
Integer valueSchemaId,
byte[] rawValue,
Object valueRecord,
Deserialized key,
Deserialized value,
Schema recordSchema,
Header[] headers
) {
}

private SchemaAndId getSchema(final byte[] value) throws IOException {
final int schemaId = getSchemaId(value);
ParsedSchema schema = null;
try {
schema = structureProvider.getSchemaById(schemaId);
} catch (final RestClientException ignore) {
//writing raw value if schema is not found
}

if (schema == null) {
return new SchemaAndId(Schema.createUnion(Schema.create(Schema.Type.BYTES),
Schema.create(Schema.Type.NULL)), schemaId);
} else {
return new SchemaAndId(Schema.createUnion((Schema) schema.rawSchema(), Schema.create(Schema.Type.NULL)),
schemaId);
}
}

private record SchemaAndId(Schema schema, Integer schemaId) { }

private void writeRecordToIceberg(
final IcebergWriter writer,
final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
Expand All @@ -316,15 +311,15 @@ private void writeRecordToIceberg(

final GenericData.Record finalRecord = new GenericData.Record(parsedRecord.recordSchema());
finalRecord.put("kafka", kafkaPart);
if (parsedRecord.keyRecord() != null) {
finalRecord.put("key", parsedRecord.keyRecord());
if (parsedRecord.key().record() != null) {
finalRecord.put("key", parsedRecord.key().record());
} else {
finalRecord.put("key_raw", parsedRecord.rawKey);
finalRecord.put("key_raw", parsedRecord.key().raw());
}
if (parsedRecord.keyRecord() != null) {
finalRecord.put("value", parsedRecord.valueRecord());
if (parsedRecord.value().record() != null) {
finalRecord.put("value", parsedRecord.value().record());
} else {
finalRecord.put("value_raw", parsedRecord.rawValue);
finalRecord.put("value_raw", parsedRecord.value().raw());
}
finalRecord.put("headers", Arrays.asList(parsedRecord.headers()));

Expand Down Expand Up @@ -374,19 +369,12 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme
new RecordBatchGrouper(new MultiFileReader(remoteFilePaths, dataFileMetadata -> {
final FileIO io = table.io();

final Schema valueSchema;
final Schema keySchema;
try {
keySchema =
(Schema) structureProvider.getSchemaById(dataFileMetadata.keySchemaId()).rawSchema();
valueSchema =
(Schema) structureProvider.getSchemaById(dataFileMetadata.valueSchemaId()).rawSchema();

} catch (final RestClientException e) {
throw new RuntimeException(e);
}
final SchemaAndId<Schema> keySchema =
structureProvider.getSchemaById(dataFileMetadata.keySchemaId());
final SchemaAndId<Schema> valueSchema =
structureProvider.getSchemaById(dataFileMetadata.valueSchemaId());

final Schema recordSchema = RowSchema.createRowSchema(keySchema, valueSchema);
final Schema recordSchema = RowSchema.createRowSchema(keySchema.schema(), valueSchema.schema());

return Parquet.read(io.newInputFile(dataFileMetadata.location()))
.project(table.schema())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@

import org.apache.kafka.common.header.Headers;

import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.Schema;

public class AvroSchemaRegistryStructureProvider implements StructureProvider {
private static final int SUBJECT_NOT_FOUND_ERROR_CODE = 40401;
private static final int SCHEMA_NOT_FOUND_ERROR_CODE = 40403;
private static final int JSON_PARSE_ERROR_CODE = 50005;

private final KafkaAvroSerializer keySerializer;
private final KafkaAvroDeserializer keyDeserializer;
private final KafkaAvroSerializer valueSerializer;
Expand Down Expand Up @@ -66,8 +70,26 @@ public void configure(final Map<String, ?> configs) {
valueDeserializer.configure(serdeConfig, false);
}

public ParsedSchema getSchemaById(final int schemaId) throws RestClientException, IOException {
return valueDeserializer.getSchemaRegistryClient().getSchemaById(schemaId);
@Override
public SchemaAndId<Schema> getSchemaById(final Integer schemaId) throws IOException {
if (schemaId == null) {
return new SchemaAndId<>(Schema.createUnion(Schema.create(Schema.Type.BYTES),
Schema.create(Schema.Type.NULL)), null);
}
try {
return new SchemaAndId<>(Schema.createUnion(Schema.create(Schema.Type.NULL),
(Schema) valueDeserializer.getSchemaRegistryClient().getSchemaById(schemaId).rawSchema()),
schemaId);
} catch (final RestClientException e) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't generic network errors fall into this category?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe yes but AFAIU it will be thrown only after exhausting all the allowed retry attempts. Or am I missing anything?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if a network error happens, the plugin should fail as well. The result of this function cannot depend on network or schema registry availability.

if (e.getErrorCode() == SCHEMA_NOT_FOUND_ERROR_CODE
|| e.getErrorCode() == SUBJECT_NOT_FOUND_ERROR_CODE
|| e.getErrorCode() == JSON_PARSE_ERROR_CODE) {
return new SchemaAndId<>(Schema.createUnion(Schema.create(Schema.Type.BYTES),
Schema.create(Schema.Type.NULL)), null);
} else {
throw new RuntimeException("Failed to fetch schema with id " + schemaId + " from schema registry", e);
}
}
}

@Override
Expand All @@ -77,7 +99,12 @@ public ByteBuffer serializeKey(final String topic, final Headers headers, final

@Override
public Object deserializeKey(final String topic, final Headers headers, final byte[] data) {
return keyDeserializer.deserialize(topic, headers, data);
try {
return keyDeserializer.deserialize(topic, headers, data);
} catch (final Throwable e) {
// If deserialization fails, return null. Further the raw bytes will be passed to Iceberg table
return null;
}
}

@Override
Expand All @@ -87,6 +114,11 @@ public ByteBuffer serializeValue(final String topic, final Headers headers, fina

@Override
public Object deserializeValue(final String topic, final Headers headers, final byte[] data) {
return valueDeserializer.deserialize(topic, headers, data);
try {
return valueDeserializer.deserialize(topic, headers, data);
} catch (final Throwable e) {
// If deserialization fails, return null. Further the raw bytes will be passed to Iceberg table
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private ByteBuffer getKafkaKey(final GenericData.Record record) {
} else if (key != null) {
return structureProvider.serializeKey(topic, null, key);
} else {
return ByteBuffer.wrap((byte[]) rawKey);
return (ByteBuffer) rawKey;
}
}

Expand All @@ -224,7 +224,7 @@ private ByteBuffer getKafkaValue(final GenericData.Record record) {
} else if (value != null) {
return structureProvider.serializeValue(topic, null, value);
} else {
return ByteBuffer.wrap((byte[]) rawValue);
return (ByteBuffer) rawValue;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.header.Headers;

import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;

public interface StructureProvider extends Configurable {
ParsedSchema getSchemaById(final int schemaId) throws RestClientException, IOException;
<T> SchemaAndId<T> getSchemaById(final Integer schemaId) throws IOException;

ByteBuffer serializeKey(final String topic, final Headers headers, final Object value);

Expand All @@ -35,4 +32,6 @@ public interface StructureProvider extends Configurable {
Object deserializeKey(final String topic, final Headers headers, final byte[] data);

Object deserializeValue(final String topic, final Headers headers, final byte[] data);

record SchemaAndId<T>(T schema, Integer schemaId) { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public ParquetValueReader<?> struct(
avroSchema = providedAvroSchema;
} else {
final int fieldId = struct.getId().intValue();
avroSchema = avroSchemasByFieldId.get(fieldId);
avroSchema = avroSchemasByFieldId.getOrDefault(fieldId, null);
}

final Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
Expand Down Expand Up @@ -223,8 +223,8 @@ public ParquetValueReader<?> list(
return new ListReader<>(
repeatedD,
repeatedR,
ParquetValueReaders.option(elementType, elementD, elementReader),
avroSchemasByFieldId.get(array.getId().intValue()));
ParquetValueReaders.option(elementType, elementD, elementReader),
avroSchemasByFieldId.getOrDefault(array.getId().intValue(), null));
}

static class ListReader<E> extends ParquetValueReaders.ListReader<E> {
Expand Down Expand Up @@ -276,6 +276,9 @@ public ParquetValueReader<?> primitive(

final ColumnDescriptor desc = type.getColumnDescription(currentPath());

if (!avroSchemasByFieldId.containsKey(primitive.getId().intValue())) {
return ParquetValueReaders.nulls();
}
if (primitive.getOriginalType() != null) {
switch (primitive.getOriginalType()) {
case ENUM:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataField;
import io.aiven.kafka.tieredstorage.storage.StorageBackend;

import io.confluent.kafka.schemaregistry.ParsedSchema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
Expand Down Expand Up @@ -425,7 +424,7 @@ public void configure(final Map<String, ?> configs) {
}

@Override
public ParsedSchema getSchemaById(final int schemaId) {
public SchemaAndId getSchemaById(final Integer schemaId) {
return null;
}

Expand Down
Loading