Skip to content

Commit 52e217f

Browse files
committed
[SPARK-52748] Support defineDataset
### What changes were proposed in this pull request? This PR aims to support `defineDataset ` API in order to support `Declarative Pipelines` (SPARK-51727) of Apache Spark `4.1.0-preview1`. ### Why are the changes needed? To support the new feature incrementally. ### Does this PR introduce _any_ user-facing change? No, this is a new feature. ### How was this patch tested? Pass the CIs with `4.1.0-preview1` test pipeline because we added it. - #210 ### Was this patch authored or co-authored using generative AI tooling? No. Closes #211 from dongjoon-hyun/SPARK-52748. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 8818f07 commit 52e217f

File tree

5 files changed

+71
-0
lines changed

5 files changed

+71
-0
lines changed

Sources/SparkConnect/Extension.swift

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,18 @@ extension String {
181181
default: .UNRECOGNIZED(-1)
182182
}
183183
}
184+
185+
var toDatasetType: DatasetType {
186+
let mode =
187+
switch self {
188+
case "unspecified": DatasetType.unspecified
189+
case "materializedView": DatasetType.materializedView
190+
case "table": DatasetType.table
191+
case "temporaryView": DatasetType.temporaryView
192+
default: DatasetType.UNRECOGNIZED(-1)
193+
}
194+
return mode
195+
}
184196
}
185197

186198
extension [String: String] {

Sources/SparkConnect/SparkConnectClient.swift

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ public actor SparkConnectClient {
145145
throw SparkConnectError.InvalidViewName
146146
case let m where m.contains("DATA_SOURCE_NOT_FOUND"):
147147
throw SparkConnectError.DataSourceNotFound
148+
case let m where m.contains("DATASET_TYPE_UNSPECIFIED"):
149+
throw SparkConnectError.DatasetTypeUnspecified
148150
default:
149151
throw error
150152
}
@@ -1237,6 +1239,39 @@ public actor SparkConnectClient {
12371239
}
12381240
}
12391241

1242+
@discardableResult
1243+
func defineDataset(
1244+
_ dataflowGraphID: String,
1245+
_ datasetName: String,
1246+
_ datasetType: String,
1247+
_ comment: String? = nil
1248+
) async throws -> Bool {
1249+
try await withGPRC { client in
1250+
if UUID(uuidString: dataflowGraphID) == nil {
1251+
throw SparkConnectError.InvalidArgument
1252+
}
1253+
1254+
var defineDataset = Spark_Connect_PipelineCommand.DefineDataset()
1255+
defineDataset.dataflowGraphID = dataflowGraphID
1256+
defineDataset.datasetName = datasetName
1257+
defineDataset.datasetType = datasetType.toDatasetType
1258+
if let comment {
1259+
defineDataset.comment = comment
1260+
}
1261+
1262+
var pipelineCommand = Spark_Connect_PipelineCommand()
1263+
pipelineCommand.commandType = .defineDataset(defineDataset)
1264+
1265+
var command = Spark_Connect_Command()
1266+
command.commandType = .pipelineCommand(pipelineCommand)
1267+
1268+
let responses = try await execute(self.sessionID!, command)
1269+
return responses.contains {
1270+
$0.responseType == .pipelineCommandResult(Spark_Connect_PipelineCommandResult())
1271+
}
1272+
}
1273+
}
1274+
12401275
private enum URIParams {
12411276
static let PARAM_GRPC_MAX_MESSAGE_SIZE = "grpc_max_message_size"
12421277
static let PARAM_SESSION_ID = "session_id"

Sources/SparkConnect/SparkConnectError.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public enum SparkConnectError: Error {
2222
case CatalogNotFound
2323
case ColumnNotFound
2424
case DataSourceNotFound
25+
case DatasetTypeUnspecified
2526
case InvalidArgument
2627
case InvalidSessionID
2728
case InvalidType

Sources/SparkConnect/TypeAliases.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ typealias AnalyzePlanResponse = Spark_Connect_AnalyzePlanResponse
2323
typealias Command = Spark_Connect_Command
2424
typealias ConfigRequest = Spark_Connect_ConfigRequest
2525
typealias DataSource = Spark_Connect_Read.DataSource
26+
typealias DatasetType = Spark_Connect_DatasetType
2627
typealias DataType = Spark_Connect_DataType
2728
typealias DayTimeInterval = Spark_Connect_DataType.DayTimeInterval
2829
typealias Drop = Spark_Connect_Drop

Tests/SparkConnectTests/SparkConnectClientTests.swift

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,4 +124,26 @@ struct SparkConnectClientTests {
124124
}
125125
await client.stop()
126126
}
127+
128+
@Test
129+
func defineDataset() async throws {
130+
let client = SparkConnectClient(remote: TEST_REMOTE)
131+
let response = try await client.connect(UUID().uuidString)
132+
133+
try await #require(throws: SparkConnectError.InvalidArgument) {
134+
try await client.defineDataset("not-a-uuid-format", "ds1", "table")
135+
}
136+
137+
if response.sparkVersion.version.starts(with: "4.1") {
138+
let dataflowGraphID = try await client.createDataflowGraph()
139+
#expect(UUID(uuidString: dataflowGraphID) != nil)
140+
try await #require(throws: SparkConnectError.DatasetTypeUnspecified) {
141+
try await client.defineDataset(dataflowGraphID, "ds1", "unspecified")
142+
}
143+
#expect(try await client.defineDataset(dataflowGraphID, "ds2", "materializedView"))
144+
#expect(try await client.defineDataset(dataflowGraphID, "ds3", "table"))
145+
#expect(try await client.defineDataset(dataflowGraphID, "ds4", "temporaryView"))
146+
}
147+
await client.stop()
148+
}
127149
}

0 commit comments

Comments
 (0)