Skip to content

Commit 5c3c00f

Browse files
committed
[SPARK-52068] Add StreamingQuery actor
### What changes were proposed in this pull request? This PR aims to add `StreamingQuery` actor. ### Why are the changes needed? To implement `StreamingQueryManager`, we need this first. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #125 from dongjoon-hyun/SPARK-52068. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent a9b267e commit 5c3c00f

File tree

4 files changed

+250
-0
lines changed

4 files changed

+250
-0
lines changed

Sources/SparkConnect/SparkConnectClient.swift

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -981,6 +981,22 @@ public actor SparkConnectClient {
981981
try await execute(self.sessionID!, command)
982982
}
983983

984+
func executeStreamingQueryCommand(
985+
_ id: String,
986+
_ runID: String,
987+
_ command: StreamingQueryCommand.OneOf_Command,
988+
) async throws -> [ExecutePlanResponse] {
989+
var queryID = Spark_Connect_StreamingQueryInstanceId()
990+
queryID.id = id
991+
queryID.runID = runID
992+
var streamingQueryCommand = Spark_Connect_StreamingQueryCommand()
993+
streamingQueryCommand.queryID = queryID
994+
streamingQueryCommand.command = command
995+
var command = Spark_Connect_Command()
996+
command.streamingQueryCommand = streamingQueryCommand
997+
return try await execute(self.sessionID!, command)
998+
}
999+
9841000
private enum URIParams {
9851001
static let PARAM_GRPC_MAX_MESSAGE_SIZE = "grpc_max_message_size"
9861002
static let PARAM_SESSION_ID = "session_id"
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
//
2+
// Licensed to the Apache Software Foundation (ASF) under one
3+
// or more contributor license agreements. See the NOTICE file
4+
// distributed with this work for additional information
5+
// regarding copyright ownership. The ASF licenses this file
6+
// to you under the Apache License, Version 2.0 (the
7+
// "License"); you may not use this file except in compliance
8+
// with the License. You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing,
13+
// software distributed under the License is distributed on an
14+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
// KIND, either express or implied. See the License for the
16+
// specific language governing permissions and limitations
17+
// under the License.
18+
//
19+
import Foundation
20+
21+
public struct StreamingQueryException: Sendable {
22+
let exceptionMessage: String
23+
let errorClass: String
24+
let stackTrace: String
25+
}
26+
27+
public struct StreamingQueryStatus: Sendable {
28+
let statusMessage: String
29+
let isDataAvailable: Bool
30+
let isTriggerActive: Bool
31+
let isActive: Bool
32+
}
33+
34+
/// A handle to a query that is executing continuously in the background as new data arrives.
35+
public actor StreamingQuery: Sendable {
36+
/// Returns the unique id of this query that persists across restarts from checkpoint data. That
37+
/// is, this id is generated when a query is started for the first time, and will be the same
38+
/// every time it is restarted from checkpoint data. Also see ``runId``.
39+
public let id: UUID
40+
41+
/// Returns the unique id of this run of the query. That is, every start/restart of a query will
42+
/// generate a unique runId. Therefore, every time a query is restarted from checkpoint, it will
43+
/// have the same ``id`` but different ``runId``s.
44+
public let runId: UUID
45+
46+
/// Returns the user-specified name of the query, or null if not specified. This name can be
47+
/// specified in the `org.apache.spark.sql.streaming.DataStreamWriter` as
48+
/// `dataframe.writeStream.queryName("query").start()`. This name, if set, must be unique across
49+
/// all active queries.
50+
public let name: String
51+
52+
/// Returns the `SparkSession` associated with `this`.
53+
public let sparkSession: SparkSession
54+
55+
init(_ id: UUID, _ runId: UUID, _ name: String, _ sparkSession: SparkSession) {
56+
self.id = id
57+
self.runId = runId
58+
self.name = name
59+
self.sparkSession = sparkSession
60+
}
61+
62+
@discardableResult
63+
private func executeCommand(
64+
_ command: StreamingQueryCommand.OneOf_Command
65+
) async throws -> [ExecutePlanResponse] {
66+
return try await self.sparkSession.client.executeStreamingQueryCommand(
67+
self.id.uuidString,
68+
self.runId.uuidString,
69+
command
70+
)
71+
}
72+
73+
/// Returns `true` if this query is actively running.
74+
public var isActive: Bool {
75+
get async throws {
76+
let response = try await executeCommand(StreamingQueryCommand.OneOf_Command.status(true))
77+
return response.first!.streamingQueryCommandResult.status.isActive
78+
}
79+
}
80+
81+
/// Returns the ``StreamingQueryException`` if the query was terminated by an exception.
82+
/// - Returns: A ``StreamingQueryException``.
83+
public func exception() async throws -> StreamingQueryException? {
84+
let response = try await executeCommand(StreamingQueryCommand.OneOf_Command.exception(true))
85+
let result = response.first!.streamingQueryCommandResult.exception
86+
return StreamingQueryException(
87+
exceptionMessage: result.exceptionMessage,
88+
errorClass: result.errorClass,
89+
stackTrace: result.stackTrace,
90+
)
91+
}
92+
93+
/// Returns the current status of the query.
94+
/// - Returns:
95+
public func status() async throws -> StreamingQueryStatus {
96+
let response = try await executeCommand(StreamingQueryCommand.OneOf_Command.status(true))
97+
let result = response.first!.streamingQueryCommandResult.status
98+
return StreamingQueryStatus(
99+
statusMessage: result.statusMessage,
100+
isDataAvailable: result.isDataAvailable,
101+
isTriggerActive: result.isTriggerActive,
102+
isActive: result.isActive,
103+
)
104+
}
105+
106+
/// Returns an array of the most recent ``StreamingQueryProgress`` updates for this query.
107+
/// The number of progress updates retained for each stream is configured by Spark session
108+
/// configuration `spark.sql.streaming.numRecentProgressUpdates`.
109+
public var recentProgress: [String] {
110+
get async throws {
111+
let response = try await executeCommand(
112+
StreamingQueryCommand.OneOf_Command.recentProgress(true))
113+
let result = response.first!.streamingQueryCommandResult.recentProgress
114+
return result.recentProgressJson
115+
}
116+
}
117+
118+
/// Returns the most recent ``StreamingQueryProgress`` update of this streaming query.
119+
public var lastProgress: String? {
120+
get async throws {
121+
let response = try await executeCommand(
122+
StreamingQueryCommand.OneOf_Command.lastProgress(true))
123+
let result = response.first!.streamingQueryCommandResult.recentProgress
124+
return result.recentProgressJson.first
125+
}
126+
}
127+
128+
/// Waits for the termination of `this` query, either by `query.stop()` or by an exception.
129+
/// If the query has terminated with an exception, then the exception will be thrown.
130+
///
131+
/// If the query has terminated, then all subsequent calls to this method will either return
132+
/// immediately (if the query was terminated by `stop()`), or throw the exception immediately
133+
/// (if the query has terminated with exception).
134+
/// - Parameter timeout: A timeout in milliseconds.
135+
/// - Returns: True on termination.
136+
public func awaitTermination(_ timeoutMs: Int64? = nil) async throws -> Bool? {
137+
var command = Spark_Connect_StreamingQueryCommand.AwaitTerminationCommand()
138+
if let timeoutMs {
139+
command.timeoutMs = timeoutMs
140+
}
141+
let response = try await executeCommand(
142+
StreamingQueryCommand.OneOf_Command.awaitTermination(command))
143+
return response.first!.streamingQueryCommandResult.awaitTermination.terminated
144+
}
145+
146+
/// Blocks until all available data in the source has been processed and committed to the sink.
147+
///
148+
/// This method is intended for testing. Note that in the case of continually arriving data, this
149+
/// method may block forever. Additionally, this method is only guaranteed to block until data
150+
/// that has been synchronously appended data to a
151+
/// `org.apache.spark.sql.execution.streaming.Source` prior to invocation.
152+
/// (i.e. `getOffset` must immediately reflect the addition).
153+
public func processAllAvailable() async throws {
154+
try await executeCommand(StreamingQueryCommand.OneOf_Command.processAllAvailable(true))
155+
}
156+
157+
/// Stops the execution of this query if it is running. This waits until the termination of the
158+
/// query execution threads or until a timeout is hit.
159+
///
160+
/// By default stop will block indefinitely. You can configure a timeout by the configuration
161+
/// `spark.sql.streaming.stopTimeout`. A timeout of 0 (or negative) milliseconds will block
162+
/// indefinitely. If a `TimeoutException` is thrown, users can retry stopping the stream. If the
163+
/// issue persists, it is advisable to kill the Spark application.
164+
public func stop() async throws {
165+
try await executeCommand(StreamingQueryCommand.OneOf_Command.stop(true))
166+
}
167+
168+
/// Prints the physical plan to the console for debugging purposes.
169+
/// - Parameter extended: Whether to do extended explain or not.
170+
public func explain(_ extended: Bool = false) async throws {
171+
var command = Spark_Connect_StreamingQueryCommand.ExplainCommand()
172+
command.extended = extended
173+
let response = try await executeCommand(StreamingQueryCommand.OneOf_Command.explain(command))
174+
print(response.first!.streamingQueryCommandResult.explain.result)
175+
}
176+
}

Sources/SparkConnect/TypeAliases.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ typealias SetOpType = SetOperation.SetOpType
5555
typealias ShowString = Spark_Connect_ShowString
5656
typealias SparkConnectService = Spark_Connect_SparkConnectService
5757
typealias Sort = Spark_Connect_Sort
58+
typealias StreamingQueryCommand = Spark_Connect_StreamingQueryCommand
5859
typealias StructType = Spark_Connect_DataType.Struct
5960
typealias Tail = Spark_Connect_Tail
6061
typealias UserContext = Spark_Connect_UserContext
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
//
2+
// Licensed to the Apache Software Foundation (ASF) under one
3+
// or more contributor license agreements. See the NOTICE file
4+
// distributed with this work for additional information
5+
// regarding copyright ownership. The ASF licenses this file
6+
// to you under the Apache License, Version 2.0 (the
7+
// "License"); you may not use this file except in compliance
8+
// with the License. You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing,
13+
// software distributed under the License is distributed on an
14+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
// KIND, either express or implied. See the License for the
16+
// specific language governing permissions and limitations
17+
// under the License.
18+
//
19+
20+
import Foundation
21+
import Testing
22+
23+
@testable import SparkConnect
24+
25+
/// A test suite for `StreamingQuery`
26+
@Suite(.serialized)
27+
struct StreamingQueryTests {
28+
29+
@Test
30+
func create() async throws {
31+
let spark = try await SparkSession.builder.getOrCreate()
32+
let id = UUID()
33+
let runId = UUID()
34+
let query = StreamingQuery(id, runId, "name", spark)
35+
#expect(await query.id == id)
36+
#expect(await query.runId == runId)
37+
#expect(await query.name == "name")
38+
39+
// Streaming query xxx is not found
40+
try await #require(throws: Error.self) {
41+
try await query.isActive
42+
}
43+
try await #require(throws: Error.self) {
44+
try await query.recentProgress
45+
}
46+
try await #require(throws: Error.self) {
47+
try await query.lastProgress
48+
}
49+
try await #require(throws: Error.self) {
50+
try await query.awaitTermination()
51+
}
52+
try await #require(throws: Error.self) {
53+
try await query.awaitTermination(1000)
54+
}
55+
await spark.stop()
56+
}
57+
}

0 commit comments

Comments
 (0)