-
Notifications
You must be signed in to change notification settings - Fork 62
Support functionalities to add user-provided, original labels. #446
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 28 commits
0aaddca
dce1a73
6b0dc4c
ed86bcb
38fa924
fcfdd56
de1fb45
c11abea
e471163
47195b9
c8931f2
cc320b5
d8c6ec5
ca6d69e
08549a7
a73801f
aa33064
a047f6a
2d4322c
6e80c2f
f62742e
91b8282
6364394
d7cfcdc
13dc761
a4d6a0b
3315be5
fc4b1fb
337fab8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import copy | ||
| import re | ||
| from logging import getLogger | ||
| from typing import Any, Optional, Union | ||
| from urllib.parse import urlsplit | ||
|
|
@@ -18,23 +19,25 @@ class GCSObjectMetadataClient: | |
| This class used for adding metadata as labels. | ||
| """ | ||
|
|
||
| @staticmethod | ||
| def _is_log_related_path(path: str) -> bool: | ||
| return re.match(r'^log/(processing_time/|task_info/|task_log/|module_versions/|random_seed/|task_params/).+', path) is not None | ||
|
|
||
| # This is the copied method of luigi.gcs._path_to_bucket_and_key(path). | ||
| @staticmethod | ||
| def path_to_bucket_and_key(path: str) -> tuple[str, str]: | ||
| def _path_to_bucket_and_key(path: str) -> tuple[str, str]: | ||
| (scheme, netloc, path, _, _) = urlsplit(path) | ||
| assert scheme == 'gs' | ||
| path_without_initial_slash = path[1:] | ||
| return netloc, path_without_initial_slash | ||
|
|
||
| @staticmethod | ||
| def add_task_state_labels( | ||
| path: str, | ||
| task_params: Optional[dict[Any, str]] = None, | ||
| ) -> None: | ||
| def add_task_state_labels(path: str, task_params: Optional[dict[str, str]] = None, custom_labels: Optional[dict[str, Any]] = None) -> None: | ||
| if GCSObjectMetadataClient._is_log_related_path(path): | ||
| return | ||
| # In gokart/object_storage.get_time_stamp, could find same call. | ||
| # _path_to_bucket_and_key is a private method, so, this might not be acceptable. | ||
| bucket, obj = GCSObjectMetadataClient.path_to_bucket_and_key(path) | ||
|
|
||
| bucket, obj = GCSObjectMetadataClient._path_to_bucket_and_key(path) | ||
| _response = GCSConfig().get_gcs_client().client.objects().get(bucket=bucket, object=obj).execute() | ||
| if _response is None: | ||
| logger.error(f'failed to get object from GCS bucket {bucket} and object {obj}.') | ||
|
|
@@ -50,6 +53,7 @@ def add_task_state_labels( | |
| patched_metadata = GCSObjectMetadataClient._get_patched_obj_metadata( | ||
| copy.deepcopy(original_metadata), | ||
| task_params, | ||
| custom_labels, | ||
| ) | ||
|
|
||
| if original_metadata != patched_metadata: | ||
|
|
@@ -71,30 +75,71 @@ def add_task_state_labels( | |
| if update_response is None: | ||
| logger.error(f'failed to patch object {obj} in bucket {bucket} and object {obj}.') | ||
|
|
||
| @staticmethod | ||
| def _normalize_labels(labels: Optional[dict[str, Any]]) -> dict[str, str]: | ||
| return {str(key): str(value) for key, value in labels.items()} if labels else {} | ||
|
|
||
|
Comment on lines
+80
to
+81
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If |
||
| @staticmethod | ||
| def _get_patched_obj_metadata( | ||
| metadata: Any, | ||
| task_params: Optional[dict[Any, str]] = None, | ||
| task_params: Optional[dict[str, str]] = None, | ||
| custom_labels: Optional[dict[str, Any]] = None, | ||
| ) -> Union[dict, Any]: | ||
| # If metadata from response when getting bucket and object information is not dictionary, | ||
| # something wrong might be happened, so return original metadata, no patched. | ||
| if not isinstance(metadata, dict): | ||
| logger.warning(f'metadata is not a dict: {metadata}, something wrong was happened when getting response when get bucket and object information.') | ||
| return metadata | ||
|
|
||
| if not task_params: | ||
| if not task_params and not custom_labels: | ||
| return metadata | ||
| # Maximum size of metadata for each object is 8 KiB. | ||
| # [Link]: https://cloud.google.com/storage/quotas#objects | ||
| max_gcs_metadata_size, total_metadata_size, labels = 8 * 1024, 0, [] | ||
| for label_name, label_value in task_params.items(): | ||
| normalized_task_params_labels = GCSObjectMetadataClient._normalize_labels(task_params) | ||
| normalized_custom_labels = GCSObjectMetadataClient._normalize_labels(custom_labels) | ||
| # There is a possibility that the keys of user-provided labels(custom_labels) may conflict with those generated from task parameters (task_params_labels). | ||
| # However, users who utilize custom_labels are no longer expected to search using the labels generated from task parameters. | ||
| # Instead, users are expected to search using the labels they provided. | ||
| # Therefore, in the event of a key conflict, the value registered by the user-provided labels will take precedence. | ||
| _merged_labels = GCSObjectMetadataClient._merge_custom_labels_and_task_params_labels(normalized_task_params_labels, normalized_custom_labels) | ||
| return dict(metadata) | dict(GCSObjectMetadataClient._adjust_gcs_metadata_limit_size(_merged_labels)) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ahh... sorry, try to recontribute. |
||
|
|
||
| @staticmethod | ||
| def _merge_custom_labels_and_task_params_labels( | ||
| normalized_task_params: dict[str, str], | ||
| normalized_custom_labels: dict[str, Any], | ||
| ) -> dict[str, str]: | ||
| merged_labels = copy.deepcopy(normalized_custom_labels) | ||
| for label_name, label_value in normalized_task_params.items(): | ||
| if len(label_value) == 0: | ||
| logger.warning(f'value of label_name={label_name} is empty. So skip to add as a metadata.') | ||
| continue | ||
| size = len(str(label_name).encode('utf-8')) + len(str(label_value).encode('utf-8')) | ||
| if total_metadata_size + size > max_gcs_metadata_size: | ||
| logger.warning(f'current metadata total size is {total_metadata_size} byte, and no more labels would be added.') | ||
| if label_name in merged_labels.keys(): | ||
| logger.warning(f'label_name={label_name} is already seen. So skip to add as a metadata.') | ||
| continue | ||
| merged_labels[label_name] = label_value | ||
| return merged_labels | ||
|
|
||
| # Google Cloud Storage(GCS) has a limitation of metadata size, 8 KiB. | ||
| # So, we need to adjust the size of metadata. | ||
| @staticmethod | ||
| def _adjust_gcs_metadata_limit_size(_labels: dict[str, str]) -> dict[str, str]: | ||
| def _get_label_size(label_name: str, label_value: str) -> int: | ||
| return len(label_name.encode('utf-8')) + len(label_value.encode('utf-8')) | ||
|
|
||
| labels = copy.deepcopy(_labels) | ||
| max_gcs_metadata_size, current_total_metadata_size = ( | ||
| 8 * 1024, | ||
| sum(_get_label_size(label_name, label_value) for label_name, label_value in labels.items()), | ||
| ) | ||
|
|
||
| if current_total_metadata_size <= max_gcs_metadata_size: | ||
| return labels | ||
|
|
||
| for label_name, label_value in reversed(labels.items()): | ||
| size = _get_label_size(label_name, label_value) | ||
| del labels[label_name] | ||
| current_total_metadata_size -= size | ||
| if current_total_metadata_size <= max_gcs_metadata_size: | ||
| break | ||
| total_metadata_size += size | ||
| labels.append((label_name, label_value)) | ||
| return dict(metadata) | dict(labels) | ||
| return labels | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nits]
you can write
dict[str, Any] | Nonesince 3.10, or, 3.7 withfrom __future__ import annotationsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for educational and helpful comment!
I'm not sure Optional expression is old expression.