diff --git a/Sources/SparkConnect/RuntimeConf.swift b/Sources/SparkConnect/RuntimeConf.swift index b0ff45c..f096e42 100644 --- a/Sources/SparkConnect/RuntimeConf.swift +++ b/Sources/SparkConnect/RuntimeConf.swift @@ -35,6 +35,22 @@ public actor RuntimeConf { try await client.setConf(map: [key: value]) } + /// Set a new configuration. + /// - Parameters: + /// - key: A string for the configuration key. + /// - value: A boolean value for the configuration. + public func set(_ key: String, _ value: Bool) async throws { + try await client.setConf(map: [key: String(value)]) + } + + /// Set a new configuration. + /// - Parameters: + /// - key: A string for the configuration key. + /// - value: A Int64 value for the configuration. + public func set(_ key: String, _ value: Int64) async throws { + try await client.setConf(map: [key: String(value)]) + } + /// Reset a configuration. /// - Parameters: /// - key: A string for the configuration key. @@ -42,16 +58,46 @@ public actor RuntimeConf { try await client.unsetConf(keys: [key]) } - /// Get a configuration. + /// Returns the value of Spark runtime configuration property for the given key. If the key is + /// not set yet, return its default value if possible, otherwise `NoSuchElementException` will be + /// thrown. /// - Parameter key: A string for the configuration look-up. /// - Returns: A string for the configuration. public func get(_ key: String) async throws -> String { return try await client.getConf(key) } + /// Returns the value of Spark runtime configuration property for the given key. If the key is + /// not set yet, return the user given `value`. This is useful when its default value defined + /// by Apache Spark is not the desired one. + /// - Parameters: + /// - key: A string for the configuration key. + /// - value: A default string value for the configuration. + public func get(_ key: String, _ value: String) async throws -> String { + return try await client.getConfWithDefault(key, value) + } + /// Get all configurations. /// - Returns: A map of configuration key-values. public func getAll() async throws -> [String: String] { return try await client.getConfAll() } + + /// Returns the value of Spark runtime configuration property for the given key. If the key is + /// not set yet, return its default value if possible, otherwise `nil` will be returned. + /// - Parameter key: A string for the configuration look-up. + /// - Returns: A string for the configuration or nil. + public func getOption(_ key: String) async throws -> String? { + return try await client.getConfOption(key) + } + + /// Indicates whether the configuration property with the given key is modifiable in the current + /// session. + /// - Parameter key: A string for the configuration look-up. + /// - Returns: `true` if the configuration property is modifiable. For static SQL, Spark Core, invalid + /// (not existing) and other non-modifiable configuration properties, the returned value is + /// `false`. + public func isModifiable(_ key: String) async throws -> Bool { + return try await client.isModifiable(key) + } } diff --git a/Sources/SparkConnect/SparkConnectClient.swift b/Sources/SparkConnect/SparkConnectClient.swift index 82f350d..309b5a0 100644 --- a/Sources/SparkConnect/SparkConnectClient.swift +++ b/Sources/SparkConnect/SparkConnectClient.swift @@ -206,6 +206,73 @@ public actor SparkConnectClient { } } + /// Create a ``ConfigRequest`` instance for `GetWithDefault` operation. + /// - Parameter pairs: A key-value dictionary. + /// - Returns: A `ConfigRequest` instance. + func getConfigRequestGetWithDefault(_ pairs: [String: String]) -> ConfigRequest { + var request = ConfigRequest() + request.operation = ConfigRequest.Operation() + var getWithDefault = ConfigRequest.GetWithDefault() + getWithDefault.pairs = pairs.toSparkConnectKeyValue + request.operation.opType = .getWithDefault(getWithDefault) + return request + } + + /// Returns the value of Spark runtime configuration property for the given key. If the key is + /// not set yet, return the user given `value`. This is useful when its default value defined + /// by Apache Spark is not the desired one. + /// - Parameters: + /// - key: A string for the configuration key. + /// - value: A default value for the configuration. + func getConfWithDefault(_ key: String, _ value: String) async throws -> String { + try await withGPRC { client in + let service = SparkConnectService.Client(wrapping: client) + var request = getConfigRequestGetWithDefault([key: value]) + request.clientType = clientType + request.userContext = userContext + request.sessionID = self.sessionID! + let response = try await service.config(request) + let result = if response.pairs[0].hasValue { + response.pairs[0].value + } else { + value + } + return result + } + } + + /// Create a ``ConfigRequest`` instance for `GetOption` operation. + /// - Parameter keys: An array of keys to get. + /// - Returns: A `ConfigRequest` instance. + func getConfigRequestGetOption(_ keys: [String]) -> ConfigRequest { + var request = ConfigRequest() + request.operation = ConfigRequest.Operation() + var getOption = ConfigRequest.GetOption() + getOption.keys = keys + request.operation.opType = .getOption(getOption) + return request + } + + /// Request the server to get a value of the given key. + /// - Parameter key: A string for key to look up. + /// - Returns: A string or nil for the value of the key. + func getConfOption(_ key: String) async throws -> String? { + try await withGPRC { client in + let service = SparkConnectService.Client(wrapping: client) + var request = getConfigRequestGetOption([key]) + request.clientType = clientType + request.userContext = userContext + request.sessionID = self.sessionID! + let response = try await service.config(request) + let result: String? = if response.pairs[0].hasValue { + response.pairs[0].value + } else { + nil + } + return result + } + } + /// Create a ``ConfigRequest`` for `GetAll` operation. /// - Returns: A `ConfigRequest` instance. func getConfigRequestGetAll() -> ConfigRequest { @@ -234,6 +301,35 @@ public actor SparkConnectClient { } } + /// Create a ``ConfigRequest`` for `IsModifiable` operation. + /// - Returns: A `ConfigRequest` instance. + func getConfigRequestIsModifiable(_ keys: [String]) -> ConfigRequest { + var request = ConfigRequest() + request.operation = ConfigRequest.Operation() + var isModifiable = ConfigRequest.IsModifiable() + isModifiable.keys = keys + request.operation.opType = .isModifiable(isModifiable) + return request + } + + /// Indicates whether the configuration property with the given key is modifiable in the current + /// session. + /// - Parameter key: A string for the configuration look-up. + /// - Returns: `true` if the configuration property is modifiable. For static SQL, Spark Core, invalid + /// (not existing) and other non-modifiable configuration properties, the returned value is + /// `false`. + func isModifiable(_ key: String) async throws -> Bool { + try await withGPRC { client in + let service = SparkConnectService.Client(wrapping: client) + var request = getConfigRequestIsModifiable([key]) + request.clientType = clientType + request.userContext = userContext + request.sessionID = self.sessionID! + let response = try await service.config(request) + return response.pairs[0].value == "true" + } + } + func getLocalRelation() -> Plan { var localRelation = Spark_Connect_LocalRelation() localRelation.schema = "" diff --git a/Tests/SparkConnectTests/RuntimeConfTests.swift b/Tests/SparkConnectTests/RuntimeConfTests.swift index 127dd3c..53f3813 100644 --- a/Tests/SparkConnectTests/RuntimeConfTests.swift +++ b/Tests/SparkConnectTests/RuntimeConfTests.swift @@ -42,6 +42,26 @@ struct RuntimeConfTests { await client.stop() } + @Test + func getWithDefault() async throws { + let client = SparkConnectClient(remote: TEST_REMOTE) + try await client.connect(UUID().uuidString) + let conf = RuntimeConf(client) + #expect(try await conf.get("spark.sql.adaptive.customCostEvaluatorClass", "XYZ") == "XYZ") + #expect(try await conf.get("spark.test.non-exist", "my_default") == "my_default") + await client.stop() + } + + @Test + func getOption() async throws { + let client = SparkConnectClient(remote: TEST_REMOTE) + try await client.connect(UUID().uuidString) + let conf = RuntimeConf(client) + #expect(try await conf.getOption("spark.app.name") != nil) + #expect(try await conf.getOption("spark.test.non-exist") == nil) + await client.stop() + } + @Test func set() async throws { let client = SparkConnectClient(remote: TEST_REMOTE) @@ -86,4 +106,15 @@ struct RuntimeConfTests { #expect(map["spark.master"] != nil) await client.stop() } + + @Test + func isModifiable() async throws { + let client = SparkConnectClient(remote: TEST_REMOTE) + try await client.connect(UUID().uuidString) + let conf = RuntimeConf(client) + #expect(try await conf.isModifiable("spark.sql.adaptive.customCostEvaluatorClass")) + #expect(try await conf.isModifiable("spark.sql.warehouse.dir") == false) + #expect(try await conf.isModifiable("spark.test.non-exist") == false) + await client.stop() + } }