File tree Expand file tree Collapse file tree 4 files changed +48
-0
lines changed Expand file tree Collapse file tree 4 files changed +48
-0
lines changed Original file line number Diff line number Diff line change @@ -297,6 +297,21 @@ public actor DataFrame: Sendable {
297
297
return DataFrame ( spark: self . spark, plan: SparkConnectClient . getLimit ( self . plan. root, n) )
298
298
}
299
299
300
+ /// Returns the first `n` rows.
301
+ /// - Parameter n: The number of rows. (default: 1)
302
+ /// - Returns: ``[[String?]]``
303
+ public func head( _ n: Int32 = 1 ) async throws -> [ [ String ? ] ] {
304
+ return try await limit ( n) . collect ( )
305
+ }
306
+
307
+ /// Returns the last `n` rows.
308
+ /// - Parameter n: The number of rows.
309
+ /// - Returns: ``[[String?]]``
310
+ public func tail( _ n: Int32 ) async throws -> [ [ String ? ] ] {
311
+ let lastN = DataFrame ( spark: spark, plan: SparkConnectClient . getTail ( self . plan. root, n) )
312
+ return try await lastN. collect ( )
313
+ }
314
+
300
315
/// Checks if the ``DataFrame`` is empty and returns a boolean value.
301
316
/// - Returns: `true` if the ``DataFrame`` is empty, `false` otherwise.
302
317
public func isEmpty( ) async throws -> Bool {
Original file line number Diff line number Diff line change @@ -373,6 +373,17 @@ public actor SparkConnectClient {
373
373
return plan
374
374
}
375
375
376
+ static func getTail( _ child: Relation , _ n: Int32 ) -> Plan {
377
+ var tail = Tail ( )
378
+ tail. input = child
379
+ tail. limit = n
380
+ var relation = Relation ( )
381
+ relation. tail = tail
382
+ var plan = Plan ( )
383
+ plan. opType = . root( relation)
384
+ return plan
385
+ }
386
+
376
387
var result : [ ExecutePlanResponse ] = [ ]
377
388
private func addResponse( _ response: ExecutePlanResponse ) {
378
389
self . result. append ( response)
Original file line number Diff line number Diff line change @@ -41,6 +41,7 @@ typealias SaveMode = Spark_Connect_WriteOperation.SaveMode
41
41
typealias SparkConnectService = Spark_Connect_SparkConnectService
42
42
typealias Sort = Spark_Connect_Sort
43
43
typealias StructType = Spark_Connect_DataType . Struct
44
+ typealias Tail = Spark_Connect_Tail
44
45
typealias UserContext = Spark_Connect_UserContext
45
46
typealias UnresolvedAttribute = Spark_Connect_Expression . UnresolvedAttribute
46
47
typealias WriteOperation = Spark_Connect_WriteOperation
Original file line number Diff line number Diff line change @@ -247,6 +247,27 @@ struct DataFrameTests {
247
247
await spark. stop ( )
248
248
}
249
249
250
+ @Test
251
+ func head( ) async throws {
252
+ let spark = try await SparkSession . builder. getOrCreate ( )
253
+ #expect( try await spark. range ( 0 ) . head ( ) . isEmpty)
254
+ #expect( try await spark. range ( 2 ) . sort ( " id " ) . head ( ) == [ [ " 0 " ] ] )
255
+ #expect( try await spark. range ( 2 ) . sort ( " id " ) . head ( 1 ) == [ [ " 0 " ] ] )
256
+ #expect( try await spark. range ( 2 ) . sort ( " id " ) . head ( 2 ) == [ [ " 0 " ] , [ " 1 " ] ] )
257
+ #expect( try await spark. range ( 2 ) . sort ( " id " ) . head ( 3 ) == [ [ " 0 " ] , [ " 1 " ] ] )
258
+ await spark. stop ( )
259
+ }
260
+
261
+ @Test
262
+ func tail( ) async throws {
263
+ let spark = try await SparkSession . builder. getOrCreate ( )
264
+ #expect( try await spark. range ( 0 ) . tail ( 1 ) . isEmpty)
265
+ #expect( try await spark. range ( 2 ) . sort ( " id " ) . tail ( 1 ) == [ [ " 1 " ] ] )
266
+ #expect( try await spark. range ( 2 ) . sort ( " id " ) . tail ( 2 ) == [ [ " 0 " ] , [ " 1 " ] ] )
267
+ #expect( try await spark. range ( 2 ) . sort ( " id " ) . tail ( 3 ) == [ [ " 0 " ] , [ " 1 " ] ] )
268
+ await spark. stop ( )
269
+ }
270
+
250
271
@Test
251
272
func show( ) async throws {
252
273
let spark = try await SparkSession . builder. getOrCreate ( )
You can’t perform that action at this time.
0 commit comments