@@ -28,15 +28,15 @@ import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
28
28
import org .scalatest .time .{Minutes , Seconds , Span }
29
29
import scala .collection .JavaConverters ._
30
30
31
- import org .apache .spark .{SparkConf , SparkFunSuite , SSLOptions }
32
31
import org .apache .spark .deploy .kubernetes .SSLUtils
32
+ import org .apache .spark .{SSLOptions , SparkConf , SparkFunSuite }
33
33
import org .apache .spark .deploy .kubernetes .config ._
34
34
import org .apache .spark .deploy .kubernetes .integrationtest .backend .IntegrationTestBackendFactory
35
35
import org .apache .spark .deploy .kubernetes .integrationtest .backend .minikube .Minikube
36
36
import org .apache .spark .deploy .kubernetes .integrationtest .constants .MINIKUBE_TEST_BACKEND
37
37
import org .apache .spark .deploy .kubernetes .submit .{Client , ClientArguments , JavaMainAppResource , KeyAndCertPem , MainAppResource , PythonMainAppResource }
38
38
import org .apache .spark .launcher .SparkLauncher
39
- import org .apache .spark .util .Utils
39
+ import org .apache .spark .util .{ RedirectThread , Utils }
40
40
41
41
private [spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
42
42
import KubernetesSuite ._
@@ -74,13 +74,31 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
74
74
75
75
test(" Include HADOOP_CONF for HDFS based jobs " ) {
76
76
assume(testBackend.name == MINIKUBE_TEST_BACKEND )
77
+ // Ensuring that HADOOP_CONF_DIR env variable is set
78
+ val builder = new ProcessBuilder (
79
+ Seq (" /bin/bash" , " -c" , " export HADOOP_CONF_DIR=" +
80
+ " test-data/hadoop-conf-files && exec" ).asJava)
81
+ builder.redirectErrorStream(true ) // Ugly but needed for stdout and stderr to synchronize
82
+ val process = builder.start()
83
+ new RedirectThread (process.getInputStream, System .out, " redirect output" ).start()
84
+ val exitCode = process.waitFor()
85
+ if (exitCode != 0 ) {
86
+ logInfo(s " exitCode: $exitCode" )
87
+ }
77
88
sparkConf.setJars(Seq (CONTAINER_LOCAL_HELPER_JAR_PATH ))
78
89
runSparkPiAndVerifyCompletion(CONTAINER_LOCAL_MAIN_APP_RESOURCE )
79
90
}
80
91
81
92
test(" Run PySpark Job on file from SUBMITTER with --py-files" ) {
82
93
assume(testBackend.name == MINIKUBE_TEST_BACKEND )
83
-
94
+ // Ensuring that HADOOP_CONF_DIR env variable is unset
95
+ val builder = new ProcessBuilder (
96
+ Seq (" /bin/bash" , " -c" , " export HADOOP_CONF_DIR=" +
97
+ " && exec" ).asJava)
98
+ builder.redirectErrorStream(true ) // Ugly but needed for stdout and stderr to synchronize
99
+ val process = builder.start()
100
+ new RedirectThread (process.getInputStream, System .out, " redirect output" ).start()
101
+ val exitCode = process.waitFor()
84
102
launchStagingServer(SSLOptions (), None )
85
103
sparkConf
86
104
.set(DRIVER_DOCKER_IMAGE ,
0 commit comments