Skip to content

Commit baf7c35

Browse files
committed
[V2 Streaming] Integrate DataFrame-based initial snapshot behind feature flag
1 parent 2fddd1c commit baf7c35

File tree

3 files changed

+168
-0
lines changed

3 files changed

+168
-0
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3130,6 +3130,18 @@ trait DeltaSQLConfBase extends DeltaSQLConfUtils {
31303130
.doc("Maximum number of files allowed in initial snapshot for V2 streaming.")
31313131
.intConf
31323132
.createWithDefault(50000)
3133+
3134+
val DELTA_STREAMING_USE_DATAFRAME_INITIAL_SNAPSHOT =
3135+
buildConf("streaming.distributedInitialSnapshot")
3136+
.internal()
3137+
.doc(
3138+
"When enabled, the V2 streaming connector uses a DataFrame-based approach for " +
3139+
"initial snapshot loading. This avoids driver OOM for large tables by running " +
3140+
"Kernel log replay on an executor and sorting files via Spark's distributed sort " +
3141+
"with MEMORY_AND_DISK persistence."
3142+
)
3143+
.booleanConf
3144+
.createWithDefault(false)
31333145
}
31343146

31353147
object DeltaSQLConf extends DeltaSQLConfBase

spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,11 @@ private static class InitialSnapshotCache {
154154
private final AtomicReference<InitialSnapshotCache> cachedInitialSnapshot =
155155
new AtomicReference<>(null);
156156

157+
private final AtomicReference<DataFrameSnapshotCache> cachedDataFrameSnapshot =
158+
new AtomicReference<>(null);
159+
157160
private final int maxInitialSnapshotFiles;
161+
private final boolean useDataFrameBasedInitialSnapshot;
158162

159163
public SparkMicroBatchStream(
160164
DeltaSnapshotManager snapshotManager,
@@ -204,6 +208,12 @@ public SparkMicroBatchStream(
204208
.sessionState()
205209
.conf()
206210
.getConf(DeltaSQLConf.DELTA_STREAMING_INITIAL_SNAPSHOT_MAX_FILES());
211+
this.useDataFrameBasedInitialSnapshot =
212+
(Boolean)
213+
spark
214+
.sessionState()
215+
.conf()
216+
.getConf(DeltaSQLConf.DELTA_STREAMING_USE_DATAFRAME_INITIAL_SNAPSHOT());
207217

208218
boolean isStreamingFromColumnMappingTable =
209219
ColumnMapping.getColumnMappingMode(
@@ -450,6 +460,14 @@ public void commit(Offset end) {
450460
@Override
451461
public void stop() {
452462
cachedInitialSnapshot.set(null);
463+
invalidateDataFrameCache();
464+
}
465+
466+
private void invalidateDataFrameCache() {
467+
DataFrameSnapshotCache prev = cachedDataFrameSnapshot.getAndSet(null);
468+
if (prev != null) {
469+
prev.close();
470+
}
453471
}
454472

455473
///////////////////////
@@ -1042,6 +1060,10 @@ private long addIndexedFilesAndReturnNextIndex(
10421060
* @return An iterator of IndexedFile representing the snapshot files
10431061
*/
10441062
private CloseableIterator<IndexedFile> getSnapshotFiles(long version) {
1063+
if (useDataFrameBasedInitialSnapshot) {
1064+
return getSnapshotFilesViaDataFrame(version);
1065+
}
1066+
10451067
InitialSnapshotCache cache = cachedInitialSnapshot.get();
10461068

10471069
if (cache != null && cache.version != null && cache.version == version) {
@@ -1056,6 +1078,78 @@ private CloseableIterator<IndexedFile> getSnapshotFiles(long version) {
10561078
return Utils.toCloseableIterator(indexedFiles.iterator());
10571079
}
10581080

1081+
private CloseableIterator<IndexedFile> getSnapshotFilesViaDataFrame(long version) {
1082+
DataFrameSnapshotCache dfCache = cachedDataFrameSnapshot.get();
1083+
if (dfCache != null && dfCache.getVersion() == version) {
1084+
return dataFrameToIndexedFiles(dfCache.getSortedAddFiles(), version);
1085+
}
1086+
1087+
invalidateDataFrameCache();
1088+
1089+
SnapshotImpl snapshot = (SnapshotImpl) snapshotManager.loadSnapshotAt(version);
1090+
io.delta.spark.internal.v2.utils.SerializableReadOnlySnapshot serSnapshot =
1091+
io.delta.spark.internal.v2.utils.SerializableReadOnlySnapshot.fromSnapshot(
1092+
snapshot, hadoopConf);
1093+
1094+
ScanFileRDD rdd = new ScanFileRDD(spark.sparkContext(), serSnapshot);
1095+
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df =
1096+
spark
1097+
.createDataFrame(rdd, ScanFileRDD.SPARK_SCHEMA)
1098+
.orderBy("modificationTime", "path")
1099+
.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK());
1100+
1101+
dfCache = new DataFrameSnapshotCache(version, df);
1102+
cachedDataFrameSnapshot.set(dfCache);
1103+
1104+
return dataFrameToIndexedFiles(df, version);
1105+
}
1106+
1107+
/**
1108+
* Converts a sorted DataFrame of AddFile rows into a lazy CloseableIterator of IndexedFiles,
1109+
* wrapped with BEGIN/END sentinels. Uses toLocalIterator() to stream rows from executors to the
1110+
* driver one at a time, avoiding pulling all data into driver memory.
1111+
*/
1112+
private static CloseableIterator<IndexedFile> dataFrameToIndexedFiles(
1113+
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df, long version) {
1114+
1115+
java.util.Iterator<org.apache.spark.sql.Row> localIter = df.toLocalIterator();
1116+
1117+
return new CloseableIterator<IndexedFile>() {
1118+
private boolean sentBegin = false;
1119+
private boolean sentEnd = false;
1120+
private long index = 0;
1121+
1122+
@Override
1123+
public boolean hasNext() {
1124+
return !sentEnd;
1125+
}
1126+
1127+
@Override
1128+
public IndexedFile next() {
1129+
if (!sentBegin) {
1130+
sentBegin = true;
1131+
return new IndexedFile(version, DeltaSourceOffset.BASE_INDEX(), null);
1132+
}
1133+
1134+
if (localIter.hasNext()) {
1135+
org.apache.spark.sql.Row sparkRow = localIter.next();
1136+
io.delta.kernel.data.Row kernelRow =
1137+
new io.delta.spark.internal.v2.utils.SparkRowToKernelRow(
1138+
sparkRow, AddFile.SCHEMA_WITHOUT_STATS);
1139+
return new IndexedFile(version, index++, new AddFile(kernelRow));
1140+
}
1141+
1142+
sentEnd = true;
1143+
return new IndexedFile(version, DeltaSourceOffset.END_INDEX(), null);
1144+
}
1145+
1146+
@Override
1147+
public void close() throws IOException {
1148+
// toLocalIterator() resources are managed by Spark
1149+
}
1150+
};
1151+
}
1152+
10591153
/** Loads snapshot files at the specified version. */
10601154
private List<IndexedFile> loadAndValidateSnapshot(long version) {
10611155
Snapshot snapshot = snapshotManager.loadSnapshotAt(version);

spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2866,4 +2866,66 @@ public void testMemoryProtection_initialSnapshotTooLarge(@TempDir File tempDir)
28662866
spark.conf().unset(configKey);
28672867
}
28682868
}
2869+
2870+
@Test
2871+
public void testDataFrameBasedInitialSnapshot_handlesLargeSnapshot(@TempDir File tempDir)
2872+
throws Exception {
2873+
String testTablePath = tempDir.getAbsolutePath();
2874+
String testTableName = "test_df_snapshot_" + System.nanoTime();
2875+
createEmptyTestTable(testTablePath, testTableName);
2876+
2877+
insertVersions(
2878+
testTableName,
2879+
/* numVersions= */ 10,
2880+
/* rowsPerVersion= */ 5,
2881+
/* includeEmptyVersion= */ false);
2882+
2883+
String maxFilesKey = DeltaSQLConf.DELTA_STREAMING_INITIAL_SNAPSHOT_MAX_FILES().key();
2884+
String dfFlagKey = DeltaSQLConf.DELTA_STREAMING_USE_DATAFRAME_INITIAL_SNAPSHOT().key();
2885+
spark.conf().set(maxFilesKey, "5");
2886+
spark.conf().set(dfFlagKey, "true");
2887+
2888+
try {
2889+
Configuration hadoopConf = spark.sessionState().newHadoopConf();
2890+
PathBasedSnapshotManager snapshotManager =
2891+
new PathBasedSnapshotManager(testTablePath, hadoopConf);
2892+
SparkMicroBatchStream stream =
2893+
createTestStreamWithDefaults(snapshotManager, hadoopConf, emptyDeltaOptions());
2894+
2895+
long version = 5L;
2896+
long fromIndex = DeltaSourceOffset.BASE_INDEX();
2897+
boolean isInitialSnapshot = true;
2898+
2899+
List<IndexedFile> files = new ArrayList<>();
2900+
try (CloseableIterator<IndexedFile> iter =
2901+
stream.getFileChanges(version, fromIndex, isInitialSnapshot, Optional.empty())) {
2902+
while (iter.hasNext()) {
2903+
files.add(iter.next());
2904+
}
2905+
}
2906+
2907+
// Should succeed (no exception) and include BEGIN/END sentinels + data files
2908+
assertTrue(files.size() >= 3, "Should have at least BEGIN, one file, and END sentinels");
2909+
assertEquals(DeltaSourceOffset.BASE_INDEX(), files.get(0).index);
2910+
assertEquals(DeltaSourceOffset.END_INDEX(), files.get(files.size() - 1).index);
2911+
2912+
// Verify data files are sorted by (modificationTime, path)
2913+
for (int i = 2; i < files.size() - 1; i++) {
2914+
IndexedFile prev = files.get(i - 1);
2915+
IndexedFile curr = files.get(i);
2916+
if (prev.getAdd() != null && curr.getAdd() != null) {
2917+
long prevTime = prev.getAdd().getModificationTime();
2918+
long currTime = curr.getAdd().getModificationTime();
2919+
assertTrue(
2920+
prevTime < currTime
2921+
|| (prevTime == currTime
2922+
&& prev.getAdd().getPath().compareTo(curr.getAdd().getPath()) <= 0),
2923+
"Files should be sorted by (modificationTime, path)");
2924+
}
2925+
}
2926+
} finally {
2927+
spark.conf().unset(maxFilesKey);
2928+
spark.conf().unset(dfFlagKey);
2929+
}
2930+
}
28692931
}

0 commit comments

Comments
 (0)