Skip to content

Commit ed6d3f1

Browse files
[test][rest] add test case for two sessions with cache for rest commitTable (#6438)
1 parent befee90 commit ed6d3f1

File tree

2 files changed

+86
-1
lines changed

2 files changed

+86
-1
lines changed

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestWithRestCatalogBase.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class PaimonSparkTestWithRestCatalogBase extends PaimonSparkTestBase {
3434

3535
private var restCatalogServer: RESTCatalogServer = _
3636
private var serverUrl: String = _
37-
private var warehouse: String = _
37+
protected var warehouse: String = _
3838
private val initToken = "init_token"
3939

4040
override protected def beforeAll(): Unit = {
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark
20+
21+
import org.apache.paimon.catalog.Catalog.TableNotExistException
22+
23+
import org.apache.spark.SparkConf
24+
import org.apache.spark.sql.Row
25+
26+
/**
27+
* Two-session reproduction of stale cache: session1 caches table, session2 drops & recreates, then
28+
* session1 commits with stale tableId and fails.
29+
*/
30+
class PaimonSparkTwoSessionCacheTest extends PaimonSparkTestWithRestCatalogBase {
31+
32+
test("Two sessions: stale cache commit fails") {
33+
val db = "sku"
34+
val tbl = "sku_detail_twosession"
35+
36+
// s1 and s2 are independent SparkSessions sharing SparkContext but with separate Catalogs
37+
val s1 = spark
38+
val s2 = spark.newSession()
39+
40+
// Ensure both sessions can see the database
41+
s1.sql(s"CREATE DATABASE IF NOT EXISTS paimon.$db")
42+
s2.sql(s"CREATE DATABASE IF NOT EXISTS paimon.$db")
43+
44+
withTable(s"paimon.$db.$tbl") {
45+
s1.sql(s"""
46+
CREATE TABLE paimon.%s.%s (
47+
id INT,
48+
name STRING
49+
) USING PAIMON
50+
""".format(db, tbl))
51+
52+
// Session 1 caches table by reading/writing
53+
s1.sql(s"INSERT INTO paimon.%s.%s VALUES (1, 'a'), (2, 'b')".format(db, tbl))
54+
checkAnswer(
55+
s1.sql(s"SELECT * FROM paimon.%s.%s ORDER BY id".format(db, tbl)),
56+
Seq(Row(1, "a"), Row(2, "b")))
57+
58+
// Session 2 drops and recreates the table (new tableId on server)
59+
s2.sql(s"DROP TABLE IF EXISTS paimon.%s.%s".format(db, tbl))
60+
s2.sql(s"""
61+
CREATE TABLE paimon.%s.%s (
62+
id INT,
63+
name STRING
64+
) USING PAIMON
65+
""".format(db, tbl))
66+
s2.sql(s"INSERT INTO paimon.%s.%s VALUES (3, 'c')".format(db, tbl))
67+
68+
// Session 1 attempts another write using stale cache (before fix this should fail)
69+
val thrown = intercept[Exception] {
70+
s1.sql(s"INSERT INTO paimon.%s.%s VALUES (4, 'd')".format(db, tbl))
71+
}
72+
assert(thrown.getMessage.contains("Commit failed"))
73+
}
74+
}
75+
76+
override protected def sparkConf: SparkConf = {
77+
super.sparkConf
78+
.set("spark.sql.catalog.paimon.cache-enabled", "true")
79+
.set("spark.sql.catalog.paimon.cache-expire-after-access", "5m")
80+
.set("spark.sql.catalog.paimon.cache-expire-after-write", "10m")
81+
.set("spark.sql.catalog.paimon.codegen-enabled", "false")
82+
.set("spark.sql.catalog.paimon.plugin-enabled", "false")
83+
.set("spark.sql.catalog.paimon.plugin-dir", warehouse)
84+
}
85+
}

0 commit comments

Comments
 (0)