77import fsspec
88import pytest
99import requests
10+ from google .cloud import storage
1011
1112from gcsfs import GCSFileSystem
12- from gcsfs .tests .settings import TEST_BUCKET
13+ from gcsfs .tests .settings import TEST_BUCKET , TEST_VERSIONED_BUCKET , TEST_ZONAL_BUCKET
1314
1415files = {
1516 "test/accounts.1.json" : (
@@ -57,7 +58,7 @@ def stop_docker(container):
5758 subprocess .call (["docker" , "rm" , "-f" , "-v" , cid ])
5859
5960
60- @pytest .fixture (scope = "module " )
61+ @pytest .fixture (scope = "session " )
6162def docker_gcs ():
6263 if "STORAGE_EMULATOR_HOST" in os .environ :
6364 # assume using real API or otherwise have a server already set up
@@ -88,7 +89,7 @@ def docker_gcs():
8889 stop_docker (container )
8990
9091
91- @pytest .fixture
92+ @pytest .fixture ( scope = "session" )
9293def gcs_factory (docker_gcs ):
9394 params ["endpoint_url" ] = docker_gcs
9495
@@ -99,44 +100,83 @@ def factory(**kwargs):
99100 return factory
100101
101102
103+ @pytest .fixture (scope = "session" )
104+ def buckets_to_delete ():
105+ """A set to keep track of buckets created during the test session."""
106+ return set ()
107+
108+
102109@pytest .fixture
103- def gcs (gcs_factory , populate = True ):
110+ def gcs (gcs_factory , buckets_to_delete , populate = True ):
104111 gcs = gcs_factory ()
105- try :
106- # ensure we're empty.
107- try :
108- gcs .rm (TEST_BUCKET , recursive = True )
109- except FileNotFoundError :
110- pass
111- try :
112+ try : # ensure we're empty.
113+ # Create the bucket if it doesn't exist, otherwise clean it.
114+ if not gcs .exists (TEST_BUCKET ):
112115 gcs .mkdir (TEST_BUCKET )
113- except Exception :
114- pass
116+ buckets_to_delete .add (TEST_BUCKET )
117+ else :
118+ try :
119+ gcs .rm (gcs .find (TEST_BUCKET ))
120+ except Exception as e :
121+ logging .warning (f"Failed to empty bucket { TEST_BUCKET } : { e } " )
115122
116123 if populate :
117124 gcs .pipe ({TEST_BUCKET + "/" + k : v for k , v in allfiles .items ()})
118125 gcs .invalidate_cache ()
119126 yield gcs
120127 finally :
121- try :
122- gcs .rm (gcs .find (TEST_BUCKET ))
123- gcs .rm (TEST_BUCKET )
124- except : # noqa: E722
125- pass
128+ _cleanup_gcs (gcs )
126129
127130
128- def _cleanup_gcs (gcs , is_real_gcs ):
129- """Only remove the bucket/contents if we are NOT using the real GCS, logging a warning on failure."""
130- if is_real_gcs :
131- return
131+ def _cleanup_gcs (gcs ):
132+ """Clean the bucket contents, logging a warning on failure."""
132133 try :
133- gcs .rm (TEST_BUCKET , recursive = True )
134+ gcs .rm (gcs . find ( TEST_BUCKET ) )
134135 except Exception as e :
135136 logging .warning (f"Failed to clean up GCS bucket { TEST_BUCKET } : { e } " )
136137
137138
139+ @pytest .fixture (scope = "session" , autouse = True )
140+ def final_cleanup (gcs_factory , buckets_to_delete ):
141+ """A session-scoped fixture to delete the test buckets after all tests are run."""
142+ yield
143+ # This code runs after the entire test session finishes
144+ use_extended_gcs = os .getenv (
145+ "GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT" , "false"
146+ ).lower () in (
147+ "true" ,
148+ "1" ,
149+ )
150+
151+ if use_extended_gcs :
152+ is_real_gcs = (
153+ os .environ .get ("STORAGE_EMULATOR_HOST" ) == "https://storage.googleapis.com"
154+ )
155+ mock_authentication_manager = (
156+ patch ("google.auth.default" , return_value = (None , "fake-project" ))
157+ if not is_real_gcs
158+ else nullcontext ()
159+ )
160+ else :
161+ mock_authentication_manager = nullcontext ()
162+
163+ with mock_authentication_manager :
164+ gcs = gcs_factory ()
165+ for bucket in buckets_to_delete :
166+ # For real GCS, only delete if created by the test suite.
167+ # For emulators, always delete.
168+ try :
169+ if gcs .exists (bucket ):
170+ gcs .rm (bucket , recursive = True )
171+ logging .info (f"Cleaned up bucket: { bucket } " )
172+ except Exception as e :
173+ logging .warning (
174+ f"Failed to perform final cleanup for bucket { bucket } : { e } "
175+ )
176+
177+
138178@pytest .fixture
139- def extended_gcsfs (gcs_factory , populate = True ):
179+ def extended_gcsfs (gcs_factory , buckets_to_delete , populate = True ):
140180 # Check if we are running against a real GCS endpoint
141181 is_real_gcs = (
142182 os .environ .get ("STORAGE_EMULATOR_HOST" ) == "https://storage.googleapis.com"
@@ -147,39 +187,96 @@ def extended_gcsfs(gcs_factory, populate=True):
147187 # Only create/delete/populate the bucket if we are NOT using the real GCS endpoint
148188 if not is_real_gcs :
149189 try :
150- extended_gcsfs .rm (TEST_BUCKET , recursive = True )
190+ extended_gcsfs .rm (TEST_ZONAL_BUCKET , recursive = True )
151191 except FileNotFoundError :
152192 pass
153- extended_gcsfs .mkdir (TEST_BUCKET )
193+ extended_gcsfs .mkdir (TEST_ZONAL_BUCKET )
194+ buckets_to_delete .add (TEST_ZONAL_BUCKET )
154195 if populate :
155196 extended_gcsfs .pipe (
156- {TEST_BUCKET + "/" + k : v for k , v in allfiles .items ()}
197+ {TEST_ZONAL_BUCKET + "/" + k : v for k , v in allfiles .items ()}
157198 )
158199 extended_gcsfs .invalidate_cache ()
159200 yield extended_gcsfs
160201 finally :
161- _cleanup_gcs (extended_gcsfs , is_real_gcs )
202+ _cleanup_gcs (extended_gcsfs )
162203
163204
164205@pytest .fixture
165- def gcs_versioned (gcs_factory ):
206+ def gcs_versioned (gcs_factory , buckets_to_delete ):
166207 gcs = gcs_factory ()
167208 gcs .version_aware = True
168- try :
209+ is_real_gcs = (
210+ os .environ .get ("STORAGE_EMULATOR_HOST" ) == "https://storage.googleapis.com"
211+ )
212+ try : # ensure we're empty.
213+ # The versioned bucket might be created by `is_versioning_enabled`
214+ # in test_core_versioned.py. We must register it for cleanup only if
215+ # it was created by this test run.
169216 try :
170- gcs . rm ( gcs . find ( TEST_BUCKET , versions = True ))
171- except FileNotFoundError :
172- pass
217+ from gcsfs . tests . test_core_versioned import (
218+ _VERSIONED_BUCKET_CREATED_BY_TESTS ,
219+ )
173220
174- try :
175- gcs .mkdir (TEST_BUCKET , enable_versioning = True )
176- except Exception :
177- pass
221+ if _VERSIONED_BUCKET_CREATED_BY_TESTS :
222+ buckets_to_delete .add (TEST_VERSIONED_BUCKET )
223+ except ImportError :
224+ pass # test_core_versioned is not being run
225+ if is_real_gcs :
226+ cleanup_versioned_bucket (gcs , TEST_VERSIONED_BUCKET )
227+ else :
228+ # For emulators, we delete and recreate the bucket for a clean state
229+ try :
230+ gcs .rm (TEST_VERSIONED_BUCKET , recursive = True )
231+ except FileNotFoundError :
232+ pass
233+ gcs .mkdir (TEST_VERSIONED_BUCKET , enable_versioning = True )
234+ buckets_to_delete .add (TEST_VERSIONED_BUCKET )
178235 gcs .invalidate_cache ()
179236 yield gcs
180237 finally :
238+ # Ensure the bucket is empty after the test.
181239 try :
182- gcs .rm (gcs .find (TEST_BUCKET , versions = True ))
183- gcs .rm (TEST_BUCKET )
184- except : # noqa: E722
185- pass
240+ if is_real_gcs :
241+ cleanup_versioned_bucket (gcs , TEST_VERSIONED_BUCKET )
242+ except Exception as e :
243+ logging .warning (
244+ f"Failed to clean up versioned bucket { TEST_VERSIONED_BUCKET } after test: { e } "
245+ )
246+
247+
248+ def cleanup_versioned_bucket (gcs , bucket_name , prefix = None ):
249+ """
250+ Deletes all object versions in a bucket using the google-cloud-storage client,
251+ ensuring it uses the same credentials as the gcsfs instance.
252+ """
253+ # Define a retry policy for API calls to handle rate limiting.
254+ # This can retry on 429 Too Many Requests errors, which can happen
255+ # when deleting many object versions quickly.
256+ from google .api_core .retry import Retry
257+
258+ retry_policy = Retry (
259+ initial = 1.0 , # Initial delay in seconds
260+ maximum = 30.0 , # Maximum delay in seconds
261+ multiplier = 1.2 , # Backoff factor
262+ )
263+
264+ client = storage .Client (
265+ credentials = gcs .credentials .credentials , project = gcs .project
266+ )
267+
268+ # List all blobs, including old versions
269+ blobs_to_delete = list (client .list_blobs (bucket_name , versions = True , prefix = prefix ))
270+
271+ if not blobs_to_delete :
272+ logging .info ("No object versions to delete in %s." , bucket_name )
273+ return
274+
275+ logging .info (
276+ "Deleting %d object versions from %s." , len (blobs_to_delete ), bucket_name
277+ )
278+ time .sleep (2 )
279+ for blob in blobs_to_delete :
280+ blob .delete (retry = retry_policy )
281+
282+ logging .info ("Successfully deleted %d object versions." , len (blobs_to_delete ))
0 commit comments