Skip to content

Commit e010c80

Browse files
committed
[SPARK-52089] Support StreamingQueryManager
### What changes were proposed in this pull request? This PR aims to support `StreamingQueryManager`. ### Why are the changes needed? For feature parity. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #137 from dongjoon-hyun/SPARK-52089. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 6819b42 commit e010c80

File tree

5 files changed

+301
-2
lines changed

5 files changed

+301
-2
lines changed

Sources/SparkConnect/SparkConnectClient.swift

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -986,17 +986,27 @@ public actor SparkConnectClient {
986986
_ runID: String,
987987
_ command: StreamingQueryCommand.OneOf_Command
988988
) async throws -> [ExecutePlanResponse] {
989-
var queryID = Spark_Connect_StreamingQueryInstanceId()
989+
var queryID = StreamingQueryInstanceId()
990990
queryID.id = id
991991
queryID.runID = runID
992-
var streamingQueryCommand = Spark_Connect_StreamingQueryCommand()
992+
var streamingQueryCommand = StreamingQueryCommand()
993993
streamingQueryCommand.queryID = queryID
994994
streamingQueryCommand.command = command
995995
var command = Spark_Connect_Command()
996996
command.streamingQueryCommand = streamingQueryCommand
997997
return try await execute(self.sessionID!, command)
998998
}
999999

1000+
func executeStreamingQueryManagerCommand(
1001+
_ command: StreamingQueryManagerCommand.OneOf_Command
1002+
) async throws -> [ExecutePlanResponse] {
1003+
var streamingQueryManagerCommand = StreamingQueryManagerCommand()
1004+
streamingQueryManagerCommand.command = command
1005+
var command = Spark_Connect_Command()
1006+
command.streamingQueryManagerCommand = streamingQueryManagerCommand
1007+
return try await execute(self.sessionID!, command)
1008+
}
1009+
10001010
private enum URIParams {
10011011
static let PARAM_GRPC_MAX_MESSAGE_SIZE = "grpc_max_message_size"
10021012
static let PARAM_SESSION_ID = "session_id"

Sources/SparkConnect/SparkSession.swift

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,14 @@ public actor SparkSession {
374374
return try await client.semanticHash(plan)
375375
}
376376

377+
/// Returns a `StreamingQueryManager` that allows managing all the `StreamingQuery`s active on
378+
/// `this`.
379+
public var streams: StreamingQueryManager {
380+
get {
381+
StreamingQueryManager(self)
382+
}
383+
}
384+
377385
/// This is defined as the return type of `SparkSession.sparkContext` method.
378386
/// This is an empty `Struct` type because `sparkContext` method is designed to throw
379387
/// `UNSUPPORTED_CONNECT_FEATURE.SESSION_SPARK_CONTEXT`.
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
//
2+
// Licensed to the Apache Software Foundation (ASF) under one
3+
// or more contributor license agreements. See the NOTICE file
4+
// distributed with this work for additional information
5+
// regarding copyright ownership. The ASF licenses this file
6+
// to you under the Apache License, Version 2.0 (the
7+
// "License"); you may not use this file except in compliance
8+
// with the License. You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing,
13+
// software distributed under the License is distributed on an
14+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
// KIND, either express or implied. See the License for the
16+
// specific language governing permissions and limitations
17+
// under the License.
18+
//
19+
import Foundation
20+
21+
/// Information about progress made for a source in the execution of a ``StreamingQuery``
22+
/// during a trigger. See ``StreamingQueryProgress`` for more information.
23+
public struct SourceProgress: Sendable {
24+
let description: String
25+
let startOffset: String
26+
let endOffset: String
27+
let latestOffset: String
28+
let numInputRows: Int64
29+
let inputRowsPerSecond: Double
30+
let processedRowsPerSecond: Double
31+
let metrics: [String: String] = [:]
32+
}
33+
34+
/// Information about progress made for a sink in the execution of a ``StreamingQuery``
35+
/// during a trigger. See ``StreamingQueryProgress`` for more information.
36+
public struct SinkProgress: Sendable {
37+
let description: String
38+
let numOutputRows: Int64
39+
let metrics: [String: String] = [:]
40+
41+
init(_ description: String) {
42+
self.description = description
43+
self.numOutputRows = -1
44+
}
45+
46+
init(_ description: String, _ numOutputRows: Int64) {
47+
self.description = description
48+
self.numOutputRows = numOutputRows
49+
}
50+
}
51+
52+
/// Information about updates made to stateful operators in a ``StreamingQuery``
53+
/// during a trigger. See ``StreamingQueryProgress`` for more information.
54+
public struct StateOperatorProgress: Sendable {
55+
let operatorName: String
56+
let numRowsTotal: Int64
57+
let numRowsUpdated: Int64
58+
let allUpdatesTimeMs: Int64
59+
let numRowsRemoved: Int64
60+
let allRemovalsTimeMs: Int64
61+
let commitTimeMs: Int64
62+
let memoryUsedBytes: Int64
63+
let numRowsDroppedByWatermark: Int64
64+
let numShufflePartitions: Int64
65+
let numStateStoreInstances: Int64
66+
let customMetrics: [String: Int64]
67+
}
68+
69+
/// Information about progress made in the execution of a ``StreamingQuery``
70+
/// during a trigger. Each event relates to processing done for a single trigger of
71+
/// the streaming query. Events are emitted even when no new data is available to be processed.
72+
public struct StreamingQueryProcess {
73+
let id: UUID
74+
let runId: UUID
75+
let name: String
76+
let timestamp: String
77+
let batchId: Int64
78+
let batchDuration: Int64
79+
let durationMs: [String: Int64]
80+
let eventTime: [String: String]
81+
let stateOperators: [StateOperatorProgress]
82+
let sources: [SourceProgress]
83+
let sink: SinkProgress
84+
let observedMetrics: [String: Row]
85+
86+
func numInputRows() -> Int64 {
87+
return sources.map { $0.numInputRows }.reduce(0, +)
88+
}
89+
90+
func inputRowsPerSecond() -> Double {
91+
return sources.map { $0.inputRowsPerSecond }.reduce(0, +)
92+
}
93+
94+
func processedRowsPerSecond() -> Double {
95+
return sources.map { $0.processedRowsPerSecond }.reduce(0, +)
96+
}
97+
}
98+
99+
/// A class to manage all the ``StreamingQuery``s active in a ``SparkSession``.
100+
public actor StreamingQueryManager {
101+
let sparkSession: SparkSession
102+
103+
init(_ sparkSession: SparkSession) {
104+
self.sparkSession = sparkSession
105+
}
106+
107+
/// Returns a list of active queries associated with this SQLContext
108+
public var active: [StreamingQuery] {
109+
get async throws {
110+
let command = StreamingQueryManagerCommand.OneOf_Command.active(true)
111+
let response = try await self.sparkSession.client.executeStreamingQueryManagerCommand(command)
112+
return response.first!.streamingQueryManagerCommandResult.active.activeQueries.map {
113+
StreamingQuery(
114+
UUID(uuidString: $0.id.id)!,
115+
UUID(uuidString: $0.id.runID)!,
116+
$0.name,
117+
self.sparkSession
118+
)
119+
}
120+
}
121+
}
122+
123+
/// Returns the query if there is an active query with the given id, or null.
124+
/// - Parameter id: an UUID.
125+
/// - Returns: A ``StreamingQuery``.
126+
public func get(_ id: UUID) async throws -> StreamingQuery {
127+
return try await get(id.uuidString)
128+
}
129+
130+
/// Returns the query if there is an active query with the given id, or null.
131+
/// - Parameter id: an UUID String
132+
/// - Returns: A ``StreamingQuery``.
133+
public func get(_ id: String) async throws -> StreamingQuery {
134+
let command = StreamingQueryManagerCommand.OneOf_Command.getQuery(id)
135+
let response = try await self.sparkSession.client.executeStreamingQueryManagerCommand(command)
136+
let query = response.first!.streamingQueryManagerCommandResult.query
137+
guard query.hasID else {
138+
throw SparkConnectError.InvalidArgumentException
139+
}
140+
return StreamingQuery(
141+
UUID(uuidString: query.id.id)!,
142+
UUID(uuidString: query.id.runID)!,
143+
query.name,
144+
self.sparkSession
145+
)
146+
}
147+
148+
/// Wait until any of the queries on the associated SQLContext has terminated since the creation
149+
/// of the context, or since `resetTerminated()` was called. If any query was terminated with an
150+
/// exception, then the exception will be thrown.
151+
/// - Parameter timeoutMs: A timeout in milliseconds.
152+
@discardableResult
153+
public func awaitAnyTermination(_ timeoutMs: Int64? = nil) async throws -> Bool {
154+
var awaitAnyTerminationCommand = StreamingQueryManagerCommand.AwaitAnyTerminationCommand()
155+
if let timeoutMs {
156+
guard timeoutMs > 0 else {
157+
throw SparkConnectError.InvalidArgumentException
158+
}
159+
awaitAnyTerminationCommand.timeoutMs = timeoutMs
160+
}
161+
let command = StreamingQueryManagerCommand.OneOf_Command.awaitAnyTermination(
162+
awaitAnyTerminationCommand)
163+
let response = try await self.sparkSession.client.executeStreamingQueryManagerCommand(command)
164+
return response.first!.streamingQueryManagerCommandResult.awaitAnyTermination.terminated
165+
}
166+
167+
/// Forget about past terminated queries so that `awaitAnyTermination()` can be used again to
168+
/// wait for new terminations.
169+
public func resetTerminated() async throws {
170+
let command = StreamingQueryManagerCommand.OneOf_Command.resetTerminated(true)
171+
_ = try await self.sparkSession.client.executeStreamingQueryManagerCommand(command)
172+
}
173+
}

Sources/SparkConnect/TypeAliases.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ typealias ShowString = Spark_Connect_ShowString
5656
typealias SparkConnectService = Spark_Connect_SparkConnectService
5757
typealias Sort = Spark_Connect_Sort
5858
typealias StreamingQueryCommand = Spark_Connect_StreamingQueryCommand
59+
typealias StreamingQueryInstanceId = Spark_Connect_StreamingQueryInstanceId
60+
typealias StreamingQueryManagerCommand = Spark_Connect_StreamingQueryManagerCommand
5961
typealias StructType = Spark_Connect_DataType.Struct
6062
typealias Tail = Spark_Connect_Tail
6163
typealias UserContext = Spark_Connect_UserContext
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
//
2+
// Licensed to the Apache Software Foundation (ASF) under one
3+
// or more contributor license agreements. See the NOTICE file
4+
// distributed with this work for additional information
5+
// regarding copyright ownership. The ASF licenses this file
6+
// to you under the Apache License, Version 2.0 (the
7+
// "License"); you may not use this file except in compliance
8+
// with the License. You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing,
13+
// software distributed under the License is distributed on an
14+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
// KIND, either express or implied. See the License for the
16+
// specific language governing permissions and limitations
17+
// under the License.
18+
//
19+
20+
import Foundation
21+
import SparkConnect
22+
import Testing
23+
24+
/// A test suite for `StreamingQueryManager`
25+
@Suite(.serialized)
26+
struct StreamingQueryManagerTests {
27+
28+
@Test
29+
func active() async throws {
30+
let spark = try await SparkSession.builder.getOrCreate()
31+
#expect(try await spark.streams.active.count == 0)
32+
await spark.stop()
33+
}
34+
35+
@Test
36+
func get() async throws {
37+
let spark = try await SparkSession.builder.getOrCreate()
38+
await #expect(throws: SparkConnectError.InvalidArgumentException) {
39+
try await spark.streams.get(UUID())
40+
}
41+
await #expect(throws: SparkConnectError.InvalidArgumentException) {
42+
try await spark.streams.get(UUID().uuidString)
43+
}
44+
await spark.stop()
45+
}
46+
47+
@Test
48+
func awaitAnyTermination() async throws {
49+
let spark = try await SparkSession.builder.getOrCreate()
50+
try await spark.streams.awaitAnyTermination(1)
51+
await #expect(throws: SparkConnectError.InvalidArgumentException) {
52+
try await spark.streams.awaitAnyTermination(-1)
53+
}
54+
await spark.stop()
55+
}
56+
57+
@Test
58+
func resetTerminated() async throws {
59+
let spark = try await SparkSession.builder.getOrCreate()
60+
try await spark.streams.resetTerminated()
61+
await spark.stop()
62+
}
63+
64+
@Test
65+
func query() async throws {
66+
let spark = try await SparkSession.builder.getOrCreate()
67+
68+
// Prepare directories
69+
let input = "/tmp/input-" + UUID().uuidString
70+
let checkpoint = "/tmp/checkpoint-" + UUID().uuidString
71+
let output = "/tmp/output-" + UUID().uuidString
72+
try await spark.range(2025).write.orc(input)
73+
74+
// Create a streaming dataframe.
75+
let df =
76+
try await spark
77+
.readStream
78+
.schema("id LONG")
79+
.orc(input)
80+
#expect(try await df.isStreaming())
81+
#expect(try await spark.streams.active.count == 0)
82+
83+
// Start a streaming query
84+
let query =
85+
try await df
86+
.selectExpr("id", "id * 10 as value")
87+
.writeStream
88+
.option("checkpointLocation", checkpoint)
89+
.outputMode("append")
90+
.format("orc")
91+
.trigger(Trigger.ProcessingTime(1000))
92+
.start(output)
93+
#expect(try await query.isActive)
94+
#expect(try await spark.streams.active.count == 1)
95+
#expect(try await spark.streams.get(query.id).isActive)
96+
#expect(try await spark.streams.get(query.id.uuidString).isActive)
97+
// Wait for processing
98+
try await Task.sleep(nanoseconds: 2_000_000_000)
99+
100+
try await query.stop()
101+
#expect(try await spark.streams.active.count == 0)
102+
#expect(try await query.isActive == false)
103+
104+
await spark.stop()
105+
}
106+
}

0 commit comments

Comments
 (0)