Skip to content

Commit 7073438

Browse files
committed
[SPARK-52162] Add DELETE/UPDATE and partitioned table tests
### What changes were proposed in this pull request? This PR aims to improve test coverage of the following. - `DELETE` statement - `UPDATE` statement - Partitioned table tests (Create/Drop/Delete/Update) ### Why are the changes needed? To prevent a future regression. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #153 from dongjoon-hyun/SPARK-52162. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent d1605ab commit 7073438

File tree

1 file changed

+94
-4
lines changed

1 file changed

+94
-4
lines changed

Tests/SparkConnectTests/IcebergTests.swift

Lines changed: 94 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@
1818
//
1919

2020
import Foundation
21-
import Testing
22-
2321
import SparkConnect
22+
import Testing
2423

2524
/// A test suite for `Apache Iceberg` integration
2625
@Suite(.serialized)
@@ -31,8 +30,10 @@ struct IcebergTests {
3130
@Test
3231
func test() async throws {
3332
guard icebergEnabled else { return }
34-
let t1 = "\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
35-
let t2 = "\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
33+
let t1 =
34+
"\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
35+
let t2 =
36+
"\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
3637

3738
let spark = try await SparkSession.builder.getOrCreate()
3839

@@ -56,14 +57,103 @@ struct IcebergTests {
5657
try await spark.table(t1).writeTo(t2).append()
5758
#expect(try await spark.table(t2).count() == 6)
5859

60+
try await spark.sql("INSERT INTO \(t2) VALUES (1, 'a'), (2, 'b'), (3, 'c')").count()
61+
#expect(try await spark.table(t2).count() == 9)
62+
5963
try await spark.table(t1).writeTo(t2).replace()
6064
#expect(try await spark.table(t2).count() == 3)
6165

66+
try await spark.sql("INSERT OVERWRITE \(t2) VALUES (1, 'a'), (2, 'b'), (3, 'c')").count()
67+
#expect(try await spark.table(t2).count() == 3)
68+
69+
try await spark.sql("DELETE FROM \(t2) WHERE id = 1").count()
70+
#expect(try await spark.table(t2).count() == 2)
71+
72+
try await spark.sql("UPDATE \(t2) SET data = 'new' WHERE id = 2").count()
73+
#expect(try await spark.sql("SELECT * FROM \(t2) WHERE data = 'new'").count() == 1)
74+
6275
try await spark.table(t1).writeTo(t2).overwrite("true")
6376
#expect(try await spark.table(t2).count() == 3)
6477

6578
try await spark.table(t1).writeTo(t2).overwrite("false")
6679
#expect(try await spark.table(t2).count() == 6)
80+
81+
try await spark.sql("INSERT OVERWRITE \(t2) VALUES (1, 'a')").count()
82+
#expect(try await spark.table(t2).count() == 1)
83+
84+
try await spark.table(t1).writeTo(t2).overwrite("id = 1")
85+
#expect(try await spark.table(t2).count() == 3)
86+
})
87+
88+
await spark.stop()
89+
}
90+
91+
@Test
92+
func partition() async throws {
93+
guard icebergEnabled else { return }
94+
let t1 =
95+
"\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
96+
let t2 =
97+
"\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
98+
99+
let spark = try await SparkSession.builder.getOrCreate()
100+
101+
try await SQLHelper.withTable(spark, t1, t2)({
102+
try await spark.sql(
103+
"""
104+
CREATE TABLE \(t1) (
105+
id BIGINT,
106+
data STRING,
107+
category STRING)
108+
USING ICEBERG
109+
PARTITIONED BY (category)
110+
"""
111+
).count()
112+
try await spark.sql(
113+
"""
114+
CREATE TABLE \(t2) (
115+
id BIGINT,
116+
data STRING,
117+
category STRING)
118+
USING ICEBERG
119+
PARTITIONED BY (category)
120+
"""
121+
).count()
122+
123+
#expect(try await spark.catalog.tableExists(t1))
124+
#expect(try await spark.catalog.tableExists(t2))
125+
126+
#expect(try await spark.table(t1).count() == 0)
127+
#expect(try await spark.table(t2).count() == 0)
128+
129+
try await spark.sql("INSERT INTO \(t1) VALUES (1, 'a', 'A'), (2, 'b', 'B'), (3, 'c', 'C')")
130+
.count()
131+
#expect(try await spark.table(t1).count() == 3)
132+
#expect(try await spark.table(t2).count() == 0)
133+
134+
try await spark.table(t1).writeTo(t2).append()
135+
#expect(try await spark.table(t2).count() == 3)
136+
137+
try await spark.table(t1).writeTo(t2).replace()
138+
#expect(try await spark.table(t2).count() == 3)
139+
140+
try await spark.sql("DELETE FROM \(t2) WHERE id = 1").count()
141+
#expect(try await spark.table(t2).count() == 2)
142+
143+
try await spark.sql("UPDATE \(t2) SET data = 'new' WHERE id = 2").count()
144+
#expect(try await spark.sql("SELECT * FROM \(t2) WHERE data = 'new'").count() == 1)
145+
146+
try await spark.table(t1).writeTo(t2).overwritePartitions()
147+
#expect(try await spark.table(t2).count() == 3)
148+
149+
try await spark.sql("INSERT OVERWRITE \(t2) SELECT * FROM \(t1)").count()
150+
#expect(try await spark.table(t2).count() == 3)
151+
152+
try await spark.sql("INSERT OVERWRITE \(t2) SELECT * FROM \(t1) WHERE category = 'C'").count()
153+
#expect(try await spark.table(t2).count() == 1)
154+
155+
try await spark.table(t1).writeTo(t2).overwrite("category = 'C'")
156+
#expect(try await spark.table(t2).count() == 3)
67157
})
68158

69159
await spark.stop()

0 commit comments

Comments
 (0)