Skip to content

Commit 83d43fd

Browse files
authored
Add fsspec support to id_gen io (#972)
Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com>
1 parent fae00de commit 83d43fd

File tree

1 file changed

+15
-8
lines changed

1 file changed

+15
-8
lines changed

ray-curator/ray_curator/stages/deduplication/id_generator.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515

1616
import json
1717
import uuid
18+
from typing import Any
1819

20+
import fsspec
1921
import ray
2022
from loguru import logger
2123
from ray.actor import ActorHandle
@@ -57,8 +59,9 @@ def get_batch_range(self, files: str | list[str] | None, key: str | None) -> tup
5759

5860
return self.batch_registry[key]
5961

60-
def to_disk(self, filepath: str) -> None:
61-
with open(filepath, "w") as f:
62+
def to_disk(self, filepath: str, storage_options: dict[str, Any] | None = None) -> None:
63+
storage_options = storage_options or {}
64+
with fsspec.open(filepath, mode="w", **storage_options) as f:
6265
json.dump(
6366
{
6467
"next_id": self.next_id,
@@ -68,8 +71,9 @@ def to_disk(self, filepath: str) -> None:
6871
)
6972

7073
@classmethod
71-
def from_disk(cls, filepath: str) -> "IdGeneratorBase":
72-
with open(filepath) as f:
74+
def from_disk(cls, filepath: str, storage_options: dict[str, Any] | None = None) -> "IdGeneratorBase":
75+
storage_options = storage_options or {}
76+
with fsspec.open(filepath, mode="r", **storage_options) as f:
7377
data = json.load(f)
7478
return cls(start_id=data["next_id"], batch_registry=data["batch_registry"])
7579

@@ -87,12 +91,13 @@ def kill_id_generator_actor() -> None:
8791
ray.kill(get_id_generator_actor())
8892

8993

90-
def create_id_generator_actor(filepath: str | None = None) -> None:
94+
def create_id_generator_actor(filepath: str | None = None, storage_options: dict[str, Any] | None = None) -> None:
9195
"""Create an id generator actor.
9296
9397
Args:
9498
filepath (str): Path from where we want to load the id generator state json file.
9599
If None, a new actor is created.
100+
storage_options (dict[str, Any] | None): Storage options to pass to fsspec.open.
96101
"""
97102
register_loguru_serializer() # TODO: instead of calling before each ray.init we can call it a packages __init__
98103
ray.init(ignore_reinit_error=True)
@@ -105,7 +110,8 @@ def create_id_generator_actor(filepath: str | None = None) -> None:
105110
else:
106111
# Create actor from saved state on disk
107112
# First load the data from disk
108-
with open(filepath) as f:
113+
storage_options = storage_options or {}
114+
with fsspec.open(filepath, mode="r", **storage_options) as f:
109115
data = json.load(f)
110116
# Create actor with loaded data
111117
_ = IdGenerator.options(
@@ -120,5 +126,6 @@ def create_id_generator_actor(filepath: str | None = None) -> None:
120126
ray.shutdown()
121127

122128

123-
def write_id_generator_to_disk(filepath: str) -> None:
124-
ray.get(get_id_generator_actor().to_disk.remote(filepath))
129+
def write_id_generator_to_disk(filepath: str, storage_options: dict[str, Any] | None = None) -> None:
130+
storage_options = storage_options or {}
131+
ray.get(get_id_generator_actor().to_disk.remote(filepath, storage_options))

0 commit comments

Comments
 (0)