Skip to content

Commit 15d3302

Browse files
authored
[spark] update branch UT and doc (#4169)
1 parent 7babea6 commit 15d3302

File tree

2 files changed

+69
-2
lines changed

2 files changed

+69
-2
lines changed

docs/content/maintenance/manage-branches.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,19 @@ Run the following command:
6868
```
6969
{{< /tab >}}
7070
71+
{{< tab "Spark SQL" >}}
72+
73+
Run the following sql:
74+
75+
```sql
76+
-- create branch named 'branch1' from tag 'tag1'
77+
CALL sys.create_branch('default.T', 'branch1', 'tag1');
78+
79+
-- create empty branch named 'branch1'
80+
CALL sys.create_branch('default.T', 'empty_branch');
81+
```
82+
{{< /tab >}}
83+
7184
{{< /tabs >}}
7285
7386
## Delete Branches
@@ -101,6 +114,15 @@ Run the following command:
101114
```
102115
{{< /tab >}}
103116
117+
{{< tab "Spark SQL" >}}
118+
119+
Run the following sql:
120+
121+
```sql
122+
CALL sys.delete_branch('default.T', 'branch1');
123+
```
124+
{{< /tab >}}
125+
104126
{{< /tabs >}}
105127
106128
## Read / Write With Branch
@@ -122,6 +144,27 @@ INSERT INTO `t$branch_branch1` SELECT ...
122144
123145
{{< /tab >}}
124146
147+
{{< tab "Spark SQL" >}}
148+
149+
```sql
150+
-- read from branch 'branch1'
151+
SELECT * FROM `t$branch_branch1`;
152+
153+
-- write to branch 'branch1'
154+
INSERT INTO `t$branch_branch1` SELECT ...
155+
```
156+
157+
{{< /tab >}}
158+
159+
{{< tab "Spark DataFrame" >}}
160+
161+
```sql
162+
-- read from branch 'branch1'
163+
spark.read.format("paimon").option("branch", "branch1").table("t")
164+
```
165+
166+
{{< /tab >}}
167+
125168
{{< /tabs >}}
126169
127170
## Fast Forward

paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteBranchProcedureTest.scala renamed to paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ import org.apache.spark.sql.{Dataset, Row}
2424
import org.apache.spark.sql.execution.streaming.MemoryStream
2525
import org.apache.spark.sql.streaming.StreamTest
2626

27-
class CreateAndDeleteBranchProcedureTest extends PaimonSparkTestBase with StreamTest {
27+
class BranchProcedureTest extends PaimonSparkTestBase with StreamTest {
2828

2929
import testImplicits._
30-
test("Paimon Procedure: create and delete branch") {
30+
test("Paimon Procedure: create, query, write and delete branch") {
3131
failAfter(streamingTimeout) {
3232
withTempDir {
3333
checkpointDir =>
@@ -86,19 +86,43 @@ class CreateAndDeleteBranchProcedureTest extends PaimonSparkTestBase with Stream
8686
val branchManager = table.branchManager()
8787
assert(branchManager.branchExists("test_branch"))
8888

89+
// query from branch
90+
checkAnswer(
91+
spark.sql("SELECT * FROM `T$branch_test_branch` ORDER BY a"),
92+
Row(1, "a") :: Row(2, "b") :: Nil
93+
)
94+
checkAnswer(
95+
spark.read.format("paimon").option("branch", "test_branch").table("T").orderBy("a"),
96+
Row(1, "a") :: Row(2, "b") :: Nil
97+
)
98+
99+
// update branch
100+
spark.sql("INSERT INTO `T$branch_test_branch` VALUES (3, 'c')")
101+
checkAnswer(
102+
spark.sql("SELECT * FROM `T$branch_test_branch` ORDER BY a"),
103+
Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil
104+
)
105+
89106
// create empty branch
90107
checkAnswer(
91108
spark.sql(
92109
"CALL paimon.sys.create_branch(table => 'test.T', branch => 'empty_branch')"),
93110
Row(true) :: Nil)
94111
assert(branchManager.branchExists("empty_branch"))
112+
checkAnswer(
113+
spark.sql("SELECT * FROM `T$branch_empty_branch` ORDER BY a"),
114+
Nil
115+
)
95116

96117
// delete branch
97118
checkAnswer(
98119
spark.sql(
99120
"CALL paimon.sys.delete_branch(table => 'test.T', branch => 'test_branch')"),
100121
Row(true) :: Nil)
101122
assert(!branchManager.branchExists("test_branch"))
123+
intercept[Exception] {
124+
spark.sql("SELECT * FROM `T$branch_test_branch` ORDER BY a")
125+
}
102126

103127
} finally {
104128
stream.stop()

0 commit comments

Comments
 (0)