Skip to content

Commit 89b794b

Browse files
committed
[V2 Streaming] Add SerializableReadOnlySnapshot, ScanFileRDD, and DataFrameSnapshotCache
1 parent f438bf0 commit 89b794b

File tree

5 files changed

+576
-0
lines changed

5 files changed

+576
-0
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.spark.internal.v2.read;
17+
18+
import org.apache.spark.sql.Dataset;
19+
import org.apache.spark.sql.Row;
20+
21+
/** Cached sorted AddFile DataFrame, keyed by version. Callers synchronize externally. */
22+
public class DataFrameSnapshotCache implements AutoCloseable {
23+
24+
private final long version;
25+
private Dataset<Row> sortedAddFiles;
26+
27+
public DataFrameSnapshotCache(long version, Dataset<Row> sortedAddFiles) {
28+
this.version = version;
29+
this.sortedAddFiles = sortedAddFiles;
30+
}
31+
32+
public long getVersion() {
33+
return version;
34+
}
35+
36+
public Dataset<Row> getSortedAddFiles() {
37+
return sortedAddFiles;
38+
}
39+
40+
@Override
41+
public void close() {
42+
Dataset<Row> df = sortedAddFiles;
43+
if (df != null) {
44+
df.unpersist();
45+
sortedAddFiles = null;
46+
}
47+
}
48+
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.spark.internal.v2.read;
17+
18+
import io.delta.kernel.Scan;
19+
import io.delta.kernel.data.FilteredColumnarBatch;
20+
import io.delta.kernel.defaults.engine.DefaultEngine;
21+
import io.delta.kernel.engine.Engine;
22+
import io.delta.kernel.internal.actions.AddFile;
23+
import io.delta.kernel.utils.CloseableIterator;
24+
import io.delta.spark.internal.v2.utils.KernelRowToSparkRow;
25+
import io.delta.spark.internal.v2.utils.SchemaUtils;
26+
import io.delta.spark.internal.v2.utils.SerializableReadOnlySnapshot;
27+
import io.delta.spark.internal.v2.utils.StreamingHelper;
28+
import java.io.IOException;
29+
import java.io.Serializable;
30+
import java.util.NoSuchElementException;
31+
import java.util.Optional;
32+
import org.apache.spark.Partition;
33+
import org.apache.spark.SparkContext;
34+
import org.apache.spark.TaskContext;
35+
import org.apache.spark.rdd.RDD;
36+
import org.apache.spark.sql.Row;
37+
import org.apache.spark.sql.types.StructType;
38+
import scala.collection.mutable.ArrayBuffer;
39+
import scala.reflect.ClassTag$;
40+
41+
/**
42+
* A single-partition Spark RDD that reconstructs a Kernel {@link Scan} on the executor from a
43+
* {@link SerializableReadOnlySnapshot} and lazily streams {@link AddFile} records as Spark {@link
44+
* Row}s. The RDD output schema matches {@link AddFile#SCHEMA_WITHOUT_STATS}.
45+
*
46+
* <p>Single partition is an intentional limitation; a future version will use Kernel's plan API for
47+
* multi-partition replay. The downstream sort is still distributed.
48+
*/
49+
public class ScanFileRDD extends RDD<Row> {
50+
51+
public static final StructType SPARK_SCHEMA =
52+
SchemaUtils.convertKernelSchemaToSparkSchema(AddFile.SCHEMA_WITHOUT_STATS);
53+
54+
private final SerializableReadOnlySnapshot serializableSnapshot;
55+
56+
public ScanFileRDD(SparkContext sc, SerializableReadOnlySnapshot serializableSnapshot) {
57+
super(sc, new ArrayBuffer<>(), ClassTag$.MODULE$.apply(Row.class));
58+
this.serializableSnapshot = serializableSnapshot;
59+
}
60+
61+
private static final class SinglePartition implements Partition, Serializable {
62+
private static final long serialVersionUID = 1L;
63+
64+
@Override
65+
public int index() {
66+
return 0;
67+
}
68+
}
69+
70+
@Override
71+
public Partition[] getPartitions() {
72+
return new Partition[] {new SinglePartition()};
73+
}
74+
75+
@Override
76+
public scala.collection.Iterator<Row> compute(Partition split, TaskContext context) {
77+
Engine engine = DefaultEngine.create(serializableSnapshot.getHadoopConf());
78+
Scan scan = serializableSnapshot.toScan();
79+
80+
CloseableIterator<FilteredColumnarBatch> batchIter;
81+
try {
82+
batchIter = scan.getScanFiles(engine);
83+
} catch (Exception e) {
84+
throw new RuntimeException("Failed to open scan files on executor", e);
85+
}
86+
87+
AddFileLazyIterator lazyIter = new AddFileLazyIterator(batchIter);
88+
89+
if (context != null) {
90+
context.addTaskCompletionListener(
91+
ctx -> {
92+
try {
93+
batchIter.close();
94+
} catch (IOException e) {
95+
// best effort cleanup
96+
}
97+
});
98+
}
99+
100+
return lazyIter;
101+
}
102+
103+
/**
104+
* Lazy Scala iterator that streams AddFile rows one at a time from the underlying Kernel batch
105+
* iterator. No eager materialization into a list.
106+
*/
107+
private static final class AddFileLazyIterator implements scala.collection.Iterator<Row> {
108+
109+
private final CloseableIterator<FilteredColumnarBatch> batchIter;
110+
111+
private FilteredColumnarBatch currentBatch;
112+
private int currentRowId;
113+
private int currentBatchSize;
114+
private Row nextRow;
115+
116+
AddFileLazyIterator(CloseableIterator<FilteredColumnarBatch> batchIter) {
117+
this.batchIter = batchIter;
118+
this.currentBatch = null;
119+
this.currentRowId = 0;
120+
this.currentBatchSize = 0;
121+
this.nextRow = null;
122+
}
123+
124+
@Override
125+
public boolean hasNext() {
126+
if (nextRow != null) {
127+
return true;
128+
}
129+
nextRow = advance();
130+
return nextRow != null;
131+
}
132+
133+
@Override
134+
public Row next() {
135+
if (!hasNext()) {
136+
throw new NoSuchElementException();
137+
}
138+
Row result = nextRow;
139+
nextRow = null;
140+
return result;
141+
}
142+
143+
private Row advance() {
144+
while (true) {
145+
while (currentRowId < currentBatchSize) {
146+
int rowId = currentRowId++;
147+
Optional<AddFile> addOpt = StreamingHelper.getAddFile(currentBatch, rowId);
148+
if (addOpt.isPresent()) {
149+
return new KernelRowToSparkRow(addOpt.get().toRow(), SPARK_SCHEMA);
150+
}
151+
}
152+
if (!batchIter.hasNext()) {
153+
return null;
154+
}
155+
currentBatch = batchIter.next();
156+
currentRowId = 0;
157+
currentBatchSize = currentBatch.getData().getSize();
158+
}
159+
}
160+
}
161+
}
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.spark.internal.v2.utils;
17+
18+
import io.delta.kernel.Scan;
19+
import io.delta.kernel.defaults.engine.DefaultEngine;
20+
import io.delta.kernel.engine.Engine;
21+
import io.delta.kernel.internal.ScanImpl;
22+
import io.delta.kernel.internal.SnapshotImpl;
23+
import io.delta.kernel.internal.actions.Metadata;
24+
import io.delta.kernel.internal.actions.Protocol;
25+
import io.delta.kernel.internal.checksum.CRCInfo;
26+
import io.delta.kernel.internal.lang.Lazy;
27+
import io.delta.kernel.internal.metrics.SnapshotQueryContext;
28+
import io.delta.kernel.internal.metrics.SnapshotReportImpl;
29+
import io.delta.kernel.internal.replay.LogReplay;
30+
import io.delta.kernel.internal.snapshot.LogSegment;
31+
import io.delta.kernel.metrics.SnapshotReport;
32+
import java.io.Serializable;
33+
import java.util.ArrayList;
34+
import java.util.Optional;
35+
import org.apache.hadoop.conf.Configuration;
36+
import org.apache.spark.util.SerializableConfiguration;
37+
38+
/**
39+
* Serializable carrier for a Delta snapshot's state. Created on the driver from an existing {@link
40+
* SnapshotImpl} (zero I/O), and reconstructed on the executor as a read-only {@link Scan} via
41+
* {@link #toScan(Configuration)}. The returned {@code Scan} interface exposes only read operations,
42+
* preventing accidental misuse of write-path APIs (e.g. {@code Committer}).
43+
*/
44+
public class SerializableReadOnlySnapshot implements Serializable {
45+
46+
private static final long serialVersionUID = 1L;
47+
48+
private final String dataPath;
49+
private final String logPath;
50+
private final long version;
51+
52+
private final ArrayList<SerializableFileStatus> deltas;
53+
private final ArrayList<SerializableFileStatus> compactions;
54+
private final ArrayList<SerializableFileStatus> checkpoints;
55+
private final SerializableFileStatus deltaAtEndVersion;
56+
private final SerializableFileStatus lastSeenChecksum; // nullable
57+
private final Long maxPublishedDeltaVersion; // nullable
58+
59+
private final Protocol protocol;
60+
private final Metadata metadata;
61+
private final SerializableConfiguration hadoopConf;
62+
63+
private SerializableReadOnlySnapshot(
64+
String dataPath,
65+
String logPath,
66+
long version,
67+
ArrayList<SerializableFileStatus> deltas,
68+
ArrayList<SerializableFileStatus> compactions,
69+
ArrayList<SerializableFileStatus> checkpoints,
70+
SerializableFileStatus deltaAtEndVersion,
71+
SerializableFileStatus lastSeenChecksum,
72+
Long maxPublishedDeltaVersion,
73+
Protocol protocol,
74+
Metadata metadata,
75+
SerializableConfiguration hadoopConf) {
76+
this.dataPath = dataPath;
77+
this.logPath = logPath;
78+
this.version = version;
79+
this.deltas = deltas;
80+
this.compactions = compactions;
81+
this.checkpoints = checkpoints;
82+
this.deltaAtEndVersion = deltaAtEndVersion;
83+
this.lastSeenChecksum = lastSeenChecksum;
84+
this.maxPublishedDeltaVersion = maxPublishedDeltaVersion;
85+
this.protocol = protocol;
86+
this.metadata = metadata;
87+
this.hadoopConf = hadoopConf;
88+
}
89+
90+
/**
91+
* Driver-side: extract the snapshot state from an existing Kernel {@link SnapshotImpl}. This
92+
* performs zero I/O — all data is already in memory on the driver.
93+
*/
94+
public static SerializableReadOnlySnapshot fromSnapshot(
95+
SnapshotImpl snapshot, Configuration hadoopConf) {
96+
LogSegment logSegment = snapshot.getLogSegment();
97+
return new SerializableReadOnlySnapshot(
98+
snapshot.getDataPath().toString(),
99+
logSegment.getLogPath().toString(),
100+
snapshot.getVersion(null /* engine, unused for SnapshotImpl */),
101+
new ArrayList<>(SerializableFileStatus.fromList(logSegment.getDeltas())),
102+
new ArrayList<>(SerializableFileStatus.fromList(logSegment.getCompactions())),
103+
new ArrayList<>(SerializableFileStatus.fromList(logSegment.getCheckpoints())),
104+
SerializableFileStatus.from(logSegment.getDeltaFileAtEndVersion()),
105+
logSegment.getLastSeenChecksum().map(SerializableFileStatus::from).orElse(null),
106+
logSegment.getMaxPublishedDeltaVersion().orElse(null),
107+
snapshot.getProtocol(),
108+
snapshot.getMetadata(),
109+
new SerializableConfiguration(hadoopConf));
110+
}
111+
112+
/**
113+
* Executor-side: reconstruct a read-only {@link Scan} from the serialized snapshot state. The
114+
* returned {@code Scan} only exposes {@code getScanFiles()} — no write-path or commit APIs.
115+
*/
116+
public Scan toScan(Configuration hadoopConfOverride) {
117+
return buildScan(hadoopConfOverride);
118+
}
119+
120+
/**
121+
* Executor-side: reconstruct using the serialized Hadoop configuration. Convenience overload when
122+
* no conf override is needed.
123+
*/
124+
public Scan toScan() {
125+
return buildScan(hadoopConf.value());
126+
}
127+
128+
public long getVersion() {
129+
return version;
130+
}
131+
132+
/** Returns the serialized Hadoop configuration for creating an Engine on the executor. */
133+
public Configuration getHadoopConf() {
134+
return hadoopConf.value();
135+
}
136+
137+
// ---- internal reconstruction ----
138+
139+
private Scan buildScan(Configuration conf) {
140+
Engine engine = DefaultEngine.create(conf);
141+
io.delta.kernel.utils.Path kernelDataPath = new io.delta.kernel.utils.Path(dataPath);
142+
io.delta.kernel.utils.Path kernelLogPath = new io.delta.kernel.utils.Path(logPath);
143+
144+
LogSegment logSegment =
145+
new LogSegment(
146+
kernelLogPath,
147+
version,
148+
SerializableFileStatus.toFileStatusList(deltas),
149+
SerializableFileStatus.toFileStatusList(compactions),
150+
SerializableFileStatus.toFileStatusList(checkpoints),
151+
deltaAtEndVersion.toFileStatus(),
152+
Optional.ofNullable(lastSeenChecksum).map(SerializableFileStatus::toFileStatus),
153+
Optional.ofNullable(maxPublishedDeltaVersion));
154+
155+
Lazy<LogSegment> lazyLogSegment = new Lazy<>(() -> logSegment);
156+
Lazy<Optional<CRCInfo>> lazyCrcInfo = new Lazy<>(Optional::empty);
157+
158+
LogReplay logReplay = new LogReplay(engine, kernelDataPath, lazyLogSegment, lazyCrcInfo);
159+
160+
SnapshotQueryContext queryContext = SnapshotQueryContext.forVersionSnapshot(dataPath, version);
161+
queryContext.setResolvedVersion(version);
162+
SnapshotReport snapshotReport = SnapshotReportImpl.forSuccess(queryContext);
163+
164+
return new ScanImpl(
165+
metadata.getSchema(),
166+
metadata.getSchema(),
167+
protocol,
168+
metadata,
169+
logReplay,
170+
Optional.empty(),
171+
kernelDataPath,
172+
snapshotReport);
173+
}
174+
}

0 commit comments

Comments
 (0)