Skip to content

Commit ca85293

Browse files
authored
Merge pull request #37 from AllenInstitute/feature/add-update_s3_path_tags-function
Add update_path_tags function for managing S3 object tags
2 parents 7ea2dad + a47a526 commit ca85293

File tree

2 files changed

+153
-11
lines changed
  • src/aibs_informatics_aws_utils
  • test/aibs_informatics_aws_utils

2 files changed

+153
-11
lines changed

src/aibs_informatics_aws_utils/s3.py

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
Callable,
1818
Dict,
1919
List,
20+
Literal,
2021
Mapping,
2122
Optional,
2223
Pattern,
@@ -575,9 +576,72 @@ def get_s3_path_stats(s3_path: S3URI, **kwargs) -> S3PathStats:
575576
)
576577

577578

578-
# TODO: Two things
579-
# 1. allow for a way to specify `size_only` for this fn when transfering large number of files
580-
# 2. add flag for failing if no source data exists.
579+
def update_path_tags(
580+
s3_path: S3URI,
581+
tags: Dict[str, str],
582+
mode: Literal["replace", "append", "delete"] = "append",
583+
recursive: bool = True,
584+
**kwargs,
585+
):
586+
"""Update tags for all objects at or prefixed by the specified path
587+
588+
There are three modes for updating tags:
589+
- replace: Replace all existing tags with new tags
590+
- append: Merge new tags with existing tags
591+
- delete: Delete specified tags from existing tags (values do not matter)
592+
593+
If recursive is True and s3_path is an object prefix, all objects under the prefix
594+
will have their tags updated. If there is an object at s3_path, it will also have
595+
its tags updated.
596+
597+
Args:
598+
s3_path (S3URI): S3 path or prefix to update tags for
599+
tags (Dict[str, str]): Tags to update
600+
mode (Literal["replace", "append", "delete"]): Tag update mode.
601+
Options:
602+
- replace: Replace all existing tags with new tags
603+
- append: Merge new tags with existing tags
604+
- delete: Delete specified tags from existing tags (values do not matter)
605+
Defaults to "append".
606+
recursive (bool): Whether to update tags recursively for all objects under prefix.
607+
Defaults to True.
608+
"""
609+
if recursive and is_object_prefix(s3_path, **kwargs):
610+
s3_paths = list_s3_paths(s3_path=s3_path, **kwargs)
611+
logger.info(f"Updating tags for {len(s3_paths)} objects under prefix {s3_path}")
612+
for nested_s3_path in s3_paths:
613+
update_path_tags(nested_s3_path, tags, mode, recursive=False, **kwargs)
614+
if is_object(s3_path, **kwargs):
615+
s3 = get_s3_client(**kwargs)
616+
617+
current_tags = {
618+
tag["Key"]: tag["Value"]
619+
for tag in s3.get_object_tagging(
620+
Bucket=s3_path.bucket,
621+
Key=s3_path.key,
622+
).get("TagSet", [])
623+
}
624+
625+
if mode == "replace":
626+
new_tags = tags
627+
elif mode in ["append", "delete"]:
628+
new_tags = current_tags.copy()
629+
if mode == "append":
630+
new_tags.update(tags)
631+
else:
632+
for tag_key in tags.keys():
633+
if tag_key in new_tags:
634+
del new_tags[tag_key]
635+
else:
636+
raise ValueError(f"Unknown tag update mode: {mode}") # pragma: no cover
637+
638+
s3.put_object_tagging(
639+
Bucket=s3_path.bucket,
640+
Key=s3_path.key,
641+
Tagging={"TagSet": [{"Key": k, "Value": v} for k, v in new_tags.items()]},
642+
)
643+
644+
581645
def sync_paths(
582646
source_path: Union[Path, S3URI],
583647
destination_path: Union[Path, S3URI],

test/aibs_informatics_aws_utils/test_s3.py

Lines changed: 86 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import re
44
from pathlib import Path
55
from time import sleep
6-
from typing import Optional
6+
from typing import Dict, Optional
77

88
import moto
99
import requests
@@ -49,6 +49,7 @@
4949
process_transfer_requests,
5050
should_sync,
5151
sync_paths,
52+
update_path_tags,
5253
update_s3_storage_class,
5354
upload_file,
5455
upload_json,
@@ -105,6 +106,17 @@ def client__list_objects_v2(self, **kwargs):
105106
kwargs["Bucket"] = self.DEFAULT_BUCKET_NAME
106107
return self.s3_client.list_objects_v2(**kwargs)
107108

109+
def _get_tag_dict(self, s3_path: S3URI) -> Dict[str, str]:
110+
response = self.s3_client.get_object_tagging(Bucket=s3_path.bucket, Key=s3_path.key)
111+
return {tag["Key"]: tag["Value"] for tag in response.get("TagSet", [])}
112+
113+
def _put_tags(self, s3_path: S3URI, tags: Dict[str, str]):
114+
self.s3_client.put_object_tagging(
115+
Bucket=s3_path.bucket,
116+
Key=s3_path.key,
117+
Tagging={"TagSet": [{"Key": key, "Value": value} for key, value in tags.items()]},
118+
)
119+
108120
def test__get_presigned_urls__default_generates_READ_ONLY_urls(self):
109121
## Setup
110122
s3_path_a = self.get_s3_path("path/to/object_a")
@@ -737,6 +749,72 @@ def test__delete_s3_path__handles_folder(self):
737749
delete_s3_path(s3_path=s3_path)
738750
self.assertEqual(0, len(list_s3_paths(s3_path)))
739751

752+
def test__update_path_tags__replace_mode_overwrites_existing_tags(self):
753+
s3_path = self.put_object("path/to/tagged.txt", "content")
754+
self._put_tags(s3_path, {"old": "tag"})
755+
756+
update_path_tags(
757+
s3_path,
758+
{"new": "value"},
759+
mode="replace",
760+
region=self.DEFAULT_REGION,
761+
)
762+
763+
self.assertDictEqual(self._get_tag_dict(s3_path), {"new": "value"})
764+
765+
def test__update_path_tags__append_mode_merges_and_overrides(self):
766+
s3_path = self.put_object("path/to/append.txt", "content")
767+
self._put_tags(s3_path, {"keep": "1", "override": "old"})
768+
769+
update_path_tags(
770+
s3_path,
771+
{"override": "new", "fresh": "2"},
772+
mode="append",
773+
region=self.DEFAULT_REGION,
774+
)
775+
776+
self.assertDictEqual(
777+
self._get_tag_dict(s3_path),
778+
{"keep": "1", "override": "new", "fresh": "2"},
779+
)
780+
781+
def test__update_path_tags__delete_mode_removes_requested_keys(self):
782+
s3_path = self.put_object("path/to/delete.txt", "content")
783+
self._put_tags(s3_path, {"keep": "1", "drop": "2"})
784+
785+
update_path_tags(
786+
s3_path,
787+
{"drop": "ignored", "missing": "value"},
788+
mode="delete",
789+
region=self.DEFAULT_REGION,
790+
)
791+
792+
self.assertDictEqual(self._get_tag_dict(s3_path), {"keep": "1"})
793+
794+
def test__update_path_tags__recursive_prefix_updates_all_children(self):
795+
prefix_path = self.get_s3_path("path/to/prefix")
796+
child_one = self.put_object("path/to/prefix/a.txt", "first")
797+
child_two = self.put_object("path/to/prefix/nested/b.txt", "second")
798+
outside = self.put_object("path/to/other.txt", "outside")
799+
800+
self._put_tags(child_one, {"existing": "one"})
801+
self._put_tags(child_two, {"existing": "two"})
802+
self._put_tags(outside, {"existing": "outside"})
803+
804+
update_path_tags(
805+
prefix_path,
806+
{"batch": "true"},
807+
mode="append",
808+
region=self.DEFAULT_REGION,
809+
)
810+
811+
for s3_path, label in ((child_one, "one"), (child_two, "two")):
812+
tags = self._get_tag_dict(s3_path)
813+
self.assertEqual(tags["existing"], label)
814+
self.assertEqual(tags["batch"], "true")
815+
816+
self.assertNotIn("batch", self._get_tag_dict(outside))
817+
740818
def test__update_s3_storage_class__handles_shallow_to_GLACIER(self):
741819
s3_root = self.get_s3_path("source/path/")
742820

@@ -880,7 +958,7 @@ def test__should_sync__s3_to_local__multipart_upload_with_custom_chunk_size_work
880958

881959
s3.upload_file(
882960
Filename=str(orig_file),
883-
**source_path.as_dict(),
961+
**source_path.as_dict(), # type: ignore
884962
Config=TransferConfig(multipart_threshold=1024, multipart_chunksize=1024),
885963
)
886964
destination_path.write_text(orig_file.read_text())
@@ -897,7 +975,7 @@ def test__should_sync__s3_to_local__multipart_upload_chunksize__gt__default(self
897975
# This does not upload as multipart for custom chunk size
898976
self.s3_client.upload_file(
899977
Filename=str(orig_file),
900-
**source_path1.as_dict(),
978+
**source_path1.as_dict(), # type: ignore
901979
Config=TransferConfig(
902980
multipart_threshold=AWS_S3_DEFAULT_CHUNK_SIZE_BYTES * 2,
903981
multipart_chunksize=AWS_S3_DEFAULT_CHUNK_SIZE_BYTES * 2,
@@ -906,7 +984,7 @@ def test__should_sync__s3_to_local__multipart_upload_chunksize__gt__default(self
906984
# This one uploads as multipart even though there is only one part
907985
self.s3_client.upload_file(
908986
Filename=str(orig_file),
909-
**source_path2.as_dict(),
987+
**source_path2.as_dict(), # type: ignore
910988
Config=TransferConfig(
911989
multipart_threshold=AWS_S3_DEFAULT_CHUNK_SIZE_BYTES,
912990
multipart_chunksize=AWS_S3_DEFAULT_CHUNK_SIZE_BYTES * 2,
@@ -928,7 +1006,7 @@ def test__should_sync__handles_multipart_upload_chunksize(self):
9281006
# This does not upload as multipart for custom chunk size
9291007
self.s3_client.upload_file(
9301008
Filename=str(orig_file),
931-
**source_path.as_dict(),
1009+
**source_path.as_dict(), # type: ignore
9321010
Config=TransferConfig(multipart_threshold=MB, multipart_chunksize=MB),
9331011
)
9341012

@@ -945,7 +1023,7 @@ def test__should_sync__handles_multipart_upload_chunksize__single_part(self):
9451023
# This does not upload as multipart for custom chunk size
9461024
self.s3_client.upload_file(
9471025
Filename=str(orig_file),
948-
**source_path.as_dict(),
1026+
**source_path.as_dict(), # type: ignore
9491027
Config=TransferConfig(multipart_threshold=MB, multipart_chunksize=2 * MB),
9501028
)
9511029

@@ -962,7 +1040,7 @@ def test__should_sync__handles_multipart_upload__threshold_not_passed(self):
9621040
# This does not upload as multipart for custom chunk size
9631041
self.s3_client.upload_file(
9641042
Filename=str(orig_file),
965-
**source_path.as_dict(),
1043+
**source_path.as_dict(), # type: ignore
9661044
Config=TransferConfig(multipart_threshold=3 * MB, multipart_chunksize=MB),
9671045
)
9681046

@@ -979,7 +1057,7 @@ def test__should_sync__handles_multipart_upload__gt_threshold_passed(self):
9791057
# This does not upload as multipart for custom chunk size
9801058
self.s3_client.upload_file(
9811059
Filename=str(orig_file),
982-
**source_path.as_dict(),
1060+
**source_path.as_dict(), # type: ignore
9831061
Config=TransferConfig(multipart_threshold=2 * MB, multipart_chunksize=MB),
9841062
)
9851063
destination_path.write_text(orig_file.read_text())

0 commit comments

Comments
 (0)