diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 4643836542aa..4d2f56390025 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -271,6 +271,118 @@ public void testEqualityDeleteWithFilter() throws IOException { assertThat(actual).as("Table should contain no rows").hasSize(0); } + @TestTemplate + public void testMergeEqualityDeletesOOM() throws IOException { + String tableName = table.name().substring(table.name().lastIndexOf(".") + 1); + Schema deleteRowSchema = table.schema().select("data"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + + // Create an extremely large dataset + List dataDeletes = Lists.newArrayList(); + for (int i = 0; i < 1000_000_000; i++) { // Adjust this number up if necessary + // Creating large strings for each record to consume more memory + dataDeletes.add(dataDelete.copy("data", "item" + i + "data".repeat(100))); +// System.out.println(dataDeletes); + } + + DeleteFile eqDeletes = + FileHelpers.writeDeleteFile( + table, + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + TestHelpers.Row.of(0), // not partition specific because key = 0 + dataDeletes, + deleteRowSchema); + + table.newRowDelta().addDeletes(eqDeletes).commit(); + + Types.StructType projection = table.schema().select("*").asStruct(); + Dataset df = + spark + .read() + .format("iceberg") + .load(TableIdentifier.of("default", tableName).toString()) + .filter("data = 'item0" + "data".repeat(100) + "'") // filtering a specific deleted row + .selectExpr("*"); + + StructLikeSet actual = StructLikeSet.create(projection); + df.collectAsList() + .forEach( + row -> { + SparkStructLike rowWrapper = new SparkStructLike(projection); + actual.add(rowWrapper.wrap(row)); + }); + assertThat(actual).as("Table should contain no rows").hasSize(0); + } + + @TestTemplate + public void testMergeEqualityDeletesOOM_v2() throws IOException { + // Set up table name and delete schema + String tableName = table.name().substring(table.name().lastIndexOf(".") + 1); + Schema deleteRowSchema = table.schema().select("data"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + + // Create and commit multiple equality delete files to simulate memory pressure + // ie. creating multiple smaller equalirty delete files instead of one really big equality delete file + int fileCount = 10; // Number of smaller delete files + int recordsPerFile = 1_000_000; // Adjust for estimated ~120 MB file size + + for (int j = 0; j < fileCount; j++) { + List dataDeletes = Lists.newArrayList(); + + for (int i = 0; i < recordsPerFile; i++) { + dataDeletes.add(dataDelete.copy("data", "item" + i + "data".repeat(100))); + } + + // Write each equality delete file separately + File deleteFile = File.createTempFile("junit", null, temp.toFile()); + DeleteFile eqDeletes = FileHelpers.writeDeleteFile( + table, + Files.localOutput(deleteFile), + TestHelpers.Row.of(0), + dataDeletes, + deleteRowSchema + ); + table.newRowDelta().addDeletes(eqDeletes).commit(); + + // Check and log the file size + // File size check: deleteFile.length() returns the file size in bytes + // then we divide by (1024 * 1024) to convert bytes to megabytes + long fileSizeInBytes = deleteFile.length(); + System.out.println("Equality delete file " + j + " size: " + (fileSizeInBytes / (1024 * 1024)) + " MB"); + System.out.println(deleteFile); + System.out.println(eqDeletes); + } + + + // Prepare for the SELECT * query (basic read) + Types.StructType projection = table.schema().select("*").asStruct(); + Dataset df = spark.read() + .format("iceberg") + .load(TableIdentifier.of("default", tableName).toString()) + .filter("data = 'item0" + "data".repeat(100) + "'") // Filter to verify delete is applied + .selectExpr("*"); + + // Track memory usage before read operation + long initialMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + + // Execute and process SELECT * result, ensuring all equality deletes are applied + StructLikeSet actual = StructLikeSet.create(projection); + df.collectAsList().forEach(row -> { + SparkStructLike rowWrapper = new SparkStructLike(projection); + actual.add(rowWrapper.wrap(row)); + }); + + // Track memory usage after read operation + long finalMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + + // Assertions + assertThat(actual).as("Table should contain no rows").hasSize(0); // Equality-deleted rows should not appear + System.out.println("Memory consumed during read operation: " + (finalMemory - initialMemory) + + " bytes. This is because initial memory consumption (before simulating read) was " + + initialMemory + " and final memory consumption was " + finalMemory); + } + + @TestTemplate public void testReadEqualityDeleteRows() throws IOException { Schema deleteSchema1 = table.schema().select("data");