Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 2
"modification": 1
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -55,6 +59,7 @@
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
Expand Down Expand Up @@ -102,6 +107,7 @@
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -197,6 +203,25 @@ public PCollectionRowTuple expand(PCollection<String> input) {
OUTPUT_TAG, snapshots, ERROR_TAG, dataFiles.get(ERRORS).setRowSchema(ERROR_SCHEMA));
}

/**
* Reads incoming file paths, extracts Iceberg metadata, and converts them into {@link SerializableDataFile} objects.
*
* <p><b>Asynchronous Bundle Processing:</b>
* Because file I/O, catalog lookups, and metadata inference can be highly latency-bound,
* this DoFn implements an asynchronous processing pattern to maximize throughput. By default,
* Beam processes elements in a bundle sequentially. To avoid bottlenecking the pipeline,
* we use an internal {@link ExecutorService} to process multiple files concurrently within a single DoFn instance.
*
* <p><b>Lifecycle & Thread Safety:</b>
* <ul>
* <li><b>{@link ProcessElement}:</b> Submits the heavy lifting (format inference,
* metrics collection, and partition resolution) to a background thread pool and stores
* the resulting {@link Future}.
* <li><b>{@link FinishBundle}:</b> Blocks and awaits the completion of all futures in the
* current bundle. It safely emits the successfully parsed {@link DataFile}s, or error rows,
* back to the runner on the main thread, as {@link MultiOutputReceiver} is not thread-safe.
* </ul>
*/
static class ConvertToDataFile extends DoFn<String, SerializableDataFile> {
private final IcebergCatalogConfig catalogConfig;
private final String identifier;
Expand All @@ -205,10 +230,15 @@ static class ConvertToDataFile extends DoFn<String, SerializableDataFile> {
private final @Nullable String prefix;
private final @Nullable List<String> partitionFields;
private final @Nullable Map<String, String> tableProps;
private transient @MonotonicNonNull ExecutorService executor;
private transient @MonotonicNonNull List<Future<ProcessResult>> activeTasks;
private transient @MonotonicNonNull Table table;

// Limit open readers to avoid blowing up memory on one worker
private static final int MAX_READERS = 10;
private static final Semaphore ACTIVE_READERS = new Semaphore(MAX_READERS);
// Number of parallel threads processing incoming files
private static final int THREAD_POOL_SIZE = 10;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we should do some experimentation to pick a reasonable default here or base this on similar optimizations done for other I/Os.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing some experiments now


public ConvertToDataFile(
IcebergCatalogConfig catalogConfig,
Expand All @@ -227,101 +257,170 @@ public ConvertToDataFile(
private static final String UNKNOWN_FORMAT_ERROR = "Could not determine the file's format";
static final String UNKNOWN_PARTITION_ERROR = "Could not determine the file's partition: ";

@ProcessElement
public void process(@Element String filePath, MultiOutputReceiver output)
throws IOException, InterruptedException {
FileFormat format;
try {
format = inferFormat(filePath);
} catch (UnknownFormatException e) {
output
.get(ERRORS)
.output(Row.withSchema(ERROR_SCHEMA).addValues(filePath, UNKNOWN_FORMAT_ERROR).build());
numErrorFiles.inc();
return;
private static class ProcessResult {
final @Nullable SerializableDataFile dataFile;
final @Nullable Row errorRow;
final Instant timestamp;
final BoundedWindow window;

ProcessResult(
@Nullable SerializableDataFile dataFile,
@Nullable Row errorRow,
Instant timestamp,
BoundedWindow window) {
Preconditions.checkState(
dataFile == null || errorRow == null,
"Expected only one of dataFile or errorRow, but got both:%n\tfile: %s%n\terror: %s",
dataFile != null ? dataFile.getPath() : null,
errorRow);
this.dataFile = dataFile;
this.errorRow = errorRow;
this.timestamp = timestamp;
this.window = window;
}
}

if (table == null) {
try {
table = getOrCreateTable(filePath, format);
} catch (FileNotFoundException e) {
output
.get(ERRORS)
.output(
Row.withSchema(ERROR_SCHEMA)
.addValues(filePath, checkStateNotNull(e.getMessage()))
.build());
numErrorFiles.inc();
return;
}
}
@Setup
public void setup() {
executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
}

// Check if the file path contains the provided prefix
if (table.spec().isPartitioned()
&& !Strings.isNullOrEmpty(prefix)
&& !filePath.startsWith(checkStateNotNull(prefix))) {
output
.get(ERRORS)
.output(Row.withSchema(ERROR_SCHEMA).addValues(filePath, PREFIX_ERROR).build());
numErrorFiles.inc();
return;
@Teardown
public void teardown() {
if (executor != null) {
executor.shutdownNow();
}
}

InputFile inputFile = table.io().newInputFile(filePath);
@StartBundle
public void startBundle() {
activeTasks = new ArrayList<>();
}

Metrics metrics;
try {
metrics =
getFileMetrics(
inputFile,
format,
MetricsConfig.forTable(table),
MappingUtil.create(table.schema()));
} catch (FileNotFoundException e) {
output
.get(ERRORS)
.output(
private Callable<ProcessResult> createProcessTask(
String filePath, Instant timestamp, BoundedWindow window) {

return () -> {
FileFormat format;
try {
format = inferFormat(filePath);
} catch (UnknownFormatException e) {
return new ProcessResult(
null,
Row.withSchema(ERROR_SCHEMA).addValues(filePath, UNKNOWN_FORMAT_ERROR).build(),
timestamp,
window);
}

// Synchronize table initialization
if (table == null) {
try {
table = getOrCreateTable(filePath, format);
} catch (FileNotFoundException e) {
return new ProcessResult(
null,
Row.withSchema(ERROR_SCHEMA)
.addValues(filePath, checkStateNotNull(e.getMessage()))
.build());
numErrorFiles.inc();
return;
}
.build(),
timestamp,
window);
}
}

// Figure out which partition this DataFile should go to
String partitionPath;
if (table.spec().isUnpartitioned()) {
partitionPath = "";
} else if (!Strings.isNullOrEmpty(prefix)) {
// option 1: use directory structure to determine partition
// Note: we don't validate the DataFile content here
partitionPath = getPartitionFromFilePath(filePath);
} else {
// Check if the file path contains the provided prefix
if (table.spec().isPartitioned()
&& !Strings.isNullOrEmpty(prefix)
&& !filePath.startsWith(checkStateNotNull(prefix))) {
return new ProcessResult(
null,
Row.withSchema(ERROR_SCHEMA).addValues(filePath, PREFIX_ERROR).build(),
timestamp,
window);
}

InputFile inputFile = table.io().newInputFile(filePath);

Metrics metrics;
try {
// option 2: examine DataFile min/max statistics to determine partition
partitionPath = getPartitionFromMetrics(metrics, inputFile, table);
} catch (UnknownPartitionException e) {
output
.get(ERRORS)
.output(
Row.withSchema(ERROR_SCHEMA)
.addValues(filePath, UNKNOWN_PARTITION_ERROR + e.getMessage())
.build());
numErrorFiles.inc();
return;
metrics =
getFileMetrics(
inputFile,
format,
MetricsConfig.forTable(table),
MappingUtil.create(table.schema()));
} catch (FileNotFoundException e) {
return new ProcessResult(
null,
Row.withSchema(ERROR_SCHEMA)
.addValues(filePath, checkStateNotNull(e.getMessage()))
.build(),
timestamp,
window);
}
}

DataFile df =
DataFiles.builder(table.spec())
.withPath(filePath)
.withFormat(format)
.withMetrics(metrics)
.withFileSizeInBytes(inputFile.getLength())
.withPartitionPath(partitionPath)
.build();
// Figure out which partition this DataFile should go to
String partitionPath;
if (table.spec().isUnpartitioned()) {
partitionPath = "";
} else if (!Strings.isNullOrEmpty(prefix)) {
// option 1: use directory structure to determine partition
// Note: we don't validate the DataFile content here
partitionPath = getPartitionFromFilePath(filePath);
} else {
try {
// option 2: examine DataFile min/max statistics to determine partition
partitionPath = getPartitionFromMetrics(metrics, inputFile, table);
} catch (UnknownPartitionException e) {
return new ProcessResult(
null,
Row.withSchema(ERROR_SCHEMA)
.addValues(filePath, UNKNOWN_PARTITION_ERROR + e.getMessage())
.build(),
timestamp,
window);
}
}

DataFile df =
DataFiles.builder(table.spec())
.withPath(filePath)
.withFormat(format)
.withMetrics(metrics)
.withFileSizeInBytes(inputFile.getLength())
.withPartitionPath(partitionPath)
.build();

return new ProcessResult(
SerializableDataFile.from(df, partitionPath), null, timestamp, window);
};
}

@ProcessElement
public void process(
@Element String filePath,
@Timestamp Instant timestamp,
BoundedWindow window,
MultiOutputReceiver output)
throws IOException, InterruptedException {

Callable<ProcessResult> task = createProcessTask(filePath, timestamp, window);
checkStateNotNull(activeTasks).add(checkStateNotNull(executor).submit(task));
}

@FinishBundle
public void finishBundle(FinishBundleContext context) throws Exception {
for (Future<ProcessResult> future : checkStateNotNull(activeTasks)) {
// Block and wait for threads to finish their work
ProcessResult result = future.get();

output.get(DATA_FILES).output(SerializableDataFile.from(df, partitionPath));
// Safely output on the main runner thread
if (result.errorRow != null) {
context.output(ERRORS, result.errorRow, result.timestamp, result.window);
numErrorFiles.inc();
} else if (result.dataFile != null) {
context.output(DATA_FILES, result.dataFile, result.timestamp, result.window);
}
}
}

static <W, T> T transformValue(Transform<W, T> transform, Type type, ByteBuffer bytes) {
Expand Down
Loading