Skip to content

Commit 21724f5

Browse files
committed
[SPARK-51809] Support offset in DataFrame
### What changes were proposed in this pull request? This PR aims to support `offset` API in `DataFrame`. ### Why are the changes needed? For feature parity. ### Does this PR introduce _any_ user-facing change? No, this is a new addition. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #62 from dongjoon-hyun/SPARK-51809. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 7bbc493 commit 21724f5

File tree

3 files changed

+28
-0
lines changed

3 files changed

+28
-0
lines changed

Sources/SparkConnect/DataFrame.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,13 @@ public actor DataFrame: Sendable {
330330
return DataFrame(spark: self.spark, plan: SparkConnectClient.getLimit(self.plan.root, n))
331331
}
332332

333+
/// Returns a new Dataset by skipping the first `n` rows.
334+
/// - Parameter n: Number of rows to skip.
335+
/// - Returns: A subset of the rows
336+
public func offset(_ n: Int32) -> DataFrame {
337+
return DataFrame(spark: self.spark, plan: SparkConnectClient.getOffset(self.plan.root, n))
338+
}
339+
333340
/// Returns a new ``Dataset`` by sampling a fraction of rows, using a user-supplied seed.
334341
/// - Parameters:
335342
/// - withReplacement: Sample with replacement or not.

Sources/SparkConnect/SparkConnectClient.swift

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,17 @@ public actor SparkConnectClient {
397397
return plan
398398
}
399399

400+
static func getOffset(_ child: Relation, _ n: Int32) -> Plan {
401+
var offset = Spark_Connect_Offset()
402+
offset.input = child
403+
offset.offset = n
404+
var relation = Relation()
405+
relation.offset = offset
406+
var plan = Plan()
407+
plan.opType = .root(relation)
408+
return plan
409+
}
410+
400411
static func getSample(_ child: Relation, _ withReplacement: Bool, _ fraction: Double, _ seed: Int64) -> Plan {
401412
var sample = Sample()
402413
sample.input = child

Tests/SparkConnectTests/DataFrameTests.swift

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,16 @@ struct DataFrameTests {
218218
await spark.stop()
219219
}
220220

221+
@Test
222+
func offset() async throws {
223+
let spark = try await SparkSession.builder.getOrCreate()
224+
#expect(try await spark.range(10).offset(0).count() == 10)
225+
#expect(try await spark.range(10).offset(1).count() == 9)
226+
#expect(try await spark.range(10).offset(2).count() == 8)
227+
#expect(try await spark.range(10).offset(15).count() == 0)
228+
await spark.stop()
229+
}
230+
221231
@Test
222232
func sample() async throws {
223233
let spark = try await SparkSession.builder.getOrCreate()

0 commit comments

Comments
 (0)