From 1a3ec7d8a270298326221e5678274cd4f01045a0 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 12 May 2025 11:57:40 -0700 Subject: [PATCH 1/3] [SPARK-52089] Support `StreamingQueryManager` --- Sources/SparkConnect/SparkConnectClient.swift | 14 +- Sources/SparkConnect/SparkSession.swift | 8 + .../SparkConnect/StreamingQueryManager.swift | 173 ++++++++++++++++++ Sources/SparkConnect/TypeAliases.swift | 2 + .../StreamingQueryManagerTests.swift | 106 +++++++++++ 5 files changed, 301 insertions(+), 2 deletions(-) create mode 100644 Sources/SparkConnect/StreamingQueryManager.swift create mode 100644 Tests/SparkConnectTests/StreamingQueryManagerTests.swift diff --git a/Sources/SparkConnect/SparkConnectClient.swift b/Sources/SparkConnect/SparkConnectClient.swift index 943131d..c67e30e 100644 --- a/Sources/SparkConnect/SparkConnectClient.swift +++ b/Sources/SparkConnect/SparkConnectClient.swift @@ -986,10 +986,10 @@ public actor SparkConnectClient { _ runID: String, _ command: StreamingQueryCommand.OneOf_Command ) async throws -> [ExecutePlanResponse] { - var queryID = Spark_Connect_StreamingQueryInstanceId() + var queryID = StreamingQueryInstanceId() queryID.id = id queryID.runID = runID - var streamingQueryCommand = Spark_Connect_StreamingQueryCommand() + var streamingQueryCommand = StreamingQueryCommand() streamingQueryCommand.queryID = queryID streamingQueryCommand.command = command var command = Spark_Connect_Command() @@ -997,6 +997,16 @@ public actor SparkConnectClient { return try await execute(self.sessionID!, command) } + func executeStreamingQueryManagerCommand( + _ command: StreamingQueryManagerCommand.OneOf_Command + ) async throws -> [ExecutePlanResponse] { + var streamingQueryManagerCommand = StreamingQueryManagerCommand() + streamingQueryManagerCommand.command = command + var command = Spark_Connect_Command() + command.streamingQueryManagerCommand = streamingQueryManagerCommand + return try await execute(self.sessionID!, command) + } + private enum URIParams { static let PARAM_GRPC_MAX_MESSAGE_SIZE = "grpc_max_message_size" static let PARAM_SESSION_ID = "session_id" diff --git a/Sources/SparkConnect/SparkSession.swift b/Sources/SparkConnect/SparkSession.swift index 2ef0cef..e565126 100644 --- a/Sources/SparkConnect/SparkSession.swift +++ b/Sources/SparkConnect/SparkSession.swift @@ -374,6 +374,14 @@ public actor SparkSession { return try await client.semanticHash(plan) } + /// Returns a `StreamingQueryManager` that allows managing all the `StreamingQuery`s active on + /// `this`. + public var streams: StreamingQueryManager { + get { + StreamingQueryManager(self) + } + } + /// This is defined as the return type of `SparkSession.sparkContext` method. /// This is an empty `Struct` type because `sparkContext` method is designed to throw /// `UNSUPPORTED_CONNECT_FEATURE.SESSION_SPARK_CONTEXT`. diff --git a/Sources/SparkConnect/StreamingQueryManager.swift b/Sources/SparkConnect/StreamingQueryManager.swift new file mode 100644 index 0000000..ebd0796 --- /dev/null +++ b/Sources/SparkConnect/StreamingQueryManager.swift @@ -0,0 +1,173 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +import Foundation + +/// Information about progress made for a sink in the execution of a ``StreamingQuery`` +/// during a trigger. See ``StreamingQueryProgress`` for more information. +public struct SourceProgress: Sendable { + let description: String + let startOffset: String + let endOffset: String + let latestOffset: String + let numInputRows: Int64 + let inputRowsPerSecond: Double + let processedRowsPerSecond: Double + let metrics: [String: String] = [:] +} + +/// Information about progress made for a sink in the execution of a ``StreamingQuery`` +/// during a trigger. See ``StreamingQueryProgress`` for more information. +public struct SinkProgress: Sendable { + let description: String + let numOutputRows: Int64 + let metrics: [String: String] = [:] + + init(_ description: String) { + self.description = description + self.numOutputRows = -1 + } + + init(_ description: String, _ numOutputRows: Int64) { + self.description = description + self.numOutputRows = numOutputRows + } +} + +/// Information about updates made to stateful operators in a ``StreamingQuery`` +/// during a trigger. See ``StreamingQueryProgress`` for more information. +public struct StateOperatorProgress: Sendable { + let operatorName: String + let numRowsTotal: Int64 + let numRowsUpdated: Int64 + let allUpdatesTimeMs: Int64 + let numRowsRemoved: Int64 + let allRemovalsTimeMs: Int64 + let commitTimeMs: Int64 + let memoryUsedBytes: Int64 + let numRowsDroppedByWatermark: Int64 + let numShufflePartitions: Int64 + let numStateStoreInstances: Int64 + let customMetrics: [String: Int64] +} + +/// Information about progress made in the execution of a ``StreamingQuery`` +/// during a trigger. Each event relates to processing done for a single trigger of +/// the streaming query. Events are emitted even when no new data is available to be processed. +public struct StreamingQueryProcess { + let id: UUID + let runId: UUID + let name: String + let timestamp: String + let batchId: Int64 + let batchDuration: Int64 + let durationMs: [String: Int64] + let eventTime: [String: String] + let stateOperators: [StateOperatorProgress] + let sources: [SourceProgress] + let sink: SinkProgress + let observedMetrics: [String: Row] + + func numInputRows() -> Int64 { + return sources.map { $0.numInputRows }.reduce(0, +) + } + + func inputRowsPerSecond() -> Double { + return sources.map { $0.inputRowsPerSecond }.reduce(0, +) + } + + func processedRowsPerSecond() -> Double { + return sources.map { $0.processedRowsPerSecond }.reduce(0, +) + } +} + +/// A class to manage all the ``StreamingQuery`` active in a ``SparkSession``. +public actor StreamingQueryManager { + let sparkSession: SparkSession + + init(_ sparkSession: SparkSession) { + self.sparkSession = sparkSession + } + + /// Returns a list of active queries associated with this SQLContext + public var active: [StreamingQuery] { + get async throws { + let command = StreamingQueryManagerCommand.OneOf_Command.active(true) + let response = try await self.sparkSession.client.executeStreamingQueryManagerCommand(command) + return response.first!.streamingQueryManagerCommandResult.active.activeQueries.map { + StreamingQuery( + UUID(uuidString: $0.id.id)!, + UUID(uuidString: $0.id.runID)!, + $0.name, + self.sparkSession + ) + } + } + } + + /// Returns the query if there is an active query with the given id, or null. + /// - Parameter id: an UUID. + /// - Returns: A ``StreamingQuery``. + public func get(_ id: UUID) async throws -> StreamingQuery { + return try await get(id.uuidString) + } + + /// Returns the query if there is an active query with the given id, or null. + /// - Parameter id: an UUID String + /// - Returns: A ``StreamingQuery``. + public func get(_ id: String) async throws -> StreamingQuery { + let command = StreamingQueryManagerCommand.OneOf_Command.getQuery(id) + let response = try await self.sparkSession.client.executeStreamingQueryManagerCommand(command) + let query = response.first!.streamingQueryManagerCommandResult.query + guard query.hasID else { + throw SparkConnectError.InvalidArgumentException + } + return StreamingQuery( + UUID(uuidString: query.id.id)!, + UUID(uuidString: query.id.runID)!, + query.name, + self.sparkSession + ) + } + + /// Wait until any of the queries on the associated SQLContext has terminated since the creation + /// of the context, or since `resetTerminated()` was called. If any query was terminated with an + /// exception, then the exception will be thrown. + /// - Parameter timeoutMs: A timeout in milliseconds. + @discardableResult + public func awaitAnyTermination(_ timeoutMs: Int64? = nil) async throws -> Bool { + var awaitAnyTerminationCommand = StreamingQueryManagerCommand.AwaitAnyTerminationCommand() + if let timeoutMs { + guard timeoutMs > 0 else { + throw SparkConnectError.InvalidArgumentException + } + awaitAnyTerminationCommand.timeoutMs = timeoutMs + } + let command = StreamingQueryManagerCommand.OneOf_Command.awaitAnyTermination( + awaitAnyTerminationCommand) + let response = try await self.sparkSession.client.executeStreamingQueryManagerCommand(command) + return response.first!.streamingQueryManagerCommandResult.awaitAnyTermination.terminated + } + + /// Forget about past terminated queries so that `awaitAnyTermination()` can be used again to + /// wait for new terminations. + public func resetTerminated() async throws { + let command = StreamingQueryManagerCommand.OneOf_Command.resetTerminated(true) + _ = try await self.sparkSession.client.executeStreamingQueryManagerCommand(command) + } +} diff --git a/Sources/SparkConnect/TypeAliases.swift b/Sources/SparkConnect/TypeAliases.swift index e0543b6..537ea42 100644 --- a/Sources/SparkConnect/TypeAliases.swift +++ b/Sources/SparkConnect/TypeAliases.swift @@ -56,6 +56,8 @@ typealias ShowString = Spark_Connect_ShowString typealias SparkConnectService = Spark_Connect_SparkConnectService typealias Sort = Spark_Connect_Sort typealias StreamingQueryCommand = Spark_Connect_StreamingQueryCommand +typealias StreamingQueryInstanceId = Spark_Connect_StreamingQueryInstanceId +typealias StreamingQueryManagerCommand = Spark_Connect_StreamingQueryManagerCommand typealias StructType = Spark_Connect_DataType.Struct typealias Tail = Spark_Connect_Tail typealias UserContext = Spark_Connect_UserContext diff --git a/Tests/SparkConnectTests/StreamingQueryManagerTests.swift b/Tests/SparkConnectTests/StreamingQueryManagerTests.swift new file mode 100644 index 0000000..ada8517 --- /dev/null +++ b/Tests/SparkConnectTests/StreamingQueryManagerTests.swift @@ -0,0 +1,106 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +import Foundation +import SparkConnect +import Testing + +/// A test suite for `StreamingQueryManager` +@Suite(.serialized) +struct StreamingQueryManagerTests { + + @Test + func active() async throws { + let spark = try await SparkSession.builder.getOrCreate() + #expect(try await spark.streams.active.count == 0) + await spark.stop() + } + + @Test + func get() async throws { + let spark = try await SparkSession.builder.getOrCreate() + await #expect(throws: SparkConnectError.InvalidArgumentException) { + try await spark.streams.get(UUID()) + } + await #expect(throws: SparkConnectError.InvalidArgumentException) { + try await spark.streams.get(UUID().uuidString) + } + await spark.stop() + } + + @Test + func awaitAnyTermination() async throws { + let spark = try await SparkSession.builder.getOrCreate() + try await spark.streams.awaitAnyTermination(1) + await #expect(throws: SparkConnectError.InvalidArgumentException) { + try await spark.streams.awaitAnyTermination(-1) + } + await spark.stop() + } + + @Test + func resetTerminated() async throws { + let spark = try await SparkSession.builder.getOrCreate() + try await spark.streams.resetTerminated() + await spark.stop() + } + + @Test + func query() async throws { + let spark = try await SparkSession.builder.getOrCreate() + + // Prepare directories + let input = "/tmp/input-" + UUID().uuidString + let checkpoint = "/tmp/checkpoint-" + UUID().uuidString + let output = "/tmp/output-" + UUID().uuidString + try await spark.range(2025).write.orc(input) + + // Create a streaming dataframe. + let df = + try await spark + .readStream + .schema("id LONG") + .orc(input) + #expect(try await df.isStreaming()) + #expect(try await spark.streams.active.count == 0) + + // Start a streaming query + let query = + try await df + .selectExpr("id", "id * 10 as value") + .writeStream + .option("checkpointLocation", checkpoint) + .outputMode("append") + .format("orc") + .trigger(Trigger.ProcessingTime(1000)) + .start(output) + #expect(try await query.isActive) + #expect(try await spark.streams.active.count == 1) + #expect(try await spark.streams.get(query.id).isActive) + #expect(try await spark.streams.get(query.id.uuidString).isActive) + // Wait for processing + try await Task.sleep(nanoseconds: 2_000_000_000) + + try await query.stop() + #expect(try await spark.streams.active.count == 0) + #expect(try await query.isActive == false) + + await spark.stop() + } +} From 3e45c6c3fc5d476107532bab243f3de8a46a2f68 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 12 May 2025 13:45:29 -0700 Subject: [PATCH 2/3] Update Sources/SparkConnect/StreamingQueryManager.swift Co-authored-by: Liang-Chi Hsieh --- Sources/SparkConnect/StreamingQueryManager.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/SparkConnect/StreamingQueryManager.swift b/Sources/SparkConnect/StreamingQueryManager.swift index ebd0796..3616080 100644 --- a/Sources/SparkConnect/StreamingQueryManager.swift +++ b/Sources/SparkConnect/StreamingQueryManager.swift @@ -18,7 +18,7 @@ // import Foundation -/// Information about progress made for a sink in the execution of a ``StreamingQuery`` +/// Information about progress made for a source in the execution of a ``StreamingQuery`` /// during a trigger. See ``StreamingQueryProgress`` for more information. public struct SourceProgress: Sendable { let description: String From fcd153177a5d139144bc5eb5af75307259750ec6 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 12 May 2025 13:45:39 -0700 Subject: [PATCH 3/3] Update Sources/SparkConnect/StreamingQueryManager.swift Co-authored-by: Liang-Chi Hsieh --- Sources/SparkConnect/StreamingQueryManager.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/SparkConnect/StreamingQueryManager.swift b/Sources/SparkConnect/StreamingQueryManager.swift index 3616080..b670aa3 100644 --- a/Sources/SparkConnect/StreamingQueryManager.swift +++ b/Sources/SparkConnect/StreamingQueryManager.swift @@ -96,7 +96,7 @@ public struct StreamingQueryProcess { } } -/// A class to manage all the ``StreamingQuery`` active in a ``SparkSession``. +/// A class to manage all the ``StreamingQuery``s active in a ``SparkSession``. public actor StreamingQueryManager { let sparkSession: SparkSession