Skip to content

Commit ac3c85e

Browse files
committed
[SPARK-51970] Support to create and drop temporary views in DataFrame and Catalog
### What changes were proposed in this pull request? This PR aims to add the following APIs. - `DataFrame` - `createTempView` - `createOrReplaceTempView` - `createGlobalTempView` - `createOrReplaceGlobalTempView` - `Catalog` - `dropTempView` - `dropGlobalTempView` - `SQLHelper` - `withTempView` - `withGlobalTempView` ### Why are the changes needed? For feature parity. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #98 from dongjoon-hyun/SPARK-51970. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent a39c346 commit ac3c85e

File tree

5 files changed

+219
-0
lines changed

5 files changed

+219
-0
lines changed

Sources/SparkConnect/Catalog.swift

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,4 +393,36 @@ public actor Catalog: Sendable {
393393
})
394394
try await df.count()
395395
}
396+
397+
/// Drops the local temporary view with the given view name in the catalog. If the view has been
398+
/// cached before, then it will also be uncached.
399+
/// - Parameter viewName: The name of the temporary view to be dropped.
400+
/// - Returns: true if the view is dropped successfully, false otherwise.
401+
@discardableResult
402+
public func dropTempView(_ viewName: String) async throws -> Bool {
403+
let df = getDataFrame({
404+
var dropTempView = Spark_Connect_DropTempView()
405+
dropTempView.viewName = viewName
406+
var catalog = Spark_Connect_Catalog()
407+
catalog.dropTempView = dropTempView
408+
return catalog
409+
})
410+
return "true" == (try await df.collect().first!.get(0) as! String)
411+
}
412+
413+
/// Drops the global temporary view with the given view name in the catalog. If the view has been
414+
/// cached before, then it will also be uncached.
415+
/// - Parameter viewName: The unqualified name of the temporary view to be dropped.
416+
/// - Returns: true if the view is dropped successfully, false otherwise.
417+
@discardableResult
418+
public func dropGlobalTempView(_ viewName: String) async throws -> Bool {
419+
let df = getDataFrame({
420+
var dropGlobalTempView = Spark_Connect_DropGlobalTempView()
421+
dropGlobalTempView.viewName = viewName
422+
var catalog = Spark_Connect_Catalog()
423+
catalog.dropGlobalTempView = dropGlobalTempView
424+
return catalog
425+
})
426+
return "true" == (try await df.collect().first!.get(0) as! String)
427+
}
396428
}

Sources/SparkConnect/DataFrame.swift

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -856,6 +856,38 @@ public actor DataFrame: Sendable {
856856
return GroupedData(self, GroupType.cube, cols)
857857
}
858858

859+
/// Creates a local temporary view using the given name. The lifetime of this temporary view is
860+
/// tied to the `SparkSession` that was used to create this ``DataFrame``.
861+
/// - Parameter viewName: A view name.
862+
public func createTempView(_ viewName: String) async throws {
863+
try await createTempView(viewName, replace: false, global: false)
864+
}
865+
866+
/// Creates a local temporary view using the given name. The lifetime of this temporary view is
867+
/// tied to the `SparkSession` that was used to create this ``DataFrame``.
868+
/// - Parameter viewName: A view name.
869+
public func createOrReplaceTempView(_ viewName: String) async throws {
870+
try await createTempView(viewName, replace: true, global: false)
871+
}
872+
873+
/// Creates a global temporary view using the given name. The lifetime of this temporary view is
874+
/// tied to this Spark application, but is cross-session.
875+
/// - Parameter viewName: A view name.
876+
public func createGlobalTempView(_ viewName: String) async throws {
877+
try await createTempView(viewName, replace: false, global: true)
878+
}
879+
880+
/// Creates a global temporary view using the given name. The lifetime of this temporary view is
881+
/// tied to this Spark application, but is cross-session.
882+
/// - Parameter viewName: A view name.
883+
public func createOrReplaceGlobalTempView(_ viewName: String) async throws {
884+
try await createTempView(viewName, replace: true, global: true)
885+
}
886+
887+
func createTempView(_ viewName: String, replace: Bool, global: Bool) async throws {
888+
try await spark.client.createTempView(self.plan.root, viewName, replace: replace, isGlobal: global)
889+
}
890+
859891
/// Returns a ``DataFrameWriter`` that can be used to write non-streaming data.
860892
public var write: DataFrameWriter {
861893
get {

Sources/SparkConnect/SparkConnectClient.swift

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,20 @@ public actor SparkConnectClient {
729729
return plan
730730
}
731731

732+
func createTempView(
733+
_ child: Relation, _ viewName: String, replace: Bool, isGlobal: Bool
734+
) async throws {
735+
var viewCommand = Spark_Connect_CreateDataFrameViewCommand()
736+
viewCommand.input = child
737+
viewCommand.name = viewName
738+
viewCommand.replace = replace
739+
viewCommand.isGlobal = isGlobal
740+
741+
var command = Spark_Connect_Command()
742+
command.createDataframeView = viewCommand
743+
try await execute(self.sessionID!, command)
744+
}
745+
732746
private enum URIParams {
733747
static let PARAM_GRPC_MAX_MESSAGE_SIZE = "grpc_max_message_size"
734748
static let PARAM_SESSION_ID = "session_id"

Tests/SparkConnectTests/CatalogTests.swift

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,117 @@ struct CatalogTests {
154154
}
155155
await spark.stop()
156156
}
157+
158+
@Test
159+
func createTempView() async throws {
160+
let spark = try await SparkSession.builder.getOrCreate()
161+
let viewName = "VIEW_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
162+
try await SQLHelper.withTempView(spark, viewName)({
163+
#expect(try await spark.catalog.tableExists(viewName) == false)
164+
try await spark.range(1).createTempView(viewName)
165+
#expect(try await spark.catalog.tableExists(viewName))
166+
167+
try await #require(throws: Error.self) {
168+
try await spark.range(1).createTempView(viewName)
169+
}
170+
})
171+
172+
try await #require(throws: Error.self) {
173+
try await spark.range(1).createTempView("invalid view name")
174+
}
175+
176+
await spark.stop()
177+
}
178+
179+
@Test
180+
func createOrReplaceTempView() async throws {
181+
let spark = try await SparkSession.builder.getOrCreate()
182+
let viewName = "VIEW_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
183+
try await SQLHelper.withTempView(spark, viewName)({
184+
#expect(try await spark.catalog.tableExists(viewName) == false)
185+
try await spark.range(1).createOrReplaceTempView(viewName)
186+
#expect(try await spark.catalog.tableExists(viewName))
187+
try await spark.range(1).createOrReplaceTempView(viewName)
188+
})
189+
190+
try await #require(throws: Error.self) {
191+
try await spark.range(1).createOrReplaceTempView("invalid view name")
192+
}
193+
194+
await spark.stop()
195+
}
196+
197+
@Test
198+
func createGlobalTempView() async throws {
199+
let spark = try await SparkSession.builder.getOrCreate()
200+
let viewName = "VIEW_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
201+
try await SQLHelper.withGlobalTempView(spark, viewName)({
202+
#expect(try await spark.catalog.tableExists("global_temp.\(viewName)") == false)
203+
try await spark.range(1).createGlobalTempView(viewName)
204+
#expect(try await spark.catalog.tableExists("global_temp.\(viewName)"))
205+
206+
try await #require(throws: Error.self) {
207+
try await spark.range(1).createGlobalTempView(viewName)
208+
}
209+
})
210+
#expect(try await spark.catalog.tableExists("global_temp.\(viewName)") == false)
211+
212+
try await #require(throws: Error.self) {
213+
try await spark.range(1).createGlobalTempView("invalid view name")
214+
}
215+
216+
await spark.stop()
217+
}
218+
219+
@Test
220+
func createOrReplaceGlobalTempView() async throws {
221+
let spark = try await SparkSession.builder.getOrCreate()
222+
let viewName = "VIEW_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
223+
try await SQLHelper.withGlobalTempView(spark, viewName)({
224+
#expect(try await spark.catalog.tableExists("global_temp.\(viewName)") == false)
225+
try await spark.range(1).createOrReplaceGlobalTempView(viewName)
226+
#expect(try await spark.catalog.tableExists("global_temp.\(viewName)"))
227+
try await spark.range(1).createOrReplaceGlobalTempView(viewName)
228+
})
229+
#expect(try await spark.catalog.tableExists("global_temp.\(viewName)") == false)
230+
231+
try await #require(throws: Error.self) {
232+
try await spark.range(1).createOrReplaceGlobalTempView("invalid view name")
233+
}
234+
235+
await spark.stop()
236+
}
237+
238+
@Test
239+
func dropTempView() async throws {
240+
let spark = try await SparkSession.builder.getOrCreate()
241+
let viewName = "VIEW_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
242+
try await SQLHelper.withTempView(spark, viewName)({ #expect(try await spark.catalog.tableExists(viewName) == false)
243+
try await spark.range(1).createTempView(viewName)
244+
try await spark.catalog.dropTempView(viewName)
245+
#expect(try await spark.catalog.tableExists(viewName) == false)
246+
})
247+
248+
#expect(try await spark.catalog.dropTempView("non_exist_view") == false)
249+
#expect(try await spark.catalog.dropTempView("invalid view name") == false)
250+
await spark.stop()
251+
}
252+
253+
@Test
254+
func dropGlobalTempView() async throws {
255+
let spark = try await SparkSession.builder.getOrCreate()
256+
let viewName = "VIEW_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
257+
try await SQLHelper.withTempView(spark, viewName)({ #expect(try await spark.catalog.tableExists(viewName) == false)
258+
try await spark.range(1).createGlobalTempView(viewName)
259+
#expect(try await spark.catalog.tableExists("global_temp.\(viewName)"))
260+
try await spark.catalog.dropGlobalTempView(viewName)
261+
#expect(try await spark.catalog.tableExists("global_temp.\(viewName)") == false)
262+
})
263+
264+
#expect(try await spark.catalog.dropGlobalTempView("non_exist_view") == false)
265+
#expect(try await spark.catalog.dropGlobalTempView("invalid view name") == false)
266+
await spark.stop()
267+
}
157268
#endif
158269

159270
@Test

Tests/SparkConnectTests/SQLHelper.swift

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,34 @@ struct SQLHelper {
5353
}
5454
return body
5555
}
56+
57+
public static func withTempView(_ spark: SparkSession, _ viewNames: String...) -> (
58+
() async throws -> Void
59+
) async throws -> Void {
60+
func body(_ f: () async throws -> Void) async throws {
61+
try await ErrorUtils.tryWithSafeFinally(
62+
f,
63+
{
64+
for name in viewNames {
65+
try await spark.catalog.dropTempView(name)
66+
}
67+
})
68+
}
69+
return body
70+
}
71+
72+
public static func withGlobalTempView(_ spark: SparkSession, _ viewNames: String...) -> (
73+
() async throws -> Void
74+
) async throws -> Void {
75+
func body(_ f: () async throws -> Void) async throws {
76+
try await ErrorUtils.tryWithSafeFinally(
77+
f,
78+
{
79+
for name in viewNames {
80+
try await spark.catalog.dropGlobalTempView(name)
81+
}
82+
})
83+
}
84+
return body
85+
}
5686
}

0 commit comments

Comments
 (0)