Skip to content

Commit ccaa92b

Browse files
committed
[SPARK-51995] Support toDF, distinct and dropDuplicates(WithinWatermark)? in DataFrame
### What changes were proposed in this pull request? This PR aims to support the following APIs in `DataFrame`. - `toDF` - `distinct` - `dropDuplicates` - `dropDuplicatesWithinWatermark` ### Why are the changes needed? For feature parity. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #111 from dongjoon-hyun/SPARK-51995. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent feddf09 commit ccaa92b

File tree

3 files changed

+96
-1
lines changed

3 files changed

+96
-1
lines changed

Sources/SparkConnect/DataFrame.swift

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ import Synchronization
9191
/// - ``show(_:_:_:)``
9292
///
9393
/// ### Transformation Operations
94+
/// - ``toDF(_:)``
9495
/// - ``select(_:)``
9596
/// - ``selectExpr(_:)``
9697
/// - ``filter(_:)``
@@ -100,6 +101,9 @@ import Synchronization
100101
/// - ``limit(_:)``
101102
/// - ``offset(_:)``
102103
/// - ``drop(_:)``
104+
/// - ``dropDuplicates(_:)``
105+
/// - ``dropDuplicatesWithinWatermark(_:)``
106+
/// - ``distinct()``
103107
/// - ``withColumnRenamed(_:_:)``
104108
///
105109
/// ### Join Operations
@@ -440,13 +444,25 @@ public actor DataFrame: Sendable {
440444
return DataFrame(spark: self.spark, plan: plan)
441445
}
442446

443-
/// Projects a set of expressions and returns a new ``DataFrame``.
447+
/// Selects a subset of existing columns using column names.
444448
/// - Parameter cols: Column names
445449
/// - Returns: A ``DataFrame`` with subset of columns.
446450
public func select(_ cols: String...) -> DataFrame {
447451
return DataFrame(spark: self.spark, plan: SparkConnectClient.getProject(self.plan.root, cols))
448452
}
449453

454+
/// Selects a subset of existing columns using column names.
455+
/// - Parameter cols: Column names
456+
/// - Returns: A ``DataFrame`` with subset of columns.
457+
public func toDF(_ cols: String...) -> DataFrame {
458+
let df = if cols.isEmpty {
459+
DataFrame(spark: self.spark, plan: self.plan)
460+
} else {
461+
DataFrame(spark: self.spark, plan: SparkConnectClient.getProject(self.plan.root, cols))
462+
}
463+
return df
464+
}
465+
450466
/// Projects a set of expressions and returns a new ``DataFrame``.
451467
/// - Parameter exprs: Expression strings
452468
/// - Returns: A ``DataFrame`` with subset of columns.
@@ -461,6 +477,24 @@ public actor DataFrame: Sendable {
461477
return DataFrame(spark: self.spark, plan: SparkConnectClient.getDrop(self.plan.root, cols))
462478
}
463479

480+
/// Returns a new ``DataFrame`` that contains only the unique rows from this ``DataFrame``.
481+
/// This is an alias for `distinct`. If column names are given, Spark considers only those columns.
482+
/// - Parameter cols: Column names
483+
/// - Returns: A ``DataFrame``.
484+
public func dropDuplicates(_ cols: String...) -> DataFrame {
485+
let plan = SparkConnectClient.getDropDuplicates(self.plan.root, cols, withinWatermark: false)
486+
return DataFrame(spark: self.spark, plan: plan)
487+
}
488+
489+
/// Returns a new Dataset with duplicates rows removed, within watermark.
490+
/// If column names are given, Spark considers only those columns.
491+
/// - Parameter cols: Column names
492+
/// - Returns: A ``DataFrame``.
493+
public func dropDuplicatesWithinWatermark(_ cols: String...) -> DataFrame {
494+
let plan = SparkConnectClient.getDropDuplicates(self.plan.root, cols, withinWatermark: true)
495+
return DataFrame(spark: self.spark, plan: plan)
496+
}
497+
464498
/// Returns a new Dataset with a column renamed. This is a no-op if schema doesn't contain existingName.
465499
/// - Parameters:
466500
/// - existingName: A existing column name to be renamed.
@@ -1108,6 +1142,13 @@ public actor DataFrame: Sendable {
11081142
return buildRepartition(numPartitions: numPartitions, shuffle: false)
11091143
}
11101144

1145+
/// Returns a new ``Dataset`` that contains only the unique rows from this ``Dataset``.
1146+
/// This is an alias for `dropDuplicates`.
1147+
/// - Returns: A `DataFrame`.
1148+
public func distinct() -> DataFrame {
1149+
return dropDuplicates()
1150+
}
1151+
11111152
/// Groups the DataFrame using the specified columns.
11121153
///
11131154
/// This method is used to perform aggregations on groups of data.

Sources/SparkConnect/SparkConnectClient.swift

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,25 @@ public actor SparkConnectClient {
455455
return plan
456456
}
457457

458+
static func getDropDuplicates(
459+
_ child: Relation,
460+
_ columnNames: [String],
461+
withinWatermark: Bool = false
462+
) -> Plan {
463+
var deduplicate = Spark_Connect_Deduplicate()
464+
deduplicate.input = child
465+
if columnNames.isEmpty {
466+
deduplicate.allColumnsAsKeys = true
467+
} else {
468+
deduplicate.columnNames = columnNames
469+
}
470+
var relation = Relation()
471+
relation.deduplicate = deduplicate
472+
var plan = Plan()
473+
plan.opType = .root(relation)
474+
return plan
475+
}
476+
458477
static func getSort(_ child: Relation, _ cols: [String]) -> Plan {
459478
var sort = Sort()
460479
sort.input = child

Tests/SparkConnectTests/DataFrameTests.swift

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ struct DataFrameTests {
183183
@Test
184184
func select() async throws {
185185
let spark = try await SparkSession.builder.getOrCreate()
186+
#expect(try await spark.range(1).select().columns.isEmpty)
186187
let schema = try await spark.range(1).select("id").schema
187188
#expect(
188189
schema
@@ -191,6 +192,14 @@ struct DataFrameTests {
191192
await spark.stop()
192193
}
193194

195+
@Test
196+
func toDF() async throws {
197+
let spark = try await SparkSession.builder.getOrCreate()
198+
#expect(try await spark.range(1).toDF().columns == ["id"])
199+
#expect(try await spark.range(1).toDF("id").columns == ["id"])
200+
await spark.stop()
201+
}
202+
194203
@Test
195204
func selectMultipleColumns() async throws {
196205
let spark = try await SparkSession.builder.getOrCreate()
@@ -647,6 +656,32 @@ struct DataFrameTests {
647656
await spark.stop()
648657
}
649658

659+
@Test
660+
func distinct() async throws {
661+
let spark = try await SparkSession.builder.getOrCreate()
662+
let df = try await spark.sql("SELECT * FROM VALUES (1), (2), (3), (1), (3) T(a)")
663+
#expect(try await df.distinct().count() == 3)
664+
await spark.stop()
665+
}
666+
667+
@Test
668+
func dropDuplicates() async throws {
669+
let spark = try await SparkSession.builder.getOrCreate()
670+
let df = try await spark.sql("SELECT * FROM VALUES (1), (2), (3), (1), (3) T(a)")
671+
#expect(try await df.dropDuplicates().count() == 3)
672+
#expect(try await df.dropDuplicates("a").count() == 3)
673+
await spark.stop()
674+
}
675+
676+
@Test
677+
func dropDuplicatesWithinWatermark() async throws {
678+
let spark = try await SparkSession.builder.getOrCreate()
679+
let df = try await spark.sql("SELECT * FROM VALUES (1), (2), (3), (1), (3) T(a)")
680+
#expect(try await df.dropDuplicatesWithinWatermark().count() == 3)
681+
#expect(try await df.dropDuplicatesWithinWatermark("a").count() == 3)
682+
await spark.stop()
683+
}
684+
650685
@Test
651686
func groupBy() async throws {
652687
let spark = try await SparkSession.builder.getOrCreate()

0 commit comments

Comments
 (0)