Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 94 additions & 4 deletions Tests/SparkConnectTests/IcebergTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
//

import Foundation
import Testing

import SparkConnect
import Testing

/// A test suite for `Apache Iceberg` integration
@Suite(.serialized)
Expand All @@ -31,8 +30,10 @@ struct IcebergTests {
@Test
func test() async throws {
guard icebergEnabled else { return }
let t1 = "\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
let t2 = "\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
let t1 =
"\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
let t2 =
"\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")

let spark = try await SparkSession.builder.getOrCreate()

Expand All @@ -56,14 +57,103 @@ struct IcebergTests {
try await spark.table(t1).writeTo(t2).append()
#expect(try await spark.table(t2).count() == 6)

try await spark.sql("INSERT INTO \(t2) VALUES (1, 'a'), (2, 'b'), (3, 'c')").count()
#expect(try await spark.table(t2).count() == 9)

try await spark.table(t1).writeTo(t2).replace()
#expect(try await spark.table(t2).count() == 3)

try await spark.sql("INSERT OVERWRITE \(t2) VALUES (1, 'a'), (2, 'b'), (3, 'c')").count()
#expect(try await spark.table(t2).count() == 3)

try await spark.sql("DELETE FROM \(t2) WHERE id = 1").count()
#expect(try await spark.table(t2).count() == 2)

try await spark.sql("UPDATE \(t2) SET data = 'new' WHERE id = 2").count()
#expect(try await spark.sql("SELECT * FROM \(t2) WHERE data = 'new'").count() == 1)

try await spark.table(t1).writeTo(t2).overwrite("true")
#expect(try await spark.table(t2).count() == 3)

try await spark.table(t1).writeTo(t2).overwrite("false")
#expect(try await spark.table(t2).count() == 6)

try await spark.sql("INSERT OVERWRITE \(t2) VALUES (1, 'a')").count()
#expect(try await spark.table(t2).count() == 1)

try await spark.table(t1).writeTo(t2).overwrite("id = 1")
#expect(try await spark.table(t2).count() == 3)
})

await spark.stop()
}

@Test
func partition() async throws {
guard icebergEnabled else { return }
let t1 =
"\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
let t2 =
"\(ICEBERG_DATABASE).TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")

let spark = try await SparkSession.builder.getOrCreate()

try await SQLHelper.withTable(spark, t1, t2)({
try await spark.sql(
"""
CREATE TABLE \(t1) (
id BIGINT,
data STRING,
category STRING)
USING ICEBERG
PARTITIONED BY (category)
"""
).count()
try await spark.sql(
"""
CREATE TABLE \(t2) (
id BIGINT,
data STRING,
category STRING)
USING ICEBERG
PARTITIONED BY (category)
"""
).count()

#expect(try await spark.catalog.tableExists(t1))
#expect(try await spark.catalog.tableExists(t2))

#expect(try await spark.table(t1).count() == 0)
#expect(try await spark.table(t2).count() == 0)

try await spark.sql("INSERT INTO \(t1) VALUES (1, 'a', 'A'), (2, 'b', 'B'), (3, 'c', 'C')")
.count()
#expect(try await spark.table(t1).count() == 3)
#expect(try await spark.table(t2).count() == 0)

try await spark.table(t1).writeTo(t2).append()
#expect(try await spark.table(t2).count() == 3)

try await spark.table(t1).writeTo(t2).replace()
#expect(try await spark.table(t2).count() == 3)

try await spark.sql("DELETE FROM \(t2) WHERE id = 1").count()
#expect(try await spark.table(t2).count() == 2)

try await spark.sql("UPDATE \(t2) SET data = 'new' WHERE id = 2").count()
#expect(try await spark.sql("SELECT * FROM \(t2) WHERE data = 'new'").count() == 1)

try await spark.table(t1).writeTo(t2).overwritePartitions()
#expect(try await spark.table(t2).count() == 3)

try await spark.sql("INSERT OVERWRITE \(t2) SELECT * FROM \(t1)").count()
#expect(try await spark.table(t2).count() == 3)

try await spark.sql("INSERT OVERWRITE \(t2) SELECT * FROM \(t1) WHERE category = 'C'").count()
#expect(try await spark.table(t2).count() == 1)

try await spark.table(t1).writeTo(t2).overwrite("category = 'C'")
#expect(try await spark.table(t2).count() == 3)
})

await spark.stop()
Expand Down
Loading