Skip to content

Commit bb93957

Browse files
authored
[Kernel][Test]Make DomainMetadataSuite work with post commit hook and do some clean up (delta-io#4336)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> This PR makes no functional change, it makes DomainMetadataSuite work with post commit hook and do some clean up ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Existing test ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No
1 parent 2011ffd commit bb93957

File tree

2 files changed

+61
-44
lines changed

2 files changed

+61
-44
lines changed

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,8 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils {
300300
schema: StructType = null,
301301
partCols: Seq[String] = null,
302302
tableProperties: Map[String, String] = null,
303-
clock: Clock = () => System.currentTimeMillis): Transaction = {
303+
clock: Clock = () => System.currentTimeMillis,
304+
withDomainMetadataSupported: Boolean = false): Transaction = {
304305

305306
var txnBuilder = createWriteTxnBuilder(
306307
TableImpl.forPath(engine, tablePath, clock))
@@ -314,6 +315,10 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils {
314315
txnBuilder = txnBuilder.withTableProperties(engine, tableProperties.asJava)
315316
}
316317

318+
if (withDomainMetadataSupported) {
319+
txnBuilder = txnBuilder.withDomainMetadataSupported()
320+
}
321+
317322
txnBuilder.build(engine)
318323
}
319324

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DomainMetadataSuite.scala

Lines changed: 55 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,20 @@
1515
*/
1616
package io.delta.kernel.defaults
1717

18-
import java.util.Collections
19-
2018
import scala.collection.JavaConverters._
2119
import scala.collection.immutable.Seq
2220

2321
import io.delta.kernel._
22+
import io.delta.kernel.data.Row
2423
import io.delta.kernel.defaults.internal.parquet.ParquetSuiteBase
2524
import io.delta.kernel.engine.Engine
2625
import io.delta.kernel.exceptions._
2726
import io.delta.kernel.expressions.Literal
28-
import io.delta.kernel.internal.{SnapshotImpl, TableConfig, TableImpl, TransactionBuilderImpl, TransactionImpl}
29-
import io.delta.kernel.internal.actions.{DomainMetadata, Protocol, SingleAction}
27+
import io.delta.kernel.internal.{SnapshotImpl, TableImpl, TransactionImpl}
28+
import io.delta.kernel.internal.actions.DomainMetadata
3029
import io.delta.kernel.internal.rowtracking.RowTrackingMetadataDomain
31-
import io.delta.kernel.internal.tablefeatures.TableFeatures
32-
import io.delta.kernel.internal.util.Utils.toCloseableIterator
33-
import io.delta.kernel.utils.CloseableIterable.{emptyIterable, inMemoryIterable}
30+
import io.delta.kernel.utils.CloseableIterable
31+
import io.delta.kernel.utils.CloseableIterable.emptyIterable
3432

3533
import org.apache.spark.sql.delta.DeltaLog
3634
import org.apache.spark.sql.delta.RowId.{RowTrackingMetadataDomain => SparkRowTrackingMetadataDomain}
@@ -104,34 +102,25 @@ class DomainMetadataSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBase
104102
useInternalApi: Boolean = false): Unit = {
105103
// Create the transaction with domain metadata and commit
106104
val txn = createTxnWithDomainMetadatas(engine, tablePath, domainMetadatas, useInternalApi)
107-
txn.commit(engine, emptyIterable())
105+
commitTransaction(txn, engine, emptyIterable())
108106

109107
// Verify the final state includes the expected domain metadata
110108
val table = Table.forPath(engine, tablePath)
111109
assertDomainMetadata(table, engine, expectedValue)
112110
}
113111

114-
// TODO we probably don't always need this since domain metadata is now automatically enabled
115-
private def setDomainMetadataSupport(engine: Engine, tablePath: String): Unit = {
116-
val protocol = new Protocol(
117-
3, // minReaderVersion
118-
7, // minWriterVersion
119-
Collections.emptySet(), // readerFeatures
120-
Set("domainMetadata").asJava // writerFeatures
121-
)
122-
123-
val protocolAction = SingleAction.createProtocolSingleAction(protocol.toRow)
124-
val txn = createTxn(engine, tablePath, isNewTable = false, testSchema, Seq.empty)
125-
txn.commit(engine, inMemoryIterable(toCloseableIterator(Seq(protocolAction).asJava.iterator())))
126-
}
127-
128112
private def createTableWithDomainMetadataSupported(engine: Engine, tablePath: String): Unit = {
129113
// Create an empty table
130-
createTxn(engine, tablePath, isNewTable = true, testSchema, Seq.empty)
131-
.commit(engine, emptyIterable())
132-
133-
// Set writer version and writer feature to support domain metadata
134-
setDomainMetadataSupport(engine, tablePath)
114+
commitTransaction(
115+
createTxn(
116+
engine,
117+
tablePath,
118+
isNewTable = true,
119+
testSchema,
120+
Seq.empty,
121+
withDomainMetadataSupported = true),
122+
engine,
123+
emptyIterable())
135124
}
136125

137126
private def validateDomainMetadataConflictResolution(
@@ -162,22 +151,22 @@ class DomainMetadataSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBase
162151
val txn1 = createTxnWithDomainMetadatas(engine, tablePath, currentTxn1DomainMetadatas)
163152

164153
val txn2 = createTxnWithDomainMetadatas(engine, tablePath, winningTxn2DomainMetadatas)
165-
txn2.commit(engine, emptyIterable())
154+
commitTransaction(txn2, engine, emptyIterable())
166155

167156
val txn3 = createTxnWithDomainMetadatas(engine, tablePath, winningTxn3DomainMetadatas)
168-
txn3.commit(engine, emptyIterable())
157+
commitTransaction(txn3, engine, emptyIterable())
169158

170159
if (expectedConflict) {
171160
// We expect the commit of txn1 to fail because of the conflicting DM actions
172161
val ex = intercept[KernelException] {
173-
txn1.commit(engine, emptyIterable())
162+
commitTransaction(txn1, engine, emptyIterable())
174163
}
175164
assert(
176165
ex.getMessage.contains(
177166
"A concurrent writer added a domainMetadata action for the same domain"))
178167
} else {
179168
// We expect the commit of txn1 to succeed
180-
txn1.commit(engine, emptyIterable())
169+
commitTransaction(txn1, engine, emptyIterable())
181170
// Verify the final state includes merged domain metadata
182171
val expectedMetadata =
183172
(winningTxn2DomainMetadatas ++ winningTxn3DomainMetadatas ++ currentTxn1DomainMetadatas)
@@ -187,13 +176,26 @@ class DomainMetadataSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBase
187176
}
188177
}
189178

179+
override def commitTransaction(
180+
txn: Transaction,
181+
engine: Engine,
182+
dataActions: CloseableIterable[Row]): TransactionCommitResult = {
183+
val result = txn.commit(engine, dataActions)
184+
result.getPostCommitHooks
185+
.stream()
186+
.forEach(hook => hook.threadSafeInvoke(engine))
187+
result
188+
}
189+
190190
test("create table w/o domain metadata") {
191191
withTempDirAndEngine { (tablePath, engine) =>
192192
val table = Table.forPath(engine, tablePath)
193193

194194
// Create an empty table
195-
createTxn(engine, tablePath, isNewTable = true, testSchema, Seq.empty)
196-
.commit(engine, emptyIterable())
195+
commitTransaction(
196+
createTxn(engine, tablePath, isNewTable = true, testSchema, Seq.empty),
197+
engine,
198+
emptyIterable())
197199

198200
// Verify that the table doesn't have any domain metadata
199201
assertDomainMetadata(table, engine, Map.empty)
@@ -204,26 +206,30 @@ class DomainMetadataSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBase
204206
withTempDirAndEngine { (tablePath, engine) =>
205207
// Create an empty table
206208
// Its minWriterVersion is 2 and doesn't have 'domainMetadata' in its writerFeatures
207-
createTxn(engine, tablePath, isNewTable = true, testSchema, Seq.empty)
208-
.commit(engine, emptyIterable())
209+
commitTransaction(
210+
createTxn(
211+
engine,
212+
tablePath,
213+
isNewTable = true,
214+
testSchema,
215+
Seq.empty),
216+
engine,
217+
emptyIterable())
209218

210219
val dm1 = new DomainMetadata("domain1", "", false)
211220
// We use the internal API because our public API will automatically upgrade the protocol
212221
val txn1 = createTxnWithDomainMetadatas(engine, tablePath, List(dm1), useInternalApi = true)
213222

214223
// We expect the commit to fail because the table doesn't support domain metadata
215224
val e = intercept[KernelException] {
216-
txn1.commit(engine, emptyIterable())
225+
commitTransaction(txn1, engine, emptyIterable())
217226
}
218227
assert(
219228
e.getMessage
220229
.contains(
221230
"Cannot commit DomainMetadata action(s) because the feature 'domainMetadata' "
222231
+ "is not supported on this table."))
223232

224-
// Set writer version and writer feature to support domain metadata
225-
setDomainMetadataSupport(engine, tablePath)
226-
227233
// Commit domain metadata again and expect success
228234
commitDomainMetadataAndVerify(
229235
engine,
@@ -642,7 +648,7 @@ class DomainMetadataSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBase
642648
test("updating domain metadata fails after transaction committed") {
643649
withTempDirAndEngine { (tablePath, engine) =>
644650
val txn = createTxn(engine, tablePath, isNewTable = true, testSchema, Seq.empty)
645-
txn.commit(engine, emptyIterable())
651+
commitTransaction(txn, engine, emptyIterable())
646652

647653
intercept[IllegalStateException] {
648654
txn.addDomainMetadata("domain", "config")
@@ -698,7 +704,7 @@ class DomainMetadataSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBase
698704
val txn = createWriteTxnBuilder(Table.forPath(defaultEngine, tablePath))
699705
.build(defaultEngine)
700706
txn.removeDomainMetadata("foo")
701-
txn.commit(defaultEngine, emptyIterable());
707+
commitTransaction(txn, defaultEngine, emptyIterable());
702708
}
703709
}
704710

@@ -791,8 +797,14 @@ class DomainMetadataSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBase
791797
test("removing a domain on a table without DomainMetadata support") {
792798
withTempDirAndEngine { (tablePath, engine) =>
793799
// Create table with legacy protocol
794-
createTxn(tablePath = tablePath, isNewTable = true, schema = testSchema, partCols = Seq())
795-
.commit(engine, emptyIterable())
800+
commitTransaction(
801+
createTxn(
802+
tablePath = tablePath,
803+
isNewTable = true,
804+
schema = testSchema,
805+
partCols = Seq()),
806+
engine,
807+
emptyIterable())
796808
intercept[IllegalStateException] {
797809
val txn = createWriteTxnBuilder(Table.forPath(engine, tablePath))
798810
.build(engine)

0 commit comments

Comments
 (0)