Skip to content

Commit 8cc1955

Browse files
author
Justin Uang
committed
resolve conflicts
1 parent e5af603 commit 8cc1955

File tree

36 files changed

+110
-1716
lines changed

36 files changed

+110
-1716
lines changed

build/mvn

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -159,20 +159,12 @@ export MAVEN_OPTS=${MAVEN_OPTS:-"$_COMPILE_JVM_OPTS"}
159159

160160
echo "Using \`mvn\` from path: $MVN_BIN" 1>&2
161161

162-
<<<<<<< HEAD
163-
# Last, call the `mvn` command as usual
164-
"${MVN_BIN}" -DzincPort=${ZINC_PORT} -e "$@"
165-
||||||| merged common ancestors
166-
# Last, call the `mvn` command as usual
167-
"${MVN_BIN}" -DzincPort=${ZINC_PORT} "$@"
168-
=======
169162
# call the `mvn` command as usual
170163
# SPARK-25854
171-
"${MVN_BIN}" -DzincPort=${ZINC_PORT} "$@"
164+
"${MVN_BIN}" -DzincPort=${ZINC_PORT} -e "$@"
172165
MVN_RETCODE=$?
173166

174167
# Try to shut down zinc explicitly if the server is still running.
175168
"${ZINC_BIN}" -shutdown -port ${ZINC_PORT}
176169

177170
exit $MVN_RETCODE
178-
>>>>>>> 3404a73~1

core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -106,22 +106,12 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
106106
if (memoryMb.isDefined) {
107107
envVars.put("PYSPARK_EXECUTOR_MEMORY_MB", memoryMb.get.toString)
108108
}
109-
<<<<<<< HEAD
110-
val worker: Socket = env.createPythonWorker(
111-
pythonExec, envVars.asScala.toMap, condaInstructions)
112-
// Whether is the worker released into idle pool
113-
val released = new AtomicBoolean(false)
114-
||||||| merged common ancestors
115-
val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap)
116-
// Whether is the worker released into idle pool
117-
val released = new AtomicBoolean(false)
118-
=======
119-
val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap)
109+
val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap,
110+
condaInstructions)
120111
// Whether is the worker released into idle pool or closed. When any codes try to release or
121112
// close a worker, they should use `releasedOrClosed.compareAndSet` to flip the state to make
122113
// sure there is only one winner that is going to release or close the worker.
123114
val releasedOrClosed = new AtomicBoolean(false)
124-
>>>>>>> 3404a73~1
125115

126116
// Start a thread to feed the process input from our parent's iterator
127117
val writerThread = newWriterThread(env, worker, inputIterator, partitionIndex, context)
@@ -351,6 +341,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
351341
}
352342
}
353343
dataOut.flush()
344+
354345
dataOut.writeInt(evalType)
355346
writeCommand(dataOut)
356347
writeIteratorToStream(dataOut)
@@ -476,18 +467,8 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
476467
}
477468
// Check whether the worker is ready to be re-used.
478469
if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
479-
<<<<<<< HEAD
480-
if (reuseWorker) {
481-
env.releasePythonWorker(pythonExec, envVars.asScala.toMap, condaInstructions, worker)
482-
released.set(true)
483-
||||||| merged common ancestors
484-
if (reuseWorker) {
485-
env.releasePythonWorker(pythonExec, envVars.asScala.toMap, worker)
486-
released.set(true)
487-
=======
488470
if (reuseWorker && releasedOrClosed.compareAndSet(false, true)) {
489-
env.releasePythonWorker(pythonExec, envVars.asScala.toMap, worker)
490-
>>>>>>> 3404a73~1
471+
env.releasePythonWorker(pythonExec, envVars.asScala.toMap, condaInstructions, worker)
491472
}
492473
}
493474
eos = true

core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.api.python
1919

2020
import java.io.{DataInputStream, DataOutputStream, EOFException, InputStream, OutputStreamWriter}
2121
import java.net.{InetAddress, ServerSocket, Socket, SocketException}
22-
import java.nio.charset.StandardCharsets
2322
import java.util.Arrays
2423
import javax.annotation.concurrent.GuardedBy
2524

@@ -33,18 +32,10 @@ import org.apache.spark.internal.Logging
3332
import org.apache.spark.security.SocketAuthHelper
3433
import org.apache.spark.util.{RedirectThread, Utils}
3534

36-
<<<<<<< HEAD
3735
private[spark] class PythonWorkerFactory(requestedPythonExec: Option[String],
3836
requestedEnvVars: Map[String, String],
3937
condaInstructions: Option[CondaSetupInstructions])
40-
extends Logging {
41-
||||||| merged common ancestors
42-
private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String])
43-
extends Logging {
44-
=======
45-
private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String])
4638
extends Logging { self =>
47-
>>>>>>> 3404a73~1
4839

4940
import PythonWorkerFactory._
5041

@@ -101,7 +92,6 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
10192
@GuardedBy("self")
10293
private val simpleWorkers = new mutable.WeakHashMap[Socket, Process]()
10394

104-
<<<<<<< HEAD
10595
private[this] val condaEnv = {
10696
// Set up conda environment if there are any conda packages requested
10797
condaInstructions.map(CondaEnvironmentManager.createCondaEnvironment)
@@ -122,11 +112,6 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
122112
}
123113

124114
val pythonPath = PythonUtils.mergePythonPaths(
125-
||||||| merged common ancestors
126-
val pythonPath = PythonUtils.mergePythonPaths(
127-
=======
128-
private val pythonPath = PythonUtils.mergePythonPaths(
129-
>>>>>>> 3404a73~1
130115
PythonUtils.sparkPythonPath,
131116
envVars.getOrElse("PYTHONPATH", ""),
132117
sys.env.getOrElse("PYTHONPATH", ""))

dev/deps/spark-deps-hadoop-2.7

Lines changed: 0 additions & 199 deletions
This file was deleted.

0 commit comments

Comments
 (0)