Skip to content

Commit 944c6de

Browse files
authored
[Kernel] [Refactor] Refactor TestUtils to provide reading APIs using both legacy (Snapshot) and new (ResolvedTable) APIs (delta-io#4676)
## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/4676/files) to review incremental changes. - [**stack/kernel_default_test_read_adapter**](delta-io#4676) [[Files changed](https://github.com/delta-io/delta/pull/4676/files)] --------- #### Which Delta project/connector is this regarding? - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description As part of the catalogManaged initiative, we are introducing new APIs to read and write delta tables: `TableManager` and `ResolvedTable` instead of `Table` and `Snapshot` As this initiative is ongoing, we want to test both APIs until we are ready and confident in deprecating the legacy API. This PR refactors `TestUtils` so that: we can have both `New` and `Legacy` implementations of it. We also update `DeltaTableReadsSuite` to show an example of it working. ## How was this patch tested? Existing UTs. ## Does this PR introduce _any_ user-facing changes? No.
1 parent 600d614 commit 944c6de

File tree

4 files changed

+231
-20
lines changed

4 files changed

+231
-20
lines changed

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableReadsSuite.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
2424

2525
import io.delta.golden.GoldenTableUtils.goldenTablePath
2626
import io.delta.kernel.Table
27-
import io.delta.kernel.defaults.utils.{TestRow, TestUtils}
27+
import io.delta.kernel.defaults.utils.{AbstractTestUtils, TestRow, TestUtils, TestUtilsWithLegacyKernelAPIs}
2828
import io.delta.kernel.exceptions.{InvalidTableException, KernelException, TableNotFoundException}
2929
import io.delta.kernel.internal.TableImpl
3030
import io.delta.kernel.internal.fs.Path
@@ -33,15 +33,22 @@ import io.delta.kernel.internal.util.InternalUtils.daysSinceEpoch
3333
import io.delta.kernel.types.{LongType, StructType}
3434

3535
import org.apache.spark.sql.delta.{DeltaLog, DeltaOperations}
36-
import org.apache.spark.sql.delta.actions.{AddFile, Metadata}
37-
import org.apache.spark.sql.delta.implicits.stringEncoder
36+
import org.apache.spark.sql.delta.actions.AddFile
3837

3938
import org.apache.hadoop.shaded.org.apache.commons.io.FileUtils
40-
import org.apache.spark.sql.functions.{col, current_timestamp, to_timestamp}
4139
import org.apache.spark.sql.functions.col
4240
import org.scalatest.funsuite.AnyFunSuite
4341

44-
class DeltaTableReadsSuite extends AnyFunSuite with TestUtils {
42+
class LegacyDeltaTableReadsSuite
43+
extends AbstractDeltaTableReadsSuite
44+
with TestUtilsWithLegacyKernelAPIs
45+
46+
// TODO: uncomment when new Kernel APIs supports ScanBuilder
47+
// class DeltaTableReadsSuite extends
48+
// AbstractDeltaTableReadsSuite
49+
// with TestUtilsWithTableManagerAPIs
50+
51+
trait AbstractDeltaTableReadsSuite extends AnyFunSuite { self: AbstractTestUtils =>
4552

4653
//////////////////////////////////////////////////////////////////////////////////
4754
// Timestamp type tests
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.kernel.defaults.test
18+
19+
import io.delta.kernel.{ResolvedTable, ScanBuilder, Snapshot}
20+
import io.delta.kernel.internal.SnapshotImpl
21+
import io.delta.kernel.types.StructType
22+
23+
/** Implicit conversions that enable easy and succinct adapter creation in tests. */
24+
object ResolvedTableAdapterImplicits {
25+
26+
/** Converts a [[Snapshot]] to a [[LegacyResolvedTableAdapter]]. */
27+
implicit class AdapterFromSnapshot(private val snapshot: Snapshot) extends AnyVal {
28+
def toTestAdapter: LegacyResolvedTableAdapter = snapshot match {
29+
case impl: SnapshotImpl => new LegacyResolvedTableAdapter(impl)
30+
case _ => throw new IllegalArgumentException("Snapshot must be an instance of SnapshotImpl")
31+
}
32+
}
33+
34+
/** Converts a [[ResolvedTable]] to a [[ResolvedTableAdapter]]. */
35+
implicit class AdapterFromResolvedTable(private val resolvedTable: ResolvedTable) extends AnyVal {
36+
def toTestAdapter: ResolvedTableAdapter = new ResolvedTableAdapter(resolvedTable)
37+
}
38+
}
39+
40+
/**
41+
* Test framework adapter that provides a unified interface for **accessing** Delta tables.
42+
*
43+
* This trait abstracts over the differences between [[Snapshot]] (legacy API) and [[ResolvedTable]]
44+
* (current API) via the [[LegacyResolvedTableAdapter]] and [[ResolvedTableAdapter]] child classes.
45+
*/
46+
trait AbstractResolvedTableAdapter {
47+
def getPath(): String
48+
def getVersion(): Long
49+
def getSchema(): StructType
50+
def getScanBuilder(): ScanBuilder
51+
}
52+
53+
class LegacyResolvedTableAdapter(snapshot: SnapshotImpl) extends AbstractResolvedTableAdapter {
54+
override def getPath(): String = snapshot.getDataPath.toString
55+
override def getVersion(): Long = snapshot.getVersion
56+
override def getSchema(): StructType = snapshot.getSchema
57+
override def getScanBuilder(): ScanBuilder = snapshot.getScanBuilder
58+
}
59+
60+
// TODO: Use ResolvedTableInternal
61+
class ResolvedTableAdapter(rt: ResolvedTable) extends AbstractResolvedTableAdapter {
62+
override def getPath(): String = rt.getPath
63+
override def getVersion(): Long = rt.getVersion
64+
override def getSchema(): StructType = rt.getSchema
65+
override def getScanBuilder(): ScanBuilder = rt.getScanBuilder
66+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.kernel.defaults.test
18+
19+
import io.delta.kernel.{Table, TableManager}
20+
import io.delta.kernel.defaults.test.ResolvedTableAdapterImplicits._
21+
import io.delta.kernel.engine.Engine
22+
23+
/**
24+
* Test framework adapter that provides a unified interface for **loading** Delta tables.
25+
*
26+
* This trait enables test suites to be parameterized over different Kernel APIs via the
27+
* [[LegacyTableManagerAdapter]] and [[TableManagerAdapter]] child classes.
28+
*
29+
* By using this adapter pattern, the same test suite can verify both APIs work correctly, without
30+
* duplicating test logic.
31+
*
32+
* Tests can switch between implementations by mixing in either
33+
* [[io.delta.kernel.defaults.utils.TestUtilsWithLegacyKernelAPIs]] or
34+
* [[io.delta.kernel.defaults.utils.TestUtilsWithTableManagerAPIs]].
35+
*/
36+
trait AbstractTableManagerAdapter {
37+
def getResolvedTableAdapterAtLatest(engine: Engine, path: String): AbstractResolvedTableAdapter
38+
39+
def getResolvedTableAdapterAtVersion(
40+
engine: Engine,
41+
path: String,
42+
version: Long): AbstractResolvedTableAdapter
43+
44+
def getResolvedTableAdapterAtTimestamp(
45+
engine: Engine,
46+
path: String,
47+
timestamp: Long): AbstractResolvedTableAdapter
48+
}
49+
50+
/**
51+
* Legacy implementation using the [[Table.forPath]] API which then wraps a [[Snapshot]].
52+
*/
53+
class LegacyTableManagerAdapter extends AbstractTableManagerAdapter {
54+
override def getResolvedTableAdapterAtLatest(
55+
engine: Engine,
56+
path: String): AbstractResolvedTableAdapter = {
57+
Table.forPath(engine, path).getLatestSnapshot(engine).toTestAdapter
58+
}
59+
60+
override def getResolvedTableAdapterAtVersion(
61+
engine: Engine,
62+
path: String,
63+
version: Long): AbstractResolvedTableAdapter = {
64+
Table.forPath(engine, path).getSnapshotAsOfVersion(engine, version).toTestAdapter
65+
}
66+
67+
override def getResolvedTableAdapterAtTimestamp(
68+
engine: Engine,
69+
path: String,
70+
timestamp: Long): AbstractResolvedTableAdapter = {
71+
Table.forPath(engine, path).getSnapshotAsOfTimestamp(engine, timestamp).toTestAdapter
72+
}
73+
}
74+
75+
/**
76+
* Current implementation using the [[TableManager.loadTable]] API, which then wraps a
77+
* [[ResolvedTable]].
78+
*/
79+
class TableManagerAdapter extends AbstractTableManagerAdapter {
80+
override def getResolvedTableAdapterAtLatest(
81+
engine: Engine,
82+
path: String): AbstractResolvedTableAdapter = {
83+
TableManager.loadTable(path).build(engine).toTestAdapter
84+
}
85+
86+
override def getResolvedTableAdapterAtVersion(
87+
engine: Engine,
88+
path: String,
89+
version: Long): AbstractResolvedTableAdapter = {
90+
TableManager.loadTable(path).atVersion(version).build(engine).toTestAdapter
91+
}
92+
93+
override def getResolvedTableAdapterAtTimestamp(
94+
engine: Engine,
95+
path: String,
96+
timestamp: Long): AbstractResolvedTableAdapter = {
97+
throw new UnsupportedOperationException("not implemented")
98+
}
99+
}

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestUtils.scala

Lines changed: 54 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@ import io.delta.kernel.{Scan, Snapshot, Table, TransactionCommitResult}
2828
import io.delta.kernel.data.{ColumnarBatch, ColumnVector, FilteredColumnarBatch, MapValue, Row}
2929
import io.delta.kernel.defaults.engine.DefaultEngine
3030
import io.delta.kernel.defaults.internal.data.vector.{DefaultGenericVector, DefaultStructVector}
31+
import io.delta.kernel.defaults.test.{AbstractResolvedTableAdapter, AbstractTableManagerAdapter, LegacyTableManagerAdapter, TableManagerAdapter}
3132
import io.delta.kernel.engine.Engine
3233
import io.delta.kernel.expressions.{Column, Predicate}
3334
import io.delta.kernel.hook.PostCommitHook.PostCommitHookType
34-
import io.delta.kernel.internal.{InternalScanFileUtils, SnapshotImpl, TableImpl}
35+
import io.delta.kernel.internal.{InternalScanFileUtils, SnapshotImpl}
3536
import io.delta.kernel.internal.actions.DomainMetadata
3637
import io.delta.kernel.internal.checksum.{ChecksumReader, ChecksumWriter, CRCInfo}
3738
import io.delta.kernel.internal.clustering.ClusteringMetadataDomain
@@ -55,7 +56,25 @@ import org.apache.spark.sql.SparkSession
5556
import org.apache.spark.sql.catalyst.plans.SQLHelper
5657
import org.scalatest.Assertions
5758

58-
trait TestUtils extends Assertions with SQLHelper {
59+
trait TestUtils extends AbstractTestUtils {
60+
override def getTableManagerAdapter: AbstractTableManagerAdapter = new LegacyTableManagerAdapter()
61+
}
62+
63+
/**
64+
* DO NOT MODIFY this trait -- this is just syntactic sugar to clearly indicate we are extending the
65+
* "default" TestUtils which happens to use the legacy Kernel APIs
66+
*/
67+
trait TestUtilsWithLegacyKernelAPIs extends TestUtils
68+
69+
trait TestUtilsWithTableManagerAPIs extends AbstractTestUtils {
70+
override def getTableManagerAdapter: AbstractTableManagerAdapter = new TableManagerAdapter()
71+
}
72+
73+
trait AbstractTestUtils extends Assertions with SQLHelper {
74+
75+
import io.delta.kernel.defaults.test.ResolvedTableAdapterImplicits._
76+
77+
def getTableManagerAdapter: AbstractTableManagerAdapter
5978

6079
lazy val configuration = new Configuration()
6180
lazy val defaultEngine = DefaultEngine.create(configuration)
@@ -210,10 +229,24 @@ trait TestUtils extends Assertions with SQLHelper {
210229
filter: Predicate = null,
211230
expectedRemainingFilter: Predicate = null,
212231
engine: Engine = defaultEngine): Seq[Row] = {
232+
readResolvedTableAdapter(
233+
snapshot.toTestAdapter,
234+
readSchema,
235+
filter,
236+
expectedRemainingFilter,
237+
engine)
238+
}
239+
240+
def readResolvedTableAdapter(
241+
resolvedTableAdapter: AbstractResolvedTableAdapter,
242+
readSchema: StructType = null,
243+
filter: Predicate = null,
244+
expectedRemainingFilter: Predicate = null,
245+
engine: Engine = defaultEngine): Seq[Row] = {
213246

214247
val result = ArrayBuffer[Row]()
215248

216-
var scanBuilder = snapshot.getScanBuilder()
249+
var scanBuilder = resolvedTableAdapter.getScanBuilder()
217250

218251
if (readSchema != null) {
219252
scanBuilder = scanBuilder.withReadSchema(readSchema)
@@ -375,40 +408,46 @@ trait TestUtils extends Assertions with SQLHelper {
375408
expectedVersion: Option[Long] = None): Unit = {
376409
assert(version.isEmpty || timestamp.isEmpty, "Cannot provide both a version and timestamp")
377410

378-
val snapshot = if (version.isDefined) {
379-
Table.forPath(engine, path)
380-
.getSnapshotAsOfVersion(engine, version.get)
411+
val resolvedTableAdapter = if (version.isDefined) {
412+
getTableManagerAdapter.getResolvedTableAdapterAtVersion(engine, path, version.get)
381413
} else if (timestamp.isDefined) {
382-
Table.forPath(engine, path)
383-
.getSnapshotAsOfTimestamp(engine, timestamp.get)
414+
getTableManagerAdapter.getResolvedTableAdapterAtTimestamp(engine, path, timestamp.get)
384415
} else {
385-
latestSnapshot(path, engine)
416+
getTableManagerAdapter.getResolvedTableAdapterAtLatest(engine, path)
386417
}
387418

388419
val readSchema = if (readCols == null) {
389420
null
390421
} else {
391-
val schema = snapshot.getSchema()
422+
val schema = resolvedTableAdapter.getSchema()
392423
new StructType(readCols.map(schema.get(_)).asJava)
393424
}
394425

395426
if (expectedSchema != null) {
396427
assert(
397-
expectedSchema == snapshot.getSchema(),
428+
expectedSchema == resolvedTableAdapter.getSchema(),
398429
s"""
399430
|Expected schema does not match actual schema:
400431
|Expected schema: $expectedSchema
401-
|Actual schema: ${snapshot.getSchema()}
432+
|Actual schema: ${resolvedTableAdapter.getSchema()}
402433
|""".stripMargin)
403434
}
404435

436+
val actualVersion = resolvedTableAdapter.getVersion()
437+
405438
expectedVersion.foreach { version =>
406439
assert(
407-
version == snapshot.getVersion(),
408-
s"Expected version $version does not match actual version ${snapshot.getVersion()}")
440+
version == actualVersion,
441+
s"Expected version $version does not match actual version $actualVersion}")
409442
}
410443

411-
val result = readSnapshot(snapshot, readSchema, filter, expectedRemainingFilter, engine)
444+
val result =
445+
readResolvedTableAdapter(
446+
resolvedTableAdapter,
447+
readSchema,
448+
filter,
449+
expectedRemainingFilter,
450+
engine)
412451
checkAnswer(result, expectedAnswer)
413452
}
414453

0 commit comments

Comments
 (0)