-
Notifications
You must be signed in to change notification settings - Fork 62
Support functionalities to enhance task traceability with metadata for dependency search. #450
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 24 commits
79a2881
0cfe7ee
3eee422
ec3bf4f
22a69d0
08e3f59
9b19a1c
accbf1d
6719f4d
0bcc16c
5c41035
0b951ab
10795a2
32b4343
6f70a41
637f5da
b607926
a8059a1
27b1abd
5ac1c4d
f4479da
e71833b
46aabcf
7bde3b0
4c44cea
dd6a629
d884c79
f1418f8
6a1c4c2
0b06455
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| [loggers] | ||
| keys=root,luigi,luigi-interface,gokart | ||
|
|
||
| [handlers] | ||
| keys=stderrHandler | ||
|
|
||
| [formatters] | ||
| keys=simpleFormatter | ||
|
|
||
| [logger_root] | ||
| level=INFO | ||
| handlers=stderrHandler | ||
|
|
||
| [logger_gokart] | ||
| level=INFO | ||
| handlers=stderrHandler | ||
| qualname=gokart | ||
| propagate=0 | ||
|
|
||
| [logger_luigi] | ||
| level=INFO | ||
| handlers=stderrHandler | ||
| qualname=luigi | ||
| propagate=0 | ||
|
|
||
| [logger_luigi-interface] | ||
| level=INFO | ||
| handlers=stderrHandler | ||
| qualname=luigi-interface | ||
| propagate=0 | ||
|
|
||
| [handler_stderrHandler] | ||
| class=StreamHandler | ||
| formatter=simpleFormatter | ||
| args=(sys.stdout,) | ||
|
|
||
| [formatter_simpleFormatter] | ||
| format=level=%(levelname)s time=%(asctime)s name=%(name)s file=%(filename)s line=%(lineno)d message=%(message)s | ||
| datefmt=%Y/%m/%d %H:%M:%S | ||
| class=logging.Formatter |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| [TaskOnKart] | ||
| workspace_directory=./resource | ||
| local_temporary_directory=./resource/tmp | ||
|
|
||
| [core] | ||
| logging_conf_file=logging.ini | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,14 +1,18 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import copy | ||
| import json | ||
| import re | ||
| from collections.abc import Iterable | ||
| from logging import getLogger | ||
| from typing import Any | ||
| from urllib.parse import urlsplit | ||
|
|
||
| from googleapiclient.model import makepatch | ||
|
|
||
| from gokart.gcs_config import GCSConfig | ||
| from gokart.required_task_output import RequiredTaskOutput | ||
| from gokart.utils import FlattenableItems | ||
|
|
||
| logger = getLogger(__name__) | ||
|
|
||
|
|
@@ -21,7 +25,7 @@ class GCSObjectMetadataClient: | |
|
|
||
| @staticmethod | ||
| def _is_log_related_path(path: str) -> bool: | ||
| return re.match(r'^log/(processing_time/|task_info/|task_log/|module_versions/|random_seed/|task_params/).+', path) is not None | ||
| return re.match(r'^gs://.+?/log/(processing_time/|task_info/|task_log/|module_versions/|random_seed/|task_params/).+', path) is not None | ||
|
|
||
| # This is the copied method of luigi.gcs._path_to_bucket_and_key(path). | ||
| @staticmethod | ||
|
|
@@ -32,7 +36,12 @@ def _path_to_bucket_and_key(path: str) -> tuple[str, str]: | |
| return netloc, path_without_initial_slash | ||
|
|
||
| @staticmethod | ||
| def add_task_state_labels(path: str, task_params: dict[str, str] | None = None, custom_labels: dict[str, Any] | None = None) -> None: | ||
| def add_task_state_labels( | ||
| path: str, | ||
| task_params: dict[str, str] | None = None, | ||
| custom_labels: dict[str, Any] | None = None, | ||
| required_task_outputs: FlattenableItems[RequiredTaskOutput] | None = None, | ||
| ) -> None: | ||
| if GCSObjectMetadataClient._is_log_related_path(path): | ||
| return | ||
| # In gokart/object_storage.get_time_stamp, could find same call. | ||
|
|
@@ -42,20 +51,18 @@ def add_task_state_labels(path: str, task_params: dict[str, str] | None = None, | |
| if _response is None: | ||
| logger.error(f'failed to get object from GCS bucket {bucket} and object {obj}.') | ||
| return | ||
|
|
||
| response: dict[str, Any] = dict(_response) | ||
| original_metadata: dict[Any, Any] = {} | ||
| if 'metadata' in response.keys(): | ||
| _metadata = response.get('metadata') | ||
| if _metadata is not None: | ||
| original_metadata = dict(_metadata) | ||
|
|
||
| patched_metadata = GCSObjectMetadataClient._get_patched_obj_metadata( | ||
| copy.deepcopy(original_metadata), | ||
| task_params, | ||
| custom_labels, | ||
| required_task_outputs, | ||
| ) | ||
|
|
||
| if original_metadata != patched_metadata: | ||
| # If we use update api, existing object metadata are removed, so should use patch api. | ||
| # See the official document descriptions. | ||
|
|
@@ -71,7 +78,6 @@ def add_task_state_labels(path: str, task_params: dict[str, str] | None = None, | |
| ) | ||
| .execute() | ||
| ) | ||
|
|
||
| if update_response is None: | ||
| logger.error(f'failed to patch object {obj} in bucket {bucket} and object {obj}.') | ||
|
|
||
|
|
@@ -84,13 +90,13 @@ def _get_patched_obj_metadata( | |
| metadata: Any, | ||
| task_params: dict[str, str] | None = None, | ||
| custom_labels: dict[str, Any] | None = None, | ||
| required_task_outputs: FlattenableItems[RequiredTaskOutput] | None = None, | ||
| ) -> dict | Any: | ||
| # If metadata from response when getting bucket and object information is not dictionary, | ||
| # something wrong might be happened, so return original metadata, no patched. | ||
| if not isinstance(metadata, dict): | ||
| logger.warning(f'metadata is not a dict: {metadata}, something wrong was happened when getting response when get bucket and object information.') | ||
| return metadata | ||
|
|
||
| if not task_params and not custom_labels: | ||
| return metadata | ||
| # Maximum size of metadata for each object is 8 KiB. | ||
|
|
@@ -101,23 +107,44 @@ def _get_patched_obj_metadata( | |
| # However, users who utilize custom_labels are no longer expected to search using the labels generated from task parameters. | ||
| # Instead, users are expected to search using the labels they provided. | ||
| # Therefore, in the event of a key conflict, the value registered by the user-provided labels will take precedence. | ||
| _merged_labels = GCSObjectMetadataClient._merge_custom_labels_and_task_params_labels(normalized_task_params_labels, normalized_custom_labels) | ||
| normalized_labels = [normalized_custom_labels, normalized_task_params_labels] | ||
| if required_task_outputs: | ||
| normalized_labels.append({'__required_task_outputs': json.dumps(GCSObjectMetadataClient._get_serialized_string(required_task_outputs))}) | ||
|
|
||
| _merged_labels = GCSObjectMetadataClient._merge_custom_labels_and_task_params_labels(normalized_labels) | ||
| return GCSObjectMetadataClient._adjust_gcs_metadata_limit_size(dict(metadata) | _merged_labels) | ||
|
|
||
| @staticmethod | ||
| def _get_serialized_string(required_task_outputs: FlattenableItems[RequiredTaskOutput]) -> FlattenableItems[str]: | ||
| def _iterable_flatten(nested_list: Iterable) -> list[str]: | ||
| flattened_list: list[str] = [] | ||
| for item in nested_list: | ||
| if isinstance(item, Iterable): | ||
| flattened_list.extend(_iterable_flatten(item)) | ||
| else: | ||
| flattened_list.append(item) | ||
| return flattened_list | ||
|
|
||
| if isinstance(required_task_outputs, dict): | ||
| return {k: GCSObjectMetadataClient._get_serialized_string(v) for k, v in required_task_outputs.items()} | ||
| if isinstance(required_task_outputs, Iterable): | ||
| return _iterable_flatten([GCSObjectMetadataClient._get_serialized_string(ro) for ro in required_task_outputs]) | ||
| return [required_task_outputs.serialize()] | ||
|
|
||
| @staticmethod | ||
| def _merge_custom_labels_and_task_params_labels( | ||
| normalized_task_params: dict[str, str], | ||
| normalized_custom_labels: dict[str, Any], | ||
| normalized_labels_list: list[dict[str, Any]], | ||
| ) -> dict[str, str]: | ||
| merged_labels = copy.deepcopy(normalized_custom_labels) | ||
| for label_name, label_value in normalized_task_params.items(): | ||
| if len(label_value) == 0: | ||
| logger.warning(f'value of label_name={label_name} is empty. So skip to add as a metadata.') | ||
| continue | ||
| if label_name in merged_labels.keys(): | ||
| logger.warning(f'label_name={label_name} is already seen. So skip to add as a metadata.') | ||
| continue | ||
| merged_labels[label_name] = label_value | ||
| merged_labels: dict[str, str] = {} | ||
| for normalized_label in normalized_labels_list: | ||
| for label_name, label_value in normalized_label.items(): | ||
| if len(label_value) == 0: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [MUST] This code may fail, since it seems to assume that I prefer checking if it is str, and then check the length as,
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for reviewing my code!
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @TlexCypher
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @TlexCypher Colud you check this comment? If you are confirmed that label_value is str, you should
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I fixed here 0b06455 |
||
| logger.warning(f'value of label_name={label_name} is empty. So skip to add as a metadata.') | ||
| continue | ||
| if label_name in merged_labels.keys(): | ||
| logger.warning(f'label_name={label_name} is already seen. So skip to add as a metadata.') | ||
| continue | ||
| merged_labels[label_name] = label_value | ||
| return merged_labels | ||
|
|
||
| # Google Cloud Storage(GCS) has a limitation of metadata size, 8 KiB. | ||
|
|
@@ -132,10 +159,8 @@ def _get_label_size(label_name: str, label_value: str) -> int: | |
| 8 * 1024, | ||
| sum(_get_label_size(label_name, label_value) for label_name, label_value in labels.items()), | ||
| ) | ||
|
|
||
| if current_total_metadata_size <= max_gcs_metadata_size: | ||
| return labels | ||
|
|
||
| for label_name, label_value in reversed(labels.items()): | ||
| size = _get_label_size(label_name, label_value) | ||
| del labels[label_name] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| from dataclasses import dataclass | ||
|
|
||
|
|
||
| @dataclass | ||
| class RequiredTaskOutput: | ||
| task_name: str | ||
| output_path: str | ||
|
|
||
| def serialize(self) -> dict[str, str]: | ||
| return {'__gokart_task_name': self.task_name, '__gokart_output_path': self.output_path} |
Uh oh!
There was an error while loading. Please reload this page.