Skip to content

Commit cbcdcb2

Browse files
authored
GC: Collect resources in versioned bucket (#271)
1 parent fc7f0b2 commit cbcdcb2

File tree

2 files changed

+40
-4
lines changed

2 files changed

+40
-4
lines changed

karton/core/backend.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -986,6 +986,21 @@ def list_objects(self, bucket: str) -> List[str]:
986986
objs.append(obj["Key"])
987987
return objs
988988

989+
def list_object_versions(self, bucket: str) -> Dict[str, List[str]]:
990+
"""
991+
List version identifiers of stored resource objects
992+
:param bucket: Bucket name
993+
:return: Dictionary of object version identifiers {key: [version_ids, ...]}
994+
"""
995+
objs = defaultdict(list)
996+
paginator = self.s3.get_paginator("list_object_versions")
997+
for page in paginator.paginate(Bucket=bucket):
998+
for obj in page.get("Versions", list()):
999+
objs[obj["Key"]].append(obj["VersionId"])
1000+
for obj in page.get("DeleteMarkers", list()):
1001+
objs[obj["Key"]].append(obj["VersionId"])
1002+
return dict(objs)
1003+
9891004
def remove_object(self, bucket: str, object_uid: str) -> None:
9901005
"""
9911006
Remove resource object from object storage
@@ -1002,7 +1017,28 @@ def remove_objects(self, bucket: str, object_uids: Iterable[str]) -> None:
10021017
:param bucket: Bucket name
10031018
:param object_uids: Object identifiers
10041019
"""
1005-
for delete_objects in chunks([{"Key": uid} for uid in object_uids], 1000):
1020+
for delete_objects in chunks([{"Key": uid} for uid in object_uids], 100):
1021+
self.s3.delete_objects(Bucket=bucket, Delete={"Objects": delete_objects})
1022+
1023+
def remove_object_versions(
1024+
self, bucket: str, object_versions: Dict[str, List[str]]
1025+
) -> None:
1026+
"""
1027+
Bulk remove resource object versions from object storage
1028+
1029+
:param bucket: Bucket name
1030+
:param object_versions: Object version identifiers
1031+
"""
1032+
versions = iter(
1033+
(uid, version_id)
1034+
for uid, versions in object_versions.items()
1035+
for version_id in versions
1036+
)
1037+
deletion_chunks = chunks(
1038+
[{"Key": uid, "VersionId": version_id} for uid, version_id in versions],
1039+
100,
1040+
)
1041+
for delete_objects in deletion_chunks:
10061042
self.s3.delete_objects(Bucket=bucket, Delete={"Objects": delete_objects})
10071043

10081044
def check_bucket_exists(self, bucket: str, create: bool = False) -> bool:

karton/system/system.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def _log_config(self):
7474
def gc_collect_resources(self) -> None:
7575
# Collects unreferenced resources left in object storage
7676
karton_bucket = self.backend.default_bucket_name
77-
resources_to_remove = set(self.backend.list_objects(karton_bucket))
77+
resources_to_remove = self.backend.list_object_versions(karton_bucket)
7878
# Note: it is important to get list of resources before getting list of tasks!
7979
# Task is created before resource upload to lock the reference to the resource.
8080
tasks = self.backend.iter_all_tasks()
@@ -85,10 +85,10 @@ def gc_collect_resources(self) -> None:
8585
resource.bucket == karton_bucket
8686
and resource.uid in resources_to_remove
8787
):
88-
resources_to_remove.remove(resource.uid)
88+
del resources_to_remove[resource.uid]
8989
# Remove unreferenced resources
9090
if resources_to_remove:
91-
self.backend.remove_objects(karton_bucket, resources_to_remove)
91+
self.backend.remove_object_versions(karton_bucket, resources_to_remove)
9292

9393
def gc_collect_tasks(self) -> None:
9494
self.log.debug("GC: gc_collect_tasks started")

0 commit comments

Comments
 (0)