Skip to content

Conversation

@machichima
Copy link
Member

@machichima machichima commented Aug 14, 2025

Tracking issue

Why are the changes needed?

Related to the discussion in the slack channel, when using FlyteFile with Elastic, the file cannot be downloaded and we will get warning:

.venv/lib/python3.12/site-packages/flytekit/types/file/file.py:356: RuntimeWarning: coroutine 'FileAccessProvider.async_get_data' was never awaited

It seems like the FlyteFile does not play well with underlying multiprocessing spawn

Based on my experiment, when we create the _downloader partial function, we use the
context in main process:

self._downloader = partial(
ctx.file_access.get_data,
ctx=ctx,
remote_path=self._remote_source, # type: ignore
local_path=self._local_path,
)

But when we use Elastic and run the FlyteFile within each process via spawn, the memory
is not shared, but our _downloader function still use main process's context, which
cannot be found in the sub-process. While the context required to convert the coroutine to a
sync call (loop_manager.synced) isn’t present, the coroutine is returned as-is

Below is the script for testing out the async in spawn with different context. The "Test
2" is which causing problem, and we can find that the things returned from the partial
function is the coroutine (Partial call is coroutine: {is_coro2}" is True), which prove that somehow the loop_manager.synced doesn't work here.

https://github.com/machichima/flytekit_test_scripts/blob/main/elastic-async-fsspec/compare_direct_vs_partial.py

What changes were proposed in this pull request?

Ensure that everytime we call download, we will get the current_context of the current process, to ensure calling the get_data synchronously.

How was this patch tested?

Test with following script:

from flytekit import task, workflow, FlyteFile, ImageSpec, Resources
from flytekitplugins.kfpytorch.task import Elastic, PyTorch


DEFAULT_REMOTE_PATH = "s3://my-s3-bucket/test/test.txt"

image_spec = ImageSpec(
    registry="localhost:30000",
    packages=[
        "flytekitplugins-kfpytorch[elastic]",
        "numpy",
        "torch",
    ],
)


@task(
    task_config=Elastic(nnodes=1, nproc_per_node=4),
    container_image=image_spec,
    requests=Resources(cpu="4", mem="4Gi"),
    limits=Resources(cpu="4", mem="4Gi"),
)
def my_task(dataset: FlyteFile):
    path = dataset.download()
    print(f"path: {path}", flush=True)

    assert Path(path).is_file()  # will pass now

@workflow
def wf():
    file = FlyteFile(path=DEFAULT_REMOTE_PATH)

    outputs = my_task(dataset=file)

Setup process

Screenshots

Result

image image

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

Summary by Bito

This pull request enhances the FlyteFile class for improved file downloading by implementing a new downloader class. It resolves coroutine warnings in multiprocessing scenarios by ensuring the current process context is used for synchronous calls, thereby increasing reliability and functionality.

@flyte-bot
Copy link
Contributor

Bito Automatic Review Skipped - Draft PR

Bito didn't auto-review because this pull request is in draft status.
No action is needed if you didn't intend for the agent to review it. Otherwise, to manually trigger a review, type /review in a comment and save.
You can change draft PR review settings here, or contact your Bito workspace admin at [email protected].

@machichima machichima changed the title [Fix] Issue when using FlyteFile with Elastic [WIP] [Fix] Issue when using FlyteFile with Elastic Aug 14, 2025
-e
Signed-off-by: machichima <[email protected]>
@codecov
Copy link

codecov bot commented Aug 15, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 76.54%. Comparing base (eb5a67f) to head (818cf34).
⚠️ Report is 4 commits behind head on master.

❗ There is a different number of reports uploaded between BASE (eb5a67f) and HEAD (818cf34). Click for more details.

HEAD has 97 uploads less than BASE
Flag BASE (eb5a67f) HEAD (818cf34)
100 3
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #3313      +/-   ##
==========================================
- Coverage   85.26%   76.54%   -8.72%     
==========================================
  Files         386      218     -168     
  Lines       30276    22719    -7557     
  Branches     2969     2950      -19     
==========================================
- Hits        25814    17391    -8423     
- Misses       3615     4527     +912     
+ Partials      847      801      -46     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@machichima machichima force-pushed the elastic-flytefile-issue branch from fd35be5 to 818cf34 Compare August 15, 2025 08:32
@machichima machichima marked this pull request as ready for review August 15, 2025 08:36
@machichima machichima changed the title [WIP] [Fix] Issue when using FlyteFile with Elastic [Fix] Issue when using FlyteFile with Elastic Aug 15, 2025
@machichima machichima requested a review from fg91 as a code owner August 16, 2025 00:39
@machichima machichima force-pushed the elastic-flytefile-issue branch from a54dbb4 to 818cf34 Compare August 16, 2025 04:36
@pingsutw pingsutw merged commit a09252a into flyteorg:master Aug 16, 2025
339 of 343 checks passed
Atharva1723 pushed a commit to Atharva1723/flytekit that referenced this pull request Oct 5, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants