Skip to content

Commit b828523

Browse files
committed
[SPARK-52168] Support to for DataFrame
1 parent 2c2a5f1 commit b828523

File tree

3 files changed

+45
-0
lines changed

3 files changed

+45
-0
lines changed

Sources/SparkConnect/DataFrame.swift

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ import Synchronization
9494
/// - ``show(_:_:_:)``
9595
///
9696
/// ### Transformation Operations
97+
/// - ``to(_:)``
9798
/// - ``toDF(_:)``
9899
/// - ``toJSON()``
99100
/// - ``select(_:)``
@@ -478,6 +479,19 @@ public actor DataFrame: Sendable {
478479
return df
479480
}
480481

482+
/// Returns a new DataFrame where each row is reconciled to match the specified schema.
483+
/// - Parameter schema: The given schema.
484+
/// - Returns: A ``DataFrame`` with the given schema.
485+
public func to(_ schema: String) async throws -> DataFrame {
486+
// Validate by parsing.
487+
do {
488+
let dataType = try await sparkSession.client.ddlParse(schema)
489+
return DataFrame(spark: self.spark, plan: SparkConnectClient.getToSchema(self.plan.root, dataType))
490+
} catch {
491+
throw SparkConnectError.InvalidTypeException
492+
}
493+
}
494+
481495
/// Returns the content of the Dataset as a Dataset of JSON strings.
482496
/// - Returns: A ``DataFrame`` with a single string column whose content is JSON.
483497
public func toJSON() -> DataFrame {

Sources/SparkConnect/SparkConnectClient.swift

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,17 @@ public actor SparkConnectClient {
505505
return plan
506506
}
507507

508+
static func getToSchema(_ child: Relation, _ schema: Spark_Connect_DataType) -> Plan {
509+
var toSchema = Spark_Connect_ToSchema()
510+
toSchema.input = child
511+
toSchema.schema = schema
512+
var relation = Relation()
513+
relation.toSchema = toSchema
514+
var plan = Plan()
515+
plan.opType = .root(relation)
516+
return plan
517+
}
518+
508519
static func getProjectExprs(_ child: Relation, _ exprs: [String]) -> Plan {
509520
var project = Project()
510521
project.input = child

Tests/SparkConnectTests/DataFrameTests.swift

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,26 @@ struct DataFrameTests {
201201
await spark.stop()
202202
}
203203

204+
@Test
205+
func to() async throws {
206+
let spark = try await SparkSession.builder.getOrCreate()
207+
208+
let schema1 = try await spark.range(1).to("shortID SHORT").schema
209+
#expect(
210+
schema1
211+
== #"{"struct":{"fields":[{"name":"shortID","dataType":{"short":{}},"nullable":true}]}}"#
212+
)
213+
214+
let schema2 = try await spark.sql("SELECT '1'").to("id INT").schema
215+
print(schema2)
216+
#expect(
217+
schema2
218+
== #"{"struct":{"fields":[{"name":"id","dataType":{"integer":{}},"nullable":true}]}}"#
219+
)
220+
221+
await spark.stop()
222+
}
223+
204224
@Test
205225
func selectMultipleColumns() async throws {
206226
let spark = try await SparkSession.builder.getOrCreate()

0 commit comments

Comments
 (0)