Skip to content

Commit d1605ab

Browse files
committed
[SPARK-52150] Add Iceberg integration test GitHub Action job
### What changes were proposed in this pull request? This PR aims to add `Apache Iceberg` integration test `GitHub Action` job. ### Why are the changes needed? To improve a test coverage, - `IcebergTests.swift` is added which is guarded by `SPARK_ICEBERG_TEST_ENABLED` environment variable. - `DataFrameWriterV2Tests` is revised to test both `Iceberg` and `non-Iceberg` environment. Since `Apache Spark 4` is not supported by `Apache Iceberg` yet, the GitHub Action job will run Spark Connect server with the following versions. - Apache Spark 3.5.5 - Apache Iceberg 1.9.0 For non-Iceberg environment, `IcebergTests` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #151 from dongjoon-hyun/SPARK-52150. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent a2d6262 commit d1605ab

File tree

4 files changed

+113
-7
lines changed

4 files changed

+113
-7
lines changed

.github/workflows/build_and_test.yml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,3 +135,28 @@ jobs:
135135
./start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.5
136136
cd -
137137
swift test --no-parallel -c release
138+
139+
integration-test-mac-iceberg:
140+
runs-on: macos-15
141+
env:
142+
SPARK_ICEBERG_TEST_ENABLED: "true"
143+
steps:
144+
- uses: actions/checkout@v4
145+
- uses: swift-actions/setup-swift@d10500c1ac8822132eebbd74c48c3372c71d7ff5
146+
with:
147+
swift-version: "6.1"
148+
- name: Install Java
149+
uses: actions/setup-java@v4
150+
with:
151+
distribution: zulu
152+
java-version: 17
153+
- name: Test
154+
run: |
155+
curl -LO https://downloads.apache.org/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz
156+
tar xvfz spark-3.5.5-bin-hadoop3.tgz
157+
mv spark-3.5.5-bin-hadoop3 /tmp/spark
158+
cd /tmp/spark/sbin
159+
./start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.5,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.0 -c spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog -c spark.sql.catalog.local.type=hadoop -c spark.sql.catalog.local.warehouse=/tmp/spark/warehouse -c spark.sql.defaultCatalog=local
160+
cd -
161+
swift test --filter DataFrameWriterV2Tests -c release
162+
swift test --filter IcebergTest -c release

Sources/SparkConnect/DataFrameWriterV2.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public actor DataFrameWriterV2: Sendable {
108108
/// Overwrite rows matching the given filter condition with the contents of the ``DataFrame`` in the
109109
/// output table.
110110
/// - Parameter condition: A filter condition.
111-
public func overwrite(condition: String) async throws {
111+
public func overwrite(_ condition: String) async throws {
112112
try await executeWriteOperation(.overwrite, condition)
113113
}
114114

Tests/SparkConnectTests/DataFrameWriterV2Tests.swift

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import Testing
2424
/// A test suite for `DataFrameWriterV2`
2525
@Suite(.serialized)
2626
struct DataFrameWriterV2Tests {
27+
let icebergEnabled = ProcessInfo.processInfo.environment["SPARK_ICEBERG_TEST_ENABLED"] != nil
2728

2829
@Test
2930
func create() async throws {
@@ -48,9 +49,12 @@ struct DataFrameWriterV2Tests {
4849
let write = try await spark.range(2).writeTo(tableName).using("orc")
4950
try await write.create()
5051
#expect(try await spark.table(tableName).count() == 2)
51-
// TODO: Use Iceberg to verify success case after Iceberg supports Apache Spark 4
52-
try await #require(throws: Error.self) {
52+
if icebergEnabled {
5353
try await write.createOrReplace()
54+
} else {
55+
try await #require(throws: Error.self) {
56+
try await write.createOrReplace()
57+
}
5458
}
5559
})
5660
await spark.stop()
@@ -64,9 +68,12 @@ struct DataFrameWriterV2Tests {
6468
let write = try await spark.range(2).writeTo(tableName).using("orc")
6569
try await write.create()
6670
#expect(try await spark.table(tableName).count() == 2)
67-
// TODO: Use Iceberg to verify success case after Iceberg supports Apache Spark 4
68-
try await #require(throws: Error.self) {
71+
if icebergEnabled {
6972
try await write.replace()
73+
} else {
74+
try await #require(throws: Error.self) {
75+
try await write.replace()
76+
}
7077
}
7178
})
7279
await spark.stop()
@@ -80,9 +87,12 @@ struct DataFrameWriterV2Tests {
8087
let write = try await spark.range(2).writeTo(tableName).using("orc")
8188
try await write.create()
8289
#expect(try await spark.table(tableName).count() == 2)
83-
// TODO: Use Iceberg to verify success case after Iceberg supports Apache Spark 4
84-
try await #require(throws: Error.self) {
90+
if icebergEnabled {
8591
try await write.append()
92+
} else {
93+
try await #require(throws: Error.self) {
94+
try await write.append()
95+
}
8696
}
8797
})
8898
await spark.stop()
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
//
2+
// Licensed to the Apache Software Foundation (ASF) under one
3+
// or more contributor license agreements. See the NOTICE file
4+
// distributed with this work for additional information
5+
// regarding copyright ownership. The ASF licenses this file
6+
// to you under the Apache License, Version 2.0 (the
7+
// "License"); you may not use this file except in compliance
8+
// with the License. You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing,
13+
// software distributed under the License is distributed on an
14+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
// KIND, either express or implied. See the License for the
16+
// specific language governing permissions and limitations
17+
// under the License.
18+
//
19+
20+
import Foundation
21+
import Testing
22+
23+
import SparkConnect
24+
25+
/// A test suite for `Apache Iceberg` integration
26+
@Suite(.serialized)
27+
struct IcebergTests {
28+
let ICEBERG_DATABASE = "local.db"
29+
let icebergEnabled = ProcessInfo.processInfo.environment["SPARK_ICEBERG_TEST_ENABLED"] != nil
30+
31+
@Test
32+
func test() async throws {
33+
guard icebergEnabled else { return }
34+
let t1 = "\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
35+
let t2 = "\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
36+
37+
let spark = try await SparkSession.builder.getOrCreate()
38+
39+
try await SQLHelper.withTable(spark, t1, t2)({
40+
try await spark.sql("CREATE TABLE \(t1) (id BIGINT, data STRING) USING ICEBERG").count()
41+
try await spark.sql("CREATE TABLE \(t2) (id BIGINT, data STRING) USING ICEBERG").count()
42+
43+
#expect(try await spark.catalog.tableExists(t1))
44+
#expect(try await spark.catalog.tableExists(t2))
45+
46+
#expect(try await spark.table(t1).count() == 0)
47+
#expect(try await spark.table(t2).count() == 0)
48+
49+
try await spark.sql("INSERT INTO \(t1) VALUES (1, 'a'), (2, 'b'), (3, 'c')").count()
50+
#expect(try await spark.table(t1).count() == 3)
51+
#expect(try await spark.table(t2).count() == 0)
52+
53+
try await spark.table(t1).writeTo(t2).append()
54+
#expect(try await spark.table(t2).count() == 3)
55+
56+
try await spark.table(t1).writeTo(t2).append()
57+
#expect(try await spark.table(t2).count() == 6)
58+
59+
try await spark.table(t1).writeTo(t2).replace()
60+
#expect(try await spark.table(t2).count() == 3)
61+
62+
try await spark.table(t1).writeTo(t2).overwrite("true")
63+
#expect(try await spark.table(t2).count() == 3)
64+
65+
try await spark.table(t1).writeTo(t2).overwrite("false")
66+
#expect(try await spark.table(t2).count() == 6)
67+
})
68+
69+
await spark.stop()
70+
}
71+
}

0 commit comments

Comments
 (0)