Skip to content

Commit 2beedd5

Browse files
committed
fix
1 parent abfda43 commit 2beedd5

File tree

6 files changed

+39
-39
lines changed

6 files changed

+39
-39
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.sql.delta.schema.SchemaUtils
3030
import org.apache.spark.sql.delta.sources.DeltaSQLConf
3131
import org.apache.spark.sql.delta.stats.DataSkippingReader
3232
import org.apache.spark.sql.delta.stats.DataSkippingReaderConf
33-
import org.apache.spark.sql.delta.stats.DefaultDataSource
33+
import org.apache.spark.sql.delta.stats.DefaultStateProvider
3434
import org.apache.spark.sql.delta.stats.DeltaStatsColumnSpec
3535
import org.apache.spark.sql.delta.stats.StatisticsCollection
3636
import org.apache.spark.sql.delta.util.DeltaCommitFileProvider
@@ -228,15 +228,15 @@ class Snapshot(
228228
}
229229

230230
/**
231-
* The [[DefaultDataSource]] that owns the full state reconstruction pipeline:
231+
* The [[DefaultStateProvider]] that owns the full state reconstruction pipeline:
232232
* loadActions -> canonicalize -> repartition -> replay -> cache state
233233
* -> extract add files -> parse stats -> cache stats
234234
*
235235
* V1's `stateDS`, `stateDF`, `allFiles`, and `withStats` all delegate here.
236236
* This replaces the previous `stateReconstruction` + `cachedState` chain.
237237
*/
238-
private[delta] lazy val dataSource: DefaultDataSource = {
239-
new DefaultDataSource(
238+
private[delta] lazy val stateProvider: DefaultStateProvider = {
239+
new DefaultStateProvider(
240240
loadActions = () => loadActions,
241241
numPartitions = getNumPartitions,
242242
canonicalizeUdf = c => deltaLog.getCanonicalPathUdf()(c),
@@ -374,12 +374,12 @@ class Snapshot(
374374

375375
/** The current set of actions in this [[Snapshot]] as plain Rows */
376376
def stateDF: DataFrame = recordFrameProfile("Delta", "stateDF") {
377-
dataSource.stateDF
377+
stateProvider.stateDF
378378
}
379379

380380
/** The current set of actions in this [[Snapshot]] as a typed Dataset. */
381381
def stateDS: Dataset[SingleAction] = recordFrameProfile("Delta", "stateDS") {
382-
dataSource.stateDS
382+
stateProvider.stateDS
383383
}
384384

385385
private[delta] def allFilesViaStateReconstruction: Dataset[AddFile] = {
@@ -478,8 +478,8 @@ class Snapshot(
478478
}
479479
}
480480

481-
// NOTE: stateReconstruction logic has been moved into DefaultDataSource.
482-
// See [[dataSource]] which owns the full pipeline:
481+
// NOTE: stateReconstruction logic has been moved into DefaultStateProvider.
482+
// See [[stateProvider]] which owns the full pipeline:
483483
// loadActions -> canonicalize -> repartition -> replay -> cache
484484

485485
/**

spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,11 +264,11 @@ trait DataSkippingReaderBase
264264

265265
/**
266266
* Creates the Snapshot-level [[DefaultScanPlanner]] backed by
267-
* [[Snapshot.dataSource]] which owns the full pipeline:
267+
* [[Snapshot.stateProvider]] which owns the full pipeline:
268268
* loadActions -> state reconstruction -> extract add -> parse stats -> cache
269269
*/
270270
private[delta] def createPlanner(): DefaultScanPlanner = {
271-
new DefaultScanPlanner(dataSource = snapshotToScan.dataSource)
271+
new DefaultScanPlanner(stateProvider = snapshotToScan.stateProvider)
272272
}
273273

274274
/** All files with the statistics column dropped completely. */

spark/src/main/scala/org/apache/spark/sql/delta/stats/ScanInterfaces.scala

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,17 @@ import org.apache.spark.sql.delta.stats.DataFiltersBuilderUtils.ScanPipelineResu
2727
import org.apache.spark.sql.functions.{col, lit}
2828
import org.apache.spark.sql.types.StructType
2929

30-
// ===================== Data Source Interface + Default =====================
30+
// ===================== State Provider Interface + Default =====================
3131

3232
/**
3333
* Provides raw and parsed AddFile DataFrames for scan planning.
3434
*
3535
* Implementations own the full pipeline:
3636
* raw source -> state reconstruction -> extract add files -> parse stats -> cache
3737
*
38-
* [[DefaultDataSource]] is the shared implementation used by both V1 and V2.
38+
* [[DefaultStateProvider]] is the shared implementation used by both V1 and V2.
3939
*/
40-
private[delta] trait ScanDataSource {
40+
private[delta] trait DeltaStateProvider {
4141

4242
/** Flat AddFile rows with stats as a JSON string. */
4343
def allAddFiles(): DataFrame
@@ -50,7 +50,7 @@ private[delta] trait ScanDataSource {
5050
}
5151

5252
/**
53-
* Shared [[ScanDataSource]] implementation used by both V1 and V2.
53+
* Shared [[DeltaStateProvider]] implementation used by both V1 and V2.
5454
*
5555
* Owns the full pipeline from raw log actions to cached AddFile
5656
* DataFrames with parsed statistics:
@@ -68,7 +68,7 @@ private[delta] trait ScanDataSource {
6868
* - V2: passes `DistributedLogReplayHelper.loadActions`,
6969
* `callUDF("canonicalizePath", _)`, no retention, no caching.
7070
*
71-
* V1's `Snapshot` owns a `DefaultDataSource` instance and delegates
71+
* V1's `Snapshot` owns a `DefaultStateProvider` instance and delegates
7272
* `stateDS`, `stateDF`, `allFiles` to it. This eliminates the
7373
* separate `stateReconstruction` / `cachedState` chain in `Snapshot`.
7474
*
@@ -96,7 +96,7 @@ private[delta] trait ScanDataSource {
9696
* `df => cacheDS(df, name).getDS`;
9797
* V2 passes None.
9898
*/
99-
private[delta] class DefaultDataSource(
99+
private[delta] class DefaultStateProvider(
100100
loadActions: () => DataFrame,
101101
numPartitions: Int,
102102
canonicalizeUdf: Column => Column,
@@ -105,7 +105,7 @@ private[delta] class DefaultDataSource(
105105
minSetTransactionRetentionTimestamp: Option[Long] = None,
106106
stateCacheFactory: Option[Dataset[SingleAction] => Dataset[SingleAction]] = None,
107107
statsParseCacheFactory: Option[DataFrame => DataFrame] = None
108-
) extends ScanDataSource {
108+
) extends DeltaStateProvider {
109109

110110
/**
111111
* Full state after log replay, optionally cached.
@@ -214,7 +214,7 @@ private[delta] trait DeltaScanPlanner {
214214

215215
/**
216216
* Returns the AddFile DataFrame with parsed stats (and optionally cached).
217-
* Delegates to the underlying [[ScanDataSource]].
217+
* Delegates to the underlying [[DeltaStateProvider]].
218218
*/
219219
def withParsedStats: DataFrame
220220

@@ -379,20 +379,20 @@ private[delta] class DefaultScanPredicateBuilder(
379379
/**
380380
* Default shared implementation of [[DeltaScanPlanner]].
381381
*
382-
* Works for both V1 and V2 connectors. Delegates to [[ScanDataSource]]
382+
* Works for both V1 and V2 connectors. Delegates to [[DeltaStateProvider]]
383383
* for the data pipeline (state reconstruction, stats parsing, caching).
384384
*
385-
* @param dataSource Provides parsed + cached AddFile DataFrames
385+
* @param stateProvider Provides parsed + cached AddFile DataFrames
386386
*/
387387
private[delta] class DefaultScanPlanner(
388-
dataSource: ScanDataSource
388+
stateProvider: DeltaStateProvider
389389
) extends DeltaScanPlanner {
390390

391391
/**
392392
* Parsed (and optionally cached) AddFile DataFrame.
393-
* Delegates to [[ScanDataSource.withParsedStats]].
393+
* Delegates to [[DeltaStateProvider.withParsedStats]].
394394
*/
395-
override def withParsedStats: DataFrame = dataSource.withParsedStats
395+
override def withParsedStats: DataFrame = stateProvider.withParsedStats
396396

397397
override def plan(
398398
filters: Seq[Expression],

spark/v2/src/main/java/io/delta/spark/internal/v2/read/DistributedLogReplayHelper.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
public class DistributedLogReplayHelper {
4545

4646
// Use the same column names as V1 (DeltaLogFileIndex.COMMIT_VERSION_COLUMN,
47-
// Snapshot.ADD_STATS_TO_USE_COL_NAME) so DefaultDataSource works identically.
47+
// Snapshot.ADD_STATS_TO_USE_COL_NAME) so DefaultStateProvider works identically.
4848
private static final String COMMIT_VERSION_COLUMN =
4949
org.apache.spark.sql.delta.DeltaLogFileIndex.COMMIT_VERSION_COLUMN();
5050
private static final String ADD_STATS_TO_USE_COL =
@@ -56,8 +56,8 @@ public class DistributedLogReplayHelper {
5656
* {@code add_stats_to_use} columns (same names as V1).
5757
*
5858
* <p>This is the V2 equivalent of V1's {@code Snapshot.loadActions}. It is consumed by {@link
59-
* org.apache.spark.sql.delta.stats.DefaultDataSource} which handles the full state reconstruction
60-
* pipeline.
59+
* org.apache.spark.sql.delta.stats.DefaultStateProvider} which handles the full state
60+
* reconstruction pipeline.
6161
*
6262
* @param spark SparkSession
6363
* @param snapshot Kernel snapshot (must be SnapshotImpl)
@@ -85,7 +85,7 @@ public static Dataset<Row> loadActionsV2(SparkSession spark, Snapshot snapshot)
8585
* Performs distributed log replay following V1's Snapshot.stateReconstruction algorithm.
8686
*
8787
* <p><b>Note:</b> For batch scans, prefer {@link #loadActionsV2} + {@link
88-
* org.apache.spark.sql.delta.stats.DefaultDataSource} which shares the full pipeline with V1.
88+
* org.apache.spark.sql.delta.stats.DefaultStateProvider} which shares the full pipeline with V1.
8989
* This method is retained for streaming use cases.
9090
*
9191
* @param spark SparkSession

spark/v2/src/main/java/io/delta/spark/internal/v2/read/DistributedScanBuilder.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import org.apache.spark.sql.delta.stats.DataFiltersBuilderV2;
3333
import org.apache.spark.sql.delta.stats.DefaultScanPlanner;
3434
import org.apache.spark.sql.delta.stats.DefaultScanPredicateBuilder;
35-
import org.apache.spark.sql.delta.stats.ScanDataSource;
35+
import org.apache.spark.sql.delta.stats.DeltaStateProvider;
3636
import org.apache.spark.sql.sources.Filter;
3737

3838
/**
@@ -104,8 +104,8 @@ public ScanBuilder withReadSchema(StructType readSchema) {
104104
* Builds the scan using the shared {@link DefaultScanPlanner}:
105105
*
106106
* <ol>
107-
* <li>Create V2 {@link ScanDataSource}: loadActions -> state reconstruction -> extract add ->
108-
* parse stats (full pipeline, same class as V1)
107+
* <li>Create V2 {@link DeltaStateProvider}: loadActions -> state reconstruction -> extract add
108+
* -> parse stats (full pipeline, same class as V1)
109109
* <li>Create shared {@link DefaultScanPlanner} backed by the data source
110110
* <li>Create shared {@link DefaultScanPredicateBuilder} with V2-specific stat column paths
111111
* <li>Convert Spark {@code Filter[]} to resolved Catalyst Expressions
@@ -116,14 +116,14 @@ public ScanBuilder withReadSchema(StructType readSchema) {
116116
@Override
117117
public Scan build() {
118118
// Step 1: V2 data source (full pipeline: loadActions -> reconstruct -> extract -> parse stats)
119-
ScanDataSource v2DataSource =
119+
DeltaStateProvider v2StateProvider =
120120
DataFiltersBuilderV2.createDataSource(
121121
() -> DistributedLogReplayHelper.loadActionsV2(spark, snapshot),
122122
numPartitions,
123123
snapshot);
124124

125125
// Step 2: Shared planner backed by the data source
126-
DefaultScanPlanner planner = DataFiltersBuilderV2.createPlanner(v2DataSource);
126+
DefaultScanPlanner planner = DataFiltersBuilderV2.createPlanner(v2StateProvider);
127127

128128
// Step 3: Shared predicate builder (V2-specific stat column paths injected)
129129
DefaultScanPredicateBuilder predicateBuilder =

spark/v2/src/main/scala/org/apache/spark/sql/delta/stats/DataFiltersBuilderV2.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ object DataFiltersBuilderV2 {
134134
}
135135

136136
/**
137-
* Creates a [[DefaultDataSource]] for V2.
137+
* Creates a [[DefaultStateProvider]] for V2.
138138
*
139139
* The data source owns the full pipeline:
140140
* loadActions -> state reconstruction -> extract add -> parse stats
@@ -143,15 +143,15 @@ object DataFiltersBuilderV2 {
143143
* @param loadActions Supplier for the union of checkpoint + delta files
144144
* @param numPartitions Number of partitions for state reconstruction
145145
* @param snapshot Kernel snapshot (provides table schema for stats)
146-
* @return A [[DefaultDataSource]] ready for V2 use
146+
* @return A [[DefaultStateProvider]] ready for V2 use
147147
*/
148148
def createDataSource(
149149
loadActions: () => DataFrame,
150150
numPartitions: Int,
151-
snapshot: Snapshot): DefaultDataSource = {
151+
snapshot: Snapshot): DefaultStateProvider = {
152152
val tableSchema = getSparkTableSchema(snapshot)
153153
val statsSchema = DataFiltersBuilderUtils.buildStatsSchema(tableSchema)
154-
new DefaultDataSource(
154+
new DefaultStateProvider(
155155
loadActions = loadActions,
156156
numPartitions = numPartitions,
157157
canonicalizeUdf = c => callUDF("canonicalizePath", c),
@@ -163,11 +163,11 @@ object DataFiltersBuilderV2 {
163163
/**
164164
* Creates a [[DefaultScanPlanner]] for V2.
165165
*
166-
* @param dataSource V2 [[ScanDataSource]] (from [[createDataSource]])
166+
* @param stateProvider V2 [[DeltaStateProvider]] (from [[createDataSource]])
167167
* @return A [[DefaultScanPlanner]] ready for V2 use
168168
*/
169-
def createPlanner(dataSource: ScanDataSource): DefaultScanPlanner = {
170-
new DefaultScanPlanner(dataSource = dataSource)
169+
def createPlanner(stateProvider: DeltaStateProvider): DefaultScanPlanner = {
170+
new DefaultScanPlanner(stateProvider = stateProvider)
171171
}
172172

173173
/**

0 commit comments

Comments
 (0)