Skip to content

Commit cce7e99

Browse files
committed
Support write mode in ZonalFile and override related methods
1 parent 47c2d76 commit cce7e99

File tree

2 files changed

+85
-0
lines changed

2 files changed

+85
-0
lines changed

gcsfs/zb_hns_utils.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,18 @@ async def download_range(offset, length, mrd):
1111
buffer = BytesIO()
1212
await mrd.download_ranges([(offset, length, buffer)])
1313
return buffer.getvalue()
14+
15+
16+
async def init_aaow(grpc_client, bucket_name, object_name, generation=None):
17+
"""
18+
Creates and opens the AsyncAppendableObjectWriter.
19+
"""
20+
21+
writer = AsyncAppendableObjectWriter(
22+
client=grpc_client,
23+
bucket_name=bucket_name,
24+
object_name=object_name,
25+
generation=generation,
26+
)
27+
await writer.open()
28+
return writer

gcsfs/zonal_file.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ def __init__(self, *args, **kwargs):
2222
self.mrd = asyn.sync(
2323
self.gcsfs.loop, self._init_mrd, self.bucket, self.key, self.generation
2424
)
25+
elif "w" in self.mode:
26+
self.aaow = asyn.sync(
27+
self.gcsfs.loop, self._init_aaow, self.bucket, self.key, self.generation
28+
)
2529
else:
2630
raise NotImplementedError(
2731
"Only read operations are currently supported for Zonal buckets."
@@ -47,10 +51,76 @@ def _fetch_range(self, start, end):
4751
return b""
4852
raise
4953

54+
async def _init_aaow(
55+
self, bucket_name, object_name, generation=None, overwrite=True
56+
):
57+
"""
58+
Initializes the AsyncAppendableObjectWriter.
59+
"""
60+
if generation is None and await self.gcsfs._exists(self.path):
61+
info = self.gcsfs.info(self.path)
62+
generation = info.get("generation")
63+
64+
return await zb_hns_utils.init_aaow(
65+
self.gcsfs.grpc_client, bucket_name, object_name, generation, overwrite
66+
)
67+
68+
def flush(self, force=False):
69+
raise NotImplementedError(
70+
"Write operations are not yet implemented for Zonal buckets."
71+
)
72+
73+
def commit(self):
74+
raise NotImplementedError(
75+
"Write operations are not yet implemented for Zonal buckets."
76+
)
77+
78+
def discard(self):
79+
raise NotImplementedError(
80+
"Write operations are not yet implemented for Zonal buckets."
81+
)
82+
83+
async def initiate_upload(
84+
fs,
85+
bucket,
86+
key,
87+
content_type="application/octet-stream",
88+
metadata=None,
89+
fixed_key_metadata=None,
90+
mode="overwrite",
91+
kms_key_name=None,
92+
):
93+
raise NotImplementedError(
94+
"Write operations are not yet implemented for Zonal buckets."
95+
)
96+
97+
async def simple_upload(
98+
fs,
99+
bucket,
100+
key,
101+
datain,
102+
metadatain=None,
103+
consistency=None,
104+
content_type="application/octet-stream",
105+
fixed_key_metadata=None,
106+
mode="overwrite",
107+
kms_key_name=None,
108+
):
109+
raise NotImplementedError(
110+
"Write operations are not yet implemented for Zonal buckets."
111+
)
112+
113+
async def upload_chunk(fs, location, data, offset, size, content_type):
114+
raise NotImplementedError(
115+
"Write operations are not yet implemented for Zonal buckets."
116+
)
117+
50118
def close(self):
51119
"""
52120
Closes the ZonalFile and the underlying AsyncMultiRangeDownloader.
53121
"""
54122
if self.mrd:
55123
asyn.sync(self.gcsfs.loop, self.mrd.close)
124+
if self.aaow:
125+
asyn.sync(self.gcsfs.loop, self.aaow.close)
56126
super().close()

0 commit comments

Comments
 (0)