Skip to content

Commit de4bf1e

Browse files
authored
feat: DH-21548: Permit switching partition-ordering for Iceberg (deephaven#7652)
1 parent c45734e commit de4bf1e

File tree

3 files changed

+240
-14
lines changed

3 files changed

+240
-14
lines changed

extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import io.deephaven.engine.table.ColumnDefinition;
88
import io.deephaven.engine.table.TableDefinition;
99
import io.deephaven.iceberg.relative.RelativeFileIO;
10+
import io.deephaven.iceberg.util.ColumnInstructions;
11+
import io.deephaven.iceberg.util.Resolver;
1012
import org.apache.iceberg.DataFile;
1113
import org.apache.iceberg.PartitionField;
1214
import org.apache.iceberg.PartitionSpec;
@@ -23,6 +25,7 @@
2325

2426
import java.net.URI;
2527
import java.time.LocalDate;
28+
import java.util.HashSet;
2629
import java.util.List;
2730
import java.util.Set;
2831
import java.util.stream.Collectors;
@@ -93,8 +96,8 @@ public static void verifyRequiredFields(final Schema tableSchema, final TableDef
9396
* Table Definition.
9497
*/
9598
public static void verifyPartitioningColumns(
96-
final PartitionSpec tablePartitionSpec,
97-
final TableDefinition tableDefinition) {
99+
@NotNull final PartitionSpec tablePartitionSpec,
100+
@NotNull final TableDefinition tableDefinition) {
98101
final List<String> partitioningColumnNamesFromDefinition = tableDefinition.getColumnStream()
99102
.filter(ColumnDefinition::isPartitioning)
100103
.peek(columnDefinition -> {
@@ -114,17 +117,17 @@ public static void verifyPartitioningColumns(
114117
" not support writing to iceberg tables with non-identity transforms");
115118
}
116119
});
120+
117121
if (partitionFieldsFromSpec.size() != partitioningColumnNamesFromDefinition.size()) {
118122
throw new IllegalArgumentException("Partition spec contains " + partitionFieldsFromSpec.size() +
119123
" fields, but the table definition contains " + partitioningColumnNamesFromDefinition.size()
120124
+ " fields, partition spec " + tablePartitionSpec + ", table definition " + tableDefinition);
121125
}
122-
for (int colIdx = 0; colIdx < partitionFieldsFromSpec.size(); colIdx += 1) {
123-
final PartitionField partitionField = partitionFieldsFromSpec.get(colIdx);
124-
if (!partitioningColumnNamesFromDefinition.get(colIdx).equals(partitionField.name())) {
126+
127+
for (final PartitionField partitionField : partitionFieldsFromSpec) {
128+
if (!partitioningColumnNamesFromDefinition.contains(partitionField.name())) {
125129
throw new IllegalArgumentException("Partitioning column " + partitionField.name() + " is not present " +
126-
"in the table definition at idx " + colIdx + ", table definition " + tableDefinition +
127-
", partition spec " + tablePartitionSpec);
130+
"in the table definition " + tableDefinition + ", partition spec " + tablePartitionSpec);
128131
}
129132
}
130133
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
//
2+
// Copyright (c) 2016-2026 Deephaven Data Labs and Patent Pending
3+
//
4+
package io.deephaven.iceberg.base;
5+
6+
import io.deephaven.engine.table.ColumnDefinition;
7+
import io.deephaven.engine.table.TableDefinition;
8+
import org.apache.iceberg.PartitionSpec;
9+
import org.apache.iceberg.Schema;
10+
import org.apache.iceberg.types.Types;
11+
import org.junit.jupiter.api.Test;
12+
13+
import static io.deephaven.iceberg.base.IcebergUtils.verifyPartitioningColumns;
14+
import static org.assertj.core.api.Assertions.assertThat;
15+
import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
16+
17+
@SuppressWarnings("ResultOfMethodCallIgnored")
18+
public class TestIcebergUtils {
19+
private static final String PART_COL1_NAME = "PartCol1";
20+
private static final String PART_COL2_NAME = "PartCol2";
21+
private static final String NONPART_COL1_NAME = "intCol";
22+
private static final String NONPART_COL2_NAME = "doubleCol";
23+
24+
private static final String RESOLVED_PART_COL_NAME = "ResolvedCol";
25+
26+
/**
27+
* Creates a {@link Schema} and {@link PartitionSpec} with 2 partitioning columns (`PartCol1`, `PartCol2`)
28+
*
29+
* @return a {@link PartitionSpec} which includes 2 partitioning columns (`PartCol1`, `PartCol2`)
30+
*/
31+
private static PartitionSpec getPartitionSpec() {
32+
final Schema schema = new Schema(
33+
Types.NestedField.builder().withId(1).withName(PART_COL1_NAME).isOptional(false)
34+
.ofType(Types.StringType.get()).build(),
35+
Types.NestedField.builder().withId(2).withName(PART_COL2_NAME).isOptional(false)
36+
.ofType(Types.IntegerType.get()).build(),
37+
Types.NestedField.builder().withId(3).withName(NONPART_COL1_NAME).isOptional(true)
38+
.ofType(Types.IntegerType.get()).build(),
39+
Types.NestedField.builder().withId(4).withName(NONPART_COL2_NAME).isOptional(true)
40+
.ofType(Types.DoubleType.get()).build());
41+
42+
return PartitionSpec.builderFor(schema)
43+
.identity(PART_COL1_NAME)
44+
.identity(PART_COL2_NAME)
45+
.build();
46+
}
47+
48+
private static ColumnDefinition<?> partCol1() {
49+
return ColumnDefinition.ofString(PART_COL1_NAME).withPartitioning();
50+
}
51+
52+
private static ColumnDefinition<?> partCol2() {
53+
return ColumnDefinition.ofInt(PART_COL2_NAME).withPartitioning();
54+
}
55+
56+
private static ColumnDefinition<?> resolvedPartCol() {
57+
return ColumnDefinition.ofInt(RESOLVED_PART_COL_NAME).withPartitioning();
58+
}
59+
60+
private static ColumnDefinition<?> nonPartCol1() {
61+
return ColumnDefinition.ofInt(NONPART_COL1_NAME);
62+
}
63+
64+
private static ColumnDefinition<?> nonPartCol2() {
65+
return ColumnDefinition.ofDouble(NONPART_COL2_NAME);
66+
}
67+
68+
@Test
69+
void testVerifyPartitioningColumns() {
70+
final PartitionSpec spec = getPartitionSpec();
71+
72+
// match partitioning-column ordering
73+
final TableDefinition tDef1 = TableDefinition.of(partCol1(), partCol2(), nonPartCol1(), nonPartCol2());
74+
verifyPartitioningColumns(spec, tDef1);
75+
76+
// flip partitioning-column ordering
77+
final TableDefinition tDef2 = TableDefinition.of(partCol2(), partCol1(), nonPartCol1(), nonPartCol2());
78+
verifyPartitioningColumns(spec, tDef2);
79+
}
80+
81+
@Test
82+
void testMissingPartitionColumn() {
83+
final PartitionSpec spec = getPartitionSpec();
84+
85+
// missing "PartCol1"
86+
final TableDefinition tDef1 = TableDefinition.of(partCol2(), nonPartCol1(), nonPartCol2());
87+
try {
88+
verifyPartitioningColumns(spec, tDef1);
89+
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
90+
} catch (final IllegalArgumentException ise) {
91+
assertThat(ise.getMessage()).startsWith(
92+
"Partition spec contains 2 fields, but the table definition contains 1 fields, partition spec");
93+
}
94+
95+
// missing "PartCol2"
96+
final TableDefinition tDef2 = TableDefinition.of(partCol1(), nonPartCol1(), nonPartCol2());
97+
try {
98+
verifyPartitioningColumns(spec, tDef2);
99+
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
100+
} catch (final IllegalArgumentException ise) {
101+
assertThat(ise.getMessage()).startsWith(
102+
"Partition spec contains 2 fields, but the table definition contains 1 fields, partition spec");
103+
}
104+
}
105+
106+
@Test
107+
void testInvalidPartitioningColumns() {
108+
// missing "PartCol2", added "ResolvedCol", but not defining the mapping in resolver. count is correct, but we
109+
// do not match by name
110+
final TableDefinition tDef1 = TableDefinition.of(partCol1(), resolvedPartCol(), nonPartCol1(), nonPartCol2());
111+
try {
112+
verifyPartitioningColumns(getPartitionSpec(), tDef1);
113+
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
114+
} catch (final IllegalArgumentException ise) {
115+
assertThat(ise.getMessage())
116+
.startsWith("Partitioning column " + PART_COL2_NAME
117+
+ " is not present in the table definition TableDefinition");
118+
}
119+
}
120+
}

extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java

Lines changed: 110 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1902,6 +1902,111 @@ void testPartitionedAppendBasicIntegerPartitions() {
19021902
assertTableEquals(expected2, fromIceberg2.select());
19031903
}
19041904

1905+
@Test
1906+
void testPartitionOrdering() {
1907+
final Table part1 = TableTools.emptyTable(6)
1908+
.update("intCol = (int) 2 * i + 10",
1909+
"doubleCol = (double) 2.5 * i + 10");
1910+
final Table part2 = TableTools.emptyTable(5)
1911+
.update("intCol = (int) 3 * i + 20",
1912+
"doubleCol = (double) 3.5 * i + 20");
1913+
final Table part3 = TableTools.emptyTable(4)
1914+
.update("intCol = (int) 4 * i + 30",
1915+
"doubleCol = (double) 4.5 * i + 30");
1916+
final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.PartitionOrderingTest");
1917+
1918+
final TableDefinition tableDefinition1 = TableDefinition.of(
1919+
ColumnDefinition.ofInt("intCol"),
1920+
ColumnDefinition.ofDouble("doubleCol"),
1921+
ColumnDefinition.ofString("InternalPartition").withPartitioning(),
1922+
ColumnDefinition.ofString("Date").withPartitioning());
1923+
final IcebergTableAdapter tableAdapter1 = catalogAdapter.createTable(tableIdentifier, tableDefinition1);
1924+
final IcebergTableWriter tableWriter1 = tableAdapter1.tableWriter(writerOptionsBuilder()
1925+
.tableDefinition(tableDefinition1)
1926+
.build());
1927+
1928+
tableWriter1.append(IcebergWriteInstructions.builder()
1929+
.addTables(part1, part2, part3)
1930+
.addAllPartitionPaths(List.of(
1931+
"InternalPartition=0/Date=2024-08-01",
1932+
"InternalPartition=1/Date=2024-08-02",
1933+
"InternalPartition=2/Date=2024-08-02"))
1934+
.build());
1935+
final Table fromIceberg1 = tableAdapter1.table();
1936+
final Table expected1 = TableTools.merge(
1937+
part1.update("InternalPartition = `0`", "Date = `2024-08-01`"),
1938+
part2.update("InternalPartition = `1`", "Date = `2024-08-02`"),
1939+
part3.update("InternalPartition = `2`", "Date = `2024-08-02`"));
1940+
assertTableEquals(expected1, fromIceberg1);
1941+
1942+
// Add another partition for same date, but with the partitions ordered differently
1943+
final TableDefinition tableDefinition2 = TableDefinition.of(
1944+
ColumnDefinition.ofInt("intCol"),
1945+
ColumnDefinition.ofDouble("doubleCol"),
1946+
ColumnDefinition.ofString("Date").withPartitioning(), // NOTE: opposite order here!
1947+
ColumnDefinition.ofString("InternalPartition").withPartitioning());
1948+
final IcebergTableWriter tableWriter2 = tableAdapter1.tableWriter(writerOptionsBuilder()
1949+
.tableDefinition(tableDefinition2)
1950+
.build());
1951+
1952+
final Table part4 = TableTools.emptyTable(3)
1953+
.update("intCol = (int) 5 * i + 30",
1954+
"doubleCol = (double) 5.5 * i + 30");
1955+
tableWriter2.append(IcebergWriteInstructions.builder()
1956+
.addTables(part4)
1957+
.addPartitionPaths("InternalPartition=1/Date=2024-08-02")
1958+
.build());
1959+
final Table fromIceberg2 = tableAdapter1.table();
1960+
final Table expected2 = TableTools.merge(
1961+
expected1,
1962+
part4.update("InternalPartition = `1`", "Date = `2024-08-02`"));
1963+
assertTableEquals(expected2, fromIceberg2);
1964+
1965+
// We have successfully written with the alternative partition-ordering. Make sure we can read it that way, too.
1966+
final Schema schema = tableAdapter1.icebergTable().schema();
1967+
final IcebergTableAdapter tableAdapter2 = catalogAdapter.loadTable(LoadTableOptions.builder()
1968+
.id(tableIdentifier)
1969+
.resolver(Resolver.builder()
1970+
// NOTE: this `TableDefinition` has `Date` before `InternalPartition`
1971+
.definition(tableDefinition2)
1972+
.schema(schema)
1973+
.spec(PartitionSpec.builderFor(schema)
1974+
.identity("InternalPartition")
1975+
.identity("Date")
1976+
.build())
1977+
.putColumnInstructions("intCol", schemaField(schema.findField("intCol").fieldId()))
1978+
.putColumnInstructions("doubleCol", schemaField(schema.findField("doubleCol").fieldId()))
1979+
.putColumnInstructions("InternalPartition",
1980+
schemaField(schema.findField("InternalPartition").fieldId()))
1981+
.putColumnInstructions("Date", schemaField(schema.findField("Date").fieldId()))
1982+
.build())
1983+
.build());
1984+
final Table fromIceberg3 = tableAdapter2.table();
1985+
// we've defined `Date` to be the first PartitioningColumn in the returned table instead of `InternalPartition`,
1986+
// so we need to re-order for the comparison (or tell it to ignore column-ordering differences)
1987+
assertTableEquals(expected2.view("intCol", "doubleCol", "Date", "InternalPartition"), fromIceberg3);
1988+
1989+
// Try reading with no partitioning columns defined
1990+
final IcebergTableAdapter tableAdapter3 = catalogAdapter.loadTable(LoadTableOptions.builder()
1991+
.id(tableIdentifier)
1992+
.resolver(Resolver.builder()
1993+
// NOTE: this `TableDefinition` has no partitioning columns
1994+
.definition(TableDefinition.of(ColumnDefinition.ofInt("intCol"),
1995+
ColumnDefinition.ofDouble("doubleCol"),
1996+
ColumnDefinition.ofString("InternalPartition"),
1997+
ColumnDefinition.ofString("Date")))
1998+
.schema(schema)
1999+
.putColumnInstructions("intCol", schemaField(schema.findField("intCol").fieldId()))
2000+
.putColumnInstructions("doubleCol", schemaField(schema.findField("doubleCol").fieldId()))
2001+
.putColumnInstructions("InternalPartition",
2002+
schemaField(schema.findField("InternalPartition").fieldId()))
2003+
.putColumnInstructions("Date", schemaField(schema.findField("Date").fieldId()))
2004+
.build())
2005+
.build());
2006+
final Table fromIceberg4 = tableAdapter3.table();
2007+
assertTableEquals(expected2, fromIceberg4);
2008+
}
2009+
19052010
@Test
19062011
void testPartitionedAppendWithDeleting() {
19072012
final Table part1 = TableTools.emptyTable(6)
@@ -1933,11 +2038,11 @@ void testPartitionedAppendWithDeleting() {
19332038
"InternalPartition=2/Date=2024-08-02"))
19342039
.build());
19352040
final Table fromIceberg = tableAdapter.table();
1936-
final Table expected = TableTools.merge(
2041+
final Table expected1 = TableTools.merge(
19372042
part1.update("InternalPartition = `0`", "Date = `2024-08-01`"),
19382043
part2.update("InternalPartition = `1`", "Date = `2024-08-02`"),
19392044
part3.update("InternalPartition = `2`", "Date = `2024-08-02`"));
1940-
assertTableEquals(expected, fromIceberg.select());
2045+
assertTableEquals(expected1, fromIceberg);
19412046

19422047
// Add another partition for same date
19432048
final Table part4 = TableTools.emptyTable(3)
@@ -1949,11 +2054,9 @@ void testPartitionedAppendWithDeleting() {
19492054
.build());
19502055
final Table fromIceberg2 = tableAdapter.table();
19512056
final Table expected2 = TableTools.merge(
1952-
part1.update("InternalPartition = `0`", "Date = `2024-08-01`"),
1953-
part2.update("InternalPartition = `1`", "Date = `2024-08-02`"),
1954-
part3.update("InternalPartition = `2`", "Date = `2024-08-02`"),
2057+
expected1,
19552058
part4.update("InternalPartition = `1`", "Date = `2024-08-02`"));
1956-
assertTableEquals(expected2, fromIceberg2.select());
2059+
assertTableEquals(expected2, fromIceberg2);
19572060

19582061
// Now delete the partition for date 2024-08-02
19592062
final Expression delExpr = Expressions.equal("Date", "2024-08-02");
@@ -1964,7 +2067,7 @@ void testPartitionedAppendWithDeleting() {
19642067
final IcebergTableAdapter latestTableAdapter = catalogAdapter.loadTable(tableIdentifier);
19652068
final Table fromIceberg3 = latestTableAdapter.table();
19662069
final Table expected3 = part1.update("InternalPartition = `0`", "Date = `2024-08-01`");
1967-
assertTableEquals(expected3, fromIceberg3.select());
2070+
assertTableEquals(expected3, fromIceberg3);
19682071
}
19692072

19702073
@Test

0 commit comments

Comments
 (0)