Skip to content

Commit a679e9c

Browse files
feat: DH-18149: Simplified passing s3 instructions for iceberg writing (deephaven#6668)
If the S3 instructions are not passed when constructing the table writer, it will automatically inherit the configuration from the catalog properties.
1 parent 6c1f875 commit a679e9c

File tree

13 files changed

+116
-21
lines changed

13 files changed

+116
-21
lines changed

extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/TableParquetWriterOptionsTest.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
import io.deephaven.engine.table.ColumnDefinition;
77
import io.deephaven.engine.table.TableDefinition;
8+
import io.deephaven.extensions.s3.S3Instructions;
9+
import io.deephaven.parquet.table.CompletedParquetWrite;
810
import io.deephaven.parquet.table.ParquetInstructions;
911
import org.junit.jupiter.api.Test;
1012

@@ -144,8 +146,12 @@ void toParquetInstructionTest() {
144146
ColumnDefinition.ofInt("PC2").withPartitioning(),
145147
ColumnDefinition.ofLong("I"));
146148
final Map<Integer, String> fieldIdToName = Map.of(2, "field2", 3, "field3");
149+
final S3Instructions s3Instructions = S3Instructions.builder().regionName("test-region").build();
150+
final ParquetInstructions.OnWriteCompleted onWriteCompleted =
151+
(final CompletedParquetWrite completedParquetWrite) -> {
152+
/* Do nothing */ };
147153
final ParquetInstructions parquetInstructions = writeInstructions.toParquetInstructions(
148-
null, definition, fieldIdToName);
154+
onWriteCompleted, definition, fieldIdToName, s3Instructions);
149155

150156
assertThat(parquetInstructions.getCompressionCodecName()).isEqualTo("GZIP");
151157
assertThat(parquetInstructions.getMaximumDictionaryKeys()).isEqualTo(100);
@@ -154,7 +160,8 @@ void toParquetInstructionTest() {
154160
assertThat(parquetInstructions.getFieldId("field1")).isEmpty();
155161
assertThat(parquetInstructions.getFieldId("field2")).hasValue(2);
156162
assertThat(parquetInstructions.getFieldId("field3")).hasValue(3);
157-
assertThat(parquetInstructions.onWriteCompleted()).isEmpty();
163+
assertThat(parquetInstructions.onWriteCompleted()).hasValue(onWriteCompleted);
158164
assertThat(parquetInstructions.getTableDefinition()).hasValue(definition);
165+
assertThat(parquetInstructions.getSpecialInstructions()).isEqualTo(s3Instructions);
159166
}
160167
}

extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,15 +131,15 @@ public static <T> Stream<T> toStream(final org.apache.iceberg.io.CloseableIterab
131131
});
132132
}
133133

134-
private static String path(String path, FileIO io) {
134+
private static String path(@NotNull final String path, @NotNull final FileIO io) {
135135
return io instanceof RelativeFileIO ? ((RelativeFileIO) io).absoluteLocation(path) : path;
136136
}
137137

138-
public static URI locationUri(Table table) {
138+
public static URI locationUri(@NotNull final Table table) {
139139
return FileUtils.convertToURI(path(table.location(), table.io()), true);
140140
}
141141

142-
public static URI dataFileUri(Table table, DataFile dataFile) {
142+
public static URI dataFileUri(@NotNull final Table table, @NotNull final DataFile dataFile) {
143143
return FileUtils.convertToURI(path(dataFile.path().toString(), table.io()), false);
144144
}
145145

extensions/iceberg/src/main/java/io/deephaven/iceberg/internal/DataInstructionsProviderLoader.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package io.deephaven.iceberg.internal;
55

66
import org.jetbrains.annotations.NotNull;
7+
import org.jetbrains.annotations.Nullable;
78

89
import java.util.ArrayList;
910
import java.util.List;
@@ -82,6 +83,7 @@ private DataInstructionsProviderLoader(
8283
* @param uriScheme The URI scheme
8384
* @return A data instructions object for the given URI scheme or null if one cannot be found
8485
*/
86+
@Nullable
8587
public Object load(@NotNull final String uriScheme) {
8688
for (final DataInstructionsProviderPlugin plugin : providers) {
8789
final Object pluginInstructions = plugin.createInstructions(uriScheme, properties);

extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131

3232
import static io.deephaven.iceberg.base.IcebergUtils.allManifestFiles;
3333
import static io.deephaven.iceberg.base.IcebergUtils.dataFileUri;
34-
import static io.deephaven.iceberg.base.IcebergUtils.locationUri;
3534

3635
public abstract class IcebergBaseLayout implements TableLocationKeyFinder<IcebergTableLocationKey> {
3736
/**
@@ -71,11 +70,6 @@ public abstract class IcebergBaseLayout implements TableLocationKeyFinder<Iceber
7170
*/
7271
final TableDefinition tableDef;
7372

74-
/**
75-
* The URI scheme from the Table {@link Table#location() location}.
76-
*/
77-
private final String uriScheme;
78-
7973
/**
8074
* The {@link Snapshot} from which to discover data files.
8175
*/
@@ -123,6 +117,8 @@ protected IcebergTableLocationKey locationKey(
123117
/**
124118
* @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table.
125119
* @param instructions The instructions for customizations while reading.
120+
* @param dataInstructionsProvider The provider for special instructions, to be used if special instructions not
121+
* provided in the {@code instructions}.
126122
*/
127123
public IcebergBaseLayout(
128124
@NotNull final IcebergTableAdapter tableAdapter,
@@ -147,7 +143,8 @@ public IcebergBaseLayout(
147143

148144
this.snapshot = tableAdapter.getSnapshot(instructions);
149145
this.tableDef = tableAdapter.definition(instructions);
150-
this.uriScheme = locationUri(tableAdapter.icebergTable()).getScheme();
146+
147+
final String uriScheme = tableAdapter.locationUri().getScheme();
151148
// Add the data instructions if provided as part of the IcebergReadInstructions, or else attempt to create
152149
// data instructions from the properties collection and URI scheme.
153150
final Object specialInstructions = instructions.dataInstructions()

extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergFlatLayout.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ public final class IcebergFlatLayout extends IcebergBaseLayout {
2222
/**
2323
* @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table.
2424
* @param instructions The instructions for customizations while reading.
25+
* @param dataInstructionsProvider The provider for special instructions, to be used if special instructions not
26+
* provided in the {@code instructions}.
2527
*/
2628
public IcebergFlatLayout(
2729
@NotNull final IcebergTableAdapter tableAdapter,

extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ private IdentityPartitioningColData(String name, Class<?> type, int index) {
4242
* @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table.
4343
* @param partitionSpec The Iceberg {@link PartitionSpec partition spec} for the table.
4444
* @param instructions The instructions for customizations while reading.
45+
* @param dataInstructionsProvider The provider for special instructions, to be used if special instructions not
46+
* provided in the {@code instructions}.
4547
*/
4648
public IcebergKeyValuePartitionedLayout(
4749
@NotNull final IcebergTableAdapter tableAdapter,

extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergReadInstructions.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ public static Builder builder() {
3737

3838
/**
3939
* The data instructions to use for reading the Iceberg data files (might be S3Instructions or other cloud
40-
* provider-specific instructions).
40+
* provider-specific instructions). If not provided, data instructions will be derived from the properties of the
41+
* catalog.
4142
*/
4243
public abstract Optional<Object> dataInstructions();
4344

extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableAdapter.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.deephaven.engine.table.impl.sources.regioned.RegionedTableComponentFactoryImpl;
2020
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
2121
import io.deephaven.engine.util.TableTools;
22+
import io.deephaven.iceberg.base.IcebergUtils;
2223
import io.deephaven.iceberg.base.IcebergUtils.SpecAndSchema;
2324
import io.deephaven.iceberg.internal.DataInstructionsProviderLoader;
2425
import io.deephaven.iceberg.layout.*;
@@ -38,6 +39,7 @@
3839
import org.jetbrains.annotations.NotNull;
3940
import org.jetbrains.annotations.Nullable;
4041

42+
import java.net.URI;
4143
import java.time.Instant;
4244
import java.util.*;
4345
import java.util.stream.Collectors;
@@ -61,6 +63,8 @@ public class IcebergTableAdapter {
6163
private final TableIdentifier tableIdentifier;
6264
private final DataInstructionsProviderLoader dataInstructionsProviderLoader;
6365

66+
private final URI locationUri;
67+
6468
public IcebergTableAdapter(
6569
final Catalog catalog,
6670
final TableIdentifier tableIdentifier,
@@ -70,6 +74,7 @@ public IcebergTableAdapter(
7074
this.table = table;
7175
this.tableIdentifier = tableIdentifier;
7276
this.dataInstructionsProviderLoader = dataInstructionsProviderLoader;
77+
this.locationUri = IcebergUtils.locationUri(table);
7378
}
7479

7580
/**
@@ -584,6 +589,13 @@ private static TableDefinition fromSchema(
584589
* @return A new instance of {@link IcebergTableWriter} configured with the provided options.
585590
*/
586591
public IcebergTableWriter tableWriter(final TableWriterOptions tableWriterOptions) {
587-
return new IcebergTableWriter(tableWriterOptions, this);
592+
return new IcebergTableWriter(tableWriterOptions, this, dataInstructionsProviderLoader);
593+
}
594+
595+
/**
596+
* Get the location URI of the Iceberg table.
597+
*/
598+
public URI locationUri() {
599+
return locationUri;
588600
}
589601
}

extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.deephaven.engine.table.Table;
1313
import io.deephaven.engine.table.TableDefinition;
1414
import io.deephaven.engine.table.impl.locations.TableDataException;
15+
import io.deephaven.iceberg.internal.DataInstructionsProviderLoader;
1516
import io.deephaven.parquet.table.CompletedParquetWrite;
1617
import io.deephaven.parquet.table.ParquetInstructions;
1718
import io.deephaven.parquet.table.ParquetTools;
@@ -101,15 +102,30 @@ public class IcebergTableWriter {
101102
*/
102103
private final OutputFileFactory outputFileFactory;
103104

105+
/**
106+
* The special instructions to use for writing the Iceberg data files (might be S3Instructions or other cloud
107+
* provider-specific instructions).
108+
*/
109+
private final Object specialInstructions;
110+
104111
/**
105112
* Characters to be used for generating random variable names of length {@link #VARIABLE_NAME_LENGTH}.
106113
*/
107114
private static final String CHARACTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
108115
private static final int VARIABLE_NAME_LENGTH = 6;
109116

117+
/**
118+
* Create a new Iceberg table writer instance.
119+
*
120+
* @param tableWriterOptions The options to configure the behavior of this writer instance.
121+
* @param tableAdapter The Iceberg table adapter corresponding to the Iceberg table to write to.
122+
* @param dataInstructionsProvider The provider for special instructions, to be used if special instructions not
123+
* provided in the {@code tableWriterOptions}.
124+
*/
110125
IcebergTableWriter(
111126
final TableWriterOptions tableWriterOptions,
112-
final IcebergTableAdapter tableAdapter) {
127+
final IcebergTableAdapter tableAdapter,
128+
final DataInstructionsProviderLoader dataInstructionsProvider) {
113129
this.tableWriterOptions = verifyWriterOptions(tableWriterOptions);
114130
this.table = tableAdapter.icebergTable();
115131

@@ -131,6 +147,11 @@ public class IcebergTableWriter {
131147
outputFileFactory = OutputFileFactory.builderFor(table, 0, 0)
132148
.format(FileFormat.PARQUET)
133149
.build();
150+
151+
final String uriScheme = tableAdapter.locationUri().getScheme();
152+
this.specialInstructions = tableWriterOptions.dataInstructions()
153+
.orElseGet(() -> dataInstructionsProvider.load(uriScheme));
154+
134155
}
135156

136157
private static TableParquetWriterOptions verifyWriterOptions(
@@ -459,7 +480,7 @@ private List<CompletedParquetWrite> writeParquet(
459480
final List<CompletedParquetWrite> parquetFilesWritten = new ArrayList<>(dhTables.size());
460481
final ParquetInstructions.OnWriteCompleted onWriteCompleted = parquetFilesWritten::add;
461482
final ParquetInstructions parquetInstructions = tableWriterOptions.toParquetInstructions(
462-
onWriteCompleted, tableDefinition, fieldIdToColumnName);
483+
onWriteCompleted, tableDefinition, fieldIdToColumnName, specialInstructions);
463484

464485
// Write the data to parquet files
465486
for (int idx = 0; idx < dhTables.size(); idx++) {

extensions/iceberg/src/main/java/io/deephaven/iceberg/util/TableParquetWriterOptions.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import io.deephaven.parquet.table.ParquetInstructions;
99
import org.immutables.value.Value;
1010
import org.jetbrains.annotations.NotNull;
11+
import org.jetbrains.annotations.Nullable;
1112

1213
import java.util.Map;
1314

@@ -71,14 +72,18 @@ public int targetPageSize() {
7172
* @param onWriteCompleted The callback to be invoked after writing the parquet file.
7273
* @param tableDefinition The table definition to be populated inside the parquet file's schema
7374
* @param fieldIdToName Mapping of field id to field name, to be populated inside the parquet file's schema
75+
* @param specialInstructions Special instructions to be passed to the parquet writer
7476
*/
7577
ParquetInstructions toParquetInstructions(
7678
@NotNull final ParquetInstructions.OnWriteCompleted onWriteCompleted,
7779
@NotNull final TableDefinition tableDefinition,
78-
@NotNull final Map<Integer, String> fieldIdToName) {
80+
@NotNull final Map<Integer, String> fieldIdToName,
81+
@Nullable final Object specialInstructions) {
7982
final ParquetInstructions.Builder builder = new ParquetInstructions.Builder();
8083

81-
dataInstructions().ifPresent(builder::setSpecialInstructions);
84+
if (specialInstructions != null) {
85+
builder.setSpecialInstructions(specialInstructions);
86+
}
8287

8388
// Add parquet writing specific instructions.
8489
builder.setTableDefinition(tableDefinition);

0 commit comments

Comments
 (0)