From 45325197ce63515a3481c14b516e19d58fafefb8 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Thu, 12 Dec 2024 11:20:40 -0800 Subject: [PATCH 1/5] remove open_async Signed-off-by: Yee Hing Tong --- flytekit/core/data_persistence.py | 44 +++++++------------ .../unit/core/test_data_persistence.py | 18 +++++++- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/flytekit/core/data_persistence.py b/flytekit/core/data_persistence.py index 7035147016..ccab432b47 100644 --- a/flytekit/core/data_persistence.py +++ b/flytekit/core/data_persistence.py @@ -199,7 +199,8 @@ def get_filesystem( elif protocol == "s3": s3kwargs = s3_setup_args(self._data_config.s3, anonymous=anonymous) s3kwargs.update(kwargs) - return fsspec.filesystem(protocol, **s3kwargs) # type: ignore + fs = fsspec.filesystem(protocol, **s3kwargs) # type: ignore + return fs elif protocol == "gs": if anonymous: kwargs["token"] = _ANON @@ -220,8 +221,8 @@ async def get_async_filesystem_for_path( ) -> Union[AsyncFileSystem, fsspec.AbstractFileSystem]: protocol = get_protocol(path) loop = asyncio.get_running_loop() - - return self.get_filesystem(protocol, anonymous=anonymous, path=path, asynchronous=True, loop=loop, **kwargs) + fs = self.get_filesystem(protocol, anonymous=anonymous, path=path, asynchronous=True, loop=loop, **kwargs) + return fs def get_filesystem_for_path(self, path: str = "", anonymous: bool = False, **kwargs) -> fsspec.AbstractFileSystem: protocol = get_protocol(path) @@ -425,45 +426,30 @@ async def async_put_raw_data( # raw bytes if isinstance(lpath, bytes): - fs = await self.get_async_filesystem_for_path(to_path) - if isinstance(fs, AsyncFileSystem): - async with fs.open_async(to_path, "wb", **kwargs) as s: - s.write(lpath) - else: - with fs.open(to_path, "wb", **kwargs) as s: - s.write(lpath) - + fs = self.get_filesystem_for_path(to_path) + with fs.open(to_path, "wb", **kwargs) as s: + s.write(lpath) return to_path # If lpath is a buffered reader of some kind if isinstance(lpath, io.BufferedReader) or isinstance(lpath, io.BytesIO): if not lpath.readable(): raise FlyteAssertion("Buffered reader must be readable") - fs = await self.get_async_filesystem_for_path(to_path) + fs = self.get_filesystem_for_path(to_path) lpath.seek(0) - if isinstance(fs, AsyncFileSystem): - async with fs.open_async(to_path, "wb", **kwargs) as s: - while data := lpath.read(read_chunk_size_bytes): - s.write(data) - else: - with fs.open(to_path, "wb", **kwargs) as s: - while data := lpath.read(read_chunk_size_bytes): - s.write(data) + with fs.open(to_path, "wb", **kwargs) as s: + while data := lpath.read(read_chunk_size_bytes): + s.write(data) return to_path if isinstance(lpath, io.StringIO): if not lpath.readable(): raise FlyteAssertion("Buffered reader must be readable") - fs = await self.get_async_filesystem_for_path(to_path) + fs = self.get_filesystem_for_path(to_path) lpath.seek(0) - if isinstance(fs, AsyncFileSystem): - async with fs.open_async(to_path, "wb", **kwargs) as s: - while data_str := lpath.read(read_chunk_size_bytes): - s.write(data_str.encode(encoding)) - else: - with fs.open(to_path, "wb", **kwargs) as s: - while data_str := lpath.read(read_chunk_size_bytes): - s.write(data_str.encode(encoding)) + with fs.open(to_path, "wb", **kwargs) as s: + while data_str := lpath.read(read_chunk_size_bytes): + s.write(data_str.encode(encoding)) return to_path raise FlyteAssertion(f"Unsupported lpath type {type(lpath)}") diff --git a/tests/flytekit/unit/core/test_data_persistence.py b/tests/flytekit/unit/core/test_data_persistence.py index d992ed1fa5..a938e86426 100644 --- a/tests/flytekit/unit/core/test_data_persistence.py +++ b/tests/flytekit/unit/core/test_data_persistence.py @@ -1,16 +1,17 @@ import io import os -import fsspec import pathlib import random import string import sys import tempfile +import fsspec import mock import pytest from azure.identity import ClientSecretCredential, DefaultAzureCredential +from flytekit.configuration import Config from flytekit.core.data_persistence import FileAccessProvider from flytekit.core.local_fsspec import FlyteLocalFileSystem @@ -207,3 +208,18 @@ def __init__(self, *args, **kwargs): fp = FileAccessProvider("/tmp", "s3://my-bucket") fp.get_filesystem("testgetfs", test_arg="test_arg") + + +@pytest.mark.sandbox_text +def test_put_raw_data_bytes(): + dc = Config.for_sandbox().data_config + raw_output = f"s3://my-s3-bucket/" + provider = FileAccessProvider(local_sandbox_dir="/tmp/unittest", raw_output_prefix=raw_output, data_config=dc) + prefix = provider.get_random_string() + provider.put_raw_data(lpath=b"hello", upload_prefix=prefix, file_name="hello_bytes") + provider.put_raw_data(lpath=io.BytesIO(b"hello"), upload_prefix=prefix, file_name="hello_bytes_io") + provider.put_raw_data(lpath=io.StringIO("hello"), upload_prefix=prefix, file_name="hello_string_io") + + fs = provider.get_filesystem("s3") + listing = fs.ls(f"{raw_output}{prefix}/") + assert len(listing) == 3 From 34331e16ef485935044b488b0cf102668409c7e1 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Thu, 12 Dec 2024 12:32:31 -0800 Subject: [PATCH 2/5] f'ing glaucoma' git push Signed-off-by: Yee Hing Tong --- tests/flytekit/unit/core/test_data_persistence.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/flytekit/unit/core/test_data_persistence.py b/tests/flytekit/unit/core/test_data_persistence.py index a938e86426..116717b92d 100644 --- a/tests/flytekit/unit/core/test_data_persistence.py +++ b/tests/flytekit/unit/core/test_data_persistence.py @@ -210,7 +210,7 @@ def __init__(self, *args, **kwargs): fp.get_filesystem("testgetfs", test_arg="test_arg") -@pytest.mark.sandbox_text +@pytest.mark.sandbox_test def test_put_raw_data_bytes(): dc = Config.for_sandbox().data_config raw_output = f"s3://my-s3-bucket/" From d70b5283d9648d927fc0c5dc2ae22e3cd900d991 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Thu, 12 Dec 2024 17:24:58 -0800 Subject: [PATCH 3/5] polars update Signed-off-by: Yee Hing Tong --- .../flytekitplugins/polars/sd_transformers.py | 3 ++- .../tests/test_polars_plugin_sd.py | 27 ++++++++++++++++++- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/plugins/flytekit-polars/flytekitplugins/polars/sd_transformers.py b/plugins/flytekit-polars/flytekitplugins/polars/sd_transformers.py index 474901544d..e6359641ca 100644 --- a/plugins/flytekit-polars/flytekitplugins/polars/sd_transformers.py +++ b/plugins/flytekit-polars/flytekitplugins/polars/sd_transformers.py @@ -69,9 +69,10 @@ def encode( df.to_parquet(output_bytes) if structured_dataset.uri is not None: + output_bytes.seek(0) fs = ctx.file_access.get_filesystem_for_path(path=structured_dataset.uri) with fs.open(structured_dataset.uri, "wb") as s: - s.write(output_bytes) + s.write(output_bytes.read()) output_uri = structured_dataset.uri else: remote_fn = "00000" # 00000 is our default unnamed parquet filename diff --git a/plugins/flytekit-polars/tests/test_polars_plugin_sd.py b/plugins/flytekit-polars/tests/test_polars_plugin_sd.py index c2d4a39be7..9acae1c274 100644 --- a/plugins/flytekit-polars/tests/test_polars_plugin_sd.py +++ b/plugins/flytekit-polars/tests/test_polars_plugin_sd.py @@ -5,7 +5,7 @@ import pytest from flytekitplugins.polars.sd_transformers import PolarsDataFrameRenderer from typing_extensions import Annotated -from packaging import version +import numpy as np from polars.testing import assert_frame_equal from flytekit import kwtypes, task, workflow @@ -134,3 +134,28 @@ def consume_sd_return_sd(sd: StructuredDataset) -> StructuredDataset: opened_sd = opened_sd.collect() assert_frame_equal(opened_sd, polars_df) + + +def test_with_uri(): + temp_file = tempfile.mktemp() + + @task + def random_dataframe(num_rows: int) -> StructuredDataset: + feature_1_list = np.random.randint(low=100, high=999, size=(num_rows,)) + feature_2_list = np.random.normal(loc=0, scale=1, size=(num_rows, )) + pl_df = pl.DataFrame({'protein_length': feature_1_list, + 'protein_feature': feature_2_list}) + sd = StructuredDataset(dataframe=pl_df, uri=temp_file) + return sd + + @task + def consume(df: pd.DataFrame): + print(df.head(5)) + print(df.describe()) + + @workflow + def my_wf(num_rows: int): + pl = random_dataframe(num_rows=num_rows) + consume(pl) + + my_wf(num_rows=100) From 90ea9132b8793e11a6f3c9ead8f3b2bddb86ff7b Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Thu, 12 Dec 2024 17:33:26 -0800 Subject: [PATCH 4/5] remove some debugging Signed-off-by: Yee Hing Tong --- flytekit/core/data_persistence.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/flytekit/core/data_persistence.py b/flytekit/core/data_persistence.py index ccab432b47..b099fdddaa 100644 --- a/flytekit/core/data_persistence.py +++ b/flytekit/core/data_persistence.py @@ -199,8 +199,7 @@ def get_filesystem( elif protocol == "s3": s3kwargs = s3_setup_args(self._data_config.s3, anonymous=anonymous) s3kwargs.update(kwargs) - fs = fsspec.filesystem(protocol, **s3kwargs) # type: ignore - return fs + return fsspec.filesystem(protocol, **s3kwargs) # type: ignore elif protocol == "gs": if anonymous: kwargs["token"] = _ANON @@ -221,8 +220,7 @@ async def get_async_filesystem_for_path( ) -> Union[AsyncFileSystem, fsspec.AbstractFileSystem]: protocol = get_protocol(path) loop = asyncio.get_running_loop() - fs = self.get_filesystem(protocol, anonymous=anonymous, path=path, asynchronous=True, loop=loop, **kwargs) - return fs + return self.get_filesystem(protocol, anonymous=anonymous, path=path, asynchronous=True, loop=loop, **kwargs) def get_filesystem_for_path(self, path: str = "", anonymous: bool = False, **kwargs) -> fsspec.AbstractFileSystem: protocol = get_protocol(path) From 935b831ed03cebcc15c88577ad210cbb68ecac65 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Thu, 12 Dec 2024 18:17:30 -0800 Subject: [PATCH 5/5] add comment and revert newline Signed-off-by: Yee Hing Tong --- flytekit/core/data_persistence.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flytekit/core/data_persistence.py b/flytekit/core/data_persistence.py index b099fdddaa..0640bc2eb5 100644 --- a/flytekit/core/data_persistence.py +++ b/flytekit/core/data_persistence.py @@ -220,6 +220,7 @@ async def get_async_filesystem_for_path( ) -> Union[AsyncFileSystem, fsspec.AbstractFileSystem]: protocol = get_protocol(path) loop = asyncio.get_running_loop() + return self.get_filesystem(protocol, anonymous=anonymous, path=path, asynchronous=True, loop=loop, **kwargs) def get_filesystem_for_path(self, path: str = "", anonymous: bool = False, **kwargs) -> fsspec.AbstractFileSystem: @@ -422,6 +423,8 @@ async def async_put_raw_data( r = await self._put(from_path, to_path, **kwargs) return r or to_path + # See https://github.com/fsspec/s3fs/issues/871 for more background and pending work on the fsspec side to + # support effectively async open(). For now these use-cases below will revert to sync calls. # raw bytes if isinstance(lpath, bytes): fs = self.get_filesystem_for_path(to_path)