|
22 | 22 | import java.util.HashMap; |
23 | 23 | import java.util.List; |
24 | 24 | import java.util.Map; |
| 25 | +import java.util.Set; |
25 | 26 | import java.util.UUID; |
| 27 | +import java.util.stream.Collectors; |
26 | 28 | import org.apache.beam.sdk.coders.KvCoder; |
27 | 29 | import org.apache.beam.sdk.coders.StringUtf8Coder; |
28 | 30 | import org.apache.beam.sdk.metrics.Counter; |
|
38 | 40 | import org.apache.beam.sdk.util.Preconditions; |
39 | 41 | import org.apache.beam.sdk.values.KV; |
40 | 42 | import org.apache.beam.sdk.values.PCollection; |
| 43 | +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; |
| 44 | +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Streams; |
41 | 45 | import org.apache.iceberg.AppendFiles; |
42 | 46 | import org.apache.iceberg.DataFile; |
43 | 47 | import org.apache.iceberg.FileFormat; |
@@ -128,13 +132,12 @@ public void processElement( |
128 | 132 | BoundedWindow window) |
129 | 133 | throws IOException { |
130 | 134 | String tableStringIdentifier = element.getKey(); |
| 135 | + Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey())); |
131 | 136 | Iterable<FileWriteResult> fileWriteResults = element.getValue(); |
132 | | - if (!fileWriteResults.iterator().hasNext()) { |
| 137 | + if (shouldSkip(table, fileWriteResults)) { |
133 | 138 | return; |
134 | 139 | } |
135 | 140 |
|
136 | | - Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey())); |
137 | | - |
138 | 141 | // vast majority of the time, we will simply append data files. |
139 | 142 | // in the rare case we get a batch that contains multiple partition specs, we will group |
140 | 143 | // data into manifest files and append. |
@@ -211,5 +214,36 @@ private ManifestWriter<DataFile> createManifestWriter( |
211 | 214 | tableLocation, manifestFilePrefix, uuid, spec.specId())); |
212 | 215 | return ManifestFiles.write(spec, io.newOutputFile(location)); |
213 | 216 | } |
| 217 | + |
| 218 | + // If the process call fails immediately after a successful commit, it gets retried with |
| 219 | + // the same data, possibly leading to data duplication. |
| 220 | + // To mitigate, we skip the current batch of files if it matches the most recently committed |
| 221 | + // batch. |
| 222 | + // |
| 223 | + // TODO(ahmedabu98): This does not cover concurrent writes from other pipelines, where the |
| 224 | + // "last successful snapshot" might reflect commits from other sources. Ideally, we would make |
| 225 | + // this stateful, but that is update incompatible. |
| 226 | + // TODO(ahmedabu98): add load test pipelines with intentional periodic crashing |
| 227 | + private boolean shouldSkip(Table table, Iterable<FileWriteResult> fileWriteResults) { |
| 228 | + if (table.currentSnapshot() == null) { |
| 229 | + return false; |
| 230 | + } |
| 231 | + if (!fileWriteResults.iterator().hasNext()) { |
| 232 | + return true; |
| 233 | + } |
| 234 | + |
| 235 | + Set<String> filesCommittedLastSnapshot = |
| 236 | + Streams.stream(table.currentSnapshot().addedDataFiles(table.io())) |
| 237 | + .map(DataFile::path) |
| 238 | + .map(CharSequence::toString) |
| 239 | + .collect(Collectors.toSet()); |
| 240 | + |
| 241 | + // Check if the current batch is identical to the most recently committed batch. |
| 242 | + // Upstream GBK means we always get the same batch of files on retry, |
| 243 | + // so a single overlapping file means the whole batch is identical. |
| 244 | + return Iterables.size(fileWriteResults) == filesCommittedLastSnapshot.size() |
| 245 | + && filesCommittedLastSnapshot.contains( |
| 246 | + fileWriteResults.iterator().next().getSerializableDataFile().getPath()); |
| 247 | + } |
214 | 248 | } |
215 | 249 | } |
0 commit comments