Skip to content

Commit eea3f55

Browse files
MrBagoHyukjinKwon
authored andcommitted
[SPARK-27446][R] Use existing spark conf if available.
## What changes were proposed in this pull request? The RBackend and RBackendHandler create new conf objects that don't pick up conf values from the existing SparkSession and therefore always use the default conf values instead of values specified by the user. In this fix we check to see if the spark env already exists, and get the conf from there. We fall back to creating a new conf. This follows the pattern used in other places including this: https://github.com/apache/spark/blob/3725b1324f731d57dc776c256bc1a100ec9e6cd0/core/src/main/scala/org/apache/spark/api/r/BaseRRunner.scala#L261 ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Closes apache#24353 from MrBago/r-backend-use-existing-conf. Authored-by: Bago Amirbekian <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
1 parent 4704af4 commit eea3f55

File tree

2 files changed

+5
-5
lines changed

2 files changed

+5
-5
lines changed

core/src/main/scala/org/apache/spark/api/r/RBackend.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder
3030
import io.netty.handler.codec.bytes.{ByteArrayDecoder, ByteArrayEncoder}
3131
import io.netty.handler.timeout.ReadTimeoutHandler
3232

33-
import org.apache.spark.SparkConf
33+
import org.apache.spark.{SparkConf, SparkEnv}
3434
import org.apache.spark.internal.Logging
3535
import org.apache.spark.internal.config.R._
3636

@@ -47,7 +47,7 @@ private[spark] class RBackend {
4747
private[r] val jvmObjectTracker = new JVMObjectTracker
4848

4949
def init(): (Int, RAuthHelper) = {
50-
val conf = new SparkConf()
50+
val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
5151
val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT)
5252
bossGroup = new NioEventLoopGroup(conf.get(R_NUM_BACKEND_THREADS))
5353
val workerGroup = bossGroup
@@ -124,7 +124,7 @@ private[spark] object RBackend extends Logging {
124124
val listenPort = serverSocket.getLocalPort()
125125
// Connection timeout is set by socket client. To make it configurable we will pass the
126126
// timeout value to client inside the temp file
127-
val conf = new SparkConf()
127+
val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
128128
val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT)
129129

130130
// tell the R process via temporary file

core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
2626
import io.netty.channel.ChannelHandler.Sharable
2727
import io.netty.handler.timeout.ReadTimeoutException
2828

29-
import org.apache.spark.SparkConf
29+
import org.apache.spark.{SparkConf, SparkEnv}
3030
import org.apache.spark.api.r.SerDe._
3131
import org.apache.spark.internal.Logging
3232
import org.apache.spark.internal.config.R._
@@ -98,7 +98,7 @@ private[r] class RBackendHandler(server: RBackend)
9898
ctx.write(pingBaos.toByteArray)
9999
}
100100
}
101-
val conf = new SparkConf()
101+
val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
102102
val heartBeatInterval = conf.get(R_HEARTBEAT_INTERVAL)
103103
val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT)
104104
val interval = Math.min(heartBeatInterval, backendConnectionTimeout - 1)

0 commit comments

Comments
 (0)