Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/release_notes/release_notes-0.15.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Key Features and Updates
*
* Developer API enhancements
* FEAT-#4359: Add __dataframe__ method to the protocol dataframe (#4360)
* FIX-#4479: Prevent users from using a local filepath when performing a distributed write (#4484)
* Update testing suite
* TEST-#4363: Use Ray from pypi in CI (#4364)
* FIX-#4422: get rid of case sensitivity for `warns_that_defaulting_to_pandas` (#4423)
Expand Down
11 changes: 11 additions & 0 deletions modin/core/execution/ray/implementations/pandas_on_ray/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from modin.core.execution.ray.common import RayTask, SignalActor
from ..dataframe import PandasOnRayDataframe
from ..partitioning import PandasOnRayDataframePartition
from modin.core.io.utils import is_local_path


class PandasOnRayIO(RayIO):
Expand Down Expand Up @@ -165,6 +166,11 @@ def to_csv(cls, qc, **kwargs):
if not cls._to_csv_check_support(kwargs):
return RayIO.to_csv(qc, **kwargs)

if len(ray.nodes()) > 1 and is_local_path(kwargs["path_or_buf"]):
raise ValueError(
"`path_or_buf` must point to a networked file or buffer when in cluster mode."
)

signals = SignalActor.remote(len(qc._modin_frame._partitions) + 1)

def func(df, **kw):
Expand Down Expand Up @@ -277,6 +283,11 @@ def to_parquet(cls, qc, **kwargs):
if not cls._to_parquet_check_support(kwargs):
return RayIO.to_parquet(qc, **kwargs)

if len(ray.nodes()) > 1 and is_local_path(kwargs["path_or_buf"]):
raise ValueError(
"`path_or_buf` must point to a networked file or buffer when in cluster mode."
)

def func(df, **kw):
"""
Dump a chunk of rows as parquet, then save them to target maintaining order.
Expand Down
75 changes: 75 additions & 0 deletions modin/core/io/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

"""Collection of utility functions for distributed io."""

import os
import pathlib
import re
from typing import Union

S3_ADDRESS_REGEX = re.compile("[sS]3://(.*?)/(.*)")


def is_local_path(path_or_buf) -> bool:
"""
Return ``True`` if the specified `path_or_buf` is a local path, ``False`` otherwise.
Parameters
----------
path_or_buf : str, path object or file-like object
The path or buffer to check.
Returns
-------
Whether the `path_or_buf` points to a local file.
"""
if isinstance(path_or_buf, str):
if S3_ADDRESS_REGEX.match(path_or_buf) is not None or "://" in path_or_buf:
return False # S3 or network path.
if isinstance(path_or_buf, (str, pathlib.PurePath)):
if os.path.exists(path_or_buf):
return True
local_device_id = os.stat(os.getcwd()).st_dev
path_device_id = get_device_id(path_or_buf)
if path_device_id == local_device_id:
return True
return False


def get_device_id(path: Union[str, pathlib.PurePath]) -> Union[int, None]:
"""
Return the result of `os.stat(path).st_dev` for the portion of `path` that exists locally.
Parameters
----------
path : str, path object
The path to check.
Returns
-------
The `st_dev` field of `os.stat` of the portion of the `path` that exists locally, None if no
part of the path exists locally.
"""
index = 1
path_list = list(pathlib.Path(path).parts)
if path_list[0] == "/":
index += 1
try:
os.stat(os.path.join(*path_list[:index]))
except:
return None
while os.path.exists(os.path.join(*path_list[:index])):
index += 1
index -= 1
return os.stat(os.path.join(*path_list[:index])).st_dev