Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Sources/SparkConnect/Catalog.swift
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public actor Catalog: Sendable {
catalog.catType = .setCurrentCatalog(setCurrentCatalog)
return catalog
})
_ = try await df.count()
try await df.count()
}

/// Returns a list of catalogs in this session.
Expand Down Expand Up @@ -156,7 +156,7 @@ public actor Catalog: Sendable {
catalog.catType = .setCurrentDatabase(setCurrentDatabase)
return catalog
})
_ = try await df.count()
try await df.count()
}

/// Returns a list of databases available across all sessions.
Expand Down
3 changes: 3 additions & 0 deletions Sources/SparkConnect/DataFrame.swift
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public actor DataFrame: Sendable {

/// Return the total number of rows.
/// - Returns: a `Int64` value.
@discardableResult
public func count() async throws -> Int64 {
let counter = Atomic(Int64(0))

Expand Down Expand Up @@ -440,6 +441,7 @@ public actor DataFrame: Sendable {

/// Persist this `DataFrame` with the given storage level.
/// - Parameter storageLevel: A storage level to apply.
@discardableResult
public func persist(storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK) async throws
-> DataFrame
{
Expand All @@ -456,6 +458,7 @@ public actor DataFrame: Sendable {
/// This will not un-persist any cached data that is built upon this `DataFrame`.
/// - Parameter blocking: Whether to block until all blocks are deleted.
/// - Returns: A `DataFrame`
@discardableResult
public func unpersist(blocking: Bool = false) async throws -> DataFrame {
try await withGPRC { client in
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
Expand Down
3 changes: 2 additions & 1 deletion Sources/SparkConnect/DataFrameReader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ public actor DataFrameReader: Sendable {
/// the schema inference step, and thus speed up data loading.
/// - Parameter schema: A DDL schema string.
/// - Returns: A `DataFrameReader`.
@discardableResult
public func schema(_ schema: String) async throws -> DataFrameReader {
// Validate by parsing.
do {
_ = try await sparkSession.client.ddlParse(schema)
try await sparkSession.client.ddlParse(schema)
} catch {
throw SparkConnectError.InvalidTypeException
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/SparkConnect/DataFrameWriter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public actor DataFrameWriter: Sendable {
var command = Spark_Connect_Command()
command.writeOperation = write

_ = try await df.spark.client.execute(df.spark.sessionID, command)
try await df.spark.client.execute(df.spark.sessionID, command)
}

/// Saves the content of the `DataFrame` in CSV format at the specified path.
Expand Down
2 changes: 1 addition & 1 deletion Sources/SparkConnect/DataFrameWriterV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,6 @@ public actor DataFrameWriterV2: Sendable {

var command = Spark_Connect_Command()
command.writeOperationV2 = write
_ = try await df.spark.client.execute(df.spark.sessionID, command)
try await df.spark.client.execute(df.spark.sessionID, command)
}
}
4 changes: 2 additions & 2 deletions Sources/SparkConnect/RuntimeConf.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ public actor RuntimeConf {
/// - key: A string for the configuration key.
/// - value: A string for the configuration value.
public func set(_ key: String, _ value: String) async throws {
_ = try await client.setConf(map: [key: value])
try await client.setConf(map: [key: value])
}

/// Reset a configuration.
/// - Parameters:
/// - key: A string for the configuration key.
public func unset(_ key: String) async throws {
_ = try await client.unsetConf(keys: [key])
try await client.unsetConf(keys: [key])
}

/// Get a configuration.
Expand Down
12 changes: 10 additions & 2 deletions Sources/SparkConnect/SparkConnectClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public actor SparkConnectClient {
/// As a test connection, this sends the server `SparkVersion` request.
/// - Parameter sessionID: A string for the session ID.
/// - Returns: An `AnalyzePlanResponse` instance for `SparkVersion`
@discardableResult
func connect(_ sessionID: String) async throws -> AnalyzePlanResponse {
try await withGPRC { client in
// To prevent server-side `INVALID_HANDLE.FORMAT (SQLSTATE: HY000)` exception.
Expand Down Expand Up @@ -137,14 +138,15 @@ public actor SparkConnectClient {
/// Request the server to set a map of configurations for this session.
/// - Parameter map: A map of key-value pairs to set.
/// - Returns: Always return true.
@discardableResult
func setConf(map: [String: String]) async throws -> Bool {
try await withGPRC { client in
let service = SparkConnectService.Client(wrapping: client)
var request = getConfigRequestSet(map: map)
request.clientType = clientType
request.userContext = userContext
request.sessionID = self.sessionID!
let _ = try await service.config(request)
_ = try await service.config(request)
return true
}
}
Expand All @@ -160,7 +162,11 @@ public actor SparkConnectClient {
request.operation.opType = .unset(unset)
return request
}


/// Request the server to unset keys
/// - Parameter keys: An array of keys
/// - Returns: Always return true
@discardableResult
func unsetConf(keys: [String]) async throws -> Bool {
try await withGPRC { client in
let service = SparkConnectService.Client(wrapping: client)
Expand Down Expand Up @@ -509,6 +515,7 @@ public actor SparkConnectClient {
self.result.append(response)
}

@discardableResult
func execute(_ sessionID: String, _ command: Command) async throws -> [ExecutePlanResponse] {
self.result.removeAll()
try await withGPRC { client in
Expand Down Expand Up @@ -555,6 +562,7 @@ public actor SparkConnectClient {
/// Parse a DDL string to ``Spark_Connect_DataType`` instance.
/// - Parameter ddlString: A string to parse.
/// - Returns: A ``Spark_Connect_DataType`` instance.
@discardableResult
func ddlParse(_ ddlString: String) async throws -> Spark_Connect_DataType {
try await withGPRC { client in
let service = SparkConnectService.Client(wrapping: client)
Expand Down
2 changes: 1 addition & 1 deletion Sources/SparkConnect/SparkFileUtils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public enum SparkFileUtils {
static func createDirectory(root: String, namePrefix: String = "spark") -> URL {
let tempDir = URL(fileURLWithPath: root).appendingPathComponent(
"\(namePrefix)-\(UUID().uuidString)")
_ = createDirectory(at: tempDir)
createDirectory(at: tempDir)
return tempDir
}

Expand Down
1 change: 1 addition & 0 deletions Sources/SparkConnect/SparkSession.swift
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ public actor SparkSession {

/// Remove all stored configurations.
/// - Returns: self
@discardableResult
func clear() -> Builder {
sparkConf.removeAll()
return self
Expand Down
2 changes: 1 addition & 1 deletion Tests/SparkConnectTests/BuilderTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct BuilderTests {
// Don't try to connect
let builder = await SparkSession.builder.remote("sc://spark:1234")
#expect(await builder.sparkConf["spark.remote"] == "sc://spark:1234")
_ = await builder.clear()
await builder.clear()
}

@Test
Expand Down
2 changes: 1 addition & 1 deletion Tests/SparkConnectTests/CatalogTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ struct CatalogTests {
let dbName = "DB_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
#expect(try await spark.catalog.databaseExists(dbName) == false)
try await SQLHelper.withDatabase(spark, dbName) ({
_ = try await spark.sql("CREATE DATABASE \(dbName)").count()
try await spark.sql("CREATE DATABASE \(dbName)").count()
#expect(try await spark.catalog.databaseExists(dbName))
})
#expect(try await spark.catalog.databaseExists(dbName) == false)
Expand Down
6 changes: 3 additions & 3 deletions Tests/SparkConnectTests/DataFrameReaderTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ struct DataFrameReaderTests {
let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
let spark = try await SparkSession.builder.getOrCreate()
try await SQLHelper.withTable(spark, tableName)({
_ = try await spark.sql("CREATE TABLE \(tableName) USING ORC AS VALUES (1), (2)").count()
try await spark.sql("CREATE TABLE \(tableName) USING ORC AS VALUES (1), (2)").count()
#expect(try await spark.read.table(tableName).count() == 2)
})
await spark.stop()
Expand All @@ -103,10 +103,10 @@ struct DataFrameReaderTests {
func invalidSchema() async throws {
let spark = try await SparkSession.builder.getOrCreate()
await #expect(throws: SparkConnectError.InvalidTypeException) {
_ = try await spark.read.schema("invalid-name SHORT")
try await spark.read.schema("invalid-name SHORT")
}
await #expect(throws: SparkConnectError.InvalidTypeException) {
_ = try await spark.read.schema("age UNKNOWN_TYPE")
try await spark.read.schema("age UNKNOWN_TYPE")
}
await spark.stop()
}
Expand Down
14 changes: 7 additions & 7 deletions Tests/SparkConnectTests/DataFrameTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,10 @@ struct DataFrameTests {
func selectInvalidColumn() async throws {
let spark = try await SparkSession.builder.getOrCreate()
try await #require(throws: Error.self) {
let _ = try await spark.range(1).select("invalid").schema
try await spark.range(1).select("invalid").schema
}
try await #require(throws: Error.self) {
let _ = try await spark.range(1).select("id + 1").schema
try await spark.range(1).select("id + 1").schema
}
await spark.stop()
}
Expand Down Expand Up @@ -447,7 +447,7 @@ struct DataFrameTests {
try await #require(throws: Error.self) {
var invalidLevel = StorageLevel.DISK_ONLY
invalidLevel.replication = 0
let _ = try await spark.range(9999).persist(storageLevel: invalidLevel).count()
try await spark.range(9999).persist(storageLevel: invalidLevel).count()
}
await spark.stop()
}
Expand Down Expand Up @@ -707,14 +707,14 @@ struct DataFrameTests {
let spark = try await SparkSession.builder.getOrCreate()
let df = try await spark.range(1)

_ = try await df.unpersist()
try await df.unpersist()
#expect(try await df.storageLevel == StorageLevel.NONE)
_ = try await df.persist()
try await df.persist()
#expect(try await df.storageLevel == StorageLevel.MEMORY_AND_DISK)

_ = try await df.unpersist()
try await df.unpersist()
#expect(try await df.storageLevel == StorageLevel.NONE)
_ = try await df.persist(storageLevel: StorageLevel.MEMORY_ONLY)
try await df.persist(storageLevel: StorageLevel.MEMORY_ONLY)
#expect(try await df.storageLevel == StorageLevel.MEMORY_ONLY)

await spark.stop()
Expand Down
8 changes: 4 additions & 4 deletions Tests/SparkConnectTests/RuntimeConfTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ struct RuntimeConfTests {
@Test
func get() async throws {
let client = SparkConnectClient(remote: TEST_REMOTE)
_ = try await client.connect(UUID().uuidString)
try await client.connect(UUID().uuidString)
let conf = RuntimeConf(client)

#expect(try await !conf.get("spark.app.name").isEmpty)
Expand All @@ -45,7 +45,7 @@ struct RuntimeConfTests {
@Test
func set() async throws {
let client = SparkConnectClient(remote: TEST_REMOTE)
_ = try await client.connect(UUID().uuidString)
try await client.connect(UUID().uuidString)
let conf = RuntimeConf(client)
try await conf.set("spark.test.key1", "value1")
#expect(try await conf.get("spark.test.key1") == "value1")
Expand All @@ -55,7 +55,7 @@ struct RuntimeConfTests {
@Test
func reset() async throws {
let client = SparkConnectClient(remote: TEST_REMOTE)
_ = try await client.connect(UUID().uuidString)
try await client.connect(UUID().uuidString)
let conf = RuntimeConf(client)

// Success with a key that doesn't exist
Expand All @@ -76,7 +76,7 @@ struct RuntimeConfTests {
@Test
func getAll() async throws {
let client = SparkConnectClient(remote: TEST_REMOTE)
_ = try await client.connect(UUID().uuidString)
try await client.connect(UUID().uuidString)
let conf = RuntimeConf(client)
let map = try await conf.getAll()
#expect(map.count > 0)
Expand Down
4 changes: 2 additions & 2 deletions Tests/SparkConnectTests/SQLHelper.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ struct SQLHelper {
f,
{
for name in dbNames {
_ = try await spark.sql("DROP DATABASE IF EXISTS \(name) CASCADE").count()
try await spark.sql("DROP DATABASE IF EXISTS \(name) CASCADE").count()
}
})
}
Expand All @@ -47,7 +47,7 @@ struct SQLHelper {
f,
{
for name in tableNames {
_ = try await spark.sql("DROP TABLE IF EXISTS \(name)").count()
try await spark.sql("DROP TABLE IF EXISTS \(name)").count()
}
})
}
Expand Down
10 changes: 5 additions & 5 deletions Tests/SparkConnectTests/SparkConnectClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,22 @@ struct SparkConnectClientTests {
func connectWithInvalidUUID() async throws {
let client = SparkConnectClient(remote: TEST_REMOTE)
try await #require(throws: SparkConnectError.InvalidSessionIDException) {
let _ = try await client.connect("not-a-uuid-format")
try await client.connect("not-a-uuid-format")
}
await client.stop()
}

@Test
func connect() async throws {
let client = SparkConnectClient(remote: TEST_REMOTE)
let _ = try await client.connect(UUID().uuidString)
try await client.connect(UUID().uuidString)
await client.stop()
}

@Test
func tags() async throws {
let client = SparkConnectClient(remote: TEST_REMOTE)
let _ = try await client.connect(UUID().uuidString)
try await client.connect(UUID().uuidString)
let plan = await client.getPlanRange(0, 1, 1)

#expect(await client.getExecutePlanRequest(plan).tags.isEmpty)
Expand All @@ -79,7 +79,7 @@ struct SparkConnectClientTests {
@Test
func ddlParse() async throws {
let client = SparkConnectClient(remote: TEST_REMOTE)
let _ = try await client.connect(UUID().uuidString)
try await client.connect(UUID().uuidString)
#expect(try await client.ddlParse("a int").simpleString == "struct<a:int>")
await client.stop()
}
Expand All @@ -91,7 +91,7 @@ struct SparkConnectClientTests {
let response = try await client.connect(UUID().uuidString)
if response.sparkVersion.version.starts(with: "4.") {
let json =
#"{"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}}]}"#
#"{"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}}]}"#
#expect(try await client.jsonToDdl(json) == "id BIGINT NOT NULL")
}
await client.stop()
Expand Down
2 changes: 1 addition & 1 deletion Tests/SparkConnectTests/SparkSessionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ struct SparkSessionTests {
let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
let spark = try await SparkSession.builder.getOrCreate()
try await SQLHelper.withTable(spark, tableName)({
_ = try await spark.sql("CREATE TABLE \(tableName) USING ORC AS VALUES (1), (2)").count()
try await spark.sql("CREATE TABLE \(tableName) USING ORC AS VALUES (1), (2)").count()
#expect(try await spark.table(tableName).count() == 2)
})
await spark.stop()
Expand Down
Loading