Skip to content

Commit e346c9e

Browse files
committed
[SPARK-52182] Support withWatermark for DataFrame
### What changes were proposed in this pull request? This PR aims to ad `withWatermark` for `DataFrame`. ### Why are the changes needed? For feature parity. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #158 from dongjoon-hyun/SPARK-52182. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent aa61fff commit e346c9e

File tree

3 files changed

+47
-0
lines changed

3 files changed

+47
-0
lines changed

Sources/SparkConnect/DataFrame.swift

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ import Synchronization
117117
/// - ``transpose()``
118118
/// - ``transpose(_:)``
119119
/// - ``hint(_:_:)``
120+
/// - ``withWatermark(_:_:)``
120121
///
121122
/// ### Join Operations
122123
/// - ``join(_:)``
@@ -1444,6 +1445,19 @@ public actor DataFrame: Sendable {
14441445
try await checkpoint(eager, false, storageLevel)
14451446
}
14461447

1448+
/// Defines an event time watermark for this ``DataFrame``. A watermark tracks a point in time
1449+
/// before which we assume no more late data is going to arrive.
1450+
/// - Parameters:
1451+
/// - eventTime: the name of the column that contains the event time of the row.
1452+
/// - delayThreshold: the minimum delay to wait to data to arrive late, relative to
1453+
/// the latest record that has been processed in the form of an interval (e.g. "1 minute" or "5 hours").
1454+
/// NOTE: This should not be negative.
1455+
/// - Returns: A ``DataFrame`` instance.
1456+
public func withWatermark(_ eventTime: String, _ delayThreshold: String) -> DataFrame {
1457+
let plan = SparkConnectClient.getWithWatermark(self.plan.root, eventTime, delayThreshold)
1458+
return DataFrame(spark: self.spark, plan: plan)
1459+
}
1460+
14471461
/// Returns a ``DataFrameWriter`` that can be used to write non-streaming data.
14481462
public var write: DataFrameWriter {
14491463
get {

Sources/SparkConnect/SparkConnectClient.swift

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1022,6 +1022,23 @@ public actor SparkConnectClient {
10221022
return plan
10231023
}
10241024

1025+
static func getWithWatermark(
1026+
_ child: Relation,
1027+
_ eventTime: String,
1028+
_ delayThreshold: String
1029+
) -> Plan {
1030+
var withWatermark = Spark_Connect_WithWatermark()
1031+
withWatermark.input = child
1032+
withWatermark.eventTime = eventTime
1033+
withWatermark.delayThreshold = delayThreshold
1034+
1035+
var relation = Relation()
1036+
relation.withWatermark = withWatermark
1037+
var plan = Plan()
1038+
plan.opType = .root(relation)
1039+
return plan
1040+
}
1041+
10251042
func createTempView(
10261043
_ child: Relation, _ viewName: String, replace: Bool, isGlobal: Bool
10271044
) async throws {

Tests/SparkConnectTests/DataFrameTests.swift

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -744,6 +744,22 @@ struct DataFrameTests {
744744
await spark.stop()
745745
}
746746

747+
@Test
748+
func withWatermark() async throws {
749+
let spark = try await SparkSession.builder.getOrCreate()
750+
let df = try await spark
751+
.sql("""
752+
SELECT * FROM VALUES
753+
(1, now()),
754+
(1, now() - INTERVAL 1 HOUR),
755+
(1, now() - INTERVAL 2 HOUR)
756+
T(data, eventTime)
757+
""")
758+
.withWatermark("eventTime", "1 minute") // This tests only API for now
759+
#expect(try await df.dropDuplicatesWithinWatermark("data").count() == 1)
760+
await spark.stop()
761+
}
762+
747763
@Test
748764
func describe() async throws {
749765
let spark = try await SparkSession.builder.getOrCreate()

0 commit comments

Comments
 (0)