Skip to content

Commit c06ba36

Browse files
authored
[Kernel] [CatalogManaged] More Builder validation and tests (delta-io#4664)
## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/4664/files) to review incremental changes. - [**stack/kernel_catalog_managed_5_builder_validation**](delta-io#4664) [[Files changed](https://github.com/delta-io/delta/pull/4664/files)] --------- #### Which Delta project/connector is this regarding? - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description This PR adds some pretty straight forward builder validation and tests. ## How was this patch tested? New UTs. ## Does this PR introduce _any_ user-facing changes? No.
1 parent 944c6de commit c06ba36

File tree

4 files changed

+188
-19
lines changed

4 files changed

+188
-19
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/table/ResolvedTableBuilderImpl.java

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.delta.kernel.internal.actions.Protocol;
2626
import io.delta.kernel.internal.files.ParsedLogData;
2727
import io.delta.kernel.internal.files.ParsedLogData.ParsedLogType;
28+
import io.delta.kernel.internal.tablefeatures.TableFeatures;
2829
import io.delta.kernel.internal.util.Clock;
2930
import io.delta.kernel.internal.util.Tuple2;
3031
import java.util.Collections;
@@ -90,7 +91,11 @@ public ResolvedTableBuilderImpl withLogData(List<ParsedLogData> logDatas) {
9091

9192
@Override
9293
public ResolvedTableBuilder withProtocolAndMetadata(Protocol protocol, Metadata metadata) {
93-
ctx.protocolAndMetadataOpt = Optional.of(new Tuple2<>(protocol, metadata));
94+
ctx.protocolAndMetadataOpt =
95+
Optional.of(
96+
new Tuple2<>(
97+
requireNonNull(protocol, "protocol is null"),
98+
requireNonNull(metadata, "metadata is null")));
9499
return this;
95100
}
96101

@@ -106,7 +111,41 @@ public ResolvedTableInternal build(Engine engine) {
106111

107112
private void validateInputOnBuild() {
108113
checkArgument(ctx.versionOpt.orElse(0L) >= 0, "version must be >= 0");
109-
// TODO: logData only ratified staged commits
110-
// TODO: logData sorted and contiguous
114+
validateProtocolAndMetadataOnlyIfVersionProvided();
115+
validateProtocolRead();
116+
validateLogDataContainsOnlyRatifiedCommits(); // TODO: delta-io/delta#4765 support other types
117+
validateLogDataIsSortedContiguous();
118+
}
119+
120+
private void validateProtocolAndMetadataOnlyIfVersionProvided() {
121+
checkArgument(
122+
ctx.versionOpt.isPresent() || !ctx.protocolAndMetadataOpt.isPresent(),
123+
"protocol and metadata can only be provided if a version is provided");
124+
}
125+
126+
private void validateProtocolRead() {
127+
ctx.protocolAndMetadataOpt.ifPresent(
128+
x -> TableFeatures.validateKernelCanReadTheTable(x._1, ctx.unresolvedPath));
129+
}
130+
131+
private void validateLogDataContainsOnlyRatifiedCommits() {
132+
for (ParsedLogData logData : ctx.logDatas) {
133+
checkArgument(
134+
logData.type == ParsedLogType.RATIFIED_STAGED_COMMIT,
135+
"Only RATIFIED_STAGED_COMMIT log data is supported, but found: " + logData);
136+
}
137+
}
138+
139+
private void validateLogDataIsSortedContiguous() {
140+
if (ctx.logDatas.size() > 1) {
141+
for (int i = 1; i < ctx.logDatas.size(); i++) {
142+
final ParsedLogData prev = ctx.logDatas.get(i - 1);
143+
final ParsedLogData curr = ctx.logDatas.get(i);
144+
checkArgument(
145+
prev.version + 1 == curr.version,
146+
String.format(
147+
"Log data must be sorted and contiguous, but found: %s and %s", prev, curr));
148+
}
149+
}
111150
}
112151
}

kernel/kernel-api/src/test/scala/io/delta/kernel/internal/catalogManaged/ResolvedTableBuilderSuite.scala

Lines changed: 137 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,157 @@ package io.delta.kernel.internal.catalogManaged
1818

1919
import java.util.Collections
2020

21+
import scala.collection.JavaConverters._
22+
2123
import io.delta.kernel.TableManager
24+
import io.delta.kernel.exceptions.KernelException
2225
import io.delta.kernel.internal.actions.Protocol
26+
import io.delta.kernel.internal.files.ParsedLogData
27+
import io.delta.kernel.internal.files.ParsedLogData.ParsedLogType
2328
import io.delta.kernel.internal.table.ResolvedTableInternal
24-
import io.delta.kernel.test.{ActionUtils, MockFileSystemClientUtils}
29+
import io.delta.kernel.test.{ActionUtils, MockFileSystemClientUtils, VectorTestUtils}
2530
import io.delta.kernel.types.{IntegerType, StructType}
2631

2732
import org.scalatest.funsuite.AnyFunSuite
2833

2934
class ResolvedTableBuilderSuite extends AnyFunSuite
30-
with MockFileSystemClientUtils with ActionUtils {
35+
with MockFileSystemClientUtils
36+
with ActionUtils
37+
with VectorTestUtils {
3138

32-
test("if P & M are provided then LogSegment is not loaded") {
33-
val testSchema = new StructType().add("c1", IntegerType.INTEGER)
34-
val engine = createMockFSListFromEngine(Nil)
39+
private val emptyMockEngine = createMockFSListFromEngine(Nil)
40+
private val protocol = new Protocol(1, 2)
41+
private val metadata = testMetadata(new StructType().add("c1", IntegerType.INTEGER))
42+
43+
///////////////////////////////////////
44+
// Builder Validation Tests -- START //
45+
///////////////////////////////////////
46+
47+
test("loadTable: null path throws NullPointerException") {
48+
assertThrows[NullPointerException] {
49+
TableManager.loadTable(null)
50+
}
51+
}
52+
53+
// ===== Version Tests ===== //
54+
55+
test("atVersion: negative version throws IllegalArgumentException") {
56+
val builder = TableManager.loadTable(dataPath.toString).atVersion(-1)
57+
58+
val exMsg = intercept[IllegalArgumentException] {
59+
builder.build(emptyMockEngine)
60+
}.getMessage
61+
62+
assert(exMsg === "version must be >= 0")
63+
}
64+
65+
// ===== Protocol and Metadata Tests ===== //
66+
67+
test("withProtocolAndMetadata: null protocol throws NullPointerException") {
68+
assertThrows[NullPointerException] {
69+
TableManager.loadTable(dataPath.toString)
70+
.withProtocolAndMetadata(null, metadata)
71+
}
72+
73+
assertThrows[NullPointerException] {
74+
TableManager.loadTable(dataPath.toString)
75+
.withProtocolAndMetadata(protocol, null)
76+
}
77+
}
78+
79+
test("withProtocolAndMetadata: only if version is provided") {
80+
val exMsg = intercept[IllegalArgumentException] {
81+
TableManager.loadTable(dataPath.toString)
82+
.withProtocolAndMetadata(protocol, metadata)
83+
.build(emptyMockEngine)
84+
}.getMessage
85+
86+
assert(exMsg === "protocol and metadata can only be provided if a version is provided")
87+
}
88+
89+
test("withProtocolAndMetadata: invalid readerVersion throws KernelException") {
90+
val exMsg = intercept[KernelException] {
91+
TableManager.loadTable(dataPath.toString)
92+
.atVersion(10)
93+
.withProtocolAndMetadata(new Protocol(999, 2), metadata)
94+
.build(emptyMockEngine)
95+
}.getMessage
96+
97+
assert(exMsg.contains("Unsupported Delta protocol reader version"))
98+
}
99+
100+
test("withProtocolAndMetadata: unknown reader feature throws KernelException") {
101+
val exMsg = intercept[KernelException] {
102+
TableManager.loadTable(dataPath.toString)
103+
.atVersion(10)
104+
.withProtocolAndMetadata(
105+
new Protocol(3, 7, Set("unknownReaderFeature").asJava, Collections.emptySet()),
106+
metadata)
107+
.build(emptyMockEngine)
108+
}.getMessage
109+
110+
assert(exMsg.contains("Unsupported Delta table feature"))
111+
}
112+
113+
// ===== LogData Tests ===== //
35114

115+
test("withLogData: null input throws NullPointerException") {
116+
assertThrows[NullPointerException] {
117+
TableManager.loadTable(dataPath.toString).withLogData(null)
118+
}
119+
}
120+
121+
Seq(
122+
ParsedLogData.forInlineData(1, ParsedLogType.RATIFIED_INLINE_COMMIT, emptyColumnarBatch),
123+
ParsedLogData.forFileStatus(logCompactionStatus(0, 1))).foreach { parsedLogData =>
124+
val suffix = s"- type=${parsedLogData.`type`}"
125+
test(s"withLogData: non-RATIFIED_STAGED_COMMIT throws IllegalArgumentException $suffix") {
126+
val builder = TableManager
127+
.loadTable(dataPath.toString)
128+
.atVersion(1)
129+
.withLogData(Collections.singletonList(parsedLogData))
130+
131+
val exMsg = intercept[IllegalArgumentException] {
132+
builder.build(emptyMockEngine)
133+
}.getMessage
134+
135+
assert(exMsg.contains("Only RATIFIED_STAGED_COMMIT log data is supported"))
136+
}
137+
}
138+
139+
test("withLogData: non-contiguous input throws IllegalArgumentException") {
140+
val exMsg = intercept[IllegalArgumentException] {
141+
TableManager.loadTable(dataPath.toString)
142+
.atVersion(2)
143+
.withLogData(parsedRatifiedStagedCommits(Seq(0, 2)).toList.asJava)
144+
.build(emptyMockEngine)
145+
}.getMessage
146+
147+
assert(exMsg.contains("Log data must be sorted and contiguous"))
148+
}
149+
150+
test("withLogData: non-sorted input throws IllegalArgumentException") {
151+
val exMsg = intercept[IllegalArgumentException] {
152+
TableManager.loadTable(dataPath.toString)
153+
.atVersion(2)
154+
.withLogData(parsedRatifiedStagedCommits(Seq(2, 1, 0)).toList.asJava)
155+
.build(emptyMockEngine)
156+
}.getMessage
157+
158+
assert(exMsg.contains("Log data must be sorted and contiguous"))
159+
}
160+
161+
/////////////////////////////////////
162+
// Builder Validation Tests -- END //
163+
/////////////////////////////////////
164+
165+
test("if P & M are provided then LogSegment is not loaded") {
36166
val resolvedTable = TableManager
37167
.loadTable(dataPath.toString)
38168
.atVersion(13)
39-
.withProtocolAndMetadata(new Protocol(1, 2), testMetadata(testSchema))
169+
.withProtocolAndMetadata(protocol, metadata)
40170
.withLogData(Collections.emptyList())
41-
.build(engine)
171+
.build(emptyMockEngine)
42172
.asInstanceOf[ResolvedTableInternal]
43173

44174
assert(!resolvedTable.getLazyLogSegment.isPresent)

kernel/kernel-api/src/test/scala/io/delta/kernel/internal/files/ParsedLogDataSuite.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,15 @@ import java.util.Optional
2121
import io.delta.kernel.data.{ColumnarBatch, ColumnVector}
2222
import io.delta.kernel.internal.files.ParsedLogData.{ParsedLogCategory, ParsedLogType}
2323
import io.delta.kernel.internal.util.FileNames
24-
import io.delta.kernel.test.MockFileSystemClientUtils
24+
import io.delta.kernel.test.{MockFileSystemClientUtils, VectorTestUtils}
2525
import io.delta.kernel.types.StructType
2626
import io.delta.kernel.utils.FileStatus
2727

2828
import org.scalatest.funsuite.AnyFunSuite
2929
import org.scalatest.matchers.must.Matchers.be
3030
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
3131

32-
class ParsedLogDataSuite extends AnyFunSuite with MockFileSystemClientUtils {
33-
34-
private val emptyColumnarBatch = new ColumnarBatch {
35-
override def getSchema: StructType = null
36-
override def getColumnVector(ordinal: Int): ColumnVector = null
37-
override def getSize: Int = 0
38-
}
32+
class ParsedLogDataSuite extends AnyFunSuite with MockFileSystemClientUtils with VectorTestUtils {
3933

4034
/////////////
4135
// General //

kernel/kernel-api/src/test/scala/io/delta/kernel/test/VectorTestUtils.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,18 @@ import java.lang.{Boolean => BooleanJ, Double => DoubleJ, Float => FloatJ}
1919

2020
import scala.collection.JavaConverters._
2121

22-
import io.delta.kernel.data.{ColumnVector, MapValue}
22+
import io.delta.kernel.data.{ColumnarBatch, ColumnVector, MapValue}
2323
import io.delta.kernel.internal.util.VectorUtils
2424
import io.delta.kernel.types._
2525

2626
trait VectorTestUtils {
2727

28+
protected def emptyColumnarBatch = new ColumnarBatch {
29+
override def getSchema: StructType = null
30+
override def getColumnVector(ordinal: Int): ColumnVector = null
31+
override def getSize: Int = 0
32+
}
33+
2834
protected def booleanVector(values: Seq[BooleanJ]): ColumnVector = {
2935
new ColumnVector {
3036
override def getDataType: DataType = BooleanType.BOOLEAN

0 commit comments

Comments
 (0)