Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion Sources/SparkConnect/RuntimeConf.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,69 @@ public actor RuntimeConf {
try await client.setConf(map: [key: value])
}

/// Set a new configuration.
/// - Parameters:
/// - key: A string for the configuration key.
/// - value: A boolean value for the configuration.
public func set(_ key: String, _ value: Bool) async throws {
try await client.setConf(map: [key: String(value)])
}

/// Set a new configuration.
/// - Parameters:
/// - key: A string for the configuration key.
/// - value: A Int64 value for the configuration.
public func set(_ key: String, _ value: Int64) async throws {
try await client.setConf(map: [key: String(value)])
}

/// Reset a configuration.
/// - Parameters:
/// - key: A string for the configuration key.
public func unset(_ key: String) async throws {
try await client.unsetConf(keys: [key])
}

/// Get a configuration.
/// Returns the value of Spark runtime configuration property for the given key. If the key is
/// not set yet, return its default value if possible, otherwise `NoSuchElementException` will be
/// thrown.
/// - Parameter key: A string for the configuration look-up.
/// - Returns: A string for the configuration.
public func get(_ key: String) async throws -> String {
return try await client.getConf(key)
}

/// Returns the value of Spark runtime configuration property for the given key. If the key is
/// not set yet, return the user given `value`. This is useful when its default value defined
/// by Apache Spark is not the desired one.
/// - Parameters:
/// - key: A string for the configuration key.
/// - value: A default string value for the configuration.
public func get(_ key: String, _ value: String) async throws -> String {
return try await client.getConfWithDefault(key, value)
}

/// Get all configurations.
/// - Returns: A map of configuration key-values.
public func getAll() async throws -> [String: String] {
return try await client.getConfAll()
}

/// Returns the value of Spark runtime configuration property for the given key. If the key is
/// not set yet, return its default value if possible, otherwise `nil` will be returned.
/// - Parameter key: A string for the configuration look-up.
/// - Returns: A string for the configuration or nil.
public func getOption(_ key: String) async throws -> String? {
return try await client.getConfOption(key)
}

/// Indicates whether the configuration property with the given key is modifiable in the current
/// session.
/// - Parameter key: A string for the configuration look-up.
/// - Returns: `true` if the configuration property is modifiable. For static SQL, Spark Core, invalid
/// (not existing) and other non-modifiable configuration properties, the returned value is
/// `false`.
public func isModifiable(_ key: String) async throws -> Bool {
return try await client.isModifiable(key)
}
}
96 changes: 96 additions & 0 deletions Sources/SparkConnect/SparkConnectClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,73 @@ public actor SparkConnectClient {
}
}

/// Create a ``ConfigRequest`` instance for `GetWithDefault` operation.
/// - Parameter pairs: A key-value dictionary.
/// - Returns: A `ConfigRequest` instance.
func getConfigRequestGetWithDefault(_ pairs: [String: String]) -> ConfigRequest {
var request = ConfigRequest()
request.operation = ConfigRequest.Operation()
var getWithDefault = ConfigRequest.GetWithDefault()
getWithDefault.pairs = pairs.toSparkConnectKeyValue
request.operation.opType = .getWithDefault(getWithDefault)
return request
}

/// Returns the value of Spark runtime configuration property for the given key. If the key is
/// not set yet, return the user given `value`. This is useful when its default value defined
/// by Apache Spark is not the desired one.
/// - Parameters:
/// - key: A string for the configuration key.
/// - value: A default value for the configuration.
func getConfWithDefault(_ key: String, _ value: String) async throws -> String {
try await withGPRC { client in
let service = SparkConnectService.Client(wrapping: client)
var request = getConfigRequestGetWithDefault([key: value])
request.clientType = clientType
request.userContext = userContext
request.sessionID = self.sessionID!
let response = try await service.config(request)
let result = if response.pairs[0].hasValue {
response.pairs[0].value
} else {
value
}
return result
}
}

/// Create a ``ConfigRequest`` instance for `GetOption` operation.
/// - Parameter keys: An array of keys to get.
/// - Returns: A `ConfigRequest` instance.
func getConfigRequestGetOption(_ keys: [String]) -> ConfigRequest {
var request = ConfigRequest()
request.operation = ConfigRequest.Operation()
var getOption = ConfigRequest.GetOption()
getOption.keys = keys
request.operation.opType = .getOption(getOption)
return request
}

/// Request the server to get a value of the given key.
/// - Parameter key: A string for key to look up.
/// - Returns: A string or nil for the value of the key.
func getConfOption(_ key: String) async throws -> String? {
try await withGPRC { client in
let service = SparkConnectService.Client(wrapping: client)
var request = getConfigRequestGetOption([key])
request.clientType = clientType
request.userContext = userContext
request.sessionID = self.sessionID!
let response = try await service.config(request)
let result: String? = if response.pairs[0].hasValue {
response.pairs[0].value
} else {
nil
}
return result
}
}

/// Create a ``ConfigRequest`` for `GetAll` operation.
/// - Returns: A `ConfigRequest` instance.
func getConfigRequestGetAll() -> ConfigRequest {
Expand Down Expand Up @@ -234,6 +301,35 @@ public actor SparkConnectClient {
}
}

/// Create a ``ConfigRequest`` for `IsModifiable` operation.
/// - Returns: A `ConfigRequest` instance.
func getConfigRequestIsModifiable(_ keys: [String]) -> ConfigRequest {
var request = ConfigRequest()
request.operation = ConfigRequest.Operation()
var isModifiable = ConfigRequest.IsModifiable()
isModifiable.keys = keys
request.operation.opType = .isModifiable(isModifiable)
return request
}

/// Indicates whether the configuration property with the given key is modifiable in the current
/// session.
/// - Parameter key: A string for the configuration look-up.
/// - Returns: `true` if the configuration property is modifiable. For static SQL, Spark Core, invalid
/// (not existing) and other non-modifiable configuration properties, the returned value is
/// `false`.
func isModifiable(_ key: String) async throws -> Bool {
try await withGPRC { client in
let service = SparkConnectService.Client(wrapping: client)
var request = getConfigRequestIsModifiable([key])
request.clientType = clientType
request.userContext = userContext
request.sessionID = self.sessionID!
let response = try await service.config(request)
return response.pairs[0].value == "true"
}
}

func getLocalRelation() -> Plan {
var localRelation = Spark_Connect_LocalRelation()
localRelation.schema = ""
Expand Down
31 changes: 31 additions & 0 deletions Tests/SparkConnectTests/RuntimeConfTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,26 @@ struct RuntimeConfTests {
await client.stop()
}

@Test
func getWithDefault() async throws {
let client = SparkConnectClient(remote: TEST_REMOTE)
try await client.connect(UUID().uuidString)
let conf = RuntimeConf(client)
#expect(try await conf.get("spark.sql.adaptive.customCostEvaluatorClass", "XYZ") == "XYZ")
#expect(try await conf.get("spark.test.non-exist", "my_default") == "my_default")
await client.stop()
}

@Test
func getOption() async throws {
let client = SparkConnectClient(remote: TEST_REMOTE)
try await client.connect(UUID().uuidString)
let conf = RuntimeConf(client)
#expect(try await conf.getOption("spark.app.name") != nil)
#expect(try await conf.getOption("spark.test.non-exist") == nil)
await client.stop()
}

@Test
func set() async throws {
let client = SparkConnectClient(remote: TEST_REMOTE)
Expand Down Expand Up @@ -86,4 +106,15 @@ struct RuntimeConfTests {
#expect(map["spark.master"] != nil)
await client.stop()
}

@Test
func isModifiable() async throws {
let client = SparkConnectClient(remote: TEST_REMOTE)
try await client.connect(UUID().uuidString)
let conf = RuntimeConf(client)
#expect(try await conf.isModifiable("spark.sql.adaptive.customCostEvaluatorClass"))
#expect(try await conf.isModifiable("spark.sql.warehouse.dir") == false)
#expect(try await conf.isModifiable("spark.test.non-exist") == false)
await client.stop()
}
}
Loading