Skip to content

Commit f27464f

Browse files
committed
[SPARK-51445][SQL][SS][CONNECT] Change the never changed var to val
### What changes were proposed in this pull request? This PR replaces unchanged `var` with `val`. ### Why are the changes needed? Use `val` instead of `var` when possible. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #50219 from LuciferYang/var-to-val. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: yangjie01 <yangjie01@baidu.com>
1 parent 2893999 commit f27464f

File tree

6 files changed

+10
-10
lines changed

6 files changed

+10
-10
lines changed

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,10 @@ private[connect] class SparkConnectExecutionManager() extends Logging {
7474
.build[ExecuteKey, ExecuteInfo]()
7575

7676
/** The time when the last execution was removed. */
77-
private var lastExecutionTimeNs: AtomicLong = new AtomicLong(System.nanoTime())
77+
private val lastExecutionTimeNs: AtomicLong = new AtomicLong(System.nanoTime())
7878

7979
/** Executor for the periodic maintenance */
80-
private var scheduledExecutor: AtomicReference[ScheduledExecutorService] =
80+
private val scheduledExecutor: AtomicReference[ScheduledExecutorService] =
8181
new AtomicReference[ScheduledExecutorService]()
8282

8383
/**

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ private[sql] class ServerSideListenerHolder(val sessionHolder: SessionHolder) {
7878
* final ResultComplete response.
7979
*/
8080
def cleanUp(): Unit = {
81-
var listener = streamingQueryServerSideListener.getAndSet(null)
81+
val listener = streamingQueryServerSideListener.getAndSet(null)
8282
if (listener != null) {
8383
sessionHolder.session.streams.removeListener(listener)
8484
listener.sendResultComplete()

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class SparkConnectSessionManager extends Logging {
4949
.build[SessionKey, SessionHolderInfo]()
5050

5151
/** Executor for the periodic maintenance */
52-
private var scheduledExecutor: AtomicReference[ScheduledExecutorService] =
52+
private val scheduledExecutor: AtomicReference[ScheduledExecutorService] =
5353
new AtomicReference[ScheduledExecutorService]()
5454

5555
private def validateSessionId(

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ private[connect] class SparkConnectStreamingQueryCache(
186186
private[service] val taggedQueries: ConcurrentMap[String, QueryCacheKeySet] =
187187
new ConcurrentHashMap[String, QueryCacheKeySet]
188188

189-
private var scheduledExecutor: AtomicReference[ScheduledExecutorService] =
189+
private val scheduledExecutor: AtomicReference[ScheduledExecutorService] =
190190
new AtomicReference[ScheduledExecutorService]()
191191

192192
/** Schedules periodic checks if it is not already scheduled */
@@ -218,7 +218,7 @@ private[connect] class SparkConnectStreamingQueryCache(
218218
(k, v) => {
219219
if (v == null || !v.addKey(queryKey)) {
220220
// Create a new QueryCacheKeySet if the entry is absent or being removed.
221-
var keys = mutable.HashSet.empty[QueryCacheKey]
221+
val keys = mutable.HashSet.empty[QueryCacheKey]
222222
keys.add(queryKey)
223223
new QueryCacheKeySet(keys = keys)
224224
} else {

sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasStateServer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ class TransformWithStateInPandasStateServer(
114114
// A map to store the iterator id -> Iterator[(Row, Row)] mapping. This is to keep track of the
115115
// current key-value iterator position for each iterator id in a map state for a grouping key in
116116
// case user tries to fetch another state variable before the current iterator is exhausted.
117-
private var keyValueIterators = if (keyValueIteratorMapForTest != null) {
117+
private val keyValueIterators = if (keyValueIteratorMapForTest != null) {
118118
keyValueIteratorMapForTest
119119
} else {
120120
new mutable.HashMap[String, Iterator[(Row, Row)]]()

sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -749,7 +749,7 @@ class VariantSuite extends QueryTest with SharedSparkSession with ExpressionEval
749749
// The initial size of the buffer backing a cached dataframe column is 128KB.
750750
// See `ColumnBuilder`.
751751
val numKeys = 128 * 1024
752-
var keyIterator = (0 until numKeys).iterator
752+
val keyIterator = (0 until numKeys).iterator
753753
val entries = Array.fill(numKeys)(s"""\"${keyIterator.next()}\": \"test\"""")
754754
val jsonStr = s"{${entries.mkString(", ")}}"
755755
val query = s"""select parse_json('${jsonStr}') v from range(0, 10)"""
@@ -792,7 +792,7 @@ class VariantSuite extends QueryTest with SharedSparkSession with ExpressionEval
792792
// The initial size of the buffer backing a cached dataframe column is 128KB.
793793
// See `ColumnBuilder`.
794794
val numKeys = 128 * 1024
795-
var keyIterator = (0 until numKeys).iterator
795+
val keyIterator = (0 until numKeys).iterator
796796
val entries = Array.fill(numKeys)(s"""\"${keyIterator.next()}\": \"test\"""")
797797
val jsonStr = s"{${entries.mkString(", ")}}"
798798
val query = s"""select array(parse_json('${jsonStr}')) v from range(0, 10)"""
@@ -839,7 +839,7 @@ class VariantSuite extends QueryTest with SharedSparkSession with ExpressionEval
839839
// The initial size of the buffer backing a cached dataframe column is 128KB.
840840
// See `ColumnBuilder`.
841841
val numKeys = 128 * 1024
842-
var keyIterator = (0 until numKeys).iterator
842+
val keyIterator = (0 until numKeys).iterator
843843
val entries = Array.fill(numKeys)(s"""\"${keyIterator.next()}\": \"test\"""")
844844
val jsonStr = s"{${entries.mkString(", ")}}"
845845
val query = s"""select named_struct(

0 commit comments

Comments
 (0)