1+ import logging
2+
13from fsspec import asyn
24from google .cloud .storage ._experimental .asyncio .async_multi_range_downloader import (
35 AsyncMultiRangeDownloader ,
46)
57
68from gcsfs .core import GCSFile
79
10+ logger = logging .getLogger ("gcsfs.zonal_file" )
11+
812
913class ZonalFile (GCSFile ):
1014 """
@@ -65,19 +69,51 @@ async def _init_aaow(
6569 self .gcsfs .grpc_client , bucket_name , object_name , generation , overwrite
6670 )
6771
72+ def write (self , data ):
73+ """
74+ Writes data using AsyncAppendableObjectWriter.
75+ """
76+ if not self .writable ():
77+ raise ValueError ("File not in write mode" )
78+ if self .closed :
79+ raise ValueError ("I/O operation on closed file." )
80+ if self .forced :
81+ raise ValueError ("This file has been force-flushed, can only close" )
82+
83+ asyn .sync (self .gcsfs .loop , self .aaow .append , data )
84+
6885 def flush (self , force = False ):
69- raise NotImplementedError (
70- "Write operations are not yet implemented for Zonal buckets."
71- )
86+ """
87+ Flushes the AsyncAppendableObjectWriter, sending all buffered data
88+ to the server.
89+ """
90+ if self .closed :
91+ raise ValueError ("Flush on closed file" )
92+ if force and self .forced :
93+ raise ValueError ("Force flush cannot be called more than once" )
94+ if force :
95+ self .forced = True
96+
97+ if self .readable ():
98+ # no-op to flush on read-mode
99+ return
100+
101+ asyn .sync (self .gcsfs .loop , self .aaow .flush )
72102
73103 def commit (self ):
74- raise NotImplementedError (
75- "Write operations are not yet implemented for Zonal buckets."
76- )
104+ """
105+ Commits the write by finalizing the AsyncAppendableObjectWriter.
106+ """
107+ if not self .writable ():
108+ raise ValueError ("File not in write mode" )
109+ self .autocommit = True
110+ asyn .sync (self .gcsfs .loop , self .aaow .finalize )
77111
78112 def discard (self ):
79- raise NotImplementedError (
80- "Write operations are not yet implemented for Zonal buckets."
113+ """Discard is not applicable for Zonal Buckets. Log a warning instead."""
114+ logger .warning (
115+ "Discard is unavailable for Zonal Buckets. \
116+ Data is uploaded via streaming and cannot be cancelled."
81117 )
82118
83119 async def initiate_upload (
@@ -91,7 +127,7 @@ async def initiate_upload(
91127 kms_key_name = None ,
92128 ):
93129 raise NotImplementedError (
94- "Write operations are not yet implemented for Zonal buckets."
130+ "Initiate_upload operation is not applicable for Zonal buckets. Please use write() instead ."
95131 )
96132
97133 async def simple_upload (
@@ -107,20 +143,23 @@ async def simple_upload(
107143 kms_key_name = None ,
108144 ):
109145 raise NotImplementedError (
110- "Write operations are not yet implemented for Zonal buckets."
146+ "Simple_upload operation is not applicable for Zonal buckets. Please use write() instead ."
111147 )
112148
113149 async def upload_chunk (fs , location , data , offset , size , content_type ):
114150 raise NotImplementedError (
115- "Write operations are not yet implemented for Zonal buckets."
151+ "Upload_chunk operation is not applicable for Zonal buckets. Please use write() instead ."
116152 )
117153
118154 def close (self ):
119155 """
120- Closes the ZonalFile and the underlying AsyncMultiRangeDownloader.
156+ Closes the ZonalFile and the underlying AsyncMultiRangeDownloader and AsyncAppendableObjectWriter.
157+ If in write mode, finalizes the write if autocommit is True.
121158 """
122- if self .mrd :
159+ if hasattr ( self , "mrd" ) and self .mrd :
123160 asyn .sync (self .gcsfs .loop , self .mrd .close )
124- if self .aaow :
125- asyn .sync (self .gcsfs .loop , self .aaow .close )
161+ if hasattr (self , "aaow" ) and self .aaow :
162+ asyn .sync (
163+ self .gcsfs .loop , self .aaow .close , finalize_on_close = self .autocommit
164+ )
126165 super ().close ()
0 commit comments