Skip to content

Commit 4cfa2f7

Browse files
authored
[GOBBLIN-2203] Add Support for Delete Manifests in Iceberg Full Table Replication (#4112)
* added support for delete manifests in iceberg full table replication * remove unused imports
1 parent 0548766 commit 4cfa2f7

File tree

2 files changed

+175
-43
lines changed

2 files changed

+175
-43
lines changed

gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030

3131
import org.apache.hadoop.fs.FileSystem;
3232
import org.apache.iceberg.DataFile;
33+
import org.apache.iceberg.DeleteFile;
34+
import org.apache.iceberg.ManifestContent;
3335
import org.apache.iceberg.ManifestFile;
3436
import org.apache.iceberg.ManifestFiles;
3537
import org.apache.iceberg.ManifestReader;
@@ -200,6 +202,9 @@ protected static List<IcebergSnapshotInfo.ManifestFileInfo> calcAllManifestFileI
200202
}
201203

202204
protected static IcebergSnapshotInfo.ManifestFileInfo calcManifestFileInfo(ManifestFile manifest, FileIO io) throws IOException {
205+
if (manifest.content() == ManifestContent.DELETES) {
206+
return new ManifestFileInfo(manifest.path(), discoverDeleteFilePaths(manifest, io));
207+
}
203208
return new ManifestFileInfo(manifest.path(), discoverDataFilePaths(manifest, io));
204209
}
205210

@@ -209,6 +214,13 @@ protected static List<String> discoverDataFilePaths(ManifestFile manifest, FileI
209214
}
210215
}
211216

217+
protected static List<String> discoverDeleteFilePaths(ManifestFile manifest, FileIO io) throws IOException {
218+
try (ManifestReader<DeleteFile> deleteFileManifestReader = ManifestFiles.readDeleteManifest(manifest, io, null);
219+
CloseableIterator<DeleteFile> deleteFiles = deleteFileManifestReader.iterator()) {
220+
return Lists.newArrayList(Iterators.transform(deleteFiles, (deleteFile) -> deleteFile.path().toString()));
221+
}
222+
}
223+
212224
public DatasetDescriptor getDatasetDescriptor(FileSystem fs) {
213225
DatasetDescriptor descriptor = new DatasetDescriptor(
214226
datasetDescriptorPlatform,

gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java

Lines changed: 163 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -24,35 +24,50 @@
2424
import java.util.ArrayList;
2525
import java.util.Arrays;
2626
import java.util.Collection;
27+
import java.util.Collections;
2728
import java.util.List;
2829
import java.util.Map;
2930
import java.util.Optional;
3031
import java.util.function.Function;
3132
import java.util.function.Predicate;
3233
import java.util.stream.Collectors;
34+
import java.util.stream.Stream;
3335

3436
import org.apache.iceberg.AppendFiles;
3537
import org.apache.iceberg.CatalogProperties;
3638
import org.apache.iceberg.DataFile;
3739
import org.apache.iceberg.DataFiles;
40+
import org.apache.iceberg.DeleteFile;
3841
import org.apache.iceberg.FileFormat;
3942
import org.apache.iceberg.PartitionData;
4043
import org.apache.iceberg.PartitionSpec;
44+
import org.apache.iceberg.RowDelta;
4145
import org.apache.iceberg.Schema;
4246
import org.apache.iceberg.StructLike;
4347
import org.apache.iceberg.Table;
4448
import org.apache.iceberg.TableMetadata;
4549
import org.apache.iceberg.avro.AvroSchemaUtil;
4650
import org.apache.iceberg.catalog.Namespace;
4751
import org.apache.iceberg.catalog.TableIdentifier;
52+
import org.apache.iceberg.data.GenericAppenderFactory;
53+
import org.apache.iceberg.data.GenericRecord;
54+
import org.apache.iceberg.data.Record;
55+
import org.apache.iceberg.deletes.EqualityDeleteWriter;
56+
import org.apache.iceberg.deletes.PositionDelete;
57+
import org.apache.iceberg.deletes.PositionDeleteWriter;
58+
import org.apache.iceberg.encryption.EncryptedOutputFile;
59+
import org.apache.iceberg.encryption.EncryptionManager;
4860
import org.apache.iceberg.hive.HiveMetastoreTest;
61+
import org.apache.iceberg.io.FileIO;
4962
import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder;
5063
import org.testng.Assert;
5164
import org.testng.annotations.AfterMethod;
5265
import org.testng.annotations.BeforeClass;
5366
import org.testng.annotations.BeforeMethod;
67+
import org.testng.annotations.DataProvider;
5468
import org.testng.annotations.Test;
5569

70+
import com.google.common.collect.ImmutableMap;
5671
import com.google.common.collect.Lists;
5772
import com.google.common.collect.Maps;
5873
import com.google.common.collect.Sets;
@@ -73,6 +88,18 @@ public class IcebergTableTest extends HiveMetastoreTest {
7388
protected static final PartitionSpec icebergPartitionSpec = PartitionSpec.builderFor(icebergSchema)
7489
.identity("id")
7590
.build();
91+
protected static final List<List<String>> perSnapshotDataFilesets = Lists.newArrayList(
92+
Lists.newArrayList("path/to/data-a0.orc"),
93+
Lists.newArrayList("path/to/data-b0.orc", "path/to/data-b1.orc"),
94+
Lists.newArrayList("path/to/data-c0.orc", "path/to/data-c1.orc", "path/to/data-c2.orc"),
95+
Lists.newArrayList("path/to/data-d0.orc")
96+
);
97+
protected static final List<List<String>> perSnapshotDeleteFilesets = Lists.newArrayList(
98+
Lists.newArrayList("path/to/delete-a0.orc"),
99+
Lists.newArrayList("path/to/delete-b0.orc", "path/to/delete-b1.orc"),
100+
Lists.newArrayList("path/to/delete-c0.orc", "path/to/delete-c1.orc", "path/to/delete-c2.orc"),
101+
Lists.newArrayList("path/to/delete-d0.orc")
102+
);
76103

77104
private final String dbName = "myicebergdb";
78105
private final String tableName = "justtesting";
@@ -91,7 +118,7 @@ public void setUp() throws Exception {
91118
@BeforeMethod
92119
public void setUpEachTest() {
93120
tableId = TableIdentifier.of(dbName, tableName);
94-
table = catalog.createTable(tableId, icebergSchema, icebergPartitionSpec);
121+
table = catalog.createTable(tableId, icebergSchema, icebergPartitionSpec, Collections.singletonMap("format-version", "2"));
95122
catalogUri = catalog.getConf().get(CatalogProperties.URI);
96123
metadataBasePath = calcMetadataBasePath(tableId);
97124
}
@@ -101,17 +128,44 @@ public void cleanUpEachTest() {
101128
catalog.dropTable(tableId);
102129
}
103130

104-
/** Verify info about the current snapshot only */
131+
/** Test to verify getCurrentSnapshotInfo, getAllSnapshotInfosIterator, getIncrementalSnapshotInfosIterator for iceberg table containing only data files.*/
105132
@Test
106-
public void testGetCurrentSnapshotInfo() throws IOException {
107-
List<List<String>> perSnapshotFilesets = Lists.newArrayList(
108-
Lists.newArrayList("/path/to/data-a0.orc"),
109-
Lists.newArrayList("/path/to/data-b0.orc", "/path/to/data-b1.orc"),
110-
Lists.newArrayList("/path/to/data-c0.orc", "/path/to/data-c1.orc", "/path/to/data-c2.orc"),
111-
Lists.newArrayList("/path/to/data-d0.orc")
112-
);
133+
public void testGetSnapshotInfosForDataFilesOnlyTable() throws IOException {
134+
initializeSnapshots(table, perSnapshotDataFilesets);
113135

114-
initializeSnapshots(table, perSnapshotFilesets);
136+
IcebergSnapshotInfo snapshotInfo = new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri,
137+
catalog.loadTable(tableId)).getCurrentSnapshotInfo();
138+
verifySnapshotInfo(snapshotInfo, perSnapshotDataFilesets, perSnapshotDataFilesets.size());
139+
140+
List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId),
141+
catalogUri, catalog.loadTable(tableId)).getAllSnapshotInfosIterator());
142+
Assert.assertEquals(snapshotInfos.size(), perSnapshotDataFilesets.size(), "num snapshots");
143+
for (int i = 0; i < perSnapshotDataFilesets.size(); ++i) {
144+
System.err.println("verifying snapshotInfo[" + i + "]");
145+
verifySnapshotInfo(snapshotInfos.get(i), perSnapshotDataFilesets.subList(0, i + 1), snapshotInfos.size());
146+
}
147+
148+
List<IcebergSnapshotInfo> incrementalSnapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId),
149+
catalogUri, catalog.loadTable(tableId)).getIncrementalSnapshotInfosIterator());
150+
Assert.assertEquals(incrementalSnapshotInfos.size(), perSnapshotDataFilesets.size(), "num snapshots");
151+
for (int i = 0; i < incrementalSnapshotInfos.size(); ++i) {
152+
System.err.println("verifying snapshotInfo[" + i + "]");
153+
verifySnapshotInfo(incrementalSnapshotInfos.get(i), perSnapshotDataFilesets.subList(i, i + 1), incrementalSnapshotInfos.size());
154+
}
155+
}
156+
157+
@DataProvider(name = "isPosDeleteProvider")
158+
public Object[][] isPosDeleteProvider() {
159+
return new Object[][] {{true}, {false}};
160+
}
161+
162+
/** Verify info about the current snapshot only */
163+
@Test(dataProvider = "isPosDeleteProvider")
164+
public void testGetCurrentSnapshotInfo(boolean isPosDelete) throws IOException {
165+
initializeSnapshots(table, perSnapshotDataFilesets);
166+
initializeSnapshotsWithDeleteFiles(table, catalog.newTableOps(tableId).io(), perSnapshotDeleteFilesets, isPosDelete);
167+
List<List<String>> perSnapshotFilesets = Stream.concat(perSnapshotDeleteFilesets.stream(), perSnapshotDataFilesets.stream())
168+
.collect(Collectors.toList());
115169
IcebergSnapshotInfo snapshotInfo = new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri,
116170
catalog.loadTable(tableId)).getCurrentSnapshotInfo();
117171
verifySnapshotInfo(snapshotInfo, perSnapshotFilesets, perSnapshotFilesets.size());
@@ -127,37 +181,37 @@ public void testGetCurrentSnapshotInfoOnBogusTable() throws IOException {
127181
}
128182

129183
/** Verify info about all (full) snapshots */
130-
@Test
131-
public void testGetAllSnapshotInfosIterator() throws IOException {
132-
List<List<String>> perSnapshotFilesets = Lists.newArrayList(
133-
Lists.newArrayList("/path/to/data-a0.orc"),
134-
Lists.newArrayList("/path/to/data-b0.orc", "/path/to/data-b1.orc"),
135-
Lists.newArrayList("/path/to/data-c0.orc", "/path/to/data-c1.orc", "/path/to/data-c2.orc"),
136-
Lists.newArrayList("/path/to/data-d0.orc")
137-
);
184+
@Test(dataProvider = "isPosDeleteProvider")
185+
public void testGetAllSnapshotInfosIterator(boolean isPosDelete) throws IOException {
186+
int numDataSnapshots = perSnapshotDataFilesets.size();
187+
int numDeleteSnapshots = perSnapshotDeleteFilesets.size();
138188

139-
initializeSnapshots(table, perSnapshotFilesets);
189+
initializeSnapshots(table, perSnapshotDataFilesets);
190+
initializeSnapshotsWithDeleteFiles(table, catalog.newTableOps(tableId).io(), perSnapshotDeleteFilesets, isPosDelete);
140191
List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId),
141192
catalogUri, catalog.loadTable(tableId)).getAllSnapshotInfosIterator());
142-
Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num snapshots");
193+
Assert.assertEquals(snapshotInfos.size(), numDataSnapshots + numDeleteSnapshots, "num snapshots");
143194

144-
for (int i = 0; i < snapshotInfos.size(); ++i) {
195+
for (int i = 0; i < numDataSnapshots; ++i) {
145196
System.err.println("verifying snapshotInfo[" + i + "]");
146-
verifySnapshotInfo(snapshotInfos.get(i), perSnapshotFilesets.subList(0, i + 1), snapshotInfos.size());
197+
verifySnapshotInfo(snapshotInfos.get(i), perSnapshotDataFilesets.subList(0, i + 1), snapshotInfos.size());
198+
}
199+
200+
for (int i = 0 ; i < numDeleteSnapshots ; i++) {
201+
System.err.println("verifying snapshotInfo[" + i + "]");
202+
List<List<String>> curSnapshotFileSets = Stream.concat(perSnapshotDeleteFilesets.subList(0, i + 1).stream(), perSnapshotDataFilesets.stream())
203+
.collect(Collectors.toList());
204+
verifySnapshotInfo(snapshotInfos.get(i + numDataSnapshots), curSnapshotFileSets, snapshotInfos.size());
147205
}
148206
}
149207

150208
/** Verify info about all snapshots (incremental deltas) */
151-
@Test
152-
public void testGetIncrementalSnapshotInfosIterator() throws IOException {
153-
List<List<String>> perSnapshotFilesets = Lists.newArrayList(
154-
Lists.newArrayList("/path/to/data-a0.orc"),
155-
Lists.newArrayList("/path/to/data-b0.orc", "/path/to/data-b1.orc"),
156-
Lists.newArrayList("/path/to/data-c0.orc", "/path/to/data-c1.orc", "/path/to/data-c2.orc"),
157-
Lists.newArrayList("/path/to/data-d0.orc")
158-
);
159-
160-
initializeSnapshots(table, perSnapshotFilesets);
209+
@Test(dataProvider = "isPosDeleteProvider")
210+
public void testGetIncrementalSnapshotInfosIterator(boolean isPosDelete) throws IOException {
211+
initializeSnapshots(table, perSnapshotDataFilesets);
212+
initializeSnapshotsWithDeleteFiles(table, catalog.newTableOps(tableId).io(), perSnapshotDeleteFilesets, isPosDelete);
213+
List<List<String>> perSnapshotFilesets = Stream.concat(perSnapshotDataFilesets.stream(), perSnapshotDeleteFilesets.stream())
214+
.collect(Collectors.toList());
161215
List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId),
162216
catalogUri, catalog.loadTable(tableId)).getIncrementalSnapshotInfosIterator());
163217
Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num snapshots");
@@ -169,26 +223,38 @@ public void testGetIncrementalSnapshotInfosIterator() throws IOException {
169223
}
170224

171225
/** Verify info about all snapshots (incremental deltas) correctly eliminates repeated data files */
172-
@Test
173-
public void testGetIncrementalSnapshotInfosIteratorRepeatedFiles() throws IOException {
174-
List<List<String>> perSnapshotFilesets = Lists.newArrayList(
175-
Lists.newArrayList("/path/to/data-a0.orc"),
176-
Lists.newArrayList("/path/to/data-b0.orc", "/path/to/data-b1.orc", "/path/to/data-a0.orc"),
177-
Lists.newArrayList("/path/to/data-a0.orc","/path/to/data-c0.orc", "/path/to/data-b1.orc", "/path/to/data-c1.orc", "/path/to/data-c2.orc"),
178-
Lists.newArrayList("/path/to/data-d0.orc")
226+
@Test(dataProvider = "isPosDeleteProvider")
227+
public void testGetIncrementalSnapshotInfosIteratorRepeatedFiles(boolean isPosDelete) throws IOException {
228+
List<List<String>> perSnapshotFilesets1 = Lists.newArrayList(
229+
Lists.newArrayList("path/to/data-a0.orc"),
230+
Lists.newArrayList("path/to/data-b0.orc", "path/to/data-b1.orc", "path/to/data-a0.orc"),
231+
Lists.newArrayList("path/to/data-a0.orc","path/to/data-c0.orc", "path/to/data-b1.orc", "path/to/data-c1.orc", "path/to/data-c2.orc"),
232+
Lists.newArrayList("path/to/data-d0.orc")
179233
);
180234

181-
initializeSnapshots(table, perSnapshotFilesets);
235+
// Note : Keeping the name as data- only to test the functionality without changing below validation code
236+
List<List<String>> perSnapshotFilesets2 = Lists.newArrayList(
237+
Lists.newArrayList("path/to/data-e0.orc"),
238+
Lists.newArrayList("path/to/data-f0.orc", "path/to/data-f1.orc", "path/to/data-e0.orc"),
239+
Lists.newArrayList("path/to/data-e0.orc","path/to/data-g0.orc", "path/to/data-f1.orc", "path/to/data-g1.orc", "path/to/data-g2.orc"),
240+
Lists.newArrayList("path/to/data-h0.orc")
241+
);
242+
243+
List<List<String>> perSnapshotFileSets = new ArrayList<>(perSnapshotFilesets1);
244+
perSnapshotFileSets.addAll(perSnapshotFilesets2);
245+
246+
initializeSnapshots(table, perSnapshotFilesets1);
247+
initializeSnapshotsWithDeleteFiles(table, catalog.newTableOps(tableId).io(), perSnapshotFilesets2, isPosDelete);
182248
List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId),
183249
catalogUri, catalog.loadTable(tableId)).getIncrementalSnapshotInfosIterator());
184-
Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num snapshots");
250+
Assert.assertEquals(snapshotInfos.size(), perSnapshotFileSets.size(), "num snapshots");
185251

186252
for (int i = 0; i < snapshotInfos.size(); ++i) {
187253
System.err.println("verifying snapshotInfo[" + i + "] - " + snapshotInfos.get(i));
188254
char initialChar = (char) ((int) 'a' + i);
189255
// adjust expectations to eliminate duplicate entries (i.e. those bearing letter not aligned with ordinal fileset)
190-
List<String> fileset = perSnapshotFilesets.get(i).stream().filter(name -> {
191-
String uniquePortion = name.substring("/path/to/data-".length());
256+
List<String> fileset = perSnapshotFileSets.get(i).stream().filter(name -> {
257+
String uniquePortion = name.substring("path/to/data-".length());
192258
return uniquePortion.startsWith(Character.toString(initialChar));
193259
}).collect(Collectors.toList());
194260
verifySnapshotInfo(snapshotInfos.get(i), Arrays.asList(fileset), snapshotInfos.size());
@@ -353,6 +419,36 @@ protected static void initializeSnapshots(Table table, List<List<String>> perSna
353419
}
354420
}
355421

422+
protected static void initializeSnapshotsWithDeleteFiles(Table table, FileIO fileIO,
423+
List<List<String>> perSnapshotFilesets, boolean isPosDelete) {
424+
Schema deleteSchema = icebergSchema.select("id");
425+
int[] equalityFieldIds = {0};
426+
GenericAppenderFactory appenderFactory =
427+
new GenericAppenderFactory(
428+
icebergSchema,
429+
icebergPartitionSpec,
430+
equalityFieldIds,
431+
deleteSchema,
432+
deleteSchema);
433+
EncryptionManager encryptionManager = table.encryption();
434+
Record deleteRecord = GenericRecord.create(deleteSchema).copy(ImmutableMap.of("id", "testVal"));
435+
PartitionData partitionData = new PartitionData(icebergPartitionSpec.partitionType());
436+
partitionData.set(0, "testVal");
437+
438+
for (List<String> snapshotFileset : perSnapshotFilesets) {
439+
RowDelta rowDelta = table.newRowDelta();
440+
for (String filePath : snapshotFileset) {
441+
EncryptedOutputFile encryptedOutputFile = encryptionManager.encrypt(fileIO.newOutputFile(filePath));
442+
if (isPosDelete) {
443+
rowDelta.addDeletes(createPosDeleteFile(appenderFactory, encryptedOutputFile, partitionData, deleteRecord));
444+
} else {
445+
rowDelta.addDeletes(createEqDeleteFile(appenderFactory, encryptedOutputFile, partitionData, deleteRecord));
446+
}
447+
}
448+
rowDelta.commit();
449+
}
450+
}
451+
356452
/** Extract whatever kind of iceberg metadata file, iff recognized by `doesResemble` */
357453
protected static Optional<File> extractSomeMetadataFilepath(String candidatePath, String basePath, Predicate<String> doesResemble) {
358454
try {
@@ -416,6 +512,30 @@ protected static DataFile createDataFile(String path, long sizeBytes, long numRe
416512
.build();
417513
}
418514

515+
protected static DeleteFile createEqDeleteFile(GenericAppenderFactory appenderFactory,
516+
EncryptedOutputFile encryptedOutputFile, StructLike partitionData, Record record) {
517+
EqualityDeleteWriter<Record> eqDeleteWriter = appenderFactory.newEqDeleteWriter(encryptedOutputFile, FileFormat.ORC, partitionData);
518+
try (EqualityDeleteWriter<Record> clsEqDeleteWriter = eqDeleteWriter) {
519+
clsEqDeleteWriter.write(record);
520+
} catch (IOException e) {
521+
throw new RuntimeException(e);
522+
}
523+
return eqDeleteWriter.toDeleteFile();
524+
}
525+
526+
protected static DeleteFile createPosDeleteFile(GenericAppenderFactory appenderFactory,
527+
EncryptedOutputFile encryptedOutputFile, StructLike partitionData, Record record) {
528+
PositionDelete<Record> posDelRecord = PositionDelete.create();
529+
posDelRecord.set("dummyFilePath", 0, record);
530+
PositionDeleteWriter<Record> posDeleteWriter = appenderFactory.newPosDeleteWriter(encryptedOutputFile, FileFormat.ORC, partitionData);
531+
try (PositionDeleteWriter<Record> clsPosDeleteWriter = posDeleteWriter) {
532+
clsPosDeleteWriter.write(posDelRecord);
533+
} catch (IOException e) {
534+
throw new RuntimeException(e);
535+
}
536+
return posDeleteWriter.toDeleteFile();
537+
}
538+
419539
/** general utility: order-independent/set equality between collections */
420540
protected static <T> void verifyAnyOrder(Collection<T> actual, Collection<T> expected, String message) {
421541
Assert.assertEquals(Sets.newHashSet(actual), Sets.newHashSet(expected), message);

0 commit comments

Comments
 (0)