Skip to content

Commit 83bde70

Browse files
authored
Revert inheritablethreadlocal and just restore taskcontext (apache-spark-on-k8s#216)
1 parent ba7b8c7 commit 83bde70

File tree

3 files changed

+3
-22
lines changed

3 files changed

+3
-22
lines changed

core/src/main/scala/org/apache/spark/TaskContext.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@ object TaskContext {
4848
}
4949
}
5050

51-
private[this] val taskContext: InheritableThreadLocal[TaskContext] =
52-
new InheritableThreadLocal[TaskContext]
51+
private[this] val taskContext: ThreadLocal[TaskContext] = new ThreadLocal[TaskContext]
5352

5453
// Note: protected[spark] instead of private[spark] to prevent the following two from
5554
// showing up in JavaDoc.

core/src/test/scala/org/apache/spark/ThreadingSuite.scala

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -234,26 +234,6 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
234234
assert(threadTestValue === originalTestValue)
235235
}
236236

237-
test("taskContext accessed from a thread pool") {
238-
TaskContext.setTaskContext(TaskContext.empty())
239-
assert(TaskContext.get() != null, "taskContext should have been defined")
240-
var throwable: Option[Throwable] = None
241-
val thread = new Thread() {
242-
override def run() {
243-
try {
244-
assert(TaskContext.get() != null, "taskContext should have been inherited")
245-
} catch {
246-
case t: Throwable =>
247-
throwable = Option(t)
248-
}
249-
}
250-
}
251-
thread.start()
252-
thread.join()
253-
throwable.foreach { t => throw improveStackTrace(t) }
254-
TaskContext.unset()
255-
}
256-
257237
/**
258238
* Improve the stack trace of an error thrown from within a thread.
259239
* Otherwise it's difficult to tell which line in the test the error came from.

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -577,8 +577,10 @@ object ParquetFileFormat extends Logging {
577577
val parFiles = partFiles.par
578578
val pool = ThreadUtils.newForkJoinPool("readingParquetFooters", 8)
579579
parFiles.tasksupport = new ForkJoinTaskSupport(pool)
580+
val tc = TaskContext.get()
580581
try {
581582
parFiles.flatMap { currentFile =>
583+
TaskContext.setTaskContext(tc)
582584
try {
583585
// Skips row group information since we only need the schema.
584586
// ParquetFileReader.readFooter throws RuntimeException, instead of IOException,

0 commit comments

Comments
 (0)