1
1
from __future__ import annotations
2
2
3
+ import asyncio
3
4
import io
4
5
import os
5
6
import shutil
8
9
9
10
from zarr .abc .store import ByteRangeRequest , Store
10
11
from zarr .core .buffer import Buffer
11
- from zarr .core .common import concurrent_map , to_thread
12
+ from zarr .core .common import concurrent_map
12
13
13
14
if TYPE_CHECKING :
14
15
from collections .abc import AsyncGenerator , Iterable
@@ -134,7 +135,7 @@ async def get(
134
135
path = self .root / key
135
136
136
137
try :
137
- return await to_thread (_get , path , prototype , byte_range )
138
+ return await asyncio . to_thread (_get , path , prototype , byte_range )
138
139
except (FileNotFoundError , IsADirectoryError , NotADirectoryError ):
139
140
return None
140
141
@@ -159,7 +160,7 @@ async def get_partial_values(
159
160
assert isinstance (key , str )
160
161
path = self .root / key
161
162
args .append ((_get , path , prototype , byte_range ))
162
- return await concurrent_map (args , to_thread , limit = None ) # TODO: fix limit
163
+ return await concurrent_map (args , asyncio . to_thread , limit = None ) # TODO: fix limit
163
164
164
165
async def set (self , key : str , value : Buffer ) -> None :
165
166
return await self ._set (key , value )
@@ -178,7 +179,7 @@ async def _set(self, key: str, value: Buffer, exclusive: bool = False) -> None:
178
179
if not isinstance (value , Buffer ):
179
180
raise TypeError ("LocalStore.set(): `value` must a Buffer instance" )
180
181
path = self .root / key
181
- await to_thread (_put , path , value , start = None , exclusive = exclusive )
182
+ await asyncio . to_thread (_put , path , value , start = None , exclusive = exclusive )
182
183
183
184
async def set_partial_values (
184
185
self , key_start_values : Iterable [tuple [str , int , bytes | bytearray | memoryview ]]
@@ -189,19 +190,19 @@ async def set_partial_values(
189
190
assert isinstance (key , str )
190
191
path = self .root / key
191
192
args .append ((_put , path , value , start ))
192
- await concurrent_map (args , to_thread , limit = None ) # TODO: fix limit
193
+ await concurrent_map (args , asyncio . to_thread , limit = None ) # TODO: fix limit
193
194
194
195
async def delete (self , key : str ) -> None :
195
196
self ._check_writable ()
196
197
path = self .root / key
197
198
if path .is_dir (): # TODO: support deleting directories? shutil.rmtree?
198
199
shutil .rmtree (path )
199
200
else :
200
- await to_thread (path .unlink , True ) # Q: we may want to raise if path is missing
201
+ await asyncio . to_thread (path .unlink , True ) # Q: we may want to raise if path is missing
201
202
202
203
async def exists (self , key : str ) -> bool :
203
204
path = self .root / key
204
- return await to_thread (path .is_file )
205
+ return await asyncio . to_thread (path .is_file )
205
206
206
207
async def list (self ) -> AsyncGenerator [str , None ]:
207
208
"""Retrieve all keys in the store.
0 commit comments