Skip to content

Commit e2e1f0a

Browse files
xuanyuankingcloud-fan
authored andcommitted
[SPARK-25921][PYSPARK] Fix barrier task run without BarrierTaskContext while python worker reuse
## What changes were proposed in this pull request? Running a barrier job after a normal spark job causes the barrier job to run without a BarrierTaskContext. This is because while python worker reuse, BarrierTaskContext._getOrCreate() will still return a TaskContext after firstly submit a normal spark job, we'll get a `AttributeError: 'TaskContext' object has no attribute 'barrier'`. Fix this by adding check logic in BarrierTaskContext._getOrCreate() and make sure it will return BarrierTaskContext in this scenario. ## How was this patch tested? Add new UT in pyspark-core. Closes apache#22962 from xuanyuanking/SPARK-25921. Authored-by: Yuanjian Li <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit c00e72f) Signed-off-by: Wenchen Fan <[email protected]>
1 parent 65e5b26 commit e2e1f0a

File tree

2 files changed

+17
-2
lines changed

2 files changed

+17
-2
lines changed

python/pyspark/taskcontext.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,8 @@ def __init__(self):
147147
@classmethod
148148
def _getOrCreate(cls):
149149
"""Internal function to get or create global BarrierTaskContext."""
150-
if cls._taskContext is None:
151-
cls._taskContext = BarrierTaskContext()
150+
if not isinstance(cls._taskContext, BarrierTaskContext):
151+
cls._taskContext = object.__new__(cls)
152152
return cls._taskContext
153153

154154
@classmethod

python/pyspark/tests.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -614,6 +614,21 @@ def context_barrier(x):
614614
times = rdd.barrier().mapPartitions(f).map(context_barrier).collect()
615615
self.assertTrue(max(times) - min(times) < 1)
616616

617+
def test_barrier_with_python_worker_reuse(self):
618+
"""
619+
Verify that BarrierTaskContext.barrier() with reused python worker.
620+
"""
621+
self.sc._conf.set("spark.python.work.reuse", "true")
622+
rdd = self.sc.parallelize(range(4), 4)
623+
# start a normal job first to start all worker
624+
result = rdd.map(lambda x: x ** 2).collect()
625+
self.assertEqual([0, 1, 4, 9], result)
626+
# make sure `spark.python.work.reuse=true`
627+
self.assertEqual(self.sc._conf.get("spark.python.work.reuse"), "true")
628+
629+
# worker will be reused in this barrier job
630+
self.test_barrier()
631+
617632
def test_barrier_infos(self):
618633
"""
619634
Verify that BarrierTaskContext.getTaskInfos() returns a list of all task infos in the

0 commit comments

Comments
 (0)