Skip to content

Commit 66ff752

Browse files
ericm-dbdongjoon-hyun
authored andcommitted
[SPARK-52008][FOLLOWUP] Fixing StateStoreCoordinator warn compilation messages from batch commit tracking
### What changes were proposed in this pull request? Changing the empty `context.reply()` in the StateStoreCoordinator to use `context.reply(())` ### Why are the changes needed? This PR: #51706 introduced the following `warn` compilation messages, ``` [warn] /home/runner/work/spark/spark/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:397:22: adaptation of an empty argument list by inserting () is deprecated: leaky (Object-receiving) target makes this especially dangerous [warn] signature: RpcCallContext.reply(response: Any): Unit [warn] given arguments: <none> [warn] after adaptation: RpcCallContext.reply((): Unit) [warn] Applicable -Wconf / nowarn filters for this warning: msg=<part of the message>, cat=deprecation, site=org.apache.spark.sql.execution.streaming.state.StateStoreCoordinator.receiveAndReply, origin=org.apache.spark.rpc.RpcCallContext.reply, version=2.11.0 [warn] context.reply() [warn] ^ [warn] /home/runner/work/spark/spark/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:407:24: adaptation of an empty argument list by inserting () is deprecated: leaky (Object-receiving) target makes this especially dangerous [warn] signature: RpcCallContext.reply(response: Any): Unit [warn] given arguments: <none> [warn] after adaptation: RpcCallContext.reply((): Unit) [warn] Applicable -Wconf / nowarn filters for this warning: msg=<part of the message>, cat=deprecation, site=org.apache.spark.sql.execution.streaming.state.StateStoreCoordinator.receiveAndReply, origin=org.apache.spark.rpc.RpcCallContext.reply, version=2.11.0 [warn] context.reply() [warn] ^ [warn] /home/runner/work/spark/spark/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:410:24: adaptation of an empty argument list by inserting () is deprecated: leaky (Object-receiving) target makes this especially dangerous [warn] signature: RpcCallContext.reply(response: Any): Unit [warn] given arguments: <none> [warn] after adaptation: RpcCallContext.reply((): Unit) [warn] Applicable -Wconf / nowarn filters for this warning: msg=<part of the message>, cat=deprecation, site=org.apache.spark.sql.execution.streaming.state.StateStoreCoordinator.receiveAndReply, origin=org.apache.spark.rpc.RpcCallContext.reply, version=2.11.0 [warn] context.reply() [warn] ^ [warn] /home/runner/work/spark/spark/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:420:26: adaptation of an empty argument list by inserting () is deprecated: leaky (Object-receiving) target makes this especially dangerous [warn] signature: RpcCallContext.reply(response: Any): Unit [warn] given arguments: <none> [warn] after adaptation: RpcCallContext.reply((): Unit) [warn] Applicable -Wconf / nowarn filters for this warning: msg=<part of the message>, cat=deprecation, site=org.apache.spark.sql.execution.streaming.state.StateStoreCoordinator.receiveAndReply, origin=org.apache.spark.rpc.RpcCallContext.reply, version=2.11.0 [warn] context.reply() ``` By changing these to `context.reply(())` these messages do not show up anymore. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By running `build/sbt clean "sql/Test/compile"` locally. ### Was this patch authored or co-authored using generative AI tooling? No Closes #51975 from ericm-db/feb-df-consumption-followup. Authored-by: Eric Marnadi <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 8d5e602 commit 66ff752

File tree

1 file changed

+4
-4
lines changed

1 file changed

+4
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ private class StateStoreCoordinator(
394394
batchCommitTrackers.put(key, new BatchCommitTracker(runId, batchId, expectedStores))
395395
logInfo(s"Started tracking commits for batch $batchId with " +
396396
s"${expectedStores.values.map(_.values.sum).sum} expected stores")
397-
context.reply()
397+
context.reply(())
398398
}
399399

400400
case ReportStateStoreCommit(storeId, version, storeName) =>
@@ -404,10 +404,10 @@ private class StateStoreCoordinator(
404404
batchCommitTrackers.get(key) match {
405405
case Some(tracker) =>
406406
tracker.recordCommit(storeId, storeName)
407-
context.reply()
407+
context.reply(())
408408
case None =>
409409
// In case no commit tracker for this batch was found
410-
context.reply()
410+
context.reply(())
411411
}
412412

413413
case ValidateStateStoreCommitForBatch(runId, batchId) =>
@@ -417,7 +417,7 @@ private class StateStoreCoordinator(
417417
try {
418418
tracker.validateAllCommitted()
419419
batchCommitTrackers.remove(key) // Clean up after validation
420-
context.reply()
420+
context.reply(())
421421
} catch {
422422
case e: StateStoreCommitValidationFailed =>
423423
batchCommitTrackers.remove(key) // Clean up even on failure

0 commit comments

Comments
 (0)