Skip to content

Commit 20b7c68

Browse files
committed
[SPARK-25248][.1][PYSPARK] update barrier Python API
## What changes were proposed in this pull request? I made one pass over the Python APIs for barrier mode and updated them to match the Scala doc in apache#22240 . Major changes: * export the public classes * expand the docs * add doc for BarrierTaskInfo.addresss cc: jiangxb1987 Closes apache#22261 from mengxr/SPARK-25248.1. Authored-by: Xiangrui Meng <[email protected]> Signed-off-by: Xiangrui Meng <[email protected]>
1 parent 3864480 commit 20b7c68

File tree

3 files changed

+44
-16
lines changed

3 files changed

+44
-16
lines changed

python/pyspark/__init__.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,22 +36,27 @@
3636
Finer-grained cache persistence levels.
3737
- :class:`TaskContext`:
3838
Information about the current running task, available on the workers and experimental.
39-
39+
- :class:`RDDBarrier`:
40+
Wraps an RDD under a barrier stage for barrier execution.
41+
- :class:`BarrierTaskContext`:
42+
A :class:`TaskContext` that provides extra info and tooling for barrier execution.
43+
- :class:`BarrierTaskInfo`:
44+
Information about a barrier task.
4045
"""
4146

4247
from functools import wraps
4348
import types
4449

4550
from pyspark.conf import SparkConf
4651
from pyspark.context import SparkContext
47-
from pyspark.rdd import RDD
52+
from pyspark.rdd import RDD, RDDBarrier
4853
from pyspark.files import SparkFiles
4954
from pyspark.storagelevel import StorageLevel
5055
from pyspark.accumulators import Accumulator, AccumulatorParam
5156
from pyspark.broadcast import Broadcast
5257
from pyspark.serializers import MarshalSerializer, PickleSerializer
5358
from pyspark.status import *
54-
from pyspark.taskcontext import TaskContext
59+
from pyspark.taskcontext import TaskContext, BarrierTaskContext, BarrierTaskInfo
5560
from pyspark.profiler import Profiler, BasicProfiler
5661
from pyspark.version import __version__
5762
from pyspark._globals import _NoValue
@@ -113,4 +118,5 @@ def wrapper(self, *args, **kwargs):
113118
"SparkConf", "SparkContext", "SparkFiles", "RDD", "StorageLevel", "Broadcast",
114119
"Accumulator", "AccumulatorParam", "MarshalSerializer", "PickleSerializer",
115120
"StatusTracker", "SparkJobInfo", "SparkStageInfo", "Profiler", "BasicProfiler", "TaskContext",
121+
"RDDBarrier", "BarrierTaskContext", "BarrierTaskInfo",
116122
]

python/pyspark/rdd.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2390,7 +2390,18 @@ def barrier(self):
23902390
"""
23912391
.. note:: Experimental
23922392
2393-
Indicates that Spark must launch the tasks together for the current stage.
2393+
Marks the current stage as a barrier stage, where Spark must launch all tasks together.
2394+
In case of a task failure, instead of only restarting the failed task, Spark will abort the
2395+
entire stage and relaunch all tasks for this stage.
2396+
The barrier execution mode feature is experimental and it only handles limited scenarios.
2397+
Please read the linked SPIP and design docs to understand the limitations and future plans.
2398+
2399+
:return: an :class:`RDDBarrier` instance that provides actions within a barrier stage.
2400+
2401+
.. seealso:: :class:`BarrierTaskContext`
2402+
.. seealso:: `SPIP: Barrier Execution Mode \
2403+
<http://jira.apache.org/jira/browse/SPARK-24374>`_
2404+
.. seealso:: `Design Doc <https://jira.apache.org/jira/browse/SPARK-24582>`_
23942405
23952406
.. versionadded:: 2.4.0
23962407
"""
@@ -2430,8 +2441,8 @@ class RDDBarrier(object):
24302441
"""
24312442
.. note:: Experimental
24322443
2433-
An RDDBarrier turns an RDD into a barrier RDD, which forces Spark to launch tasks of the stage
2434-
contains this RDD together.
2444+
Wraps an RDD in a barrier stage, which forces Spark to launch tasks of this stage together.
2445+
:class:`RDDBarrier` instances are created by :func:`RDD.barrier`.
24352446
24362447
.. versionadded:: 2.4.0
24372448
"""
@@ -2443,7 +2454,10 @@ def mapPartitions(self, f, preservesPartitioning=False):
24432454
"""
24442455
.. note:: Experimental
24452456
2446-
Return a new RDD by applying a function to each partition of this RDD.
2457+
Returns a new RDD by applying a function to each partition of the wrapped RDD,
2458+
where tasks are launched together in a barrier stage.
2459+
The interface is the same as :func:`RDD.mapPartitions`.
2460+
Please see the API doc there.
24472461
24482462
.. versionadded:: 2.4.0
24492463
"""

python/pyspark/taskcontext.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,8 @@ class BarrierTaskContext(TaskContext):
131131
"""
132132
.. note:: Experimental
133133
134-
A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext
135-
for a running task, use:
136-
L{BarrierTaskContext.get()}.
134+
A :class:`TaskContext` with extra contextual info and tooling for tasks in a barrier stage.
135+
Use :func:`BarrierTaskContext.get` to obtain the barrier context for a running barrier task.
137136
138137
.. versionadded:: 2.4.0
139138
"""
@@ -155,8 +154,11 @@ def _getOrCreate(cls):
155154
@classmethod
156155
def get(cls):
157156
"""
158-
Return the currently active BarrierTaskContext. This can be called inside of user functions
159-
to access contextual information about running tasks.
157+
.. note:: Experimental
158+
159+
Return the currently active :class:`BarrierTaskContext`.
160+
This can be called inside of user functions to access contextual information about
161+
running tasks.
160162
161163
.. note:: Must be called on the worker, not the driver. Returns None if not initialized.
162164
"""
@@ -176,7 +178,12 @@ def barrier(self):
176178
.. note:: Experimental
177179
178180
Sets a global barrier and waits until all tasks in this stage hit this barrier.
179-
Note this method is only allowed for a BarrierTaskContext.
181+
Similar to `MPI_Barrier` function in MPI, this function blocks until all tasks
182+
in the same stage have reached this routine.
183+
184+
.. warning:: In a barrier stage, each task much have the same number of `barrier()`
185+
calls, in all possible code branches.
186+
Otherwise, you may get the job hanging or a SparkException after timeout.
180187
181188
.. versionadded:: 2.4.0
182189
"""
@@ -190,9 +197,8 @@ def getTaskInfos(self):
190197
"""
191198
.. note:: Experimental
192199
193-
Returns the all task infos in this barrier stage, the task infos are ordered by
194-
partitionId.
195-
Note this method is only allowed for a BarrierTaskContext.
200+
Returns :class:`BarrierTaskInfo` for all tasks in this barrier stage,
201+
ordered by partition ID.
196202
197203
.. versionadded:: 2.4.0
198204
"""
@@ -210,6 +216,8 @@ class BarrierTaskInfo(object):
210216
211217
Carries all task infos of a barrier task.
212218
219+
:var address: The IPv4 address (host:port) of the executor that the barrier task is running on
220+
213221
.. versionadded:: 2.4.0
214222
"""
215223

0 commit comments

Comments
 (0)