From de77b4e730dd86cab39e4af2501e3111ec4ed7c7 Mon Sep 17 00:00:00 2001 From: Dylan Nguyen Date: Thu, 17 Oct 2024 22:58:14 -0700 Subject: [PATCH 1/4] Simulating out of memory error on compaction job --- .../actions/TestRewriteDataFilesAction.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index b67ee87c7d3e..a5db54ca8cc6 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -164,6 +164,31 @@ public void testEmptyTable() { assertThat(table.currentSnapshot()).as("Table must stay empty").isNull(); } + @Test + public void testLargeFileCompaction() { + PartitionSpec spec = PartitionSpec.unpartitioned(); + Map options = Maps.newHashMap(); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); // create empty table + int totalFiles = 0; + + for (int i = 0; i < 10000; i++) { + // Append records to the table in each iteration + writeRecords(10000, SCALE*3); // Appending large files in each iteration + totalFiles += 10000; + System.out.println(totalFiles*10000); + shouldHaveFiles(table, totalFiles); + try { + basicRewrite(table).execute(); + + } catch (OutOfMemoryError e) { + // Catch the OutOfMemoryError and validate that it was triggered as expected + System.out.println("Caught OutOfMemoryError as expected: " + e.getMessage()); + } catch (Exception e) { + // If any other exception occurs, fail the test + System.out.println("Caught OutOfMemoryError as expected: " + e.getMessage()); + } + } + } @Test public void testBinPackUnpartitionedTable() { From 5bb5843bbc8ad40d0962b4a93c9cd4308e6e2d47 Mon Sep 17 00:00:00 2001 From: Dylan Nguyen Date: Mon, 21 Oct 2024 12:17:05 -0700 Subject: [PATCH 2/4] Revised test for OOM --- .../actions/TestRewriteDataFilesAction.java | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index a5db54ca8cc6..3c1e49a3b654 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -165,27 +165,36 @@ public void testEmptyTable() { assertThat(table.currentSnapshot()).as("Table must stay empty").isNull(); } @Test - public void testLargeFileCompaction() { + public void testCompactionMemoryUsage() { PartitionSpec spec = PartitionSpec.unpartitioned(); Map options = Maps.newHashMap(); Table table = TABLES.create(SCHEMA, spec, options, tableLocation); // create empty table int totalFiles = 0; + // Write records and perform compaction job for (int i = 0; i < 10000; i++) { // Append records to the table in each iteration + long memoryBefore = getMemoryUsage(); // Memory prior to compaction writeRecords(10000, SCALE*3); // Appending large files in each iteration totalFiles += 10000; - System.out.println(totalFiles*10000); - shouldHaveFiles(table, totalFiles); + // Check if table has files + System.out.println(totalFiles); + // shouldHaveFiles(table, totalFiles); try { - basicRewrite(table).execute(); + basicRewrite(table).execute(); // Try to execute with this many files + long memoryAfter = getMemoryUsage(); // Memory after compaction run + long memoryConsumed = memoryAfter - memoryBefore; // Amount of memory consumed for this operation + System.out.println("Memory used during compaction: " + memoryConsumed + " bytes"); } catch (OutOfMemoryError e) { - // Catch the OutOfMemoryError and validate that it was triggered as expected + // Catch the OutOfMemoryError and print the most recent memory usage System.out.println("Caught OutOfMemoryError as expected: " + e.getMessage()); + long memoryAfter = getMemoryUsage(); // Memory after compaction run + long memoryConsumed = memoryAfter - memoryBefore; // Amount of memory consumed for this operation + System.out.println("Memory used during compaction: " + memoryConsumed + " bytes"); } catch (Exception e) { - // If any other exception occurs, fail the test - System.out.println("Caught OutOfMemoryError as expected: " + e.getMessage()); + // If any other exception occurs, print that the test failed + System.out.println("Caught: " + e.getMessage()); } } } From 0f9c4a1ec7d55e49ddd2dbe757cf3b40ef9d6bec Mon Sep 17 00:00:00 2001 From: Dylan Nguyen Date: Mon, 4 Nov 2024 09:39:41 -0800 Subject: [PATCH 3/4] Unable to run tests... --- .../actions/TestRewriteDataFilesAction.java | 108 ++++++++++++++++++ 1 file changed, 108 insertions(+) diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 3c1e49a3b654..7e4f15c3790f 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -199,6 +199,114 @@ public void testCompactionMemoryUsage() { } } + @Test + public void testCompactionSdataLdelete() { + PartitionSpec spec = PartitionSpec.unpartitioned(); // determines how we are partitioning + Map options = Maps.newHashMap(); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); // create empty table + + // Create a large amount of records + List records = new Lists.newArrayList(); + GenericRecord record = GenericRecord.create(table.schema()); + + for (int i = 1; i <= 100_000; i++) { + records.add(record.copy("id", i, "data", "data_" + i)); + } + + // Create a dataFile with all the records + this.dataFile = FileHelpers.writeDataFile( + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + Row.of(0), + records); + table.newAppend().appendFile(dataFile).commit(); // saving a snapshot of the table + + // Create a lot of deletes + PositionDelete dataDelete = PositionDelete.create(); + List dataDeletes = new Lists.newArrayList(); + for (int i = 1; i <= 50_000; i++) { + dataDeletes.add(dataDelete.copy("data", "data_" + i)); + } + + // Write several delete files + RowDelta r = table.newRowDelta(); + for (int i = 1; i <= 50_000; i++) { + // Create a huge equality delete with all the deletes + DeleteFile eqDeletes = FileHelpers.writeDeleteFile( table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + ImmutableList.of(dataDeletes), + deleteRowSchema); + r.addDeletes(eqDeletes); + } + r.commit(); + + // Read and Compact + private RewritePositionDeleteFilesSparkAction basicRewritePositionalDeletes (Table curTable) { + curTable.refresh(); // updates table snapshot to newest "version" + return actions().rewritePositionDeletes(curTable); + } + + basicRewritePositionalDeltes(table).execute(); + } + + @Test + public void testCompactionLdataLdeletes() { + PartitionSpec spec = PartitionSpec.unpartitioned(); // determines how we are partitioning + Map options = Maps.newHashMap(); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); // create empty table + + // Create a large amount of records + List records = new Lists.newArrayList(); + GenericRecord record = GenericRecord.create(table.schema()); + + for (int i = 1; i <= 100_000; i++) { + records.add(record.copy("id", i, "data", "data_" + i)); + } + + // Create several dataFiles + AppendFiles recAppend = table.newAppend(); + for (int i = 1; i <= 100_000; i++) { + // Create a dataFile with all the records + this.dataFile = FileHelpers.writeDataFile( + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + Row.of(0), + ImmutableList.of(records)); + recAppennd.appendFile(dataFile); + } + recAppend.commit(); // saving a snapshot of the table + + // Create a lot of deletes + + List dataDeletes = new Lists.newArrayList(); + for (int i = 1; i <= 50_000; i++) { + dataDeletes.add(dataDelete.copy("data", "data_" + i)); + } + + // Write several delete files + RowDelta r = table.newRowDelta(); + for (int i = 1; i <= 50_000; i++) { + // Create a huge equality delete with all the deletes + DeleteFile eqDeletes = FileHelpers.writeDeleteFile( table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), + ImmutableList.of(dataDeletes), + deleteRowSchema); + r.addDeletes(eqDeletes); + } + r.commit(); + + // Read and Compact, Table refresh with eqDeletes + private RewritePositionDeleteFilesSparkAction basicRewritePositionalDeletes (Table curTable) { + curTable.refresh(); // updates table snapshot to newest "version" + return actions().rewritePositionDeletes(curTable); + } + + // Compaction job + basicRewritePositionalDeltes(table).execute(); + } + @Test public void testBinPackUnpartitionedTable() { Table table = createTable(4); From c6ddd790b424d0b880df12910fc27608a12cbf56 Mon Sep 17 00:00:00 2001 From: Dylan Nguyen Date: Mon, 4 Nov 2024 09:57:07 -0800 Subject: [PATCH 4/4] Tried putting private function helper outside but still erroring --- .../actions/TestRewriteDataFilesAction.java | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 7e4f15c3790f..5997a2e0096d 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -199,6 +199,12 @@ public void testCompactionMemoryUsage() { } } + // Read and Compact Helper Method + private RewritePositionDeleteFilesSparkAction basicRewritePositionalDeletes (Table curTable) { + curTable.refresh(); // updates table snapshot to newest "version" + return actions().rewritePositionDeletes(curTable); + } + @Test public void testCompactionSdataLdelete() { PartitionSpec spec = PartitionSpec.unpartitioned(); // determines how we are partitioning @@ -241,12 +247,7 @@ public void testCompactionSdataLdelete() { } r.commit(); - // Read and Compact - private RewritePositionDeleteFilesSparkAction basicRewritePositionalDeletes (Table curTable) { - curTable.refresh(); // updates table snapshot to newest "version" - return actions().rewritePositionDeletes(curTable); - } - + //Compaction Job basicRewritePositionalDeltes(table).execute(); } @@ -297,12 +298,6 @@ public void testCompactionLdataLdeletes() { } r.commit(); - // Read and Compact, Table refresh with eqDeletes - private RewritePositionDeleteFilesSparkAction basicRewritePositionalDeletes (Table curTable) { - curTable.refresh(); // updates table snapshot to newest "version" - return actions().rewritePositionDeletes(curTable); - } - // Compaction job basicRewritePositionalDeltes(table).execute(); }