Skip to content

Commit ffe2103

Browse files
committed
Improve FS clients creation
1 parent 73d4ee6 commit ffe2103

File tree

3 files changed

+66
-163
lines changed

3 files changed

+66
-163
lines changed

src/crawlee/storage_clients/_file_system/_dataset_client.py

Lines changed: 25 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -56,21 +56,17 @@ def __init__(
5656
self,
5757
*,
5858
metadata: DatasetMetadata,
59-
storage_dir: Path,
59+
path_to_dataset: Path,
6060
lock: asyncio.Lock,
61-
directory_name: str | None = None,
6261
) -> None:
6362
"""Initialize a new instance.
6463
6564
Preferably use the `FileSystemDatasetClient.open` class method to create a new instance.
6665
"""
6766
self._metadata = metadata
6867

69-
self._storage_dir = storage_dir
70-
"""The base directory where the storage data are being persisted."""
71-
72-
self._directory_name = directory_name
73-
"""The directory name to use for this dataset. If None, uses metadata.name or default."""
68+
self._path_to_dataset = path_to_dataset
69+
"""The full path to the dataset directory."""
7470

7571
self._lock = lock
7672
"""A lock to ensure that only one operation is performed at a time."""
@@ -82,14 +78,7 @@ async def get_metadata(self) -> DatasetMetadata:
8278
@property
8379
def path_to_dataset(self) -> Path:
8480
"""The full path to the dataset directory."""
85-
# Use the explicit directory name if provided, otherwise fall back to metadata.name or default
86-
if self._directory_name is not None:
87-
return self._storage_dir / self._STORAGE_SUBDIR / self._directory_name
88-
89-
if self._metadata.name is None:
90-
return self._storage_dir / self._STORAGE_SUBDIR / self._STORAGE_SUBSUBDIR_DEFAULT
91-
92-
return self._storage_dir / self._STORAGE_SUBDIR / self._metadata.name
81+
return self._path_to_dataset
9382

9483
@property
9584
def path_to_metadata(self) -> Path:
@@ -124,12 +113,12 @@ async def open(
124113
ValueError: If a dataset with the specified ID is not found, if metadata is invalid,
125114
or if both name and alias are provided.
126115
"""
127-
# Validate parameters - exactly one of name or alias should be provided (or neither for default)
128-
if name is not None and alias is not None:
129-
raise ValueError('Cannot specify both name and alias parameters')
116+
# Validate parameters
117+
specified_params = sum(1 for param in [id, name, alias] if param is not None)
118+
if specified_params > 1:
119+
raise ValueError('Only one of "id", "name", or "alias" can be specified, not multiple.')
130120

131-
storage_dir = Path(configuration.storage_dir)
132-
dataset_base_path = storage_dir / cls._STORAGE_SUBDIR
121+
dataset_base_path = Path(configuration.storage_dir) / cls._STORAGE_SUBDIR
133122

134123
if not dataset_base_path.exists():
135124
await asyncio.to_thread(dataset_base_path.mkdir, parents=True, exist_ok=True)
@@ -141,21 +130,20 @@ async def open(
141130
if not dataset_dir.is_dir():
142131
continue
143132

144-
metadata_path = dataset_dir / METADATA_FILENAME
145-
if not metadata_path.exists():
133+
path_to_metadata = dataset_dir / METADATA_FILENAME
134+
if not path_to_metadata.exists():
146135
continue
147136

148137
try:
149-
file = await asyncio.to_thread(metadata_path.open)
138+
file = await asyncio.to_thread(path_to_metadata.open)
150139
try:
151140
file_content = json.load(file)
152141
metadata = DatasetMetadata(**file_content)
153142
if metadata.id == id:
154143
client = cls(
155144
metadata=metadata,
156-
storage_dir=storage_dir,
145+
path_to_dataset=dataset_base_path / dataset_dir,
157146
lock=asyncio.Lock(),
158-
directory_name=dataset_dir.name, # Use the actual directory name
159147
)
160148
await client._update_metadata(update_accessed_at=True)
161149
found = True
@@ -170,48 +158,29 @@ async def open(
170158

171159
# Get a new instance by name or alias.
172160
else:
173-
# Determine the directory name and metadata name based on whether this is a named or alias storage
174-
if alias is not None:
175-
# For alias storages, use the alias as directory name and set metadata.name to None
176-
# Special case: alias='default' should use the same directory as default storage
177-
directory_name = None if alias == 'default' else alias
178-
actual_name = None
179-
elif name is not None:
180-
# For named storages, use the name as both directory name and metadata.name
181-
directory_name = name
182-
actual_name = name
183-
else:
184-
# For default storage (no name or alias), use None for both - same as alias='default'
185-
directory_name = None
186-
actual_name = None
187-
188-
dataset_path = (
189-
dataset_base_path / cls._STORAGE_SUBSUBDIR_DEFAULT
190-
if directory_name is None
191-
else dataset_base_path / directory_name
192-
)
193-
metadata_path = dataset_path / METADATA_FILENAME
161+
dataset_dir = Path(name) if name else Path(alias) if alias else Path('default')
162+
path_to_dataset = dataset_base_path / dataset_dir
163+
path_to_metadata = path_to_dataset / METADATA_FILENAME
194164

195165
# If the dataset directory exists, reconstruct the client from the metadata file.
196-
if dataset_path.exists() and metadata_path.exists():
197-
file = await asyncio.to_thread(open, metadata_path)
166+
if path_to_dataset.exists() and path_to_metadata.exists():
167+
file = await asyncio.to_thread(open, path_to_metadata)
198168
try:
199169
file_content = json.load(file)
200170
finally:
201171
await asyncio.to_thread(file.close)
202172
try:
203173
metadata = DatasetMetadata(**file_content)
204-
# For aliases, ensure the metadata.name is None
205-
if alias is not None:
206-
metadata = metadata.model_copy(update={'name': None})
207174
except ValidationError as exc:
208-
raise ValueError(f'Invalid metadata file for dataset "{name}"') from exc
175+
raise ValueError(f'Invalid metadata file for dataset "{name or alias}"') from exc
176+
177+
# Update metadata name to match the resolution.
178+
metadata.name = name
209179

210180
client = cls(
211181
metadata=metadata,
212-
storage_dir=storage_dir,
182+
path_to_dataset=path_to_dataset,
213183
lock=asyncio.Lock(),
214-
directory_name=directory_name,
215184
)
216185

217186
await client._update_metadata(update_accessed_at=True)
@@ -221,17 +190,16 @@ async def open(
221190
now = datetime.now(timezone.utc)
222191
metadata = DatasetMetadata(
223192
id=crypto_random_object_id(),
224-
name=actual_name, # Use actual_name which will be None for aliases
193+
name=name,
225194
created_at=now,
226195
accessed_at=now,
227196
modified_at=now,
228197
item_count=0,
229198
)
230199
client = cls(
231200
metadata=metadata,
232-
storage_dir=storage_dir,
201+
path_to_dataset=path_to_dataset,
233202
lock=asyncio.Lock(),
234-
directory_name=directory_name,
235203
)
236204
await client._update_metadata()
237205

src/crawlee/storage_clients/_file_system/_key_value_store_client.py

Lines changed: 21 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -55,21 +55,17 @@ def __init__(
5555
self,
5656
*,
5757
metadata: KeyValueStoreMetadata,
58-
storage_dir: Path,
58+
path_to_kvs: Path,
5959
lock: asyncio.Lock,
60-
directory_name: str | None = None,
6160
) -> None:
6261
"""Initialize a new instance.
6362
6463
Preferably use the `FileSystemKeyValueStoreClient.open` class method to create a new instance.
6564
"""
6665
self._metadata = metadata
6766

68-
self._storage_dir = storage_dir
69-
"""The base directory where the storage data are being persisted."""
70-
71-
self._directory_name = directory_name
72-
"""The directory name to use for this key-value store. If None, uses metadata.name or default."""
67+
self._path_to_kvs = path_to_kvs
68+
"""The full path to the key-value store directory."""
7369

7470
self._lock = lock
7571
"""A lock to ensure that only one operation is performed at a time."""
@@ -81,14 +77,7 @@ async def get_metadata(self) -> KeyValueStoreMetadata:
8177
@property
8278
def path_to_kvs(self) -> Path:
8379
"""The full path to the key-value store directory."""
84-
# Use the explicit directory name if provided, otherwise fall back to metadata.name or default
85-
if self._directory_name is not None:
86-
return self._storage_dir / self._STORAGE_SUBDIR / self._directory_name
87-
88-
if self._metadata.name is None:
89-
return self._storage_dir / self._STORAGE_SUBDIR / self._STORAGE_SUBSUBDIR_DEFAULT
90-
91-
return self._storage_dir / self._STORAGE_SUBDIR / self._metadata.name
80+
return self._path_to_kvs
9281

9382
@property
9483
def path_to_metadata(self) -> Path:
@@ -127,8 +116,7 @@ async def open(
127116
if name is not None and alias is not None:
128117
raise ValueError('Cannot specify both name and alias parameters')
129118

130-
storage_dir = Path(configuration.storage_dir)
131-
kvs_base_path = storage_dir / cls._STORAGE_SUBDIR
119+
kvs_base_path = Path(configuration.storage_dir) / cls._STORAGE_SUBDIR
132120

133121
if not kvs_base_path.exists():
134122
await asyncio.to_thread(kvs_base_path.mkdir, parents=True, exist_ok=True)
@@ -140,21 +128,20 @@ async def open(
140128
if not kvs_dir.is_dir():
141129
continue
142130

143-
metadata_path = kvs_dir / METADATA_FILENAME
144-
if not metadata_path.exists():
131+
path_to_metadata = kvs_dir / METADATA_FILENAME
132+
if not path_to_metadata.exists():
145133
continue
146134

147135
try:
148-
file = await asyncio.to_thread(metadata_path.open)
136+
file = await asyncio.to_thread(path_to_metadata.open)
149137
try:
150138
file_content = json.load(file)
151139
metadata = KeyValueStoreMetadata(**file_content)
152140
if metadata.id == id:
153141
client = cls(
154142
metadata=metadata,
155-
storage_dir=storage_dir,
143+
path_to_kvs=kvs_base_path / kvs_dir,
156144
lock=asyncio.Lock(),
157-
directory_name=kvs_dir.name, # Use the actual directory name
158145
)
159146
await client._update_metadata(update_accessed_at=True)
160147
found = True
@@ -169,48 +156,29 @@ async def open(
169156

170157
# Get a new instance by name or alias.
171158
else:
172-
# Determine the directory name and metadata name based on whether this is a named or alias storage
173-
if alias is not None:
174-
# For alias storages, use the alias as directory name and set metadata.name to None
175-
# Special case: alias='default' should use the same directory as default storage
176-
directory_name = None if alias == 'default' else alias
177-
actual_name = None
178-
elif name is not None:
179-
# For named storages, use the name as both directory name and metadata.name
180-
directory_name = name
181-
actual_name = name
182-
else:
183-
# For default storage (no name or alias), use None for both - same as alias='default'
184-
directory_name = None
185-
actual_name = None
186-
187-
kvs_path = (
188-
kvs_base_path / cls._STORAGE_SUBSUBDIR_DEFAULT
189-
if directory_name is None
190-
else kvs_base_path / directory_name
191-
)
192-
metadata_path = kvs_path / METADATA_FILENAME
159+
kvs_dir = Path(name) if name else Path(alias) if alias else Path('default')
160+
path_to_kvs = kvs_base_path / kvs_dir
161+
path_to_metadata = path_to_kvs / METADATA_FILENAME
193162

194163
# If the key-value store directory exists, reconstruct the client from the metadata file.
195-
if kvs_path.exists() and metadata_path.exists():
196-
file = await asyncio.to_thread(open, metadata_path)
164+
if path_to_kvs.exists() and path_to_metadata.exists():
165+
file = await asyncio.to_thread(open, path_to_metadata)
197166
try:
198167
file_content = json.load(file)
199168
finally:
200169
await asyncio.to_thread(file.close)
201170
try:
202171
metadata = KeyValueStoreMetadata(**file_content)
203-
# For aliases, ensure the metadata.name is None
204-
if alias is not None:
205-
metadata = metadata.model_copy(update={'name': None})
206172
except ValidationError as exc:
207-
raise ValueError(f'Invalid metadata file for key-value store "{name}"') from exc
173+
raise ValueError(f'Invalid metadata file for key-value store "{name or alias}"') from exc
174+
175+
# Update metadata name to match the resolution.
176+
metadata.name = name
208177

209178
client = cls(
210179
metadata=metadata,
211-
storage_dir=storage_dir,
180+
path_to_kvs=path_to_kvs,
212181
lock=asyncio.Lock(),
213-
directory_name=directory_name,
214182
)
215183

216184
await client._update_metadata(update_accessed_at=True)
@@ -220,16 +188,15 @@ async def open(
220188
now = datetime.now(timezone.utc)
221189
metadata = KeyValueStoreMetadata(
222190
id=crypto_random_object_id(),
223-
name=actual_name, # Use actual_name which will be None for aliases
191+
name=name,
224192
created_at=now,
225193
accessed_at=now,
226194
modified_at=now,
227195
)
228196
client = cls(
229197
metadata=metadata,
230-
storage_dir=storage_dir,
198+
path_to_kvs=path_to_kvs,
231199
lock=asyncio.Lock(),
232-
directory_name=directory_name,
233200
)
234201
await client._update_metadata()
235202

0 commit comments

Comments
 (0)