diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 5f8ab34..927d4d2 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -135,3 +135,28 @@ jobs: ./start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.5 cd - swift test --no-parallel -c release + + integration-test-mac-iceberg: + runs-on: macos-15 + env: + SPARK_ICEBERG_TEST_ENABLED: "true" + steps: + - uses: actions/checkout@v4 + - uses: swift-actions/setup-swift@d10500c1ac8822132eebbd74c48c3372c71d7ff5 + with: + swift-version: "6.1" + - name: Install Java + uses: actions/setup-java@v4 + with: + distribution: zulu + java-version: 17 + - name: Test + run: | + curl -LO https://downloads.apache.org/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz + tar xvfz spark-3.5.5-bin-hadoop3.tgz + mv spark-3.5.5-bin-hadoop3 /tmp/spark + cd /tmp/spark/sbin + ./start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.5,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.0 -c spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog -c spark.sql.catalog.local.type=hadoop -c spark.sql.catalog.local.warehouse=/tmp/spark/warehouse -c spark.sql.defaultCatalog=local + cd - + swift test --filter DataFrameWriterV2Tests -c release + swift test --filter IcebergTest -c release diff --git a/Sources/SparkConnect/DataFrameWriterV2.swift b/Sources/SparkConnect/DataFrameWriterV2.swift index 6deac16..3a2b6af 100644 --- a/Sources/SparkConnect/DataFrameWriterV2.swift +++ b/Sources/SparkConnect/DataFrameWriterV2.swift @@ -108,7 +108,7 @@ public actor DataFrameWriterV2: Sendable { /// Overwrite rows matching the given filter condition with the contents of the ``DataFrame`` in the /// output table. /// - Parameter condition: A filter condition. - public func overwrite(condition: String) async throws { + public func overwrite(_ condition: String) async throws { try await executeWriteOperation(.overwrite, condition) } diff --git a/Tests/SparkConnectTests/DataFrameWriterV2Tests.swift b/Tests/SparkConnectTests/DataFrameWriterV2Tests.swift index ab3252f..938caa8 100644 --- a/Tests/SparkConnectTests/DataFrameWriterV2Tests.swift +++ b/Tests/SparkConnectTests/DataFrameWriterV2Tests.swift @@ -24,6 +24,7 @@ import Testing /// A test suite for `DataFrameWriterV2` @Suite(.serialized) struct DataFrameWriterV2Tests { + let icebergEnabled = ProcessInfo.processInfo.environment["SPARK_ICEBERG_TEST_ENABLED"] != nil @Test func create() async throws { @@ -48,9 +49,12 @@ struct DataFrameWriterV2Tests { let write = try await spark.range(2).writeTo(tableName).using("orc") try await write.create() #expect(try await spark.table(tableName).count() == 2) - // TODO: Use Iceberg to verify success case after Iceberg supports Apache Spark 4 - try await #require(throws: Error.self) { + if icebergEnabled { try await write.createOrReplace() + } else { + try await #require(throws: Error.self) { + try await write.createOrReplace() + } } }) await spark.stop() @@ -64,9 +68,12 @@ struct DataFrameWriterV2Tests { let write = try await spark.range(2).writeTo(tableName).using("orc") try await write.create() #expect(try await spark.table(tableName).count() == 2) - // TODO: Use Iceberg to verify success case after Iceberg supports Apache Spark 4 - try await #require(throws: Error.self) { + if icebergEnabled { try await write.replace() + } else { + try await #require(throws: Error.self) { + try await write.replace() + } } }) await spark.stop() @@ -80,9 +87,12 @@ struct DataFrameWriterV2Tests { let write = try await spark.range(2).writeTo(tableName).using("orc") try await write.create() #expect(try await spark.table(tableName).count() == 2) - // TODO: Use Iceberg to verify success case after Iceberg supports Apache Spark 4 - try await #require(throws: Error.self) { + if icebergEnabled { try await write.append() + } else { + try await #require(throws: Error.self) { + try await write.append() + } } }) await spark.stop() diff --git a/Tests/SparkConnectTests/IcebergTests.swift b/Tests/SparkConnectTests/IcebergTests.swift new file mode 100644 index 0000000..ef3be51 --- /dev/null +++ b/Tests/SparkConnectTests/IcebergTests.swift @@ -0,0 +1,71 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +import Foundation +import Testing + +import SparkConnect + +/// A test suite for `Apache Iceberg` integration +@Suite(.serialized) +struct IcebergTests { + let ICEBERG_DATABASE = "local.db" + let icebergEnabled = ProcessInfo.processInfo.environment["SPARK_ICEBERG_TEST_ENABLED"] != nil + + @Test + func test() async throws { + guard icebergEnabled else { return } + let t1 = "\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") + let t2 = "\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "") + + let spark = try await SparkSession.builder.getOrCreate() + + try await SQLHelper.withTable(spark, t1, t2)({ + try await spark.sql("CREATE TABLE \(t1) (id BIGINT, data STRING) USING ICEBERG").count() + try await spark.sql("CREATE TABLE \(t2) (id BIGINT, data STRING) USING ICEBERG").count() + + #expect(try await spark.catalog.tableExists(t1)) + #expect(try await spark.catalog.tableExists(t2)) + + #expect(try await spark.table(t1).count() == 0) + #expect(try await spark.table(t2).count() == 0) + + try await spark.sql("INSERT INTO \(t1) VALUES (1, 'a'), (2, 'b'), (3, 'c')").count() + #expect(try await spark.table(t1).count() == 3) + #expect(try await spark.table(t2).count() == 0) + + try await spark.table(t1).writeTo(t2).append() + #expect(try await spark.table(t2).count() == 3) + + try await spark.table(t1).writeTo(t2).append() + #expect(try await spark.table(t2).count() == 6) + + try await spark.table(t1).writeTo(t2).replace() + #expect(try await spark.table(t2).count() == 3) + + try await spark.table(t1).writeTo(t2).overwrite("true") + #expect(try await spark.table(t2).count() == 3) + + try await spark.table(t1).writeTo(t2).overwrite("false") + #expect(try await spark.table(t2).count() == 6) + }) + + await spark.stop() + } +}