Skip to content

Commit 0c719f5

Browse files
committed
add logic to route upload methods to zonal implementation
1 parent feea4d3 commit 0c719f5

File tree

3 files changed

+125
-30
lines changed

3 files changed

+125
-30
lines changed

gcsfs/core.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1864,6 +1864,7 @@ def __init__(
18641864
self.bucket = bucket
18651865
self.key = key
18661866
self.acl = acl
1867+
self.consistency = consistency
18671868
self.checker = get_consistency_checker(consistency)
18681869

18691870
if "a" in self.mode:
@@ -2073,6 +2074,18 @@ def _convert_fixed_key_metadata(metadata, *, from_google=False):
20732074

20742075

20752076
async def upload_chunk(fs, location, data, offset, size, content_type):
2077+
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
2078+
AsyncAppendableObjectWriter,
2079+
)
2080+
2081+
from .extended_gcsfs import ExtendedGcsFileSystem
2082+
from .extended_gcsfs import upload_chunk as ext_upload_chunk
2083+
2084+
if isinstance(fs, ExtendedGcsFileSystem) and isinstance(
2085+
location, AsyncAppendableObjectWriter
2086+
):
2087+
2088+
return await ext_upload_chunk(fs, location, data, offset, size, content_type)
20762089
head = {}
20772090
l = len(data)
20782091
range = "bytes %i-%i/%i" % (offset, offset + l - 1, size)
@@ -2101,6 +2114,22 @@ async def initiate_upload(
21012114
mode="overwrite",
21022115
kms_key_name=None,
21032116
):
2117+
from .extended_gcsfs import ExtendedGcsFileSystem
2118+
from .extended_gcsfs import initiate_upload as ext_initiate_upload
2119+
2120+
if isinstance(fs, ExtendedGcsFileSystem) and await fs._is_zonal_bucket(bucket):
2121+
2122+
return await ext_initiate_upload(
2123+
fs,
2124+
bucket,
2125+
key,
2126+
content_type,
2127+
metadata,
2128+
fixed_key_metadata,
2129+
mode,
2130+
kms_key_name,
2131+
)
2132+
21042133
j = {"name": key}
21052134
if metadata:
21062135
j["metadata"] = metadata
@@ -2135,6 +2164,24 @@ async def simple_upload(
21352164
mode="overwrite",
21362165
kms_key_name=None,
21372166
):
2167+
from .extended_gcsfs import ExtendedGcsFileSystem
2168+
from .extended_gcsfs import simple_upload as ext_simple_upload
2169+
2170+
if isinstance(fs, ExtendedGcsFileSystem) and await fs._is_zonal_bucket(bucket):
2171+
2172+
return await ext_simple_upload(
2173+
fs,
2174+
bucket,
2175+
key,
2176+
datain,
2177+
metadatain,
2178+
consistency,
2179+
content_type,
2180+
fixed_key_metadata,
2181+
mode,
2182+
kms_key_name,
2183+
)
2184+
21382185
checker = get_consistency_checker(consistency)
21392186
path = f"{fs._location}/upload/storage/v1/b/{quote(bucket)}/o"
21402187
metadata = {"name": key}

gcsfs/extended_gcsfs.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,3 +252,41 @@ async def _cat_file(self, path, start=None, end=None, mrd=None, **kwargs):
252252
# Explicit cleanup if we created the MRD
253253
if mrd_created:
254254
await mrd.close()
255+
256+
257+
async def upload_chunk(fs, location, data, offset, size, content_type):
258+
raise NotImplementedError(
259+
"upload_chunk is not implemented yet for ExtendedGcsFileSystem. Please use write() instead."
260+
)
261+
262+
263+
async def initiate_upload(
264+
fs,
265+
bucket,
266+
key,
267+
content_type="application/octet-stream",
268+
metadata=None,
269+
fixed_key_metadata=None,
270+
mode="overwrite",
271+
kms_key_name=None,
272+
):
273+
raise NotImplementedError(
274+
"initiate_upload is not implemented yet for ExtendedGcsFileSystem. Please use write() instead."
275+
)
276+
277+
278+
async def simple_upload(
279+
fs,
280+
bucket,
281+
key,
282+
datain,
283+
metadatain=None,
284+
consistency=None,
285+
content_type="application/octet-stream",
286+
fixed_key_metadata=None,
287+
mode="overwrite",
288+
kms_key_name=None,
289+
):
290+
raise NotImplementedError(
291+
"simple_upload is not implemented yet for ExtendedGcsFileSystem. Please use write() instead."
292+
)

gcsfs/zonal_file.py

Lines changed: 40 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,10 @@ def write(self, data):
7575
"""
7676
Writes data using AsyncAppendableObjectWriter.
7777
"""
78-
if not self.writable():
79-
raise ValueError("File not in write mode")
8078
if self.closed:
8179
raise ValueError("I/O operation on closed file.")
80+
if not self.writable():
81+
raise ValueError("File not in write mode")
8282
if self.forced:
8383
raise ValueError("This file has been force-flushed, can only close")
8484

@@ -118,39 +118,49 @@ def discard(self):
118118
Data is uploaded via streaming and cannot be cancelled."
119119
)
120120

121-
async def initiate_upload(
122-
fs,
123-
bucket,
124-
key,
125-
content_type="application/octet-stream",
126-
metadata=None,
127-
fixed_key_metadata=None,
128-
mode="overwrite",
129-
kms_key_name=None,
130-
):
131-
raise NotImplementedError(
132-
"Initiate_upload operation is not implemented yet for Zonal buckets. Please use write() instead."
121+
def _initiate_upload(self):
122+
"""Initiates the upload for Zonal buckets using gRPC."""
123+
from gcsfs.extended_gcsfs import initiate_upload
124+
125+
self.location = asyn.sync(
126+
self.gcsfs.loop,
127+
initiate_upload,
128+
self.gcsfs,
129+
self.bucket,
130+
self.key,
131+
self.content_type,
132+
self.metadata,
133+
self.fixed_key_metadata,
134+
mode="create" if "x" in self.mode else "overwrite",
135+
kms_key_name=self.kms_key_name,
136+
timeout=self.timeout,
133137
)
134138

135-
async def simple_upload(
136-
fs,
137-
bucket,
138-
key,
139-
datain,
140-
metadatain=None,
141-
consistency=None,
142-
content_type="application/octet-stream",
143-
fixed_key_metadata=None,
144-
mode="overwrite",
145-
kms_key_name=None,
146-
):
147-
raise NotImplementedError(
148-
"Simple_upload operation is not implemented yet for Zonal buckets. Please use write() instead."
139+
def _simple_upload(self):
140+
"""Performs a simple upload for Zonal buckets using gRPC."""
141+
from gcsfs.extended_gcsfs import simple_upload
142+
143+
self.buffer.seek(0)
144+
data = self.buffer.read()
145+
asyn.sync(
146+
self.gcsfs.loop,
147+
simple_upload,
148+
self.gcsfs,
149+
self.bucket,
150+
self.key,
151+
data,
152+
self.metadata,
153+
self.consistency,
154+
self.content_type,
155+
self.fixed_key_metadata,
156+
mode="create" if "x" in self.mode else "overwrite",
157+
kms_key_name=self.kms_key_name,
158+
timeout=self.timeout,
149159
)
150160

151-
async def upload_chunk(fs, location, data, offset, size, content_type):
161+
def _upload_chunk(self, final=False):
152162
raise NotImplementedError(
153-
"Upload_chunk operation is not implemented yet for Zonal buckets. Please use write() instead."
163+
"_upload_chunk is not implemented yet for ZonalFile. Please use write() instead."
154164
)
155165

156166
def close(self):

0 commit comments

Comments
 (0)