Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions Sources/SparkConnect/DataFrameReader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,33 @@ public actor DataFrameReader: Sendable {
self.sparkSession = sparkSession
}

/// Returns the specified table/view as a ``DataFrame``. If it's a table, it must support batch
/// reading and the returned ``DataFrame`` is the batch scan query plan of this table. If it's a
/// view, the returned ``DataFrame`` is simply the query plan of the view, which can either be a
/// batch or streaming query plan.
///
/// - Parameter tableName: a qualified or unqualified name that designates a table or view. If a database is
/// specified, it identifies the table/view from the database. Otherwise, it first attempts to
/// find a temporary view with the given name and then match the table/view from the current
/// database. Note that, the global temporary view database is also valid here.
/// - Returns: A ``DataFrame`` instance.
public func table(_ tableName: String) -> DataFrame {
var namedTable = NamedTable()
namedTable.unparsedIdentifier = tableName
namedTable.options = self.extraOptions.toStringDictionary()

var read = Read()
read.namedTable = namedTable

var relation = Relation()
relation.read = read

var plan = Plan()
plan.opType = .root(relation)

return DataFrame(spark: sparkSession, plan: plan)
}

/// Specifies the input data source format.
/// - Parameter source: A string.
/// - Returns: A `DataFrameReader`.
Expand Down
14 changes: 14 additions & 0 deletions Sources/SparkConnect/SparkSession.swift
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,20 @@ public actor SparkSession {
}
}

/// Returns the specified table/view as a ``DataFrame``. If it's a table, it must support batch
/// reading and the returned ``DataFrame`` is the batch scan query plan of this table. If it's a
/// view, the returned ``DataFrame`` is simply the query plan of the view, which can either be a
/// batch or streaming query plan.
///
/// - Parameter tableName: a qualified or unqualified name that designates a table or view. If a database is
/// specified, it identifies the table/view from the database. Otherwise, it first attempts to
/// find a temporary view with the given name and then match the table/view from the current
/// database. Note that, the global temporary view database is also valid here.
/// - Returns: A ``DataFrame`` instance.
public func table(_ tableName: String) async throws -> DataFrame {
return await read.table(tableName)
}

/// Executes some code block and prints to stdout the time taken to execute the block.
/// - Parameter f: A function to execute.
/// - Returns: The result of the executed code.
Expand Down
1 change: 1 addition & 0 deletions Sources/SparkConnect/TypeAliases.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ typealias Filter = Spark_Connect_Filter
typealias KeyValue = Spark_Connect_KeyValue
typealias Limit = Spark_Connect_Limit
typealias MapType = Spark_Connect_DataType.Map
typealias NamedTable = Spark_Connect_Read.NamedTable
typealias OneOf_Analyze = AnalyzePlanRequest.OneOf_Analyze
typealias Plan = Spark_Connect_Plan
typealias Project = Spark_Connect_Project
Expand Down
10 changes: 10 additions & 0 deletions Tests/SparkConnectTests/DataFrameReaderTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,14 @@ struct DataFrameReaderTests {
#expect(try await spark.read.parquet(path, path).count() == 4)
await spark.stop()
}

@Test
func table() async throws {
let tableName = UUID().uuidString.replacingOccurrences(of: "-", with: "")
let spark = try await SparkSession.builder.getOrCreate()
#expect(try await spark.sql("CREATE TABLE \(tableName) AS VALUES (1), (2)").count() == 0)
#expect(try await spark.read.table(tableName).count() == 2)
#expect(try await spark.sql("DROP TABLE \(tableName)").count() == 0)
await spark.stop()
}
}
10 changes: 10 additions & 0 deletions Tests/SparkConnectTests/SparkSessionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ struct SparkSessionTests {
await spark.stop()
}

@Test
func table() async throws {
let tableName = UUID().uuidString.replacingOccurrences(of: "-", with: "")
let spark = try await SparkSession.builder.getOrCreate()
#expect(try await spark.sql("CREATE TABLE \(tableName) AS VALUES (1), (2)").count() == 0)
Copy link
Member

@viirya viirya Apr 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to also test view currently?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review. For views, there are multiple variants like Permanent View, Temporary View, Global Temporary view. So, I postpone the test coverage until now. However, it will be added soon because I'm implementing Catalog, @viirya .

#expect(try await spark.table(tableName).count() == 2)
#expect(try await spark.sql("DROP TABLE \(tableName)").count() == 0)
await spark.stop()
}

@Test
func time() async throws {
let spark = try await SparkSession.builder.getOrCreate()
Expand Down
Loading