Skip to content

Commit f56d0fa

Browse files
authored
Stale claner implementation (#35439)
* Print clock at every print For logging purposes it is easier to print the time of deletion * Add prefixes to ignore subtopics * Setup a stale cleaner github action * Instead of a new github action, add the stale_cleaner script to the existing gradle workflow.
1 parent 9f431f7 commit f56d0fa

File tree

3 files changed

+46
-8
lines changed

3 files changed

+46
-8
lines changed

.test-infra/tools/build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,15 @@ task removeStaleSpannerResources(type: Exec) {
4040
commandLine './stale_spanner_cleaner.sh'
4141
}
4242

43+
task runStaleCleaner(type: Exec) {
44+
commandLine 'bash', '-c', 'pip install -r requirements.txt && python3 stale_cleaner.py'
45+
}
46+
4347
task cleanupOtherStaleResources {
4448
// declared as finalizedBy dependency so that other task continue even if one dep task fails
4549
finalizedBy tasks.removeStaleBqDatasets
4650
finalizedBy tasks.removeStaleCbtInstances
4751
finalizedBy tasks.removeStaleK8sWorkload
4852
finalizedBy tasks.removeStaleSpannerResources
53+
finalizedBy tasks.runStaleCleaner
4954
}

.test-infra/tools/stale_cleaner.py

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ def _write_resources(self, resources: dict) -> None:
168168
blob_json = json.dumps(resource_dict, indent=4)
169169

170170
blob.upload_from_string(blob_json, content_type="application/json")
171-
print(f"Resources written to {self.bucket_name}/{STORAGE_PREFIX}{self.resource_type}.json")
171+
print(f"{self.clock()} - Resources written to {self.bucket_name}/{STORAGE_PREFIX}{self.resource_type}.json")
172172

173173
def _stored_resources(self) -> dict:
174174
"""
@@ -179,7 +179,7 @@ def _stored_resources(self) -> dict:
179179
blob = bucket.blob(f"{STORAGE_PREFIX}{self.resource_type}.json")
180180

181181
if not blob.exists():
182-
print(f"Blob {self.bucket_name}/{STORAGE_PREFIX}{self.resource_type}.json does not exist.")
182+
print(f"{self.clock()} - Blob {self.bucket_name}/{STORAGE_PREFIX}{self.resource_type}.json does not exist.")
183183
return {}
184184

185185
blob_string = blob.download_as_text()
@@ -211,7 +211,7 @@ def refresh(self) -> None:
211211

212212
for k, v in list(stored_resources.items()):
213213
if k not in active_resources:
214-
print(f"Resource {k} is no longer alive. Deleting it from the stored resources.")
214+
print(f"{self.clock()} - Resource {k} is no longer alive. Deleting it from the stored resources.")
215215
del stored_resources[k]
216216
else:
217217
v.update(clock=self.clock)
@@ -272,12 +272,12 @@ def delete_stale(self, dry_run: bool = True) -> None:
272272
for k, v in stale_resources_map.items():
273273
if k in active_resources_map:
274274
if dry_run:
275-
print(f"Dry run: Would delete resource {k}")
275+
print(f"{self.clock()} - Dry run: Would delete resource {k}")
276276
else:
277-
print(f"Deleting resource {k}")
277+
print(f"{self.clock()} - Deleting resource {k}")
278278
self._delete_resource(k)
279279
else:
280-
print(f"Resource {k} marked as stale but no longer exists in GCP. Skipping deletion.")
280+
print(f"{self.clock()} - Resource {k} marked as stale but no longer exists in GCP. Skipping deletion.")
281281

282282
if not dry_run:
283283
self.refresh()
@@ -302,5 +302,36 @@ def _active_resources(self) -> dict:
302302

303303
def _delete_resource(self, resource_name: str) -> None:
304304
topic_name = resource_name.split('/')[-1]
305-
print(f"Deleting PubSub topic {topic_name}")
305+
print(f"{self.clock()} - Deleting PubSub topic {topic_name}")
306306
self.client.delete_topic(name=resource_name)
307+
308+
if __name__ == "__main__":
309+
project_id = DEFAULT_PROJECT_ID
310+
bucket_name = DEFAULT_BUCKET_NAME
311+
312+
# Prefixes found after analyzing the PubSub topics in the project
313+
prefixes = [
314+
"psit_topic_input",
315+
"psit_topic_output",
316+
"wc_topic_input",
317+
"wc_topic_output",
318+
"leader_board_it_input_topic",
319+
"leader_board_it_output_topic",
320+
"exercise_streaming_metrics_topic_input",
321+
"exercise_streaming_metrics_topic_output",
322+
"pubsub_io_performance",
323+
"testing",
324+
"pubsubNamespace",
325+
"game_stats_it_input_topic",
326+
"game_stats_it_output_topic"
327+
]
328+
329+
# Create a PubSubTopicCleaner instance
330+
cleaner = PubSubTopicCleaner(project_id=project_id, bucket_name=bucket_name,
331+
prefixes=prefixes, time_threshold=DEFAULT_PUBSUB_TOPIC_THRESHOLD)
332+
333+
# Refresh resources
334+
cleaner.refresh()
335+
336+
# Delete stale resources
337+
cleaner.delete_stale()

.test-infra/tools/test_stale_cleaner.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,8 @@ def test_delete_resource(self):
476476
"""Test __delete_resource method."""
477477
# Create a resource name
478478
resource_name = "projects/test-project/topics/test-topic"
479+
# Set the clock to a specific time
480+
self.cleaner.clock.set("2025-05-28T10:00:00")
479481

480482
# Call the private method using name mangling
481483
with mock.patch('builtins.print') as mock_print:
@@ -485,7 +487,7 @@ def test_delete_resource(self):
485487
self.cleaner.client.delete_topic.assert_called_once_with(name=resource_name)
486488

487489
# Check that correct message was printed
488-
mock_print.assert_called_once_with("Deleting PubSub topic test-topic")
490+
mock_print.assert_called_once_with(f"{self.cleaner.clock()} - Deleting PubSub topic test-topic")
489491

490492
if __name__ == '__main__':
491493
unittest.main()

0 commit comments

Comments
 (0)