Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Sources/SparkConnect/DataFrame.swift
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,13 @@ public actor DataFrame: Sendable {
return DataFrame(spark: self.spark, plan: SparkConnectClient.getLimit(self.plan.root, n))
}

/// Returns a new Dataset by skipping the first `n` rows.
/// - Parameter n: Number of records to skip.
/// - Returns: A subset of the records
public func offset(_ n: Int32) -> DataFrame {
return DataFrame(spark: self.spark, plan: SparkConnectClient.getOffset(self.plan.root, n))
}

/// Returns a new ``Dataset`` by sampling a fraction of rows, using a user-supplied seed.
/// - Parameters:
/// - withReplacement: Sample with replacement or not.
Expand Down
11 changes: 11 additions & 0 deletions Sources/SparkConnect/SparkConnectClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,17 @@ public actor SparkConnectClient {
return plan
}

static func getOffset(_ child: Relation, _ n: Int32) -> Plan {
var offset = Spark_Connect_Offset()
offset.input = child
offset.offset = n
var relation = Relation()
relation.offset = offset
var plan = Plan()
plan.opType = .root(relation)
return plan
}

static func getSample(_ child: Relation, _ withReplacement: Bool, _ fraction: Double, _ seed: Int64) -> Plan {
var sample = Sample()
sample.input = child
Expand Down
10 changes: 10 additions & 0 deletions Tests/SparkConnectTests/DataFrameTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,16 @@ struct DataFrameTests {
await spark.stop()
}

@Test
func offset() async throws {
let spark = try await SparkSession.builder.getOrCreate()
#expect(try await spark.range(10).offset(0).count() == 10)
#expect(try await spark.range(10).offset(1).count() == 9)
#expect(try await spark.range(10).offset(2).count() == 8)
#expect(try await spark.range(10).offset(15).count() == 0)
await spark.stop()
}

@Test
func sample() async throws {
let spark = try await SparkSession.builder.getOrCreate()
Expand Down
Loading