Skip to content

Commit 9499253

Browse files
committed
[SPARK-51992] Support interrupt(Tag|Operation|All) in SparkSession
### What changes were proposed in this pull request? This PR aims to support `interrupt(Tag|Operation|All)` API of `SparkSession`. ### Why are the changes needed? For feature parity. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #108 from dongjoon-hyun/SPARK-51992. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 48263da commit 9499253

File tree

4 files changed

+99
-5
lines changed

4 files changed

+99
-5
lines changed

Sources/SparkConnect/Documentation.docc/SparkSession.md

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,13 @@ let csvDf = spark.read.csv("path/to/file.csv")
3232

3333
### Creating Sessions
3434

35-
- ``builder()``
36-
- ``active()``
35+
- ``builder``
3736
- ``stop()``
3837

39-
### DataFrame Operations
38+
### DataFrame Operations
4039

41-
- ``range(_:_:)``
40+
- ``range(_:_:_:)``
4241
- ``sql(_:)``
43-
- ``createDataFrame(_:_:)``
4442

4543
### Data I/O
4644

@@ -53,3 +51,13 @@ let csvDf = spark.read.csv("path/to/file.csv")
5351
### Catalog Operations
5452

5553
- ``catalog``
54+
55+
### Managing Operations
56+
57+
- ``addTag(_:)``
58+
- ``removeTag(_:)``
59+
- ``getTags()``
60+
- ``clearTags()``
61+
- ``interruptAll()``
62+
- ``interruptTag(_:)``
63+
- ``interruptOperation(_:)``

Sources/SparkConnect/SparkConnectClient.swift

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,50 @@ public actor SparkConnectClient {
559559
tags.removeAll()
560560
}
561561

562+
public func interruptAll() async throws -> [String] {
563+
var request = Spark_Connect_InterruptRequest()
564+
request.sessionID = self.sessionID!
565+
request.userContext = self.userContext
566+
request.clientType = self.clientType
567+
request.interruptType = .all
568+
569+
return try await withGPRC { client in
570+
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
571+
let response = try await service.interrupt(request)
572+
return response.interruptedIds
573+
}
574+
}
575+
576+
public func interruptTag(_ tag: String) async throws -> [String] {
577+
var request = Spark_Connect_InterruptRequest()
578+
request.sessionID = self.sessionID!
579+
request.userContext = self.userContext
580+
request.clientType = self.clientType
581+
request.interruptType = .tag
582+
request.operationTag = tag
583+
584+
return try await withGPRC { client in
585+
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
586+
let response = try await service.interrupt(request)
587+
return response.interruptedIds
588+
}
589+
}
590+
591+
public func interruptOperation(_ operationId: String) async throws -> [String] {
592+
var request = Spark_Connect_InterruptRequest()
593+
request.sessionID = self.sessionID!
594+
request.userContext = self.userContext
595+
request.clientType = self.clientType
596+
request.interruptType = .operationID
597+
request.operationID = operationId
598+
599+
return try await withGPRC { client in
600+
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
601+
let response = try await service.interrupt(request)
602+
return response.interruptedIds
603+
}
604+
}
605+
562606
/// Parse a DDL string to ``Spark_Connect_DataType`` instance.
563607
/// - Parameter ddlString: A string to parse.
564608
/// - Returns: A ``Spark_Connect_DataType`` instance.

Sources/SparkConnect/SparkSession.swift

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,27 @@ public actor SparkSession {
314314
await client.clearTags()
315315
}
316316

317+
/// Request to interrupt all currently running operations of this session.
318+
/// - Returns: Sequence of operation IDs requested to be interrupted.
319+
@discardableResult
320+
public func interruptAll() async throws -> [String] {
321+
return try await client.interruptAll()
322+
}
323+
324+
/// Request to interrupt all currently running operations of this session with the given job tag.
325+
/// - Returns: Sequence of operation IDs requested to be interrupted.
326+
@discardableResult
327+
public func interruptTag(_ tag: String) async throws -> [String] {
328+
return try await client.interruptTag(tag)
329+
}
330+
331+
/// Request to interrupt an operation of this session, given its operation ID.
332+
/// - Returns: Sequence of operation IDs requested to be interrupted.
333+
@discardableResult
334+
public func interruptOperation(_ operationId: String) async throws -> [String] {
335+
return try await client.interruptOperation(operationId)
336+
}
337+
317338
func sameSemantics(_ plan: Plan, _ otherPlan: Plan) async throws -> Bool {
318339
return try await client.sameSemantics(plan, otherPlan)
319340
}

Tests/SparkConnectTests/SparkSessionTests.swift

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,4 +136,25 @@ struct SparkSessionTests {
136136
}
137137
await spark.stop()
138138
}
139+
140+
@Test
141+
func interruptAll() async throws {
142+
let spark = try await SparkSession.builder.getOrCreate()
143+
#expect(try await spark.interruptAll() == [])
144+
await spark.stop()
145+
}
146+
147+
@Test
148+
func interruptTag() async throws {
149+
let spark = try await SparkSession.builder.getOrCreate()
150+
#expect(try await spark.interruptTag("etl") == [])
151+
await spark.stop()
152+
}
153+
154+
@Test
155+
func interruptOperation() async throws {
156+
let spark = try await SparkSession.builder.getOrCreate()
157+
#expect(try await spark.interruptOperation("id") == [])
158+
await spark.stop()
159+
}
139160
}

0 commit comments

Comments
 (0)