diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/IcebergRemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/IcebergRemoteStorageManager.java index c3cd5a3e9..880be9d68 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/IcebergRemoteStorageManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/IcebergRemoteStorageManager.java @@ -102,7 +102,7 @@ public class IcebergRemoteStorageManager extends InternalRemoteStorageManager { this.objectKeyFactory = new ObjectKeyFactory(config.keyPrefix(), config.keyPrefixMask()); this.catalog = config.icebergCatalog(); this.structureProvider = config.structureProvider(); - this.icebergTableManager = new IcebergTableManager(catalog); + this.icebergTableManager = new IcebergTableManager(catalog, config.icebergPartitionSpec()); this.icebergNamespace = config.icebergNamespace(); LOG.info("IcebergRemoteStorageManager initialized successfully"); } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java b/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java index cbc168a85..15b1e3cbe 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java @@ -115,6 +115,12 @@ public class RemoteStorageManagerConfig extends AbstractConfig { private static final String ICEBERG_CATALOG_CLASS_CONFIG = ICEBERG_CATALOG_PREFIX + "class"; private static final String ICEBERG_CATALOG_CLASS_DOC = "The Iceberg catalog implementation class"; + private static final String ICEBERG_PARTITION_SPEC_CONFIG = ICEBERG_PREFIX + "partition.spec"; + private static final String ICEBERG_PARTITION_SPEC_DOC = "The Iceberg partition specification. " + + "Defines custom partitioning scheme for the Iceberg table. Format: (col1, col2, ...) or (transform(col), ...). " + + "Supported transforms: year, month, day, hour, bucket, truncate. " + + "Example: (year(timestamp), user_id) or (day(created_at))"; + public static ConfigDef configDef() { final ConfigDef configDef = new ConfigDef(); @@ -252,6 +258,14 @@ public static ConfigDef configDef() { ICEBERG_CATALOG_CLASS_DOC ); + configDef.define( + ICEBERG_PARTITION_SPEC_CONFIG, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.LOW, + ICEBERG_PARTITION_SPEC_DOC + ); + return configDef; } @@ -484,4 +498,23 @@ public Catalog icebergCatalog() { catalog.initialize("catalog", configs); return catalog; } + + public List icebergPartitionSpec() { + final String partitionSpec = getString(ICEBERG_PARTITION_SPEC_CONFIG); + if (partitionSpec == null || partitionSpec.trim().isEmpty()) { + return List.of(); + } + // Parse partition spec format: (col1, col2, ...) or (transform(col), ...) + String spec = partitionSpec.trim(); + if (spec.startsWith("(") && spec.endsWith(")")) { + spec = spec.substring(1, spec.length() - 1); + } + if (spec.isEmpty()) { + return List.of(); + } + return Arrays.stream(spec.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + } } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/iceberg/IcebergTableManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/iceberg/IcebergTableManager.java index 8c3899278..cd8f6af1a 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/iceberg/IcebergTableManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/iceberg/IcebergTableManager.java @@ -17,6 +17,7 @@ package io.aiven.kafka.tieredstorage.iceberg; import java.util.Arrays; +import java.util.List; import java.util.concurrent.atomic.AtomicReference; import org.apache.avro.Schema; @@ -32,21 +33,30 @@ import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.util.Tasks; +import io.aiven.kafka.tieredstorage.iceberg.data.SchemaUtils; + /** * The structure, definitions and idea of these defines was taken from * IcebergWriterFactory */ public class IcebergTableManager { private final Catalog catalog; + private final List partitionSpec; public IcebergTableManager(final Catalog catalog) { + this(catalog, List.of()); + } + + public IcebergTableManager(final Catalog catalog, final List partitionSpec) { this.catalog = catalog; + this.partitionSpec = partitionSpec != null ? partitionSpec : List.of(); } public Table getOrCreateTable(final TableIdentifier identifier, final Schema schema) { createNamespaceIfNotExist(catalog, identifier.namespace()); final org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(schema); + final PartitionSpec partitionSpecToUse = SchemaUtils.createPartitionSpec(icebergSchema, partitionSpec); final AtomicReference result = new AtomicReference<>(); Tasks.range(1) @@ -55,7 +65,7 @@ public Table getOrCreateTable(final TableIdentifier identifier, final Schema sch notUsed -> { try { final Table table = - catalog.createTable(identifier, icebergSchema, PartitionSpec.unpartitioned()); + catalog.createTable(identifier, icebergSchema, partitionSpecToUse); //Initial commit to get snapshotId and sequence number addInitialCommit(table); result.set(table); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/iceberg/data/SchemaUtils.java b/core/src/main/java/io/aiven/kafka/tieredstorage/iceberg/data/SchemaUtils.java index 4affc0fe7..1d1d453f3 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/iceberg/data/SchemaUtils.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/iceberg/data/SchemaUtils.java @@ -70,7 +70,7 @@ * The structure, definitions and idea of these defines was taken from * SchemaUtils */ -class SchemaUtils { +public class SchemaUtils { private static final Logger LOG = LoggerFactory.getLogger(SchemaUtils.class); @@ -155,7 +155,7 @@ private static boolean isOptional(final org.apache.iceberg.Schema schema, final return field.isOptional(); } - static PartitionSpec createPartitionSpec( + public static PartitionSpec createPartitionSpec( final org.apache.iceberg.Schema schema, final List partitionBy) { if (partitionBy.isEmpty()) { return PartitionSpec.unpartitioned(); diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java index 719d550aa..6effa47c7 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java @@ -551,5 +551,101 @@ static Stream catalogNonStringConfigsAreNotAllowed() { Arguments.of(12.3) ); } + + @Test + void partitionSpecNotDefined() { + final var config = new RemoteStorageManagerConfig( + Map.of( + "storage.backend.class", NoopStorageBackend.class.getCanonicalName(), + "chunk.size", "123" + ) + ); + assertThat(config.icebergPartitionSpec()).isEmpty(); + } + + @Test + void partitionSpecWithSingleColumn() { + final var config = new RemoteStorageManagerConfig( + Map.of( + "storage.backend.class", NoopStorageBackend.class.getCanonicalName(), + "chunk.size", "123", + "iceberg.partition.spec", "(user_id)" + ) + ); + assertThat(config.icebergPartitionSpec()).containsExactly("user_id"); + } + + @Test + void partitionSpecWithMultipleColumns() { + final var config = new RemoteStorageManagerConfig( + Map.of( + "storage.backend.class", NoopStorageBackend.class.getCanonicalName(), + "chunk.size", "123", + "iceberg.partition.spec", "(col1, col2, col3)" + ) + ); + assertThat(config.icebergPartitionSpec()).containsExactly("col1", "col2", "col3"); + } + + @Test + void partitionSpecWithTransforms() { + final var config = new RemoteStorageManagerConfig( + Map.of( + "storage.backend.class", NoopStorageBackend.class.getCanonicalName(), + "chunk.size", "123", + "iceberg.partition.spec", "(year(timestamp), user_id)" + ) + ); + assertThat(config.icebergPartitionSpec()).containsExactly("year(timestamp)", "user_id"); + } + + @Test + void partitionSpecWithComplexTransforms() { + final var config = new RemoteStorageManagerConfig( + Map.of( + "storage.backend.class", NoopStorageBackend.class.getCanonicalName(), + "chunk.size", "123", + "iceberg.partition.spec", "(day(created_at), bucket(user_id, 10), region)" + ) + ); + assertThat(config.icebergPartitionSpec()) + .containsExactly("day(created_at)", "bucket(user_id, 10)", "region"); + } + + @Test + void partitionSpecWithoutParentheses() { + final var config = new RemoteStorageManagerConfig( + Map.of( + "storage.backend.class", NoopStorageBackend.class.getCanonicalName(), + "chunk.size", "123", + "iceberg.partition.spec", "col1, col2" + ) + ); + assertThat(config.icebergPartitionSpec()).containsExactly("col1", "col2"); + } + + @Test + void partitionSpecWithEmptyString() { + final var config = new RemoteStorageManagerConfig( + Map.of( + "storage.backend.class", NoopStorageBackend.class.getCanonicalName(), + "chunk.size", "123", + "iceberg.partition.spec", "" + ) + ); + assertThat(config.icebergPartitionSpec()).isEmpty(); + } + + @Test + void partitionSpecWithEmptyParentheses() { + final var config = new RemoteStorageManagerConfig( + Map.of( + "storage.backend.class", NoopStorageBackend.class.getCanonicalName(), + "chunk.size", "123", + "iceberg.partition.spec", "()" + ) + ); + assertThat(config.icebergPartitionSpec()).isEmpty(); + } } } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/iceberg/IcebergTableManagerTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/iceberg/IcebergTableManagerTest.java index 35e0a50e5..881909625 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/iceberg/IcebergTableManagerTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/iceberg/IcebergTableManagerTest.java @@ -72,7 +72,7 @@ void setUp() { @Test void createNewTable() { when(catalog.createTable(eq(tableIdentifier), any(org.apache.iceberg.Schema.class), - eq(PartitionSpec.unpartitioned()))) + any(PartitionSpec.class))) .thenReturn(table); final AppendFiles tableAppend = mock(AppendFiles.class); @@ -82,14 +82,14 @@ void createNewTable() { assertThat(result).isEqualTo(table); verify(catalog).createTable(eq(tableIdentifier), any(org.apache.iceberg.Schema.class), - eq(PartitionSpec.unpartitioned())); + any(PartitionSpec.class)); verify(catalog, never()).loadTable(any()); } @Test void loadExistingTable() { when(catalog.createTable(eq(tableIdentifier), any(org.apache.iceberg.Schema.class), - eq(PartitionSpec.unpartitioned()))) + any(PartitionSpec.class))) .thenThrow(new AlreadyExistsException("Table already exists")); when(catalog.loadTable(tableIdentifier)).thenReturn(table); @@ -100,14 +100,14 @@ void loadExistingTable() { assertThat(result).isEqualTo(table); verify(catalog).createTable(eq(tableIdentifier), any(org.apache.iceberg.Schema.class), - eq(PartitionSpec.unpartitioned())); + any(PartitionSpec.class)); verify(catalog).loadTable(tableIdentifier); } @Test void createAfterRetry() { when(catalog.createTable(eq(tableIdentifier), any(org.apache.iceberg.Schema.class), - eq(PartitionSpec.unpartitioned()))) + any(PartitionSpec.class))) .thenThrow(new RuntimeException("Temporary failure")) .thenReturn(table); @@ -118,13 +118,13 @@ void createAfterRetry() { assertThat(result).isEqualTo(table); verify(catalog, times(2)).createTable(eq(tableIdentifier), any(org.apache.iceberg.Schema.class), - eq(PartitionSpec.unpartitioned())); + any(PartitionSpec.class)); } @Test void failWhenExceedsRetries() { when(catalog.createTable(eq(tableIdentifier), any(org.apache.iceberg.Schema.class), - eq(PartitionSpec.unpartitioned()))) + any(PartitionSpec.class))) .thenThrow(new RuntimeException("Permanent failure")); assertThatThrownBy(() -> tableManager.getOrCreateTable(tableIdentifier, avroSchema)) @@ -132,7 +132,7 @@ void failWhenExceedsRetries() { .hasMessage("Permanent failure"); verify(catalog, times(4)).createTable(eq(tableIdentifier), any(org.apache.iceberg.Schema.class), - eq(PartitionSpec.unpartitioned())); + any(PartitionSpec.class)); } @Test @@ -165,4 +165,32 @@ void ignoresForbiddenException() { verify((SupportsNamespaces) catalog).createNamespace(namespace); } + + @Test + void createTableWithCustomPartitionSpec() { + // Create a table manager with custom partition spec + final IcebergTableManager customTableManager = new IcebergTableManager( + catalog, java.util.List.of("id")); + + // Add timestamp field to schema for more complex partitioning test + final Schema schemaWithTimestamp = Schema.createRecord("TestRecord", "Test record", "test.namespace", false); + schemaWithTimestamp.setFields(java.util.List.of( + new Schema.Field("id", Schema.create(Schema.Type.LONG), "Record ID", null), + new Schema.Field("name", Schema.create(Schema.Type.STRING), "Record name", null), + new Schema.Field("timestamp", Schema.create(Schema.Type.LONG), "Timestamp", null) + )); + + when(catalog.createTable(eq(tableIdentifier), any(org.apache.iceberg.Schema.class), + any(PartitionSpec.class))) + .thenReturn(table); + + final AppendFiles tableAppend = mock(AppendFiles.class); + when(table.newAppend()).thenReturn(tableAppend); + + final Table result = customTableManager.getOrCreateTable(tableIdentifier, schemaWithTimestamp); + + assertThat(result).isEqualTo(table); + verify(catalog).createTable(eq(tableIdentifier), any(org.apache.iceberg.Schema.class), + any(PartitionSpec.class)); + } } diff --git a/docs/configs.rst b/docs/configs.rst index 2284b9b38..87579d542 100644 --- a/docs/configs.rst +++ b/docs/configs.rst @@ -70,6 +70,13 @@ RemoteStorageManagerConfig * Valid Values: [kafka, iceberg] * Importance: medium +``iceberg.partition.spec`` + The Iceberg partition specification. Defines custom partitioning scheme for the Iceberg table. Format: (col1, col2, ...) or (transform(col), ...). Supported transforms: year, month, day, hour, bucket, truncate. Example: (year(timestamp), user_id) or (day(created_at)) + + * Type: string + * Default: null + * Importance: low + ``structure.provider.class`` The structure provider implementation class