Skip to content

Commit 6687d72

Browse files
committed
[SPARK-51792] Support saveAsTable and insertInto
### What changes were proposed in this pull request? This PR aims to support `saveAsTable` and `insertInto` APIs in `DataFrameWriter`. ### Why are the changes needed? For feature parity. ### Does this PR introduce _any_ user-facing change? No, this is a new addition. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #56 from dongjoon-hyun/SPARK-51792. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent f73c824 commit 6687d72

File tree

2 files changed

+82
-7
lines changed

2 files changed

+82
-7
lines changed

Sources/SparkConnect/DataFrameWriter.swift

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -113,16 +113,48 @@ public actor DataFrameWriter: Sendable {
113113
}
114114

115115
private func saveInternal(_ path: String?) async throws {
116-
var write = WriteOperation()
116+
try await executeWriteOperation({
117+
var write = WriteOperation()
118+
if let path = path {
119+
write.path = path
120+
}
121+
return write
122+
})
123+
}
124+
125+
/// Saves the content of the ``DataFrame`` as the specified table.
126+
/// - Parameter tableName: A table name.
127+
public func saveAsTable(_ tableName: String) async throws {
128+
try await executeWriteOperation({
129+
var write = WriteOperation()
130+
write.table.tableName = tableName
131+
write.table.saveMethod = .saveAsTable
132+
return write
133+
})
134+
}
135+
136+
/// Inserts the content of the ``DataFrame`` to the specified table. It requires that the schema of
137+
/// the ``DataFrame`` is the same as the schema of the table. Unlike ``saveAsTable``,
138+
/// ``insertInto`` ignores the column names and just uses position-based resolution.
139+
/// - Parameter tableName: A table name.
140+
public func insertInto(_ tableName: String) async throws {
141+
try await executeWriteOperation({
142+
var write = WriteOperation()
143+
write.table.tableName = tableName
144+
write.table.saveMethod = .insertInto
145+
return write
146+
})
147+
}
148+
149+
private func executeWriteOperation(_ f: () -> WriteOperation) async throws {
150+
var write = f()
151+
152+
// Cannot both be set
153+
assert(!(!write.path.isEmpty && !write.table.tableName.isEmpty))
154+
117155
let plan = await self.df.getPlan() as! Plan
118156
write.input = plan.root
119157
write.mode = self.saveMode.toSaveMode
120-
if let path = path {
121-
write.path = path
122-
}
123-
124-
// Cannot both be set
125-
// require(!(builder.hasPath && builder.hasTable))
126158

127159
if let source = self.source {
128160
write.source = source

Tests/SparkConnectTests/DataFrameWriterTests.swift

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,49 @@ struct DataFrameWriterTests {
101101
await spark.stop()
102102
}
103103

104+
@Test
105+
func saveAsTable() async throws {
106+
let spark = try await SparkSession.builder.getOrCreate()
107+
let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
108+
try await SQLHelper.withTable(spark, tableName)({
109+
try await spark.range(1).write.saveAsTable(tableName)
110+
#expect(try await spark.read.table(tableName).count() == 1)
111+
112+
try await #require(throws: Error.self) {
113+
try await spark.range(1).write.saveAsTable(tableName)
114+
}
115+
116+
try await spark.range(1).write.mode("overwrite").saveAsTable(tableName)
117+
#expect(try await spark.read.table(tableName).count() == 1)
118+
119+
try await spark.range(1).write.mode("append").saveAsTable(tableName)
120+
#expect(try await spark.read.table(tableName).count() == 2)
121+
})
122+
await spark.stop()
123+
}
124+
125+
@Test
126+
func insertInto() async throws {
127+
let spark = try await SparkSession.builder.getOrCreate()
128+
let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
129+
try await SQLHelper.withTable(spark, tableName)({
130+
// Table doesn't exist.
131+
try await #require(throws: Error.self) {
132+
try await spark.range(1).write.insertInto(tableName)
133+
}
134+
135+
try await spark.range(1).write.saveAsTable(tableName)
136+
#expect(try await spark.read.table(tableName).count() == 1)
137+
138+
try await spark.range(1).write.insertInto(tableName)
139+
#expect(try await spark.read.table(tableName).count() == 2)
140+
141+
try await spark.range(1).write.insertInto(tableName)
142+
#expect(try await spark.read.table(tableName).count() == 3)
143+
})
144+
await spark.stop()
145+
}
146+
104147
@Test
105148
func partitionBy() async throws {
106149
let tmpDir = "/tmp/" + UUID().uuidString

0 commit comments

Comments
 (0)