Skip to content

Commit 8374fbe

Browse files
authored
Stale pubsub subscriptions cleaner (#35553)
* Implement PubSub Subscription stale cleaner as dry run This cleaner will delete the stale PubSub subscriptions that do not have any associated topics. * Add tests for the PubSub subscription deletion class
1 parent 824e97e commit 8374fbe

File tree

2 files changed

+164
-3
lines changed

2 files changed

+164
-3
lines changed

.test-infra/tools/stale_cleaner.py

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
# Resource types
2727
PUBSUB_TOPIC_RESOURCE = "pubsub_topic"
28+
PUBSUB_SUBSCRIPTION_RESOURCE = "pubsub_subscription"
2829

2930
# Storage constants
3031
STORAGE_PREFIX = "stale_cleaner/"
@@ -34,6 +35,7 @@
3435

3536
# Time constants (in seconds)
3637
DEFAULT_PUBSUB_TOPIC_THRESHOLD = 86400 # 1 day
38+
DEFAULT_PUBSUB_SUBSCRIPTION_THRESHOLD = 86400 # 1 day
3739
DEFAULT_TIME_THRESHOLD = 3600 # 1 hour
3840

3941
# Default values for testing
@@ -285,6 +287,12 @@ def delete_stale(self, dry_run: bool = True) -> None:
285287

286288
# PubSub topic cleaner
287289
class PubSubTopicCleaner(StaleCleaner):
290+
"""
291+
This cleaner will delete PubSub topics that are stale based on the time threshold.
292+
It uses the PubSub API to list and delete topics.
293+
It also applies prefix filtering to only delete topics that match the specified prefixes.
294+
"""
295+
288296
def __init__(self, project_id: str, bucket_name: str,
289297
prefixes: list = None, time_threshold: int = DEFAULT_PUBSUB_TOPIC_THRESHOLD,
290298
clock: Clock = None) -> None:
@@ -304,7 +312,50 @@ def _delete_resource(self, resource_name: str) -> None:
304312
print(f"{self.clock()} - Deleting PubSub topic {resource_name}")
305313
self.client.delete_topic(request={"topic": resource_name})
306314

307-
if __name__ == "__main__":
315+
# PubSub Subscription cleaner
316+
class PubSubSubscriptionCleaner(StaleCleaner):
317+
"""
318+
This cleaner will delete PubSub subscriptions that are stale based on the time threshold.
319+
It uses the PubSub API to list and delete subscriptions.
320+
It also applies prefix filtering to only delete subscriptions that match the specified prefixes.
321+
It checks if the subscription is detached (whether it has a topic associated with it).
322+
If it is detached, it will be considered stale and eligible for deletion.
323+
"""
324+
325+
def __init__(self, project_id: str, bucket_name: str,
326+
prefixes: list = None, time_threshold: int = DEFAULT_PUBSUB_SUBSCRIPTION_THRESHOLD,
327+
clock: Clock = None) -> None:
328+
super().__init__(project_id, PUBSUB_SUBSCRIPTION_RESOURCE, bucket_name, prefixes, time_threshold, clock)
329+
self.client = None # Will be initialized in each method that needs it
330+
331+
def _active_resources(self) -> dict:
332+
d = {}
333+
self.client = pubsub_v1.SubscriberClient()
334+
335+
with self.client:
336+
for subscription in self.client.list_subscriptions(request={"project": self.project_path}):
337+
subscription_name = subscription.name
338+
# Apply prefix filtering if prefixes are defined
339+
if not self.prefixes or any(subscription_name.startswith(f"{self.project_path}/subscriptions/{prefix}") for prefix in self.prefixes):
340+
# Check if the subscription has a topic associated with it
341+
if subscription.detached:
342+
d[subscription_name] = GoogleCloudResource(resource_name=subscription_name, clock=self.clock)
343+
344+
return d
345+
346+
def _delete_resource(self, resource_name: str) -> None:
347+
self.client = pubsub_v1.SubscriberClient()
348+
print(f"{self.clock()} - Deleting PubSub subscription {resource_name}")
349+
with self.client:
350+
subscription_path = self.client.subscription_path(self.project_id, resource_name)
351+
self.client.delete_subscription(request={"subscription": subscription_path})
352+
353+
def clean_pubsub_topics():
354+
""" Clean up stale PubSub topics in the specified GCP project.
355+
This function initializes the PubSubTopicCleaner with the default project ID and bucket name,
356+
and a predefined list of topic prefixes.
357+
It then refreshes the resources and deletes any stale topics.
358+
"""
308359
project_id = DEFAULT_PROJECT_ID
309360
bucket_name = DEFAULT_BUCKET_NAME
310361

@@ -355,3 +406,32 @@ def _delete_resource(self, resource_name: str) -> None:
355406

356407
# Delete stale resources
357408
cleaner.delete_stale(dry_run=False)
409+
410+
def clean_pubsub_subscriptions():
411+
""" Clean up stale PubSub subscriptions in the specified GCP project.
412+
This function initializes the PubSubSubscriptionCleaner with the default project ID and bucket name,
413+
and a predefined list of subscription prefixes.
414+
It then refreshes the resources and deletes any stale subscriptions.
415+
"""
416+
project_id = DEFAULT_PROJECT_ID
417+
bucket_name = DEFAULT_BUCKET_NAME
418+
419+
# No prefixes are defined for subscriptions so we will delete all stale subscriptions
420+
prefixes = []
421+
422+
# Create a PubSubSubscriptionCleaner instance
423+
cleaner = PubSubSubscriptionCleaner(project_id=project_id, bucket_name=bucket_name,
424+
prefixes=prefixes, time_threshold=DEFAULT_PUBSUB_SUBSCRIPTION_THRESHOLD)
425+
426+
# Refresh resources
427+
cleaner.refresh()
428+
429+
# Delete stale resources
430+
cleaner.delete_stale(dry_run=True) # Keep dry_run=True to avoid accidental deletions during testing
431+
432+
if __name__ == "__main__":
433+
# Clean up stale PubSub topics
434+
clean_pubsub_topics()
435+
436+
# Clean up stale PubSub subscriptions
437+
clean_pubsub_subscriptions()

.test-infra/tools/test_stale_cleaner.py

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
GoogleCloudResource,
2626
StaleCleaner,
2727
PubSubTopicCleaner,
28+
PubSubSubscriptionCleaner,
2829
FakeClock,
2930
DEFAULT_TIME_THRESHOLD,
3031
PUBSUB_TOPIC_RESOURCE,
@@ -390,6 +391,86 @@ def test_delete_stale(self):
390391
self.assertNotIn("deleted-resource", self.cleaner.deleted_resources)
391392

392393

394+
class PubSubSubscriptionCleanerTest(unittest.TestCase):
395+
"""Tests for the PubSubSubscriptionCleaner class."""
396+
397+
def setUp(self):
398+
"""Set up test fixtures."""
399+
self.project_id = "test-project"
400+
self.bucket_name = "test-bucket"
401+
self.prefixes = ["test-prefix"]
402+
self.time_threshold = 86400 # 1 day
403+
self.fake_clock = FakeClock("2025-05-28T10:00:00")
404+
405+
# Mock PubSub client
406+
self.mock_client_patcher = mock.patch('google.cloud.pubsub_v1.SubscriberClient')
407+
self.MockSubscriberClientClass = self.mock_client_patcher.start()
408+
self.mock_subscriber_client = self.MockSubscriberClientClass.return_value
409+
410+
# Create a test cleaner
411+
self.cleaner = PubSubSubscriptionCleaner(
412+
project_id=self.project_id,
413+
bucket_name=self.bucket_name,
414+
prefixes=self.prefixes,
415+
time_threshold=self.time_threshold,
416+
clock=self.fake_clock
417+
)
418+
419+
self.cleaner._write_resources = SilencedMock()
420+
self.cleaner._stored_resources = SilencedMock(return_value={})
421+
422+
def tearDown(self):
423+
"""Tear down test fixtures."""
424+
self.mock_client_patcher.stop()
425+
426+
def test_init(self):
427+
"""Test initialization."""
428+
self.assertEqual(self.cleaner.project_id, self.project_id)
429+
self.assertEqual(self.cleaner.bucket_name, self.bucket_name)
430+
self.assertEqual(self.cleaner.prefixes, self.prefixes)
431+
self.assertEqual(self.cleaner.time_threshold, self.time_threshold)
432+
self.assertIsInstance(self.cleaner.clock, FakeClock)
433+
434+
def test_active_resources(self):
435+
"""Test _active_resources method."""
436+
# Mock subscriptions
437+
sub1 = mock.Mock()
438+
sub1.name = "projects/test-project/subscriptions/test-prefix-sub1"
439+
sub1.topic = "projects/test-project/topics/some-topic"
440+
441+
sub2 = mock.Mock()
442+
sub2.name = "projects/test-project/subscriptions/test-prefix-sub2-detached"
443+
sub2.topic = "_deleted-topic_"
444+
445+
sub3 = mock.Mock()
446+
sub3.name = "projects/test-project/subscriptions/other-prefix-sub3"
447+
sub3.topic = "projects/test-project/topics/another-topic"
448+
449+
self.mock_subscriber_client.list_subscriptions.return_value = [sub1, sub2, sub3]
450+
451+
with SilencePrint():
452+
active = self.cleaner._active_resources()
453+
454+
self.assertIn("projects/test-project/subscriptions/test-prefix-sub1", active)
455+
self.assertIn("projects/test-project/subscriptions/test-prefix-sub2-detached", active)
456+
self.assertNotIn("projects/test-project/subscriptions/other-prefix-sub3", active)
457+
self.assertEqual(len(active), 2)
458+
459+
def test_delete_resource(self):
460+
"""Test _delete_resource method."""
461+
sub_name = "test-sub-to-delete"
462+
subscription_path = f"projects/{self.project_id}/subscriptions/{sub_name}"
463+
self.mock_subscriber_client.subscription_path.return_value = subscription_path
464+
465+
with SilencePrint():
466+
self.cleaner._delete_resource(sub_name)
467+
468+
self.mock_subscriber_client.subscription_path.assert_called_once_with(self.project_id, sub_name)
469+
self.mock_subscriber_client.delete_subscription.assert_called_once_with(
470+
request={'subscription': subscription_path}
471+
)
472+
473+
393474
class PubSubTopicCleanerTest(unittest.TestCase):
394475
"""Tests for the PubSubTopicCleaner class."""
395476

@@ -484,10 +565,10 @@ def test_delete_resource(self):
484565
self.cleaner._delete_resource(resource_name)
485566

486567
# Check that delete_topic was called
487-
self.cleaner.client.delete_topic.assert_called_once_with(name=resource_name)
568+
self.cleaner.client.delete_topic.assert_called_once_with(request={'topic': resource_name})
488569

489570
# Check that correct message was printed
490-
mock_print.assert_called_once_with(f"{self.cleaner.clock()} - Deleting PubSub topic test-topic")
571+
mock_print.assert_called_once_with(f"{self.cleaner.clock()} - Deleting PubSub topic {resource_name}")
491572

492573
if __name__ == '__main__':
493574
unittest.main()

0 commit comments

Comments
 (0)