Skip to content

Commit 18584e8

Browse files
committed
[SPARK-51481] Add RuntimeConf actor
### What changes were proposed in this pull request? This PR aims to add `RuntimeConf` actor. ```mermaid classDiagram class RuntimeConf { -client: SparkConnectClient +init(client: SparkConnectClient) +set(key: String, value: String) void +unset(key: String) void +get(key: String) String +getAll() [String: String] } ``` ### Why are the changes needed? This is required before adding `SparkSession`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #9 from dongjoon-hyun/SPARK-51481. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent c405943 commit 18584e8

File tree

3 files changed

+173
-0
lines changed

3 files changed

+173
-0
lines changed
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
//
2+
// Licensed to the Apache Software Foundation (ASF) under one
3+
// or more contributor license agreements. See the NOTICE file
4+
// distributed with this work for additional information
5+
// regarding copyright ownership. The ASF licenses this file
6+
// to you under the Apache License, Version 2.0 (the
7+
// "License"); you may not use this file except in compliance
8+
// with the License. You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing,
13+
// software distributed under the License is distributed on an
14+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
// KIND, either express or implied. See the License for the
16+
// specific language governing permissions and limitations
17+
// under the License.
18+
//
19+
20+
/// User-facing configuration API, accessible through `SparkSession.conf`.
21+
public actor RuntimeConf {
22+
private let client: SparkConnectClient
23+
24+
/// Create a `RuntimeConf` instance with the given client.
25+
/// - Parameter client: A client to talk to the Spark Connect server.
26+
init(_ client: SparkConnectClient) {
27+
self.client = client
28+
}
29+
30+
/// Set a new configuration.
31+
/// - Parameters:
32+
/// - key: A string for the configuration key.
33+
/// - value: A string for the configuration value.
34+
public func set(_ key: String, _ value: String) async throws {
35+
_ = try await client.setConf(map: [key: value])
36+
}
37+
38+
/// Reset a configuration.
39+
/// - Parameters:
40+
/// - key: A string for the configuration key.
41+
public func unset(_ key: String) async throws {
42+
_ = try await client.unsetConf(keys: [key])
43+
}
44+
45+
/// Get a configuration.
46+
/// - Parameter key: A string for the configuration look-up.
47+
/// - Returns: A string for the configuration.
48+
public func get(_ key: String) async throws -> String {
49+
return try await client.getConf(key)
50+
}
51+
52+
/// Get all configurations.
53+
/// - Returns: A map of configuration key-values.
54+
public func getAll() async throws -> [String: String] {
55+
return try await client.getConfAll()
56+
}
57+
}

Sources/SparkConnect/SparkConnectClient.swift

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,35 @@ public actor SparkConnectClient {
107107
}
108108
}
109109

110+
/// Create a ``ConfigRequest`` instance for `Unset` operation.
111+
/// - Parameter key: A string for key to unset.
112+
/// - Returns: A ``ConfigRequest`` instance.
113+
func getConfigRequestUnset(keys: [String]) -> ConfigRequest {
114+
var request = ConfigRequest()
115+
request.operation = ConfigRequest.Operation()
116+
var unset = ConfigRequest.Unset()
117+
unset.keys = keys
118+
request.operation.opType = .unset(unset)
119+
return request
120+
}
121+
122+
func unsetConf(keys: [String]) async throws -> Bool {
123+
try await withGRPCClient(
124+
transport: .http2NIOPosix(
125+
target: .dns(host: self.host, port: self.port),
126+
transportSecurity: .plaintext
127+
)
128+
) { client in
129+
let service = SparkConnectService.Client(wrapping: client)
130+
var request = getConfigRequestUnset(keys: keys)
131+
request.clientType = clientType
132+
request.userContext = userContext
133+
request.sessionID = self.sessionID!
134+
let _ = try await service.config(request)
135+
return true
136+
}
137+
}
138+
110139
/// Create a ``ConfigRequest`` instance for `Get` operation.
111140
/// - Parameter keys: An array of keys to get.
112141
/// - Returns: A `ConfigRequest` instance.
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
//
2+
// Licensed to the Apache Software Foundation (ASF) under one
3+
// or more contributor license agreements. See the NOTICE file
4+
// distributed with this work for additional information
5+
// regarding copyright ownership. The ASF licenses this file
6+
// to you under the Apache License, Version 2.0 (the
7+
// "License"); you may not use this file except in compliance
8+
// with the License. You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing,
13+
// software distributed under the License is distributed on an
14+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
// KIND, either express or implied. See the License for the
16+
// specific language governing permissions and limitations
17+
// under the License.
18+
//
19+
20+
import Foundation
21+
import Testing
22+
23+
@testable import SparkConnect
24+
25+
/// A test suite for `RuntimeConf`
26+
@Suite(.serialized)
27+
struct RuntimeConfTests {
28+
@Test
29+
func get() async throws {
30+
let client = SparkConnectClient(remote: "sc://localhost", user: "test")
31+
_ = try await client.connect(UUID().uuidString)
32+
let conf = RuntimeConf(client)
33+
34+
#expect(try await conf.get("spark.app.name") == "Spark Connect server")
35+
36+
try await #require(throws: Error.self) {
37+
try await conf.get("spark.test.non-exist")
38+
}
39+
40+
await client.stop()
41+
}
42+
43+
@Test
44+
func set() async throws {
45+
let client = SparkConnectClient(remote: "sc://localhost", user: "test")
46+
_ = try await client.connect(UUID().uuidString)
47+
let conf = RuntimeConf(client)
48+
try await conf.set("spark.test.key1", "value1")
49+
#expect(try await conf.get("spark.test.key1") == "value1")
50+
await client.stop()
51+
}
52+
53+
@Test
54+
func reset() async throws {
55+
let client = SparkConnectClient(remote: "sc://localhost", user: "test")
56+
_ = try await client.connect(UUID().uuidString)
57+
let conf = RuntimeConf(client)
58+
59+
// Success with a key that doesn't exist
60+
try await conf.unset("spark.test.key1")
61+
62+
// Make it sure that `spark.test.key1` exists before testing `reset`.
63+
try await conf.set("spark.test.key1", "value1")
64+
#expect(try await conf.get("spark.test.key1") == "value1")
65+
66+
try await conf.unset("spark.test.key1")
67+
try await #require(throws: Error.self) {
68+
try await conf.get("spark.test.key1")
69+
}
70+
71+
await client.stop()
72+
}
73+
74+
@Test
75+
func getAll() async throws {
76+
let client = SparkConnectClient(remote: "sc://localhost", user: "test")
77+
_ = try await client.connect(UUID().uuidString)
78+
let conf = RuntimeConf(client)
79+
let map = try await conf.getAll()
80+
#expect(map.count > 0)
81+
#expect(map["spark.app.id"] != nil)
82+
#expect(map["spark.app.startTime"] != nil)
83+
#expect(map["spark.executor.id"] == "driver")
84+
#expect(map["spark.master"] != nil)
85+
await client.stop()
86+
}
87+
}

0 commit comments

Comments
 (0)