From 614f89479533abfe2f0af157652a958a8ede4856 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 15 Apr 2025 22:43:00 +0900 Subject: [PATCH 1/2] [SPARK-51809] Support `offset` in `DataFrame` --- Sources/SparkConnect/DataFrame.swift | 7 +++++++ Sources/SparkConnect/SparkConnectClient.swift | 11 +++++++++++ Tests/SparkConnectTests/DataFrameTests.swift | 10 ++++++++++ 3 files changed, 28 insertions(+) diff --git a/Sources/SparkConnect/DataFrame.swift b/Sources/SparkConnect/DataFrame.swift index 0c93234..8f7ed01 100644 --- a/Sources/SparkConnect/DataFrame.swift +++ b/Sources/SparkConnect/DataFrame.swift @@ -330,6 +330,13 @@ public actor DataFrame: Sendable { return DataFrame(spark: self.spark, plan: SparkConnectClient.getLimit(self.plan.root, n)) } + /// Returns a new Dataset by skipping the first `n` rows. + /// - Parameter n: Number of records to skip. + /// - Returns: A subset of the records + public func offset(_ n: Int32) -> DataFrame { + return DataFrame(spark: self.spark, plan: SparkConnectClient.getOffset(self.plan.root, n)) + } + /// Returns a new ``Dataset`` by sampling a fraction of rows, using a user-supplied seed. /// - Parameters: /// - withReplacement: Sample with replacement or not. diff --git a/Sources/SparkConnect/SparkConnectClient.swift b/Sources/SparkConnect/SparkConnectClient.swift index d76f533..904e76e 100644 --- a/Sources/SparkConnect/SparkConnectClient.swift +++ b/Sources/SparkConnect/SparkConnectClient.swift @@ -397,6 +397,17 @@ public actor SparkConnectClient { return plan } + static func getOffset(_ child: Relation, _ n: Int32) -> Plan { + var offset = Spark_Connect_Offset() + offset.input = child + offset.offset = n + var relation = Relation() + relation.offset = offset + var plan = Plan() + plan.opType = .root(relation) + return plan + } + static func getSample(_ child: Relation, _ withReplacement: Bool, _ fraction: Double, _ seed: Int64) -> Plan { var sample = Sample() sample.input = child diff --git a/Tests/SparkConnectTests/DataFrameTests.swift b/Tests/SparkConnectTests/DataFrameTests.swift index aee0c93..b9c927d 100644 --- a/Tests/SparkConnectTests/DataFrameTests.swift +++ b/Tests/SparkConnectTests/DataFrameTests.swift @@ -218,6 +218,16 @@ struct DataFrameTests { await spark.stop() } + @Test + func offset() async throws { + let spark = try await SparkSession.builder.getOrCreate() + #expect(try await spark.range(10).offset(0).count() == 10) + #expect(try await spark.range(10).offset(1).count() == 9) + #expect(try await spark.range(10).offset(2).count() == 8) + #expect(try await spark.range(10).offset(15).count() == 0) + await spark.stop() + } + @Test func sample() async throws { let spark = try await SparkSession.builder.getOrCreate() From b5212ba03729fdc8065540b939d64841a652e2d7 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 16 Apr 2025 07:57:59 +0900 Subject: [PATCH 2/2] Update Sources/SparkConnect/DataFrame.swift Co-authored-by: Liang-Chi Hsieh --- Sources/SparkConnect/DataFrame.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/SparkConnect/DataFrame.swift b/Sources/SparkConnect/DataFrame.swift index 8f7ed01..be5c5e7 100644 --- a/Sources/SparkConnect/DataFrame.swift +++ b/Sources/SparkConnect/DataFrame.swift @@ -331,8 +331,8 @@ public actor DataFrame: Sendable { } /// Returns a new Dataset by skipping the first `n` rows. - /// - Parameter n: Number of records to skip. - /// - Returns: A subset of the records + /// - Parameter n: Number of rows to skip. + /// - Returns: A subset of the rows public func offset(_ n: Int32) -> DataFrame { return DataFrame(spark: self.spark, plan: SparkConnectClient.getOffset(self.plan.root, n)) }