Skip to content

Commit 5f38636

Browse files
a0x8ocloud-fan
andcommitted
[SPARK-51281][SQL] DataFrameWriterV2 should respect the path option
### What changes were proposed in this pull request? Unlike `DataFrameWriter.saveAsTable` where we explicitly get the "path" option and treat it as table location, `DataFrameWriterV2` doesn't do it and treats the "path" option as a normal option which doesn't have any real impact. This PR fixes it, and adds a legacy config to restore the old behavior. ### Why are the changes needed? bug fix ### Does this PR introduce _any_ user-facing change? Yes, now `DataFrameWriterV2` can correctly write data to the specified path for file source tables. ### How was this patch tested? new test ### Was this patch authored or co-authored using generative AI tooling? no Closes #50040 from cloud-fan/prop. Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 69bd57c commit 5f38636

File tree

183 files changed

+4672
-2211
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

183 files changed

+4672
-2211
lines changed

LICENSE-binary

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,7 @@ dev.ludovic.netlib:blas
476476
dev.ludovic.netlib:arpack
477477
dev.ludovic.netlib:lapack
478478
net.razorvine:pickle
479+
org.bouncycastle:bcprov-jdk18on
479480
org.checkerframework:checker-qual
480481
org.typelevel:algebra_2.13:jar
481482
org.typelevel:cats-kernel_2.13

assembly/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,16 @@
136136
<artifactId>guava</artifactId>
137137
<scope>${hadoop.deps.scope}</scope>
138138
</dependency>
139+
140+
<!--
141+
SPARK-51311: HDFS-15098 (3.4.0) adds hard dependency on bcprov-jdk18on, Spark fails to submit
142+
to Kerberized cluster without this dependency, until HADOOP-19152 (3.5.0, unreleased)
143+
-->
144+
<dependency>
145+
<groupId>org.bouncycastle</groupId>
146+
<artifactId>bcprov-jdk18on</artifactId>
147+
<scope>${hadoop.deps.scope}</scope>
148+
</dependency>
139149
</dependencies>
140150

141151
<build>

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3493,6 +3493,11 @@
34933493
"message" : [
34943494
"Scalar subquery must return only one column, but got <number>."
34953495
]
3496+
},
3497+
"STREAMING_QUERY" : {
3498+
"message" : [
3499+
"Streaming query is not allowed in subquery expressions."
3500+
]
34963501
}
34973502
},
34983503
"sqlState" : "42823"

connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -806,8 +806,6 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
806806

807807
protected def caseConvert(tableName: String): String = tableName
808808

809-
private def withOrWithout(isDistinct: Boolean): String = if (isDistinct) "with" else "without"
810-
811809
Seq(true, false).foreach { isDistinct =>
812810
val distinct = if (isDistinct) "DISTINCT " else ""
813811
val withOrWithout = if (isDistinct) "with" else "without"

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -722,6 +722,9 @@ class SparkContext(config: SparkConf) extends Logging {
722722
}
723723
appStatusSource.foreach(_env.metricsSystem.registerSource(_))
724724
_plugins.foreach(_.registerMetrics(applicationId))
725+
726+
new CallerContext("DRIVER", config.get(APP_CALLER_CONTEXT),
727+
Some(applicationId), applicationAttemptId).setCurrentContext()
725728
} catch {
726729
case NonFatal(e) =>
727730
logError("Error initializing SparkContext.", e)

core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
707707
}
708708
}
709709

710+
private[this] val idleTimeoutMillis: Long = TimeUnit.SECONDS.toMillis(idleTimeoutSeconds)
710711
private[this] var pythonWorkerKilled: Boolean = false
711712

712713
override def read(b: Array[Byte], off: Int, len: Int): Int = {
@@ -742,8 +743,13 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
742743
val buf = ByteBuffer.wrap(b, off, len)
743744
var n = 0
744745
while (n == 0) {
745-
val selected = worker.selector.select(TimeUnit.SECONDS.toMillis(idleTimeoutSeconds))
746-
if (selected == 0) {
746+
val start = System.currentTimeMillis()
747+
val selected = worker.selector.select(idleTimeoutMillis)
748+
val end = System.currentTimeMillis()
749+
if (selected == 0
750+
// Avoid logging if no timeout or the selector doesn't wait for the idle timeout
751+
// as it can return 0 in some case.
752+
&& idleTimeoutMillis > 0 && (end - start) >= idleTimeoutMillis) {
747753
if (pythonWorkerKilled) {
748754
logWarning(
749755
log"Waiting for Python worker process to terminate after idle timeout: " +

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -249,13 +249,12 @@ private[spark] class SparkSubmit extends Logging {
249249
val childArgs = new ArrayBuffer[String]()
250250
val childClasspath = new ArrayBuffer[String]()
251251
val sparkConf = args.toSparkConf()
252-
if (sparkConf.contains("spark.local.connect")) sparkConf.remove("spark.remote")
253252
var childMainClass = ""
254253

255254
// Set the cluster manager
256255
val clusterManager: Int = args.maybeMaster match {
257256
case Some(v) =>
258-
assert(args.maybeRemote.isEmpty || sparkConf.contains("spark.local.connect"))
257+
assert(args.maybeRemote.isEmpty)
259258
v match {
260259
case "yarn" => YARN
261260
case m if m.startsWith("spark") => STANDALONE
@@ -643,14 +642,11 @@ private[spark] class SparkSubmit extends Logging {
643642
// All cluster managers
644643
OptionAssigner(
645644
// If remote is not set, sets the master,
646-
// In local remote mode, starts the default master to to start the server.
647-
if (args.maybeRemote.isEmpty || sparkConf.contains("spark.local.connect")) args.master
645+
if (args.maybeRemote.isEmpty) args.master
648646
else args.maybeMaster.orNull,
649647
ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.master"),
650648
OptionAssigner(
651-
// In local remote mode, do not set remote.
652-
if (sparkConf.contains("spark.local.connect")) null
653-
else args.maybeRemote.orNull, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.remote"),
649+
args.maybeRemote.orNull, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.remote"),
654650
OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
655651
confKey = SUBMIT_DEPLOY_MODE.key),
656652
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.app.name"),
@@ -767,8 +763,7 @@ private[spark] class SparkSubmit extends Logging {
767763
// In case of shells, spark.ui.showConsoleProgress can be true by default or by user. Except,
768764
// when Spark Connect is in local mode, because Spark Connect support its own progress
769765
// reporting.
770-
if (isShell(args.primaryResource) && !sparkConf.contains(UI_SHOW_CONSOLE_PROGRESS) &&
771-
!sparkConf.contains("spark.local.connect")) {
766+
if (isShell(args.primaryResource) && !sparkConf.contains(UI_SHOW_CONSOLE_PROGRESS)) {
772767
sparkConf.set(UI_SHOW_CONSOLE_PROGRESS, true)
773768
}
774769

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
253253
if (args.length == 0) {
254254
printUsageAndExit(-1)
255255
}
256-
if (!sparkProperties.contains("spark.local.connect") &&
257-
maybeRemote.isDefined && (maybeMaster.isDefined || deployMode != null)) {
256+
if (maybeRemote.isDefined && (maybeMaster.isDefined || deployMode != null)) {
258257
error("Remote cannot be specified with master and/or deploy mode.")
259258
}
260259
if (primaryResource == null) {

core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1911,14 +1911,6 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
19111911
| Helper methods for accessing private methods and fields |
19121912
* ------------------------------------------------------- */
19131913

1914-
private val _numExecutorsToAddPerResourceProfileId =
1915-
PrivateMethod[mutable.HashMap[Int, Int]](
1916-
Symbol("numExecutorsToAddPerResourceProfileId"))
1917-
private val _numExecutorsTargetPerResourceProfileId =
1918-
PrivateMethod[mutable.HashMap[Int, Int]](
1919-
Symbol("numExecutorsTargetPerResourceProfileId"))
1920-
private val _maxNumExecutorsNeededPerResourceProfile =
1921-
PrivateMethod[Int](Symbol("maxNumExecutorsNeededPerResourceProfile"))
19221914
private val _addTime = PrivateMethod[Long](Symbol("addTime"))
19231915
private val _schedule = PrivateMethod[Unit](Symbol("schedule"))
19241916
private val _doUpdateRequest = PrivateMethod[Unit](Symbol("doUpdateRequest"))
@@ -1932,8 +1924,6 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
19321924
PrivateMethod[mutable.HashMap[Int, Int]](Symbol("numLocalityAwareTasksPerResourceProfileId"))
19331925
private val _rpIdToHostToLocalTaskCount =
19341926
PrivateMethod[Map[Int, Map[String, Int]]](Symbol("rpIdToHostToLocalTaskCount"))
1935-
private val _onSpeculativeTaskSubmitted =
1936-
PrivateMethod[Unit](Symbol("onSpeculativeTaskSubmitted"))
19371927
private val _totalRunningTasksPerResourceProfile =
19381928
PrivateMethod[Int](Symbol("totalRunningTasksPerResourceProfile"))
19391929

@@ -1946,24 +1936,18 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
19461936
private def numExecutorsToAdd(
19471937
manager: ExecutorAllocationManager,
19481938
rp: ResourceProfile): Int = {
1949-
val nmap = manager invokePrivate _numExecutorsToAddPerResourceProfileId()
1939+
val nmap = manager.numExecutorsToAddPerResourceProfileId
19501940
nmap(rp.id)
19511941
}
19521942

1953-
private def updateAndSyncNumExecutorsTarget(
1954-
manager: ExecutorAllocationManager,
1955-
now: Long): Unit = {
1956-
manager invokePrivate _updateAndSyncNumExecutorsTarget(now)
1957-
}
1958-
19591943
private def numExecutorsTargetForDefaultProfileId(manager: ExecutorAllocationManager): Int = {
19601944
numExecutorsTarget(manager, defaultProfile.id)
19611945
}
19621946

19631947
private def numExecutorsTarget(
19641948
manager: ExecutorAllocationManager,
19651949
rpId: Int): Int = {
1966-
val numMap = manager invokePrivate _numExecutorsTargetPerResourceProfileId()
1950+
val numMap = manager.numExecutorsTargetPerResourceProfileId
19671951
numMap(rpId)
19681952
}
19691953

@@ -1982,7 +1966,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
19821966
rp: ResourceProfile
19831967
): Int = {
19841968
val maxNumExecutorsNeeded =
1985-
manager invokePrivate _maxNumExecutorsNeededPerResourceProfile(rp.id)
1969+
manager.maxNumExecutorsNeededPerResourceProfile(rp.id)
19861970
manager invokePrivate
19871971
_addExecutorsToTarget(maxNumExecutorsNeeded, rp.id, updatesNeeded)
19881972
}
@@ -2005,7 +1989,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
20051989
private def maxNumExecutorsNeededPerResourceProfile(
20061990
manager: ExecutorAllocationManager,
20071991
rp: ResourceProfile): Int = {
2008-
manager invokePrivate _maxNumExecutorsNeededPerResourceProfile(rp.id)
1992+
manager.maxNumExecutorsNeededPerResourceProfile(rp.id)
20091993
}
20101994

20111995
private def adjustRequestedExecutors(manager: ExecutorAllocationManager): Int = {
@@ -2033,10 +2017,6 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
20332017
manager invokePrivate _onSchedulerQueueEmpty()
20342018
}
20352019

2036-
private def onSpeculativeTaskSubmitted(manager: ExecutorAllocationManager, id: String) : Unit = {
2037-
manager invokePrivate _onSpeculativeTaskSubmitted(id)
2038-
}
2039-
20402020
private def localityAwareTasksForDefaultProfile(manager: ExecutorAllocationManager): Int = {
20412021
val localMap = manager invokePrivate _localityAwareTasksPerResourceProfileId()
20422022
localMap(defaultProfile.id)
@@ -2052,7 +2032,4 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
20522032
rpIdToHostLocal(defaultProfile.id)
20532033
}
20542034

2055-
private def getResourceProfileIdOfExecutor(manager: ExecutorAllocationManager): Int = {
2056-
defaultProfile.id
2057-
}
20582035
}

core/src/test/scala/org/apache/spark/SparkContextSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import com.google.common.io.Files
2929
import org.apache.hadoop.conf.Configuration
3030
import org.apache.hadoop.fs.Path
3131
import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
32+
import org.apache.hadoop.ipc.{CallerContext => HadoopCallerContext}
3233
import org.apache.hadoop.mapred.TextInputFormat
3334
import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
3435
import org.apache.logging.log4j.{Level, LogManager}
@@ -1460,6 +1461,15 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
14601461
}
14611462
sc.stop()
14621463
}
1464+
1465+
test("SPARK-51095: Test caller context initialization") {
1466+
val conf = new SparkConf().setAppName("test").setMaster("local")
1467+
sc = new SparkContext(conf)
1468+
val hadoopCallerContext = HadoopCallerContext.getCurrent()
1469+
assert(hadoopCallerContext.getContext().startsWith("SPARK_DRIVER"))
1470+
sc.stop()
1471+
}
1472+
14631473
}
14641474

14651475
object SparkContextSuite {

0 commit comments

Comments
 (0)