Skip to content

Commit 17198be

Browse files
authored
fix: prevent NoSuchElementException in AppendFilesToTables (#37217)
1 parent c489cb7 commit 17198be

File tree

3 files changed

+121
-12
lines changed

3 files changed

+121
-12
lines changed

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,8 @@ public void processElement(
135135
}
136136

137137
// vast majority of the time, we will simply append data files.
138-
// in the rare case we get a batch that contains multiple partition specs, we will group
138+
// in the rare case we get a batch that contains multiple partition specs, we
139+
// will group
139140
// data into manifest files and append.
140141
// note: either way, we must use a single commit operation for atomicity.
141142
if (containsMultiplePartitionSpecs(fileWriteResults)) {
@@ -163,11 +164,14 @@ private void appendDataFiles(Table table, Iterable<FileWriteResult> fileWriteRes
163164
update.commit();
164165
}
165166

166-
// When a user updates their table partition spec during runtime, we can end up with
167-
// a batch of files where some are written with the old spec and some are written with the new
167+
// When a user updates their table partition spec during runtime, we can end up
168+
// with
169+
// a batch of files where some are written with the old spec and some are
170+
// written with the new
168171
// spec.
169172
// A table commit is limited to a single partition spec.
170-
// To handle this, we create a manifest file for each partition spec, and group data files
173+
// To handle this, we create a manifest file for each partition spec, and group
174+
// data files
171175
// accordingly.
172176
// Afterward, we append all manifests using a single commit operation.
173177
private void appendManifestFiles(Table table, Iterable<FileWriteResult> fileWriteResults)
@@ -211,14 +215,18 @@ private ManifestWriter<DataFile> createManifestWriter(
211215
return ManifestFiles.write(spec, io.newOutputFile(location));
212216
}
213217

214-
// If the process call fails immediately after a successful commit, it gets retried with
218+
// If the process call fails immediately after a successful commit, it gets
219+
// retried with
215220
// the same data, possibly leading to data duplication.
216-
// To mitigate, we skip the current batch of files if it matches the most recently committed
221+
// To mitigate, we skip the current batch of files if it matches the most
222+
// recently committed
217223
// batch.
218224
//
219-
// TODO(ahmedabu98): This does not cover concurrent writes from other pipelines, where the
220-
// "last successful snapshot" might reflect commits from other sources. Ideally, we would make
221-
// this stateful, but that is update incompatible.
225+
// TODO(ahmedabu98): This does not cover concurrent writes from other pipelines,
226+
// where the
227+
// "last successful snapshot" might reflect commits from other sources. Ideally,
228+
// we would make
229+
// this stateful, but that is update incompatible.
222230
// TODO(ahmedabu98): add load test pipelines with intentional periodic crashing
223231
private boolean shouldSkip(Table table, Iterable<FileWriteResult> fileWriteResults) {
224232
if (table.currentSnapshot() == null) {
@@ -231,8 +239,11 @@ private boolean shouldSkip(Table table, Iterable<FileWriteResult> fileWriteResul
231239
// Check if the current batch is identical to the most recently committed batch.
232240
// Upstream GBK means we always get the same batch of files on retry,
233241
// so a single overlapping file means the whole batch is identical.
234-
String sampleCommittedDataFilePath =
235-
table.currentSnapshot().addedDataFiles(table.io()).iterator().next().path().toString();
242+
Iterable<DataFile> addedDataFiles = table.currentSnapshot().addedDataFiles(table.io());
243+
if (!addedDataFiles.iterator().hasNext()) {
244+
return false;
245+
}
246+
String sampleCommittedDataFilePath = addedDataFiles.iterator().next().location().toString();
236247
for (FileWriteResult result : fileWriteResults) {
237248
if (result.getSerializableDataFile().getPath().equals(sampleCommittedDataFilePath)) {
238249
return true;

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ abstract static class Builder {
141141
static SerializableDataFile from(DataFile f, String partitionPath) {
142142

143143
return SerializableDataFile.builder()
144-
.setPath(f.path().toString())
144+
.setPath(f.location().toString())
145145
.setFileFormat(f.format().toString())
146146
.setRecordCount(f.recordCount())
147147
.setFileSizeInBytes(f.fileSizeInBytes())
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.iceberg;
19+
20+
import static org.hamcrest.MatcherAssert.assertThat;
21+
22+
import java.io.Serializable;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.UUID;
26+
import org.apache.beam.sdk.Pipeline;
27+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
28+
import org.apache.beam.sdk.transforms.Create;
29+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
30+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
31+
import org.apache.iceberg.CatalogUtil;
32+
import org.apache.iceberg.DeleteFiles;
33+
import org.apache.iceberg.Table;
34+
import org.apache.iceberg.catalog.TableIdentifier;
35+
import org.apache.iceberg.data.IcebergGenerics;
36+
import org.apache.iceberg.data.Record;
37+
import org.hamcrest.Matchers;
38+
import org.junit.ClassRule;
39+
import org.junit.Rule;
40+
import org.junit.Test;
41+
import org.junit.rules.TemporaryFolder;
42+
import org.junit.runner.RunWith;
43+
import org.junit.runners.JUnit4;
44+
45+
@RunWith(JUnit4.class)
46+
public class AppendFilesToTablesTest implements Serializable {
47+
48+
@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
49+
50+
@Rule
51+
public transient TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default");
52+
53+
@Test
54+
public void testAppendAfterDelete() throws Exception {
55+
TableIdentifier tableId =
56+
TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16));
57+
58+
Map<String, String> catalogProps =
59+
ImmutableMap.<String, String>builder()
60+
.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
61+
.put("warehouse", warehouse.location)
62+
.build();
63+
64+
IcebergCatalogConfig catalog =
65+
IcebergCatalogConfig.builder()
66+
.setCatalogName("name")
67+
.setCatalogProperties(catalogProps)
68+
.build();
69+
70+
// 1. Create table and write some data using first pipeline
71+
Pipeline p1 = Pipeline.create(PipelineOptionsFactory.create());
72+
p1.apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1)))
73+
.setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
74+
.apply("Append To Table", IcebergIO.writeRows(catalog).to(tableId));
75+
76+
p1.run().waitUntilFinish();
77+
78+
// 2. Delete the data
79+
Table table = warehouse.loadTable(tableId);
80+
DeleteFiles delete = table.newDelete();
81+
// Delete all data files in the current snapshot
82+
table.currentSnapshot().addedDataFiles(table.io()).forEach(delete::deleteFile);
83+
delete.commit();
84+
85+
// 3. Write more data using a fresh second pipeline
86+
Pipeline p2 = Pipeline.create(PipelineOptionsFactory.create());
87+
p2.apply("More Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT2)))
88+
.setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
89+
.apply("Append More To Table", IcebergIO.writeRows(catalog).to(tableId));
90+
91+
p2.run().waitUntilFinish();
92+
93+
// Verify data - after delete and append, only FILE1SNAPSHOT2 should be present
94+
table.refresh();
95+
List<Record> writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build());
96+
assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT2.toArray()));
97+
}
98+
}

0 commit comments

Comments
 (0)