Skip to content

Commit 2eeefda

Browse files
committed
[SPARK-51852] Support SPARK_CONNECT_AUTHENTICATE_TOKEN
### What changes were proposed in this pull request? This PR aims to support `SPARK_CONNECT_AUTHENTICATE_TOKEN`. ### Why are the changes needed? To provide a secure connection between Spark Connect server and `Swift` client. ### Does this PR introduce _any_ user-facing change? No, this will enable to connect to the secured server. ### How was this patch tested? Pass the CIs - Unsecured environment with the existing test pipelines. - Secured environment with the newly added `integration-test-token` test pipeline. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #74 from dongjoon-hyun/SPARK-51852. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 1169bea commit 2eeefda

File tree

4 files changed

+77
-3
lines changed

4 files changed

+77
-3
lines changed

.github/workflows/build_and_test.yml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,3 +88,21 @@ jobs:
8888
./start-connect-server.sh
8989
cd ../..
9090
swift test --no-parallel
91+
92+
integration-test-token:
93+
runs-on: macos-15
94+
env:
95+
SPARK_CONNECT_AUTHENTICATE_TOKEN: ${{ github.run_id }}-${{ github.run_attempt }}
96+
steps:
97+
- uses: actions/checkout@v4
98+
- uses: swift-actions/[email protected]
99+
with:
100+
swift-version: "6.1"
101+
- name: Test
102+
run: |
103+
curl -LO https://dist.apache.org/repos/dist/dev/spark/v4.0.0-rc4-bin/spark-4.0.0-bin-hadoop3.tgz
104+
tar xvfz spark-4.0.0-bin-hadoop3.tgz
105+
cd spark-4.0.0-bin-hadoop3/sbin
106+
./start-connect-server.sh
107+
cd ../..
108+
swift test --no-parallel
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
//
2+
// Licensed to the Apache Software Foundation (ASF) under one
3+
// or more contributor license agreements. See the NOTICE file
4+
// distributed with this work for additional information
5+
// regarding copyright ownership. The ASF licenses this file
6+
// to you under the Apache License, Version 2.0 (the
7+
// "License"); you may not use this file except in compliance
8+
// with the License. You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing,
13+
// software distributed under the License is distributed on an
14+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
// KIND, either express or implied. See the License for the
16+
// specific language governing permissions and limitations
17+
// under the License.
18+
//
19+
20+
import Foundation
21+
import GRPCCore
22+
23+
struct BearerTokenInterceptor: ClientInterceptor {
24+
let token: String
25+
26+
init(token: String) {
27+
self.token = token
28+
}
29+
30+
func intercept<Input: Sendable, Output: Sendable>(
31+
request: StreamingClientRequest<Input>,
32+
context: ClientContext,
33+
next: (
34+
_ request: StreamingClientRequest<Input>,
35+
_ context: ClientContext
36+
) async throws -> StreamingClientResponse<Output>
37+
) async throws -> StreamingClientResponse<Output> {
38+
var request = request
39+
request.metadata.addString("Bearer \(self.token)", forKey: "Authorization")
40+
41+
// Forward the request to the next interceptor.
42+
return try await next(request, context)
43+
}
44+
}

Sources/SparkConnect/DataFrame.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ public actor DataFrame: Sendable {
117117
transport: .http2NIOPosix(
118118
target: .dns(host: spark.client.host, port: spark.client.port),
119119
transportSecurity: .plaintext
120-
)
120+
),
121+
interceptors: spark.client.getIntercepters()
121122
) { client in
122123
return try await f(client)
123124
}

Sources/SparkConnect/SparkConnectClient.swift

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ public actor SparkConnectClient {
2828
let url: URL
2929
let host: String
3030
let port: Int
31+
let token: String?
32+
var intercepters: [ClientInterceptor] = []
3133
let userContext: UserContext
3234
var sessionID: String? = nil
3335
var tags = Set<String>()
@@ -36,10 +38,14 @@ public actor SparkConnectClient {
3638
/// - Parameters:
3739
/// - remote: A string to connect `Spark Connect` server.
3840
/// - user: A string for the user ID of this connection.
39-
init(remote: String, user: String) {
41+
init(remote: String, user: String, token: String? = nil) {
4042
self.url = URL(string: remote)!
4143
self.host = url.host() ?? "localhost"
4244
self.port = self.url.port ?? 15002
45+
self.token = token ?? ProcessInfo.processInfo.environment["SPARK_CONNECT_AUTHENTICATE_TOKEN"]
46+
if let token = self.token {
47+
self.intercepters.append(BearerTokenInterceptor(token: token))
48+
}
4349
self.userContext = user.toUserContext
4450
}
4551

@@ -75,12 +81,17 @@ public actor SparkConnectClient {
7581
transport: .http2NIOPosix(
7682
target: .dns(host: self.host, port: self.port),
7783
transportSecurity: .plaintext
78-
)
84+
),
85+
interceptors: self.intercepters
7986
) { client in
8087
return try await f(client)
8188
}
8289
}
8390

91+
public func getIntercepters() -> [ClientInterceptor] {
92+
return self.intercepters
93+
}
94+
8495
/// Create a ``ConfigRequest`` instance for `Set` operation.
8596
/// - Parameter map: A map of key-value string pairs.
8697
/// - Returns: A ``ConfigRequest`` instance.

0 commit comments

Comments
 (0)