99import fsspec
1010import pytest
1111import requests
12+ from google .cloud import storage
1213
1314from gcsfs import GCSFileSystem
14- from gcsfs .tests .settings import TEST_BUCKET
15+ from gcsfs .tests .settings import TEST_BUCKET , TEST_VERSIONED_BUCKET , TEST_ZONAL_BUCKET
1516
1617files = {
1718 "test/accounts.1.json" : (
@@ -59,7 +60,7 @@ def stop_docker(container):
5960 subprocess .call (["docker" , "rm" , "-f" , "-v" , cid ])
6061
6162
62- @pytest .fixture (scope = "module " )
63+ @pytest .fixture (scope = "session " )
6364def docker_gcs ():
6465 if "STORAGE_EMULATOR_HOST" in os .environ :
6566 # assume using real API or otherwise have a server already set up
@@ -90,7 +91,7 @@ def docker_gcs():
9091 stop_docker (container )
9192
9293
93- @pytest .fixture
94+ @pytest .fixture ( scope = "session" )
9495def gcs_factory (docker_gcs ):
9596 params ["endpoint_url" ] = docker_gcs
9697
@@ -101,44 +102,102 @@ def factory(**kwargs):
101102 return factory
102103
103104
105+ @pytest .fixture (scope = "session" )
106+ def buckets_to_delete ():
107+ """
108+ Provides a session-scoped set to track the names of GCS buckets that are
109+ created by the test suite.
110+
111+ When tests run, they may create new GCS buckets. If these buckets are not
112+ deleted, they will persist after the test run, leading to resource leakage.
113+ This set acts as a registry of buckets that the `final_cleanup` fixture
114+ should remove at the end of the entire test session.
115+ """
116+ return set ()
117+
118+
104119@pytest .fixture
105- def gcs (gcs_factory , populate = True ):
120+ def gcs (gcs_factory , buckets_to_delete , populate = True ):
106121 gcs = gcs_factory ()
107- try :
108- # ensure we're empty.
109- try :
110- gcs .rm (TEST_BUCKET , recursive = True )
111- except FileNotFoundError :
112- pass
113- try :
122+ try : # ensure we're empty.
123+ # Create the bucket if it doesn't exist, otherwise clean it.
124+ if not gcs .exists (TEST_BUCKET ):
114125 gcs .mkdir (TEST_BUCKET )
115- except Exception :
116- pass
126+ # By adding the bucket name to this set, we are marking it for
127+ # deletion at the end of the test session. This ensures that if
128+ # the test suite creates the bucket, it will also be responsible
129+ # for deleting it. If the bucket already existed, we assume it's
130+ # managed externally and should not be deleted by the tests.
131+ buckets_to_delete .add (TEST_BUCKET )
132+ else :
133+ try :
134+ gcs .rm (gcs .find (TEST_BUCKET ))
135+ except Exception as e :
136+ logging .warning (f"Failed to empty bucket { TEST_BUCKET } : { e } " )
117137
118138 if populate :
119139 gcs .pipe ({TEST_BUCKET + "/" + k : v for k , v in allfiles .items ()})
120140 gcs .invalidate_cache ()
121141 yield gcs
122142 finally :
123- try :
124- gcs .rm (gcs .find (TEST_BUCKET ))
125- gcs .rm (TEST_BUCKET )
126- except : # noqa: E722
127- pass
143+ _cleanup_gcs (gcs )
128144
129145
130- def _cleanup_gcs (gcs , is_real_gcs ):
131- """Only remove the bucket/contents if we are NOT using the real GCS, logging a warning on failure."""
132- if is_real_gcs :
133- return
146+ def _cleanup_gcs (gcs ):
147+ """Clean the bucket contents, logging a warning on failure."""
134148 try :
135- gcs .rm (TEST_BUCKET , recursive = True )
149+ gcs .rm (gcs . find ( TEST_BUCKET ) )
136150 except Exception as e :
137151 logging .warning (f"Failed to clean up GCS bucket { TEST_BUCKET } : { e } " )
138152
139153
154+ @pytest .fixture (scope = "session" , autouse = True )
155+ def final_cleanup (gcs_factory , buckets_to_delete ):
156+ """
157+ A session-scoped, auto-use fixture that deletes all buckets registered
158+ in the `buckets_to_delete` set after the entire test session is complete.
159+ """
160+ yield
161+ # This code runs after the entire test session finishes
162+ use_extended_gcs = os .getenv (
163+ "GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT" , "false"
164+ ).lower () in (
165+ "true" ,
166+ "1" ,
167+ )
168+
169+ if use_extended_gcs :
170+ is_real_gcs = (
171+ os .environ .get ("STORAGE_EMULATOR_HOST" ) == "https://storage.googleapis.com"
172+ )
173+ # Mock authentication if not using a real GCS endpoint,
174+ # since grpc client in extended_gcsfs does not work with anon access
175+ mock_authentication_manager = (
176+ patch ("google.auth.default" , return_value = (None , "fake-project" ))
177+ if not is_real_gcs
178+ else nullcontext ()
179+ )
180+ else :
181+ mock_authentication_manager = nullcontext ()
182+
183+ with mock_authentication_manager :
184+ gcs = gcs_factory ()
185+ for bucket in buckets_to_delete :
186+ # The cleanup logic attempts to delete every bucket that was
187+ # added to the set during the session. For real GCS, only delete if
188+ # created by the test suite.
189+ try :
190+ if gcs .exists (bucket ):
191+ gcs .rm (bucket , recursive = True )
192+ logging .info (f"Cleaned up bucket: { bucket } " )
193+ except Exception as e :
194+ logging .warning (
195+ f"Failed to perform final cleanup for bucket { bucket } : { e } "
196+ )
197+
198+
140199@pytest .fixture
141- def extended_gcsfs (gcs_factory , populate = True ):
200+ def extended_gcsfs (gcs_factory , buckets_to_delete , populate = True ):
142201 # Check if we are running against a real GCS endpoint
143202 is_real_gcs = (
144203 os .environ .get ("STORAGE_EMULATOR_HOST" ) == "https://storage.googleapis.com"
@@ -158,39 +217,102 @@ def extended_gcsfs(gcs_factory, populate=True):
158217 # Only create/delete/populate the bucket if we are NOT using the real GCS endpoint
159218 if not is_real_gcs :
160219 try :
161- extended_gcsfs .rm (TEST_BUCKET , recursive = True )
220+ extended_gcsfs .rm (TEST_ZONAL_BUCKET , recursive = True )
162221 except FileNotFoundError :
163222 pass
164- extended_gcsfs .mkdir (TEST_BUCKET )
223+ extended_gcsfs .mkdir (TEST_ZONAL_BUCKET )
224+ # When running against the emulator, the zonal test bucket is
225+ # always created and added to the set for guaranteed cleanup.
226+ buckets_to_delete .add (TEST_ZONAL_BUCKET )
165227 if populate :
166228 extended_gcsfs .pipe (
167- {TEST_BUCKET + "/" + k : v for k , v in allfiles .items ()}
229+ {TEST_ZONAL_BUCKET + "/" + k : v for k , v in allfiles .items ()}
168230 )
169231 extended_gcsfs .invalidate_cache ()
170232 yield extended_gcsfs
171233 finally :
172- _cleanup_gcs (extended_gcsfs , is_real_gcs )
234+ _cleanup_gcs (extended_gcsfs )
173235
174236
175237@pytest .fixture
176- def gcs_versioned (gcs_factory ):
238+ def gcs_versioned (gcs_factory , buckets_to_delete ):
177239 gcs = gcs_factory ()
178240 gcs .version_aware = True
179- try :
241+ is_real_gcs = (
242+ os .environ .get ("STORAGE_EMULATOR_HOST" ) == "https://storage.googleapis.com"
243+ )
244+ try : # ensure we're empty.
245+ # The versioned bucket might be created by `is_versioning_enabled`
246+ # in test_core_versioned.py. We must register it for cleanup only if
247+ # it was created by this test run.
180248 try :
181- gcs . rm ( gcs . find ( TEST_BUCKET , versions = True ))
182- except FileNotFoundError :
183- pass
249+ from gcsfs . tests . test_core_versioned import (
250+ _VERSIONED_BUCKET_CREATED_BY_TESTS ,
251+ )
184252
185- try :
186- gcs .mkdir (TEST_BUCKET , enable_versioning = True )
187- except Exception :
188- pass
253+ if _VERSIONED_BUCKET_CREATED_BY_TESTS :
254+ # If the versioned bucket was created by the test suite, it's added
255+ # here for cleanup.
256+ buckets_to_delete .add (TEST_VERSIONED_BUCKET )
257+ except ImportError :
258+ pass # test_core_versioned is not being run
259+ if is_real_gcs :
260+ cleanup_versioned_bucket (gcs , TEST_VERSIONED_BUCKET )
261+ else :
262+ # For emulators, we delete and recreate the bucket for a clean state
263+ try :
264+ gcs .rm (TEST_VERSIONED_BUCKET , recursive = True )
265+ except FileNotFoundError :
266+ pass
267+ gcs .mkdir (TEST_VERSIONED_BUCKET , enable_versioning = True )
268+ # When using the emulator, the versioned bucket is always recreated
269+ # and added to the cleanup set.
270+ buckets_to_delete .add (TEST_VERSIONED_BUCKET )
189271 gcs .invalidate_cache ()
190272 yield gcs
191273 finally :
274+ # Ensure the bucket is empty after the test.
192275 try :
193- gcs .rm (gcs .find (TEST_BUCKET , versions = True ))
194- gcs .rm (TEST_BUCKET )
195- except : # noqa: E722
196- pass
276+ if is_real_gcs :
277+ cleanup_versioned_bucket (gcs , TEST_VERSIONED_BUCKET )
278+ except Exception as e :
279+ logging .warning (
280+ f"Failed to clean up versioned bucket { TEST_VERSIONED_BUCKET } after test: { e } "
281+ )
282+
283+
284+ def cleanup_versioned_bucket (gcs , bucket_name , prefix = None ):
285+ """
286+ Deletes all object versions in a bucket using the google-cloud-storage client,
287+ ensuring it uses the same credentials as the gcsfs instance.
288+ """
289+ # Define a retry policy for API calls to handle rate limiting.
290+ # This can retry on 429 Too Many Requests errors, which can happen
291+ # when deleting many object versions quickly.
292+ from google .api_core .retry import Retry
293+
294+ retry_policy = Retry (
295+ initial = 1.0 , # Initial delay in seconds
296+ maximum = 30.0 , # Maximum delay in seconds
297+ multiplier = 1.2 , # Backoff factor
298+ )
299+
300+ client = storage .Client (
301+ credentials = gcs .credentials .credentials , project = gcs .project
302+ )
303+
304+ # List all blobs, including old versions
305+ blobs_to_delete = list (client .list_blobs (bucket_name , versions = True , prefix = prefix ))
306+
307+ if not blobs_to_delete :
308+ logging .info ("No object versions to delete in %s." , bucket_name )
309+ return
310+
311+ logging .info (
312+ "Deleting %d object versions from %s." , len (blobs_to_delete ), bucket_name
313+ )
314+ time .sleep (2 )
315+ for blob in blobs_to_delete :
316+ blob .delete (retry = retry_policy )
317+
318+ logging .info ("Successfully deleted %d object versions." , len (blobs_to_delete ))
0 commit comments