Skip to content

Commit c386e26

Browse files
committed
[SPARK-52168] Support to for DataFrame
### What changes were proposed in this pull request? This PR aims to support `to` for `DataFrame`. ### Why are the changes needed? For feature parity. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #156 from dongjoon-hyun/SPARK-52168. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 03c2f45 commit c386e26

File tree

3 files changed

+45
-0
lines changed

3 files changed

+45
-0
lines changed

Sources/SparkConnect/DataFrame.swift

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ import Synchronization
9494
/// - ``show(_:_:_:)``
9595
///
9696
/// ### Transformation Operations
97+
/// - ``to(_:)``
9798
/// - ``toDF(_:)``
9899
/// - ``toJSON()``
99100
/// - ``select(_:)``
@@ -479,6 +480,19 @@ public actor DataFrame: Sendable {
479480
return df
480481
}
481482

483+
/// Returns a new DataFrame where each row is reconciled to match the specified schema.
484+
/// - Parameter schema: The given schema.
485+
/// - Returns: A ``DataFrame`` with the given schema.
486+
public func to(_ schema: String) async throws -> DataFrame {
487+
// Validate by parsing.
488+
do {
489+
let dataType = try await sparkSession.client.ddlParse(schema)
490+
return DataFrame(spark: self.spark, plan: SparkConnectClient.getToSchema(self.plan.root, dataType))
491+
} catch {
492+
throw SparkConnectError.InvalidTypeException
493+
}
494+
}
495+
482496
/// Returns the content of the Dataset as a Dataset of JSON strings.
483497
/// - Returns: A ``DataFrame`` with a single string column whose content is JSON.
484498
public func toJSON() -> DataFrame {

Sources/SparkConnect/SparkConnectClient.swift

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,17 @@ public actor SparkConnectClient {
505505
return plan
506506
}
507507

508+
static func getToSchema(_ child: Relation, _ schema: Spark_Connect_DataType) -> Plan {
509+
var toSchema = Spark_Connect_ToSchema()
510+
toSchema.input = child
511+
toSchema.schema = schema
512+
var relation = Relation()
513+
relation.toSchema = toSchema
514+
var plan = Plan()
515+
plan.opType = .root(relation)
516+
return plan
517+
}
518+
508519
static func getProjectExprs(_ child: Relation, _ exprs: [String]) -> Plan {
509520
var project = Project()
510521
project.input = child

Tests/SparkConnectTests/DataFrameTests.swift

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,26 @@ struct DataFrameTests {
201201
await spark.stop()
202202
}
203203

204+
@Test
205+
func to() async throws {
206+
let spark = try await SparkSession.builder.getOrCreate()
207+
208+
let schema1 = try await spark.range(1).to("shortID SHORT").schema
209+
#expect(
210+
schema1
211+
== #"{"struct":{"fields":[{"name":"shortID","dataType":{"short":{}},"nullable":true}]}}"#
212+
)
213+
214+
let schema2 = try await spark.sql("SELECT '1'").to("id INT").schema
215+
print(schema2)
216+
#expect(
217+
schema2
218+
== #"{"struct":{"fields":[{"name":"id","dataType":{"integer":{}},"nullable":true}]}}"#
219+
)
220+
221+
await spark.stop()
222+
}
223+
204224
@Test
205225
func selectMultipleColumns() async throws {
206226
let spark = try await SparkSession.builder.getOrCreate()

0 commit comments

Comments
 (0)