11from __future__ import annotations
22
3+ import asyncio
34import logging
45from collections .abc import Mapping
56from pathlib import Path , PurePosixPath , PureWindowsPath
@@ -99,10 +100,10 @@ def subfold(name: str, folds: tuple[int, ...]) -> tuple[str, ...]:
99100 )
100101
101102
102- class ZarrTable (Table ):
103+ class ExternalZarrTable (Table ):
103104 """
104- The table tracking externally stored Zarr hierarchies.
105- Declare as ZarrTable (connection, database)
105+ The table tracking externally stored objects, with special support for Zarr hierarchies.
106+ Declare as ExternalZarrTable (connection, database)
106107 """
107108
108109 def __init__ (self , connection , store , database ):
@@ -141,7 +142,7 @@ def definition(self):
141142
142143 @property
143144 def table_name (self ):
144- return f"zarr_ { self .store } "
145+ return f"{ EXTERNAL_TABLE_ROOT } _zarr_ { self .store } "
145146
146147 @property
147148 def s3 (self ):
@@ -194,37 +195,14 @@ def _copy_store(self, source_store: zarr.abc.Store, dest_path: str, metadata=Non
194195 elif self .spec ["protocol" ] == "file" :
195196 # For file system, use LocalStore
196197 from zarr .storage import LocalStore
197- import os
198- os .makedirs (str (dest_path ), exist_ok = True )
199198 dest_store = LocalStore (str (dest_path ))
200199 else :
201200 raise ValueError (f"Unsupported protocol: { self .spec ['protocol' ]} " )
202201
203202 # Copy all keys from source to destination store
204203 import asyncio
205- asyncio .run (self . _manual_copy_store (source_store , dest_store ))
204+ asyncio .run (_copy_zarr_store (source_store , dest_store ))
206205
207- async def _manual_copy_store (self , source_store , dest_store ):
208- """Manually copy store contents using list_dir and set"""
209- import asyncio
210-
211- # Get all keys from the source store using list_dir
212- try :
213- keys_generator = source_store .list_dir (prefix = "" )
214- keys = [key async for key in keys_generator ]
215- except Exception as e :
216- raise ValueError (f"Cannot enumerate keys in source store: { e } " )
217-
218- # Copy each key using get/set
219- for key in keys :
220- try :
221- value = await source_store .get (key )
222- await dest_store .set (key , value )
223- except Exception as e :
224- # Skip keys we can't copy but log the issue
225- print (f"Warning: Could not copy key '{ key } ': { e } " )
226- continue
227-
228206 def _download_file (self , external_path , download_path ):
229207 if self .spec ["protocol" ] == "s3" :
230208 self .s3 .fget (external_path , download_path )
@@ -619,7 +597,7 @@ def __getitem__(self, store):
619597 :return: the ExternalTable object for the store
620598 """
621599 if store not in self ._tables :
622- self ._tables [store ] = ZarrTable (
600+ self ._tables [store ] = ExternalZarrTable (
623601 connection = self .schema .connection ,
624602 store = store ,
625603 database = self .schema .database ,
@@ -631,3 +609,19 @@ def __len__(self):
631609
632610 def __iter__ (self ):
633611 return iter (self ._tables )
612+
613+
614+ async def _copy_zarr_store (source_store : zarr .abc .store .Store , dest_store : zarr .abc .store .Store ) -> None :
615+ """Copy the contents of a Zarr store using list_dir and set. This is a brittle, temporary
616+ implementation that should be made more robust to handle the failure of individual keys
617+ to copy.
618+ """
619+
620+ async for key in source_store .list_dir (prefix = "" ):
621+ try :
622+ value = await source_store .get (key )
623+ await dest_store .set (key , value )
624+ except Exception as e :
625+ # Skip keys we can't copy but log the issue
626+ print (f"Warning: Could not copy key '{ key } ': { e } " )
627+ continue
0 commit comments