Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,16 @@ class PyTorchFunctionTask(PythonFunctionTask[PyTorch]):
"""

_PYTORCH_TASK_TYPE = "pytorch"
_PYTORCH_TASK_TYPE_STANDALONE = "python-task"

def __init__(self, task_config: PyTorch, task_function: Callable, **kwargs):

task_type = self._PYTORCH_TASK_TYPE_STANDALONEE if task_config.num_workers == 0 else self._PYTORCH_TASK_TYPE

super().__init__(
task_config,
task_function,
task_type=self._PYTORCH_TASK_TYPE,
task_type=task_type,
**kwargs,
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two questions:

  • I think we might need to handle workers=0 separately in get_custom since in this case we don't want to create a PytorchJob. (See how this is done for elastic task below)
  • Currently, task_config=Pytorch(workers=0) is equivalent to no task_config at all. However, torch.distributed.init_process_group() will not work without the env vars set by the operator. We could solve this by overwriting the execute method and simply setting the env vars WORLD_SIZE=1, RANK=0, and potentially the master address (would have to try whether it is required).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you y'all think about throwing an error if workers=0 and telling people to use a standard python config if they want to run it on a single machine?

If people really want to set workers to 0 then I understand having a smooth fallback, but otherwise it could confuse people if they make a mistake.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ByronHsu @wild-endeavor the new pytorch elastic task can run locally and in a single k8s pod but also with multiple workers using kubeflow training operator. I'd say its functionality is a superset of the already existing PyTorch task config. What do you think about using this one in order to debug dist training with a single worker @ByronHsu ?

I think falling back to a normal pod (without kubeflow operator) when doing task_config=PyTorch(num_workers=0) doesn't make much sense because the env vars like MASTER_ADDR, RANK, ... required by torch.distributed.init_process_group(), ... will not be set, neither by the kubeflow operator, nor by the pytorch task logic and distributed training, thus, cannot be tested.

I would propose to either allow num_workers=0 in PyTorch task but use kubeflow training operator also in this case (when users don't want to use the training operator, they can use Elastic) or 2) not allow num_workers=0 as is the case now.

Expand Down
11 changes: 11 additions & 0 deletions plugins/flytekit-kf-pytorch/tests/test_pytorch_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,14 @@ def my_pytorch_task(x: int, y: str) -> int:
assert my_pytorch_task.resources.limits == Resources()
assert my_pytorch_task.resources.requests == Resources(cpu="1")
assert my_pytorch_task.task_type == "pytorch"

def test_zero_worker():
@task(
task_config=PyTorch(num_workers=0),
cache=True,
cache_version="1",
requests=Resources(cpu="1"),
)
def my_pytorch_task(x: int, y: str) -> int:
return x
assert my_pytorch_task.task_type == "python-task"