Skip to content

Commit d069034

Browse files
committed
[SPARK-51912] Support semanticHash and sameSemantics in DataFrame
### What changes were proposed in this pull request? This PR aims to support `semanticHash` and `sameSemantics` in `DataFrame`. ### Why are the changes needed? For feature parity. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#89 from dongjoon-hyun/SPARK-51912. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 2fb55a0 commit d069034

File tree

4 files changed

+62
-0
lines changed

4 files changed

+62
-0
lines changed

Sources/SparkConnect/DataFrame.swift

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,20 @@ public actor DataFrame: Sendable {
468468
}
469469
}
470470

471+
/// Returns a `hashCode` of the logical query plan against this ``DataFrame``.
472+
/// - Returns: A hashcode value.
473+
public func semanticHash() async throws -> Int32 {
474+
return try await self.spark.semanticHash(self.plan)
475+
}
476+
477+
/// Returns `true` when the logical query plans inside both ``Dataset``s are equal and therefore
478+
/// return same results.
479+
/// - Parameter other: A ``DataFrame`` to compare.
480+
/// - Returns: Whether the both logical plans are equal.
481+
public func sameSemantics(other: DataFrame) async throws -> Bool {
482+
return try await self.spark.sameSemantics(self.plan, other.getPlan() as! Plan)
483+
}
484+
471485
/// Prints the physical plan to the console for debugging purposes.
472486
public func explain() async throws {
473487
try await explain("simple")

Sources/SparkConnect/SparkConnectClient.swift

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,33 @@ public actor SparkConnectClient {
568568
}
569569
}
570570

571+
func sameSemantics(_ plan: Plan, _ otherPlan: Plan) async throws -> Bool {
572+
try await withGPRC { client in
573+
let service = SparkConnectService.Client(wrapping: client)
574+
let request = analyze(self.sessionID!, {
575+
var sameSemantics = AnalyzePlanRequest.SameSemantics()
576+
sameSemantics.targetPlan = plan
577+
sameSemantics.otherPlan = otherPlan
578+
return OneOf_Analyze.sameSemantics(sameSemantics)
579+
})
580+
let response = try await service.analyzePlan(request)
581+
return response.sameSemantics.result
582+
}
583+
}
584+
585+
func semanticHash(_ plan: Plan) async throws -> Int32 {
586+
try await withGPRC { client in
587+
let service = SparkConnectService.Client(wrapping: client)
588+
let request = analyze(self.sessionID!, {
589+
var semanticHash = AnalyzePlanRequest.SemanticHash()
590+
semanticHash.plan = plan
591+
return OneOf_Analyze.semanticHash(semanticHash)
592+
})
593+
let response = try await service.analyzePlan(request)
594+
return response.semanticHash.result
595+
}
596+
}
597+
571598
static func getJoin(
572599
_ left: Relation, _ right: Relation, _ joinType: JoinType,
573600
joinCondition: String? = nil, usingColumns: [String]? = nil

Sources/SparkConnect/SparkSession.swift

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,14 @@ public actor SparkSession {
171171
await client.clearTags()
172172
}
173173

174+
func sameSemantics(_ plan: Plan, _ otherPlan: Plan) async throws -> Bool {
175+
return try await client.sameSemantics(plan, otherPlan)
176+
}
177+
178+
func semanticHash(_ plan: Plan) async throws -> Int32 {
179+
return try await client.semanticHash(plan)
180+
}
181+
174182
/// This is defined as the return type of `SparkSession.sparkContext` method.
175183
/// This is an empty `Struct` type because `sparkContext` method is designed to throw
176184
/// `UNSUPPORTED_CONNECT_FEATURE.SESSION_SPARK_CONTEXT`.

Tests/SparkConnectTests/DataFrameTests.swift

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,19 @@ struct DataFrameTests {
124124
await spark.stop()
125125
}
126126

127+
@Test
128+
func sameSemantics() async throws {
129+
let spark = try await SparkSession.builder.getOrCreate()
130+
let df1 = try await spark.range(1)
131+
let df2 = try await spark.range(1)
132+
let df3 = try await spark.range(2)
133+
#expect(try await df1.sameSemantics(other: df2))
134+
#expect(try await df1.semanticHash() == df2.semanticHash())
135+
#expect(try await df1.sameSemantics(other: df3) == false)
136+
#expect(try await df1.semanticHash() != df3.semanticHash())
137+
await spark.stop()
138+
}
139+
127140
@Test
128141
func explain() async throws {
129142
let spark = try await SparkSession.builder.getOrCreate()

0 commit comments

Comments
 (0)