diff --git a/lib/charms/data_platform_libs/v0/data_interfaces.py b/lib/charms/data_platform_libs/v0/data_interfaces.py index 9071655a8..5d1691d9f 100644 --- a/lib/charms/data_platform_libs/v0/data_interfaces.py +++ b/lib/charms/data_platform_libs/v0/data_interfaces.py @@ -298,7 +298,7 @@ def _on_topic_requested(self, event: TopicRequestedEvent): from collections import namedtuple from datetime import datetime from enum import Enum -from typing import Dict, List, Optional, Set, Union +from typing import Callable, Dict, List, Optional, Set, Tuple, Union from ops import JujuVersion, Secret, SecretInfo, SecretNotFoundError from ops.charm import ( @@ -320,7 +320,7 @@ def _on_topic_requested(self, event: TopicRequestedEvent): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 20 +LIBPATCH = 23 PYDEPS = ["ops>=2.0.0"] @@ -377,11 +377,24 @@ class SecretsIllegalUpdateError(SecretError): """Secrets aren't yet available for Juju version used.""" -def get_encoded_field( +def get_encoded_dict( relation: Relation, member: Union[Unit, Application], field: str -) -> Union[str, List[str], Dict[str, str]]: +) -> Optional[Dict[str, str]]: """Retrieve and decode an encoded field from relation data.""" - return json.loads(relation.data[member].get(field, "{}")) + data = json.loads(relation.data[member].get(field, "{}")) + if isinstance(data, dict): + return data + logger.error("Unexpected datatype for %s instead of dict.", str(data)) + + +def get_encoded_list( + relation: Relation, member: Union[Unit, Application], field: str +) -> Optional[List[str]]: + """Retrieve and decode an encoded field from relation data.""" + data = json.loads(relation.data[member].get(field, "[]")) + if isinstance(data, list): + return data + logger.error("Unexpected datatype for %s instead of list.", str(data)) def set_encoded_field( @@ -406,16 +419,11 @@ def diff(event: RelationChangedEvent, bucket: Union[Unit, Application]) -> Diff: keys from the event relation databag. """ # Retrieve the old data from the data key in the application relation databag. - old_data = get_encoded_field(event.relation, bucket, "data") + old_data = get_encoded_dict(event.relation, bucket, "data") if not old_data: old_data = {} - if not isinstance(old_data, dict): - # We should never get here, added to re-assure pyright - logger.error("Previous databag diff is of a wrong type.") - old_data = {} - # Retrieve the new data from the event relation databag. new_data = ( {key: value for key, value in event.relation.data[event.app].items() if key != "data"} @@ -523,9 +531,14 @@ def get_content(self) -> Dict[str, str]: def set_content(self, content: Dict[str, str]) -> None: """Setting cached secret content.""" - if self.meta: + if not self.meta: + return + + if content: self.meta.set_content(content) self._secret_content = content + else: + self.meta.remove_all_revisions() def get_info(self) -> Optional[SecretInfo]: """Wrapper function to apply the corresponding call on the Secret object within CachedSecret if any.""" @@ -622,6 +635,16 @@ def _fetch_my_specific_relation_data( """Fetch data available (directily or indirectly -- i.e. secrets) from the relation for owner/this_app.""" raise NotImplementedError + @abstractmethod + def _update_relation_data(self, relation: Relation, data: Dict[str, str]) -> None: + """Update data available (directily or indirectly -- i.e. secrets) from the relation for owner/this_app.""" + raise NotImplementedError + + @abstractmethod + def _delete_relation_data(self, relation: Relation, fields: List[str]) -> None: + """Delete data available (directily or indirectly -- i.e. secrets) from the relation for owner/this_app.""" + raise NotImplementedError + # Internal helper methods @staticmethod @@ -688,9 +711,9 @@ def _group_secret_fields(secret_fields: List[str]) -> Dict[SecretGroup, List[str secret_fieldnames_grouped.setdefault(SecretGroup.EXTRA, []).append(key) return secret_fieldnames_grouped - def _retrieve_group_secret_contents( + def _get_group_secret_contents( self, - relation_id: int, + relation: Relation, group: SecretGroup, secret_fields: Optional[Union[Set[str], List[str]]] = None, ) -> Dict[str, str]: @@ -698,12 +721,30 @@ def _retrieve_group_secret_contents( if not secret_fields: secret_fields = [] - if (secret := self._get_relation_secret(relation_id, group)) and ( + if (secret := self._get_relation_secret(relation.id, group)) and ( secret_data := secret.get_content() ): return {k: v for k, v in secret_data.items() if k in secret_fields} return {} + @staticmethod + def _content_for_secret_group( + content: Dict[str, str], secret_fields: Set[str], group_mapping: SecretGroup + ) -> Dict[str, str]: + """Select : pairs from input, that belong to this particular Secret group.""" + if group_mapping == SecretGroup.EXTRA: + return { + k: v + for k, v in content.items() + if k in secret_fields and k not in SECRET_LABEL_MAP.keys() + } + + return { + k: v + for k, v in content.items() + if k in secret_fields and SECRET_LABEL_MAP.get(k) == group_mapping + } + @juju_secrets_only def _get_relation_secret_data( self, relation_id: int, group_mapping: SecretGroup, relation_name: Optional[str] = None @@ -713,6 +754,49 @@ def _get_relation_secret_data( if secret: return secret.get_content() + # Core operations on Relation Fields manipulations (regardless whether the field is in the databag or in a secret) + # Internal functions to be called directly from transparent public interface functions (+closely related helpers) + + def _process_secret_fields( + self, + relation: Relation, + req_secret_fields: Optional[List[str]], + impacted_rel_fields: List[str], + operation: Callable, + *args, + **kwargs, + ) -> Tuple[Dict[str, str], Set[str]]: + """Isolate target secret fields of manipulation, and execute requested operation by Secret Group.""" + result = {} + + # If the relation started on a databag, we just stay on the databag + # (Rolling upgrades may result in a relation starting on databag, getting secrets enabled on-the-fly) + # self.local_app is sufficient to check (ignored if Requires, never has secrets -- works if Provides) + fallback_to_databag = ( + req_secret_fields + and self.local_unit.is_leader() + and set(req_secret_fields) & set(relation.data[self.local_app]) + ) + + normal_fields = set(impacted_rel_fields) + if req_secret_fields and self.secrets_enabled and not fallback_to_databag: + normal_fields = normal_fields - set(req_secret_fields) + secret_fields = set(impacted_rel_fields) - set(normal_fields) + + secret_fieldnames_grouped = self._group_secret_fields(list(secret_fields)) + + for group in secret_fieldnames_grouped: + # operation() should return nothing when all goes well + if group_result := operation(relation, group, secret_fields, *args, **kwargs): + # If "meaningful" data was returned, we take it. (Some 'operation'-s only return success/failure.) + if isinstance(group_result, dict): + result.update(group_result) + else: + # If it wasn't found as a secret, let's give it a 2nd chance as "normal" field + # Needed when Juju3 Requires meets Juju2 Provider + normal_fields |= set(secret_fieldnames_grouped[group]) + return (result, normal_fields) + def _fetch_relation_data_without_secrets( self, app: Application, relation: Relation, fields: Optional[List[str]] ) -> Dict[str, str]: @@ -723,6 +807,9 @@ def _fetch_relation_data_without_secrets( This is used typically when the Provides side wants to read the Requires side's data, or when the Requires side may want to read its own data. """ + if app not in relation.data or not relation.data[app]: + return {} + if fields: return {k: relation.data[app][k] for k in fields if k in relation.data[app]} else: @@ -743,43 +830,66 @@ def _fetch_relation_data_with_secrets( Provides side itself). """ result = {} + normal_fields = [] - normal_fields = fields - if not normal_fields: - normal_fields = list(relation.data[app].keys()) - - if req_secret_fields and self.secrets_enabled: - if fields: - # Processing from what was requested - normal_fields = set(fields) - set(req_secret_fields) - secret_fields = set(fields) - set(normal_fields) - - secret_fieldnames_grouped = self._group_secret_fields(list(secret_fields)) - - for group in secret_fieldnames_grouped: - if contents := self._retrieve_group_secret_contents( - relation.id, group, secret_fields - ): - result.update(contents) - else: - # If it wasn't found as a secret, let's give it a 2nd chance as "normal" field - normal_fields |= set(secret_fieldnames_grouped[group]) - else: - # Processing from what is given, i.e. retrieving all - normal_fields = [ - f for f in relation.data[app].keys() if not self._is_secret_field(f) - ] - secret_fields = [f for f in relation.data[app].keys() if self._is_secret_field(f)] - for group in SecretGroup: - result.update( - self._retrieve_group_secret_contents(relation.id, group, req_secret_fields) - ) + if not fields: + if app not in relation.data or not relation.data[app]: + return {} + + all_fields = list(relation.data[app].keys()) + normal_fields = [field for field in all_fields if not self._is_secret_field(field)] + + # There must have been secrets there + if all_fields != normal_fields and req_secret_fields: + # So we assemble the full fields list (without 'secret-' fields) + fields = normal_fields + req_secret_fields + + if fields: + result, normal_fields = self._process_secret_fields( + relation, req_secret_fields, fields, self._get_group_secret_contents + ) # Processing "normal" fields. May include leftover from what we couldn't retrieve as a secret. - result.update({k: relation.data[app][k] for k in normal_fields if k in relation.data[app]}) + # (Typically when Juju3 Requires meets Juju2 Provides) + if normal_fields: + result.update( + self._fetch_relation_data_without_secrets(app, relation, list(normal_fields)) + ) return result - # Public methods + def _update_relation_data_without_secrets( + self, app: Application, relation: Relation, data: Dict[str, str] + ) -> None: + """Updating databag contents when no secrets are involved.""" + if app not in relation.data or relation.data[app] is None: + return + + if any(self._is_secret_field(key) for key in data.keys()): + raise SecretsIllegalUpdateError("Can't update secret {key}.") + + if relation: + relation.data[app].update(data) + + def _delete_relation_data_without_secrets( + self, app: Application, relation: Relation, fields: List[str] + ) -> None: + """Remove databag fields 'fields' from Relation.""" + if app not in relation.data or not relation.data[app]: + return + + for field in fields: + try: + relation.data[app].pop(field) + except KeyError: + logger.debug( + "Non-existing field was attempted to be removed from the databag %s, %s", + str(relation.id), + str(field), + ) + pass + + # Public interface methods + # Handling Relation Fields seamlessly, regardless if in databag or a Juju Secret def get_relation(self, relation_name, relation_id) -> Relation: """Safe way of retrieving a relation.""" @@ -790,9 +900,6 @@ def get_relation(self, relation_name, relation_id) -> Relation: "Relation %s %s couldn't be retrieved", relation_name, relation_id ) - if not relation.app: - raise DataInterfacesError("Relation's application missing") - return relation def fetch_relation_data( @@ -879,12 +986,19 @@ def fetch_my_relation_field( if relation_data := self.fetch_my_relation_data([relation_id], [field], relation_name): return relation_data.get(relation_id, {}).get(field) - # Public methods - mandatory override - - @abstractmethod + @leader_only def update_relation_data(self, relation_id: int, data: dict) -> None: """Update the data within the relation.""" - raise NotImplementedError + relation_name = self.relation_name + relation = self.get_relation(relation_name, relation_id) + return self._update_relation_data(relation, data) + + @leader_only + def delete_relation_data(self, relation_id: int, fields: List[str]) -> None: + """Remove field from the relation.""" + relation_name = self.relation_name + relation = self.get_relation(relation_name, relation_id) + return self._delete_relation_data(relation, fields) # Base DataProvides and DataRequires @@ -910,59 +1024,95 @@ def _diff(self, event: RelationChangedEvent) -> Diff: # Private methods handling secrets - @leader_only @juju_secrets_only def _add_relation_secret( - self, relation_id: int, content: Dict[str, str], group_mapping: SecretGroup - ) -> Optional[Secret]: + self, relation: Relation, content: Dict[str, str], group_mapping: SecretGroup + ) -> bool: """Add a new Juju Secret that will be registered in the relation databag.""" - relation = self.get_relation(self.relation_name, relation_id) - secret_field = self._generate_secret_field_name(group_mapping) if relation.data[self.local_app].get(secret_field): - logging.error("Secret for relation %s already exists, not adding again", relation_id) - return + logging.error("Secret for relation %s already exists, not adding again", relation.id) + return False - label = self._generate_secret_label(self.relation_name, relation_id, group_mapping) + label = self._generate_secret_label(self.relation_name, relation.id, group_mapping) secret = self.secrets.add(label, content, relation) # According to lint we may not have a Secret ID if secret.meta and secret.meta.id: relation.data[self.local_app][secret_field] = secret.meta.id - @leader_only + # Return the content that was added + return True + @juju_secrets_only def _update_relation_secret( - self, relation_id: int, content: Dict[str, str], group_mapping: SecretGroup - ): + self, relation: Relation, content: Dict[str, str], group_mapping: SecretGroup + ) -> bool: """Update the contents of an existing Juju Secret, referred in the relation databag.""" - secret = self._get_relation_secret(relation_id, group_mapping) + secret = self._get_relation_secret(relation.id, group_mapping) if not secret: - logging.error("Can't update secret for relation %s", relation_id) - return + logging.error("Can't update secret for relation %s", relation.id) + return False old_content = secret.get_content() full_content = copy.deepcopy(old_content) full_content.update(content) secret.set_content(full_content) - @staticmethod - def _secret_content_grouped( - content: Dict[str, str], secret_fields: Set[str], group_mapping: SecretGroup - ) -> Dict[str, str]: - if group_mapping == SecretGroup.EXTRA: - return { - k: v - for k, v in content.items() - if k in secret_fields and k not in SECRET_LABEL_MAP.keys() - } + # Return True on success + return True - return { - k: v - for k, v in content.items() - if k in secret_fields and SECRET_LABEL_MAP.get(k) == group_mapping - } + def _add_or_update_relation_secrets( + self, + relation: Relation, + group: SecretGroup, + secret_fields: Set[str], + data: Dict[str, str], + ) -> bool: + """Update contents for Secret group. If the Secret doesn't exist, create it.""" + secret_content = self._content_for_secret_group(data, secret_fields, group) + if self._get_relation_secret(relation.id, group): + return self._update_relation_secret(relation, secret_content, group) + else: + return self._add_relation_secret(relation, secret_content, group) + + @juju_secrets_only + def _delete_relation_secret( + self, relation: Relation, group: SecretGroup, secret_fields: List[str], fields: List[str] + ) -> bool: + """Update the contents of an existing Juju Secret, referred in the relation databag.""" + secret = self._get_relation_secret(relation.id, group) + + if not secret: + logging.error("Can't update secret for relation %s", str(relation.id)) + return False + + old_content = secret.get_content() + new_content = copy.deepcopy(old_content) + for field in fields: + try: + new_content.pop(field) + except KeyError: + logging.error( + "Non-existing secret was attempted to be removed %s, %s", + str(relation.id), + str(field), + ) + return False + + secret.set_content(new_content) + + # Remove secret from the relation if it's fully gone + if not new_content: + field = self._generate_secret_field_name(group) + try: + relation.data[self.local_app].pop(field) + except KeyError: + pass + + # Return the content that was removed + return True # Mandatory internal overrides @@ -1004,45 +1154,42 @@ def _fetch_my_specific_relation_data( """Fetching our own relation data.""" secret_fields = None if relation.app: - secret_fields = get_encoded_field(relation, relation.app, REQ_SECRET_FIELDS) + secret_fields = get_encoded_list(relation, relation.app, REQ_SECRET_FIELDS) return self._fetch_relation_data_with_secrets( self.local_app, - secret_fields if isinstance(secret_fields, list) else None, + secret_fields, relation, fields, ) - # Public methods -- mandatory overrides - - @leader_only - def update_relation_data(self, relation_id: int, fields: Dict[str, str]) -> None: + def _update_relation_data(self, relation: Relation, data: Dict[str, str]) -> None: """Set values for fields not caring whether it's a secret or not.""" - relation = self.get_relation(self.relation_name, relation_id) - + req_secret_fields = [] if relation.app: - relation_secret_fields = get_encoded_field(relation, relation.app, REQ_SECRET_FIELDS) - else: - relation_secret_fields = [] + req_secret_fields = get_encoded_list(relation, relation.app, REQ_SECRET_FIELDS) - normal_fields = list(fields) - if relation_secret_fields and self.secrets_enabled: - normal_fields = set(fields.keys()) - set(relation_secret_fields) - secret_fields = set(fields.keys()) - set(normal_fields) + _, normal_fields = self._process_secret_fields( + relation, + req_secret_fields, + list(data), + self._add_or_update_relation_secrets, + data=data, + ) - secret_fieldnames_grouped = self._group_secret_fields(list(secret_fields)) + normal_content = {k: v for k, v in data.items() if k in normal_fields} + self._update_relation_data_without_secrets(self.local_app, relation, normal_content) - for group in secret_fieldnames_grouped: - secret_content = self._secret_content_grouped(fields, secret_fields, group) - if self._get_relation_secret(relation_id, group): - self._update_relation_secret(relation_id, secret_content, group) - else: - self._add_relation_secret(relation_id, secret_content, group) + def _delete_relation_data(self, relation: Relation, fields: List[str]) -> None: + """Delete fields from the Relation not caring whether it's a secret or not.""" + req_secret_fields = [] + if relation.app: + req_secret_fields = get_encoded_list(relation, relation.app, REQ_SECRET_FIELDS) - normal_content = {k: v for k, v in fields.items() if k in normal_fields} - relation.data[self.local_app].update( # pyright: ignore [reportGeneralTypeIssues] - normal_content + _, normal_fields = self._process_secret_fields( + relation, req_secret_fields, fields, self._delete_relation_secret, fields=fields ) + self._delete_relation_data_without_secrets(self.local_app, relation, list(normal_fields)) # Public methods - "native" @@ -1245,26 +1392,30 @@ def _fetch_my_specific_relation_data(self, relation, fields: Optional[List[str]] """Fetching our own relation data.""" return self._fetch_relation_data_without_secrets(self.local_app, relation, fields) - # Public methods -- mandatory overrides - - @leader_only - def update_relation_data(self, relation_id: int, data: dict) -> None: + def _update_relation_data(self, relation: Relation, data: dict) -> None: """Updates a set of key-value pairs in the relation. This function writes in the application data bag, therefore, only the leader unit can call it. Args: - relation_id: the identifier for a particular relation. + relation: the particular relation. data: dict containing the key-value pairs that should be updated in the relation. """ - if any(self._is_secret_field(key) for key in data.keys()): - raise SecretsIllegalUpdateError("Requires side can't update secrets.") + return self._update_relation_data_without_secrets(self.local_app, relation, data) - relation = self.charm.model.get_relation(self.relation_name, relation_id) - if relation: - relation.data[self.local_app].update(data) + def _delete_relation_data(self, relation: Relation, fields: List[str]) -> None: + """Deletes a set of fields from the relation. + + This function writes in the application data bag, therefore, + only the leader unit can call it. + + Args: + relation: the particular relation. + fields: list containing the field names that should be removed from the relation. + """ + return self._delete_relation_data_without_secrets(self.local_app, relation, fields) # General events diff --git a/lib/charms/data_platform_libs/v0/s3.py b/lib/charms/data_platform_libs/v0/s3.py index 9fb518a56..7beb113b6 100644 --- a/lib/charms/data_platform_libs/v0/s3.py +++ b/lib/charms/data_platform_libs/v0/s3.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""A library for communicating with the S3 credentials providers and consumers. +r"""A library for communicating with the S3 credentials providers and consumers. This library provides the relevant interface code implementing the communication specification for fetching, retrieving, triggering, and responding to events related to @@ -113,7 +113,7 @@ def _on_credential_gone(self, event: CredentialsGoneEvent): import json import logging from collections import namedtuple -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Union import ops.charm import ops.framework @@ -121,15 +121,13 @@ def _on_credential_gone(self, event: CredentialsGoneEvent): from ops.charm import ( CharmBase, CharmEvents, - EventSource, - Object, - ObjectEvents, RelationBrokenEvent, RelationChangedEvent, RelationEvent, RelationJoinedEvent, ) -from ops.model import Relation +from ops.framework import EventSource, Object, ObjectEvents +from ops.model import Application, Relation, RelationDataContent, Unit # The unique Charmhub library identifier, never change it LIBID = "fca396f6254246c9bfa565b1f85ab528" @@ -139,7 +137,7 @@ def _on_credential_gone(self, event: CredentialsGoneEvent): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 2 +LIBPATCH = 4 logger = logging.getLogger(__name__) @@ -152,7 +150,7 @@ def _on_credential_gone(self, event: CredentialsGoneEvent): deleted - key that were deleted""" -def diff(event: RelationChangedEvent, bucket: str) -> Diff: +def diff(event: RelationChangedEvent, bucket: Union[Unit, Application]) -> Diff: """Retrieves the diff of the data in the relation changed databag. Args: @@ -166,9 +164,11 @@ def diff(event: RelationChangedEvent, bucket: str) -> Diff: # Retrieve the old data from the data key in the application relation databag. old_data = json.loads(event.relation.data[bucket].get("data", "{}")) # Retrieve the new data from the event relation databag. - new_data = { - key: value for key, value in event.relation.data[event.app].items() if key != "data" - } + new_data = ( + {key: value for key, value in event.relation.data[event.app].items() if key != "data"} + if event.app + else {} + ) # These are the keys that were added to the databag and triggered this event. added = new_data.keys() - old_data.keys() @@ -193,7 +193,10 @@ class BucketEvent(RelationEvent): @property def bucket(self) -> Optional[str]: """Returns the bucket was requested.""" - return self.relation.data[self.relation.app].get("bucket") + if not self.relation.app: + return None + + return self.relation.data[self.relation.app].get("bucket", "") class CredentialRequestedEvent(BucketEvent): @@ -209,7 +212,7 @@ class S3CredentialEvents(CharmEvents): class S3Provider(Object): """A provider handler for communicating S3 credentials to consumers.""" - on = S3CredentialEvents() + on = S3CredentialEvents() # pyright: ignore [reportGeneralTypeIssues] def __init__( self, @@ -232,7 +235,9 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None: diff = self._diff(event) # emit on credential requested if bucket is provided by the requirer application if "bucket" in diff.added: - self.on.credentials_requested.emit(event.relation, app=event.app, unit=event.unit) + getattr(self.on, "credentials_requested").emit( + event.relation, app=event.app, unit=event.unit + ) def _load_relation_data(self, raw_relation_data: dict) -> dict: """Loads relation data from the relation data bag. @@ -242,7 +247,7 @@ def _load_relation_data(self, raw_relation_data: dict) -> dict: Returns: dict: Relation data in dict format. """ - connection_data = dict() + connection_data = {} for key in raw_relation_data: try: connection_data[key] = json.loads(raw_relation_data[key]) @@ -309,9 +314,11 @@ def fetch_relation_data(self) -> dict: """ data = {} for relation in self.relations: - data[relation.id] = { - key: value for key, value in relation.data[relation.app].items() if key != "data" - } + data[relation.id] = ( + {key: value for key, value in relation.data[relation.app].items() if key != "data"} + if relation.app + else {} + ) return data def update_connection_info(self, relation_id: int, connection_data: dict) -> None: @@ -493,46 +500,73 @@ class S3Event(RelationEvent): @property def bucket(self) -> Optional[str]: """Returns the bucket name.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("bucket") @property def access_key(self) -> Optional[str]: """Returns the access key.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("access-key") @property def secret_key(self) -> Optional[str]: """Returns the secret key.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("secret-key") @property def path(self) -> Optional[str]: """Returns the path where data can be stored.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("path") @property def endpoint(self) -> Optional[str]: """Returns the endpoint address.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("endpoint") @property def region(self) -> Optional[str]: """Returns the region.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("region") @property def s3_uri_style(self) -> Optional[str]: """Returns the s3 uri style.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("s3-uri-style") @property def storage_class(self) -> Optional[str]: """Returns the storage class name.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("storage-class") @property def tls_ca_chain(self) -> Optional[List[str]]: """Returns the TLS CA chain.""" + if not self.relation.app: + return None + tls_ca_chain = self.relation.data[self.relation.app].get("tls-ca-chain") if tls_ca_chain is not None: return json.loads(tls_ca_chain) @@ -541,11 +575,17 @@ def tls_ca_chain(self) -> Optional[List[str]]: @property def s3_api_version(self) -> Optional[str]: """Returns the S3 API version.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("s3-api-version") @property def attributes(self) -> Optional[List[str]]: """Returns the attributes.""" + if not self.relation.app: + return None + attributes = self.relation.data[self.relation.app].get("attributes") if attributes is not None: return json.loads(attributes) @@ -573,9 +613,11 @@ class S3CredentialRequiresEvents(ObjectEvents): class S3Requirer(Object): """Requires-side of the s3 relation.""" - on = S3CredentialRequiresEvents() + on = S3CredentialRequiresEvents() # pyright: ignore[reportGeneralTypeIssues] - def __init__(self, charm: ops.charm.CharmBase, relation_name: str, bucket_name: str = None): + def __init__( + self, charm: ops.charm.CharmBase, relation_name: str, bucket_name: Optional[str] = None + ): """Manager of the s3 client relations.""" super().__init__(charm, relation_name) @@ -658,7 +700,7 @@ def update_connection_info(self, relation_id: int, connection_data: dict) -> Non relation.data[self.local_app].update(updated_connection_data) logger.debug(f"Updated S3 credentials: {updated_connection_data}") - def _load_relation_data(self, raw_relation_data: dict) -> dict: + def _load_relation_data(self, raw_relation_data: RelationDataContent) -> Dict[str, str]: """Loads relation data from the relation data bag. Args: @@ -666,7 +708,7 @@ def _load_relation_data(self, raw_relation_data: dict) -> dict: Returns: dict: Relation data in dict format. """ - connection_data = dict() + connection_data = {} for key in raw_relation_data: try: connection_data[key] = json.loads(raw_relation_data[key]) @@ -700,22 +742,25 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None: missing_options.append(configuration_option) # emit credential change event only if all mandatory fields are present if contains_required_options: - self.on.credentials_changed.emit(event.relation, app=event.app, unit=event.unit) + getattr(self.on, "credentials_changed").emit( + event.relation, app=event.app, unit=event.unit + ) else: logger.warning( f"Some mandatory fields: {missing_options} are not present, do not emit credential change event!" ) - def get_s3_connection_info(self) -> Dict: + def get_s3_connection_info(self) -> Dict[str, str]: """Return the s3 credentials as a dictionary.""" - relation = self.charm.model.get_relation(self.relation_name) - if not relation: - return {} - return self._load_relation_data(relation.data[relation.app]) + for relation in self.relations: + if relation and relation.app: + return self._load_relation_data(relation.data[relation.app]) + + return {} def _on_relation_broken(self, event: RelationBrokenEvent) -> None: """Notify the charm about a broken S3 credential store relation.""" - self.on.credentials_gone.emit(event.relation, app=event.app, unit=event.unit) + getattr(self.on, "credentials_gone").emit(event.relation, app=event.app, unit=event.unit) @property def relations(self) -> List[Relation]: diff --git a/lib/charms/grafana_k8s/v0/grafana_dashboard.py b/lib/charms/grafana_k8s/v0/grafana_dashboard.py index 7e088d46e..1f1bc4f0c 100644 --- a/lib/charms/grafana_k8s/v0/grafana_dashboard.py +++ b/lib/charms/grafana_k8s/v0/grafana_dashboard.py @@ -219,7 +219,7 @@ def __init__(self, *args): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 30 +LIBPATCH = 35 logger = logging.getLogger(__name__) @@ -525,7 +525,7 @@ def _validate_relation_by_interface_and_direction( relation = charm.meta.relations[relation_name] actual_relation_interface = relation.interface_name - if actual_relation_interface != expected_relation_interface: + if actual_relation_interface and actual_relation_interface != expected_relation_interface: raise RelationInterfaceMismatchError( relation_name, expected_relation_interface, actual_relation_interface ) @@ -665,14 +665,14 @@ def _template_panels( continue if not existing_templates: datasource = panel.get("datasource") - if type(datasource) == str: + if isinstance(datasource, str): if "loki" in datasource: panel["datasource"] = "${lokids}" elif "grafana" in datasource: continue else: panel["datasource"] = "${prometheusds}" - elif type(datasource) == dict: + elif isinstance(datasource, dict): # In dashboards exported by Grafana 9, datasource type is dict dstype = datasource.get("type", "") if dstype == "loki": @@ -686,7 +686,7 @@ def _template_panels( logger.error("Unknown datasource format: skipping") continue else: - if type(panel["datasource"]) == str: + if isinstance(panel["datasource"], str): if panel["datasource"].lower() in replacements.values(): # Already a known template variable continue @@ -701,7 +701,7 @@ def _template_panels( if replacement: used_replacements.append(ds) panel["datasource"] = replacement or panel["datasource"] - elif type(panel["datasource"]) == dict: + elif isinstance(panel["datasource"], dict): dstype = panel["datasource"].get("type", "") if panel["datasource"].get("uid", "").lower() in replacements.values(): # Already a known template variable @@ -790,7 +790,7 @@ def _inject_labels(content: str, topology: dict, transformer: "CosTool") -> str: # We need to use an index so we can insert the changed element back later for panel_idx, panel in enumerate(panels): - if type(panel) is not dict: + if not isinstance(panel, dict): continue # Use the index to insert it back in the same location @@ -831,11 +831,11 @@ def _modify_panel(panel: dict, topology: dict, transformer: "CosTool") -> dict: if "datasource" not in panel.keys(): continue - if type(panel["datasource"]) == str: + if isinstance(panel["datasource"], str): if panel["datasource"] not in known_datasources: continue querytype = known_datasources[panel["datasource"]] - elif type(panel["datasource"]) == dict: + elif isinstance(panel["datasource"], dict): if panel["datasource"]["uid"] not in known_datasources: continue querytype = known_datasources[panel["datasource"]["uid"]] @@ -955,7 +955,7 @@ def restore(self, snapshot): """Restore grafana source information.""" self.error_message = snapshot["error_message"] self.valid = snapshot["valid"] - self.errors = json.loads(snapshot["errors"]) + self.errors = json.loads(str(snapshot["errors"])) class GrafanaProviderEvents(ObjectEvents): @@ -968,7 +968,7 @@ class GrafanaDashboardProvider(Object): """An API to provide Grafana dashboards to a Grafana charm.""" _stored = StoredState() - on = GrafanaProviderEvents() + on = GrafanaProviderEvents() # pyright: ignore def __init__( self, @@ -1072,7 +1072,7 @@ def add_dashboard(self, content: str, inject_dropdowns: bool = True) -> None: """ # Update of storage must be done irrespective of leadership, so # that the stored state is there when this unit becomes leader. - stored_dashboard_templates = self._stored.dashboard_templates # type: Any + stored_dashboard_templates: Any = self._stored.dashboard_templates # pyright: ignore encoded_dashboard = _encode_dashboard_content(content) @@ -1093,7 +1093,7 @@ def remove_non_builtin_dashboards(self) -> None: """Remove all dashboards to the relation added via :method:`add_dashboard`.""" # Update of storage must be done irrespective of leadership, so # that the stored state is there when this unit becomes leader. - stored_dashboard_templates = self._stored.dashboard_templates # type: Any + stored_dashboard_templates: Any = self._stored.dashboard_templates # pyright: ignore for dashboard_id in list(stored_dashboard_templates.keys()): if dashboard_id.startswith("prog:"): @@ -1120,7 +1120,7 @@ def _update_all_dashboards_from_dir( # Ensure we do not leave outdated dashboards by removing from stored all # the encoded dashboards that start with "file/". if self._dashboards_path: - stored_dashboard_templates = self._stored.dashboard_templates # type: Any + stored_dashboard_templates: Any = self._stored.dashboard_templates # pyright: ignore for dashboard_id in list(stored_dashboard_templates.keys()): if dashboard_id.startswith("file:"): @@ -1174,7 +1174,7 @@ def _reinitialize_dashboard_data(self, inject_dropdowns: bool = True) -> None: e.grafana_dashboards_absolute_path, e.message, ) - stored_dashboard_templates = self._stored.dashboard_templates # type: Any + stored_dashboard_templates: Any = self._stored.dashboard_templates # pyright: ignore for dashboard_id in list(stored_dashboard_templates.keys()): if dashboard_id.startswith("file:"): @@ -1195,6 +1195,7 @@ def _on_grafana_dashboard_relation_created(self, event: RelationCreatedEvent) -> `grafana_dashboaard` relationship is joined """ if self._charm.unit.is_leader(): + self._update_all_dashboards_from_dir() self._upset_dashboards_on_relation(event.relation) def _on_grafana_dashboard_relation_changed(self, event: RelationChangedEvent) -> None: @@ -1212,16 +1213,18 @@ def _on_grafana_dashboard_relation_changed(self, event: RelationChangedEvent) -> valid = bool(data.get("valid", True)) errors = data.get("errors", []) if valid and not errors: - self.on.dashboard_status_changed.emit(valid=valid) + self.on.dashboard_status_changed.emit(valid=valid) # pyright: ignore else: - self.on.dashboard_status_changed.emit(valid=valid, errors=errors) + self.on.dashboard_status_changed.emit( # pyright: ignore + valid=valid, errors=errors + ) def _upset_dashboards_on_relation(self, relation: Relation) -> None: """Update the dashboards in the relation data bucket.""" # It's completely ridiculous to add a UUID, but if we don't have some # pseudo-random value, this never makes it across 'juju set-state' stored_data = { - "templates": _type_convert_stored(self._stored.dashboard_templates), + "templates": _type_convert_stored(self._stored.dashboard_templates), # pyright: ignore "uuid": str(uuid.uuid4()), } @@ -1256,7 +1259,7 @@ def dashboard_templates(self) -> List: class GrafanaDashboardConsumer(Object): """A consumer object for working with Grafana Dashboards.""" - on = GrafanaDashboardEvents() + on = GrafanaDashboardEvents() # pyright: ignore _stored = StoredState() def __init__( @@ -1348,13 +1351,13 @@ def _on_grafana_dashboard_relation_changed(self, event: RelationChangedEvent) -> changes = self._render_dashboards_and_signal_changed(event.relation) if changes: - self.on.dashboards_changed.emit() + self.on.dashboards_changed.emit() # pyright: ignore def _on_grafana_peer_changed(self, _: RelationChangedEvent) -> None: """Emit dashboard events on peer events so secondary charm data updates.""" if self._charm.unit.is_leader(): return - self.on.dashboards_changed.emit() + self.on.dashboards_changed.emit() # pyright: ignore def update_dashboards(self, relation: Optional[Relation] = None) -> None: """Re-establish dashboards on one or more relations. @@ -1401,7 +1404,7 @@ def _render_dashboards_and_signal_changed(self, relation: Relation) -> bool: # """ other_app = relation.app - raw_data = relation.data[other_app].get("dashboards", {}) # type: ignore + raw_data = relation.data[other_app].get("dashboards", "") # pyright: ignore if not raw_data: logger.warning( @@ -1509,12 +1512,12 @@ def _render_dashboards_and_signal_changed(self, relation: Relation) -> bool: # def _manage_dashboard_uid(self, dashboard: str, template: dict) -> str: """Add an uid to the dashboard if it is not present.""" - dashboard = json.loads(dashboard) + dashboard_dict = json.loads(dashboard) - if not dashboard.get("uid", None) and "dashboard_alt_uid" in template: - dashboard["uid"] = template["dashboard_alt_uid"] + if not dashboard_dict.get("uid", None) and "dashboard_alt_uid" in template: + dashboard_dict["uid"] = template["dashboard_alt_uid"] - return json.dumps(dashboard) + return json.dumps(dashboard_dict) def _remove_all_dashboards_for_relation(self, relation: Relation) -> None: """If an errored dashboard is in stored data, remove it and trigger a deletion.""" @@ -1522,7 +1525,7 @@ def _remove_all_dashboards_for_relation(self, relation: Relation) -> None: stored_dashboards = self.get_peer_data("dashboards") stored_dashboards.pop(str(relation.id)) self.set_peer_data("dashboards", stored_dashboards) - self.on.dashboards_changed.emit() + self.on.dashboards_changed.emit() # pyright: ignore def _to_external_object(self, relation_id, dashboard): return { @@ -1604,7 +1607,7 @@ class GrafanaDashboardAggregator(Object): """ _stored = StoredState() - on = GrafanaProviderEvents() + on = GrafanaProviderEvents() # pyright: ignore def __init__( self, @@ -1669,7 +1672,7 @@ def _update_remote_grafana(self, _: Optional[RelationEvent] = None) -> None: """Push dashboards to the downstream Grafana relation.""" # It's still ridiculous to add a UUID here, but needed stored_data = { - "templates": _type_convert_stored(self._stored.dashboard_templates), + "templates": _type_convert_stored(self._stored.dashboard_templates), # pyright: ignore "uuid": str(uuid.uuid4()), } @@ -1690,7 +1693,7 @@ def remove_dashboards(self, event: RelationBrokenEvent) -> None: del self._stored.dashboard_templates[id] # type: ignore stored_data = { - "templates": _type_convert_stored(self._stored.dashboard_templates), + "templates": _type_convert_stored(self._stored.dashboard_templates), # pyright: ignore "uuid": str(uuid.uuid4()), } diff --git a/lib/charms/loki_k8s/v0/loki_push_api.py b/lib/charms/loki_k8s/v0/loki_push_api.py index 8fe2833a6..9f9372d26 100644 --- a/lib/charms/loki_k8s/v0/loki_push_api.py +++ b/lib/charms/loki_k8s/v0/loki_push_api.py @@ -32,7 +32,7 @@ This object may be used by any Charmed Operator which implements the `loki_push_api` interface. For instance, Loki or Grafana Agent. -For this purposes a charm needs to instantiate the `LokiPushApiProvider` object with one mandatory +For this purpose a charm needs to instantiate the `LokiPushApiProvider` object with one mandatory and three optional arguments. - `charm`: A reference to the parent (Loki) charm. @@ -43,12 +43,10 @@ If provided, this relation name must match a provided relation in metadata.yaml with the `loki_push_api` interface. - Typically `LokiPushApiConsumer` use "logging" as a relation_name and `LogProxyConsumer` use - "log_proxy". + The default relation name is "logging" for `LokiPushApiConsumer` and "log-proxy" for + `LogProxyConsumer`. - The default value of this arguments is "logging". - - An example of this in a `metadata.yaml` file should have the following section: + For example, a provider's `metadata.yaml` file may look as follows: ```yaml provides: @@ -56,7 +54,7 @@ interface: loki_push_api ``` - For example, a Loki charm may instantiate the `LokiPushApiProvider` in its constructor as + Subsequently, a Loki charm may instantiate the `LokiPushApiProvider` in its constructor as follows: from charms.loki_k8s.v0.loki_push_api import LokiPushApiProvider @@ -69,21 +67,20 @@ class LokiOperatorCharm(CharmBase): def __init__(self, *args): super().__init__(*args) ... - self._loki_ready() + external_url = urlparse(self._external_url) + self.loki_provider = LokiPushApiProvider( + self, + address=external_url.hostname or self.hostname, + port=external_url.port or 80, + scheme=external_url.scheme, + path=f"{external_url.path}/loki/api/v1/push", + ) ... - def _loki_ready(self): - try: - version = self._loki_server.version - self.loki_provider = LokiPushApiProvider(self) - logger.debug("Loki Provider is available. Loki version: %s", version) - except LokiServerNotReadyError as e: - self.unit.status = MaintenanceStatus(str(e)) - except LokiServerError as e: - self.unit.status = BlockedStatus(str(e)) - - - `port`: Loki Push Api endpoint port. Default value: 3100. - - `rules_dir`: Directory to store alert rules. Default value: "/loki/rules". + - `port`: Loki Push Api endpoint port. Default value: `3100`. + - `scheme`: Loki Push Api endpoint scheme (`HTTP` or `HTTPS`). Default value: `HTTP` + - `address`: Loki Push Api endpoint address. Default value: `localhost` + - `path`: Loki Push Api endpoint path. Default value: `loki/api/v1/push` The `LokiPushApiProvider` object has several responsibilities: @@ -92,7 +89,7 @@ def _loki_ready(self): must be unique to all instances (e.g. using a load balancer). 2. Set the Promtail binary URL (`promtail_binary_zip_url`) so clients that use - `LogProxyConsumer` object can downloaded and configure it. + `LogProxyConsumer` object could download and configure it. 3. Process the metadata of the consumer application, provided via the "metadata" field of the consumer data bag, which are used to annotate the @@ -222,14 +219,14 @@ def __init__(self, *args): ## LogProxyConsumer Library Usage -Let's say that we have a workload charm that produces logs and we need to send those logs to a +Let's say that we have a workload charm that produces logs, and we need to send those logs to a workload implementing the `loki_push_api` interface, such as `Loki` or `Grafana Agent`. Adopting this object in a Charmed Operator consist of two steps: -1. Use the `LogProxyConsumer` class by instanting it in the `__init__` method of the charmed - operator. There are two ways to get logs in to promtail. You can give it a list of files to read - or you can write to it using the syslog protocol. +1. Use the `LogProxyConsumer` class by instantiating it in the `__init__` method of the charmed + operator. There are two ways to get logs in to promtail. You can give it a list of files to + read, or you can write to it using the syslog protocol. For example: @@ -396,7 +393,7 @@ def _promtail_error(self, event): The Loki charm may be related to multiple Loki client charms. Without this, filter rules submitted by one provider charm will also result in corresponding alerts for other -provider charms. Hence every alert rule expression must include such a topology filter stub. +provider charms. Hence, every alert rule expression must include such a topology filter stub. Gathering alert rules and generating rule files within the Loki charm is easily done using the `alerts()` method of `LokiPushApiProvider`. Alerts generated by Loki will automatically @@ -483,7 +480,7 @@ def _alert_rules_error(self, event): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 18 +LIBPATCH = 22 logger = logging.getLogger(__name__) @@ -1077,6 +1074,10 @@ def __init__( It is strongly advised not to change the default, so that people deploying your charm will have a consistent experience with all other charms that consume metrics endpoints. + port: an optional port of the Loki service (default is "3100"). + scheme: an optional scheme of the Loki API URL (default is "http"). + address: an optional address of the Loki service (default is "localhost"). + path: an optional path of the Loki API URL (default is "loki/api/v1/push") Raises: RelationNotFoundError: If there is no relation in the charm's metadata.yaml @@ -1186,7 +1187,7 @@ def _should_update_alert_rules(self, relation) -> bool: """Determine whether alert rules should be regenerated. If there are alert rules in the relation data bag, tell the charm - whether or not to regenerate them based on the boolean returned here. + whether to regenerate them based on the boolean returned here. """ if relation.data.get(relation.app).get("alert_rules", None) is not None: return True @@ -1207,7 +1208,7 @@ def _process_logging_relation_changed(self, relation: Relation) -> bool: relation: the `Relation` instance to update. Returns: - A boolean indicating whether an event should be emitted so we + A boolean indicating whether an event should be emitted, so we only emit one on lifecycle events """ relation.data[self._charm.unit]["public_address"] = socket.getfqdn() or "" @@ -1232,7 +1233,7 @@ def update_endpoint(self, url: str = "", relation: Optional[Relation] = None) -> This method should be used when the charm relying on this library needs to update the relation data in response to something occurring outside - of the `logging` relation lifecycle, e.g., in case of a + the `logging` relation lifecycle, e.g., in case of a host address change because the charmed operator becomes connected to an Ingress after the `logging` relation is established. @@ -1286,7 +1287,7 @@ def alerts(self) -> dict: # noqa: C901 separate alert rules file for each relation since the returned list of alert groups are indexed by relation ID. Also for each relation ID associated scrape metadata such as Juju model, UUID and application - name are provided so the a unique name may be generated for the rules + name are provided so a unique name may be generated for the rules file. For each relation the structure of data returned is a dictionary with four keys @@ -1528,24 +1529,27 @@ def __init__( ): """Construct a Loki charm client. - The `LokiPushApiConsumer` object provides configurations to a Loki client charm. - A charm instantiating this object needs Loki information, for instance the - Loki API endpoint to push logs. - The `LokiPushApiConsumer` can be instantiated as follows: + The `LokiPushApiConsumer` object provides configurations to a Loki client charm, such as + the Loki API endpoint to push logs. It is intended for workloads that can speak + loki_push_api (https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki), such + as grafana-agent. + (If you only need to forward a few workload log files, then use LogProxyConsumer.) + + `LokiPushApiConsumer` can be instantiated as follows: self._loki_consumer = LokiPushApiConsumer(self) Args: charm: a `CharmBase` object that manages this `LokiPushApiConsumer` object. - Typically this is `self` in the instantiating class. + Typically, this is `self` in the instantiating class. relation_name: the string name of the relation interface to look up. If `charm` has exactly one relation with this interface, the relation's name is returned. If none or multiple relations with the provided interface - are found, this method will raise either an exception of type - NoRelationWithInterfaceFoundError or MultipleRelationsWithInterfaceFoundError, - respectively. + are found, this method will raise either a NoRelationWithInterfaceFoundError or + MultipleRelationsWithInterfaceFoundError exception, respectively. alert_rules_path: a string indicating a path where alert rules can be found - recursive: Whether or not to scan for rule files recursively. + recursive: Whether to scan for rule files recursively. + skip_alert_topology_labeling: whether to skip the alert topology labeling. Raises: RelationNotFoundError: If there is no relation in the charm's metadata.yaml @@ -1732,9 +1736,8 @@ class LogProxyConsumer(ConsumerBase): relation_name: the string name of the relation interface to look up. If `charm` has exactly one relation with this interface, the relation's name is returned. If none or multiple relations with the provided interface - are found, this method will raise either an exception of type - NoRelationWithInterfaceFoundError or MultipleRelationsWithInterfaceFoundError, - respectively. + are found, this method will raise either a NoRelationWithInterfaceFoundError or + MultipleRelationsWithInterfaceFoundError exception, respectively. enable_syslog: Whether to enable syslog integration. syslog_port: The port syslog is attached to. alert_rules_path: an optional path for the location of alert rules @@ -1762,7 +1765,7 @@ class LogProxyConsumer(ConsumerBase): def __init__( self, charm, - log_files: Optional[list] = None, + log_files: Optional[Union[List[str], str]] = None, relation_name: str = DEFAULT_LOG_PROXY_RELATION_NAME, enable_syslog: bool = False, syslog_port: int = 1514, @@ -1770,19 +1773,30 @@ def __init__( recursive: bool = False, container_name: str = "", promtail_resource_name: Optional[str] = None, + *, # TODO: In v1, move the star up so everything after 'charm' is a kwarg + insecure_skip_verify: bool = False, ): super().__init__(charm, relation_name, alert_rules_path, recursive) self._charm = charm self._relation_name = relation_name self._container = self._get_container(container_name) self._container_name = self._get_container_name(container_name) - self._log_files = log_files or [] + + if not log_files: + log_files = [] + elif isinstance(log_files, str): + log_files = [log_files] + elif not isinstance(log_files, list) or not all((isinstance(x, str) for x in log_files)): + raise TypeError("The 'log_files' argument must be a list of strings.") + self._log_files = log_files + self._syslog_port = syslog_port self._is_syslog = enable_syslog self.topology = JujuTopology.from_charm(charm) self._promtail_resource_name = promtail_resource_name or "promtail-bin" + self.insecure_skip_verify = insecure_skip_verify - # architechure used for promtail binary + # architecture used for promtail binary arch = platform.processor() self._arch = "amd64" if arch == "x86_64" else arch @@ -1981,7 +1995,9 @@ def _push_binary_to_workload(self, binary_path: str, workload_binary_path: str) workload_binary_path: path in workload container to which promtail binary is pushed. """ with open(binary_path, "rb") as f: - self._container.push(workload_binary_path, f, permissions=0o755, make_dirs=True) + self._container.push( + workload_binary_path, f, permissions=0o755, encoding=None, make_dirs=True + ) logger.debug("The promtail binary file has been pushed to the workload container.") @property @@ -1999,8 +2015,7 @@ def _promtail_attached_as_resource(self) -> bool: except NameError as e: if "invalid resource name" in str(e): return False - else: - raise + raise def _push_promtail_if_attached(self, workload_binary_path: str) -> bool: """Checks whether Promtail binary is attached to the charm or not. @@ -2041,7 +2056,7 @@ def _promtail_must_be_downloaded(self, promtail_info: dict) -> bool: return False def _sha256sums_matches(self, file_path: str, sha256sum: str) -> bool: - """Checks whether a file's sha256sum matches or not with an specific sha256sum. + """Checks whether a file's sha256sum matches or not with a specific sha256sum. Args: file_path: A string representing the files' patch. @@ -2049,7 +2064,7 @@ def _sha256sums_matches(self, file_path: str, sha256sum: str) -> bool: Returns: a boolean representing whether a file's sha256sum matches or not with - an specific sha256sum. + a specific sha256sum. """ try: with open(file_path, "rb") as f: @@ -2141,8 +2156,15 @@ def _current_config(self) -> dict: @property def _promtail_config(self) -> dict: - """Generates the config file for Promtail.""" + """Generates the config file for Promtail. + + Reference: https://grafana.com/docs/loki/latest/send-data/promtail/configuration + """ config = {"clients": self._clients_list()} + if self.insecure_skip_verify: + for client in config["clients"]: + client["tls_config"] = {"insecure_skip_verify": True} + config.update(self._server_config()) config.update(self._positions()) config.update(self._scrape_configs()) diff --git a/lib/charms/mongodb/v0/mongodb.py b/lib/charms/mongodb/v0/mongodb.py index 25a65d7c0..d0464db4e 100644 --- a/lib/charms/mongodb/v0/mongodb.py +++ b/lib/charms/mongodb/v0/mongodb.py @@ -34,6 +34,9 @@ # path to store mongodb ketFile logger = logging.getLogger(__name__) +# List of system usernames needed for correct work on the charm. +CHARM_USERS = ["operator", "backup", "monitor"] + @dataclass class MongoDBConfiguration: diff --git a/lib/charms/mongodb/v0/mongodb_secrets.py b/lib/charms/mongodb/v0/mongodb_secrets.py index 77b7a0f21..6a1589c66 100644 --- a/lib/charms/mongodb/v0/mongodb_secrets.py +++ b/lib/charms/mongodb/v0/mongodb_secrets.py @@ -14,7 +14,7 @@ # The unique Charmhub library identifier, never change it # The unique Charmhub library identifier, never change it -LIBID = "87456e41c7594240b92b783a648592b5" +LIBID = "89cefc863fd747d7ace12cb508e7bec2" # Increment this major API version when introducing breaking changes LIBAPI = 0 @@ -63,7 +63,7 @@ def add_secret(self, content: Dict[str, str], scope: Scopes) -> Secret: "Secret is already defined with uri %s", self._secret_uri ) - if scope == Config.APP_SCOPE: + if scope == Config.Relations.APP_SCOPE: secret = self.charm.app.add_secret(content, label=self.label) else: secret = self.charm.unit.add_secret(content, label=self.label) diff --git a/lib/charms/mongodb/v0/mongodb_tls.py b/lib/charms/mongodb/v0/mongodb_tls.py index b9ead22cc..f87d2d9c2 100644 --- a/lib/charms/mongodb/v0/mongodb_tls.py +++ b/lib/charms/mongodb/v0/mongodb_tls.py @@ -39,8 +39,7 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 5 - +LIBPATCH = 6 logger = logging.getLogger(__name__) @@ -70,7 +69,7 @@ def __init__(self, charm, peer_relation, substrate): self.framework.observe(self.certs.on.certificate_expiring, self._on_certificate_expiring) def is_tls_enabled(self, scope: Scopes): - """Getting internal TLS flag (meaning).""" + """Returns a boolean indicating if TLS for a given `scope` is enabled.""" return self.charm.get_secret(scope, Config.TLS.SECRET_CERT_LABEL) is not None def _on_set_tls_private_key(self, event: ActionEvent) -> None: diff --git a/lib/charms/mongodb/v0/helpers.py b/lib/charms/mongodb/v1/helpers.py similarity index 85% rename from lib/charms/mongodb/v0/helpers.py rename to lib/charms/mongodb/v1/helpers.py index 1f6fa8860..57c50dda3 100644 --- a/lib/charms/mongodb/v0/helpers.py +++ b/lib/charms/mongodb/v1/helpers.py @@ -19,16 +19,17 @@ ) from pymongo.errors import AutoReconnect, ServerSelectionTimeoutError +from config import Config + # The unique Charmhub library identifier, never change it LIBID = "b9a7fe0c38d8486a9d1ce94c27d4758e" # Increment this major API version when introducing breaking changes -LIBAPI = 0 +LIBAPI = 1 # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 8 - +LIBPATCH = 1 # path to store mongodb ketFile KEY_FILE = "keyFile" @@ -40,6 +41,7 @@ MONGODB_COMMON_DIR = "/var/snap/charmed-mongodb/common" MONGODB_SNAP_DATA_DIR = "/var/snap/charmed-mongodb/current" +MONGO_SHELL = "charmed-mongodb.mongosh" DATA_DIR = "/var/lib/mongodb" CONF_DIR = "/etc/mongod" @@ -48,9 +50,7 @@ # noinspection GrazieInspection -def get_create_user_cmd( - config: MongoDBConfiguration, mongo_path="charmed-mongodb.mongosh" -) -> List[str]: +def get_create_user_cmd(config: MongoDBConfiguration, mongo_path=MONGO_SHELL) -> List[str]: """Creates initial admin user for MongoDB. Initial admin user can be created only through localhost connection. @@ -80,10 +80,41 @@ def get_create_user_cmd( ] +def get_mongos_args( + config: MongoDBConfiguration, + snap_install: bool = False, +) -> str: + """Returns the arguments used for starting mongos on a config-server side application. + + Returns: + A string representing the arguments to be passed to mongos. + """ + # mongos running on the config server communicates through localhost + # use constant for port + config_server_uri = f"{config.replset}/localhost:27017" + + full_conf_dir = f"{MONGODB_SNAP_DATA_DIR}{CONF_DIR}" if snap_install else CONF_DIR + cmd = [ + # mongos on config server side should run on 0.0.0.0 so it can be accessed by other units + # in the sharded cluster + "--bind_ip_all", + f"--configdb {config_server_uri}", + # config server is already using 27017 + f"--port {Config.MONGOS_PORT}", + f"--keyFile={full_conf_dir}/{KEY_FILE}", + "\n", + ] + + # TODO Future PR: support TLS on mongos + + return " ".join(cmd) + + def get_mongod_args( config: MongoDBConfiguration, auth: bool = True, snap_install: bool = False, + role: str = "replication", ) -> str: """Construct the MongoDB startup command line. @@ -102,6 +133,9 @@ def get_mongod_args( f"--replSet={config.replset}", # db must be located within the snap common directory since the snap is strictly confined f"--dbpath={full_data_dir}", + # for simplicity we run the mongod daemon on shards, configsvrs, and replicas on the same + # port + f"--port={Config.MONGODB_PORT}", logging_options, ] if auth: @@ -137,6 +171,12 @@ def get_mongod_args( ] ) + if role == Config.Role.CONFIG_SERVER: + cmd.append("--configsvr") + + if role == Config.Role.SHARD: + cmd.append("--shardsvr") + cmd.append("\n") return " ".join(cmd) @@ -202,25 +242,6 @@ def copy_licenses_to_unit(): ) -_StrOrBytes = Union[str, bytes] - - -def process_pbm_error(error_string: Optional[_StrOrBytes]) -> str: - """Parses pbm error string and returns a user friendly message.""" - message = "couldn't configure s3 backup option" - if not error_string: - return message - if type(error_string) == bytes: - error_string = error_string.decode("utf-8") - if "status code: 403" in error_string: # type: ignore - message = "s3 credentials are incorrect." - elif "status code: 404" in error_string: # type: ignore - message = "s3 configurations are incompatible." - elif "status code: 301" in error_string: # type: ignore - message = "s3 configurations are incompatible." - return message - - def current_pbm_op(pbm_status: str) -> str: """Parses pbm status for the operation that pbm is running.""" pbm_status = json.loads(pbm_status) @@ -246,3 +267,22 @@ def process_pbm_status(pbm_status: str) -> StatusBase: return WaitingStatus("waiting to sync s3 configurations.") return ActiveStatus() + + +_StrOrBytes = Union[str, bytes] + + +def process_pbm_error(error_string: Optional[_StrOrBytes]) -> str: + """Parses pbm error string and returns a user friendly message.""" + message = "couldn't configure s3 backup option" + if not error_string: + return message + if type(error_string) == bytes: + error_string = error_string.decode("utf-8") + if "status code: 403" in error_string: # type: ignore + message = "s3 credentials are incorrect." + elif "status code: 404" in error_string: # type: ignore + message = "s3 configurations are incompatible." + elif "status code: 301" in error_string: # type: ignore + message = "s3 configurations are incompatible." + return message diff --git a/lib/charms/mongodb/v1/mongodb.py b/lib/charms/mongodb/v1/mongodb.py new file mode 100644 index 000000000..730f02301 --- /dev/null +++ b/lib/charms/mongodb/v1/mongodb.py @@ -0,0 +1,440 @@ +"""Code for interactions with MongoDB.""" +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +import logging +import re +from dataclasses import dataclass +from typing import Dict, List, Optional, Set +from urllib.parse import quote_plus + +from bson.json_util import dumps +from pymongo import MongoClient +from pymongo.errors import OperationFailure, PyMongoError +from tenacity import ( + RetryError, + Retrying, + before_log, + retry, + stop_after_attempt, + stop_after_delay, + wait_fixed, +) + +# The unique Charmhub library identifier, never change it +LIBID = "49c69d9977574dd7942eb7b54f43355b" + +# Increment this major API version when introducing breaking changes +LIBAPI = 1 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 0 + +# path to store mongodb ketFile +logger = logging.getLogger(__name__) + + +@dataclass +class MongoDBConfiguration: + """Class for MongoDB configuration. + + — replset: name of replica set, needed for connection URI. + — database: database name. + — username: username. + — password: password. + — hosts: full list of hosts to connect to, needed for the URI. + - tls_external: indicator for use of internal TLS connection. + - tls_internal: indicator for use of external TLS connection. + """ + + replset: str + database: Optional[str] + username: str + password: str + hosts: Set[str] + roles: Set[str] + tls_external: bool + tls_internal: bool + + @property + def uri(self): + """Return URI concatenated from fields.""" + hosts = ",".join(self.hosts) + # Auth DB should be specified while user connects to application DB. + auth_source = "" + if self.database != "admin": + auth_source = "&authSource=admin" + return ( + f"mongodb://{quote_plus(self.username)}:" + f"{quote_plus(self.password)}@" + f"{hosts}/{quote_plus(self.database)}?" + f"replicaSet={quote_plus(self.replset)}" + f"{auth_source}" + ) + + +class NotReadyError(PyMongoError): + """Raised when not all replica set members healthy or finished initial sync.""" + + +class MongoDBConnection: + """In this class we create connection object to MongoDB. + + Real connection is created on the first call to MongoDB. + Delayed connectivity allows to firstly check database readiness + and reuse the same connection for an actual query later in the code. + + Connection is automatically closed when object destroyed. + Automatic close allows to have more clean code. + + Note that connection when used may lead to the following pymongo errors: ConfigurationError, + ConfigurationError, OperationFailure. It is suggested that the following pattern be adopted + when using MongoDBConnection: + + with MongoDBConnection(self._mongodb_config) as mongo: + try: + mongo. + except ConfigurationError, ConfigurationError, OperationFailure: + + """ + + def __init__(self, config: MongoDBConfiguration, uri=None, direct=False): + """A MongoDB client interface. + + Args: + config: MongoDB Configuration object. + uri: allow using custom MongoDB URI, needed for replSet init. + direct: force a direct connection to a specific host, avoiding + reading replica set configuration and reconnection. + """ + self.mongodb_config = config + + if uri is None: + uri = config.uri + + self.client = MongoClient( + uri, + directConnection=direct, + connect=False, + serverSelectionTimeoutMS=1000, + connectTimeoutMS=2000, + ) + return + + def __enter__(self): + """Return a reference to the new connection.""" + return self + + def __exit__(self, object_type, value, traceback): + """Disconnect from MongoDB client.""" + self.client.close() + self.client = None + + @property + def is_ready(self) -> bool: + """Is the MongoDB server ready for services requests. + + Returns: + True if services is ready False otherwise. Retries over a period of 60 seconds times to + allow server time to start up. + + Raises: + ConfigurationError, ConfigurationError, OperationFailure + """ + try: + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + with attempt: + # The ping command is cheap and does not require auth. + self.client.admin.command("ping") + except RetryError: + return False + + return True + + @retry( + stop=stop_after_attempt(3), + wait=wait_fixed(5), + reraise=True, + before=before_log(logger, logging.DEBUG), + ) + def init_replset(self) -> None: + """Create replica set config the first time. + + Raises: + ConfigurationError, ConfigurationError, OperationFailure + """ + config = { + "_id": self.mongodb_config.replset, + "members": [{"_id": i, "host": h} for i, h in enumerate(self.mongodb_config.hosts)], + } + try: + self.client.admin.command("replSetInitiate", config) + except OperationFailure as e: + if e.code not in (13, 23): # Unauthorized, AlreadyInitialized + # Unauthorized error can be raised only if initial user were + # created the step after this. + # AlreadyInitialized error can be raised only if this step + # finished. + logger.error("Cannot initialize replica set. error=%r", e) + raise e + + def get_replset_status(self) -> Dict: + """Get a replica set status as a dict. + + Returns: + A set of the replica set members along with their status. + + Raises: + ConfigurationError, ConfigurationError, OperationFailure + """ + rs_status = self.client.admin.command("replSetGetStatus") + rs_status_parsed = {} + for member in rs_status["members"]: + member_name = self._hostname_from_hostport(member["name"]) + rs_status_parsed[member_name] = member["stateStr"] + + return rs_status_parsed + + def get_replset_members(self) -> Set[str]: + """Get a replica set members. + + Returns: + A set of the replica set members as reported by mongod. + + Raises: + ConfigurationError, ConfigurationError, OperationFailure + """ + rs_status = self.client.admin.command("replSetGetStatus") + curr_members = [ + self._hostname_from_hostport(member["name"]) for member in rs_status["members"] + ] + return set(curr_members) + + def add_replset_member(self, hostname: str) -> None: + """Add a new member to replica set config inside MongoDB. + + Raises: + ConfigurationError, ConfigurationError, OperationFailure, NotReadyError + """ + rs_config = self.client.admin.command("replSetGetConfig") + rs_status = self.client.admin.command("replSetGetStatus") + + # When we add a new member, MongoDB transfer data from existing member to new. + # Such operation reduce performance of the cluster. To avoid huge performance + # degradation, before adding new members, it is needed to check that all other + # members finished init sync. + if self._is_any_sync(rs_status): + # it can take a while, we should defer + raise NotReadyError + + # Avoid reusing IDs, according to the doc + # https://www.mongodb.com/docs/manual/reference/replica-configuration/ + max_id = max([int(member["_id"]) for member in rs_config["config"]["members"]]) + new_member = {"_id": int(max_id + 1), "host": hostname} + + rs_config["config"]["version"] += 1 + rs_config["config"]["members"].extend([new_member]) + logger.debug("rs_config: %r", rs_config["config"]) + self.client.admin.command("replSetReconfig", rs_config["config"]) + + @retry( + stop=stop_after_attempt(20), + wait=wait_fixed(3), + reraise=True, + before=before_log(logger, logging.DEBUG), + ) + def remove_replset_member(self, hostname: str) -> None: + """Remove member from replica set config inside MongoDB. + + Raises: + ConfigurationError, ConfigurationError, OperationFailure, NotReadyError + """ + rs_config = self.client.admin.command("replSetGetConfig") + rs_status = self.client.admin.command("replSetGetStatus") + + # When we remove member, to avoid issues when majority members is removed, we need to + # remove next member only when MongoDB forget the previous removed member. + if self._is_any_removing(rs_status): + # removing from replicaset is fast operation, lets @retry(3 times with a 5sec timeout) + # before giving up. + raise NotReadyError + + # avoid downtime we need to reelect new primary if removable member is the primary. + logger.debug("primary: %r", self._is_primary(rs_status, hostname)) + if self._is_primary(rs_status, hostname): + self.client.admin.command("replSetStepDown", {"stepDownSecs": "60"}) + + rs_config["config"]["version"] += 1 + rs_config["config"]["members"][:] = [ + member + for member in rs_config["config"]["members"] + if hostname != self._hostname_from_hostport(member["host"]) + ] + logger.debug("rs_config: %r", dumps(rs_config["config"])) + self.client.admin.command("replSetReconfig", rs_config["config"]) + + def create_user(self, config: MongoDBConfiguration): + """Create user. + + Grant read and write privileges for specified database. + """ + self.client.admin.command( + "createUser", + config.username, + pwd=config.password, + roles=self._get_roles(config), + mechanisms=["SCRAM-SHA-256"], + ) + + def update_user(self, config: MongoDBConfiguration): + """Update grants on database.""" + self.client.admin.command( + "updateUser", + config.username, + roles=self._get_roles(config), + ) + + def set_user_password(self, username, password: str): + """Update the password.""" + self.client.admin.command( + "updateUser", + username, + pwd=password, + ) + + def create_role(self, role_name: str, privileges: dict, roles: dict = []): + """Creates a new role. + + Args: + role_name: name of the role to be added. + privileges: privileges to be associated with the role. + roles: List of roles from which this role inherits privileges. + """ + try: + self.client.admin.command( + "createRole", role_name, privileges=[privileges], roles=roles + ) + except OperationFailure as e: + if not e.code == 51002: # Role already exists + logger.error("Cannot add role. error=%r", e) + raise e + + @staticmethod + def _get_roles(config: MongoDBConfiguration) -> List[dict]: + """Generate roles List.""" + supported_roles = { + "admin": [ + {"role": "userAdminAnyDatabase", "db": "admin"}, + {"role": "readWriteAnyDatabase", "db": "admin"}, + {"role": "userAdmin", "db": "admin"}, + ], + "monitor": [ + {"role": "explainRole", "db": "admin"}, + {"role": "clusterMonitor", "db": "admin"}, + {"role": "read", "db": "local"}, + ], + "backup": [ + {"db": "admin", "role": "readWrite", "collection": ""}, + {"db": "admin", "role": "backup"}, + {"db": "admin", "role": "clusterMonitor"}, + {"db": "admin", "role": "restore"}, + {"db": "admin", "role": "pbmAnyAction"}, + ], + "default": [ + {"role": "readWrite", "db": config.database}, + ], + } + return [role_dict for role in config.roles for role_dict in supported_roles[role]] + + def drop_user(self, username: str): + """Drop user.""" + self.client.admin.command("dropUser", username) + + def get_users(self) -> Set[str]: + """Add a new member to replica set config inside MongoDB.""" + users_info = self.client.admin.command("usersInfo") + return set( + [ + user_obj["user"] + for user_obj in users_info["users"] + if re.match(r"^relation-\d+$", user_obj["user"]) + ] + ) + + def get_databases(self) -> Set[str]: + """Return list of all non-default databases.""" + system_dbs = ("admin", "local", "config") + databases = self.client.list_database_names() + return set([db for db in databases if db not in system_dbs]) + + def drop_database(self, database: str): + """Drop a non-default database.""" + system_dbs = ("admin", "local", "config") + if database in system_dbs: + return + self.client.drop_database(database) + + def _is_primary(self, rs_status: Dict, hostname: str) -> bool: + """Returns True if passed host is the replica set primary. + + Args: + hostname: host of interest. + rs_status: current state of replica set as reported by mongod. + """ + return any( + hostname == self._hostname_from_hostport(member["name"]) + and member["stateStr"] == "PRIMARY" + for member in rs_status["members"] + ) + + def primary(self) -> str: + """Returns primary replica host.""" + status = self.client.admin.command("replSetGetStatus") + + primary = None + # loop through all members in the replica set + for member in status["members"]: + # check replica's current state + if member["stateStr"] == "PRIMARY": + primary = self._hostname_from_hostport(member["name"]) + + return primary + + @staticmethod + def _is_any_sync(rs_status: Dict) -> bool: + """Returns true if any replica set members are syncing data. + + Checks if any members in replica set are syncing data. Note it is recommended to run only + one sync in the cluster to not have huge performance degradation. + + Args: + rs_status: current state of replica set as reported by mongod. + """ + return any( + member["stateStr"] in ["STARTUP", "STARTUP2", "ROLLBACK", "RECOVERING"] + for member in rs_status["members"] + ) + + @staticmethod + def _is_any_removing(rs_status: Dict) -> bool: + """Returns true if any replica set members are removing now. + + Checks if any members in replica set are getting removed. It is recommended to run only one + removal in the cluster at a time as to not have huge performance degradation. + + Args: + rs_status: current state of replica set as reported by mongod. + """ + return any(member["stateStr"] == "REMOVED" for member in rs_status["members"]) + + @staticmethod + def _hostname_from_hostport(hostname: str) -> str: + """Return hostname part from MongoDB returned. + + MongoDB typically returns a value that contains both, hostname and port. + e.g. input: mongodb-1:27015 + Return hostname without changes if the port is not passed. + e.g. input: mongodb-1 + """ + return hostname.split(":")[0] diff --git a/lib/charms/mongodb/v0/mongodb_backups.py b/lib/charms/mongodb/v1/mongodb_backups.py similarity index 90% rename from lib/charms/mongodb/v0/mongodb_backups.py rename to lib/charms/mongodb/v1/mongodb_backups.py index 715a7094b..c184e49ad 100644 --- a/lib/charms/mongodb/v0/mongodb_backups.py +++ b/lib/charms/mongodb/v1/mongodb_backups.py @@ -13,14 +13,10 @@ import re import subprocess import time -from typing import Dict, List +from typing import Dict, List, Optional, Union from charms.data_platform_libs.v0.s3 import CredentialsChangedEvent, S3Requirer -from charms.mongodb.v0.helpers import ( - current_pbm_op, - process_pbm_error, - process_pbm_status, -) +from charms.mongodb.v1.helpers import current_pbm_op, process_pbm_status from charms.operator_libs_linux.v1 import snap from ops.framework import Object from ops.model import BlockedStatus, MaintenanceStatus, StatusBase, WaitingStatus @@ -34,15 +30,17 @@ wait_fixed, ) +from config import Config + # The unique Charmhub library identifier, never change it -LIBID = "9f2b91c6128d48d6ba22724bf365da3b" +LIBID = "18c461132b824ace91af0d7abe85f40e" # Increment this major API version when introducing breaking changes -LIBAPI = 0 +LIBAPI = 1 # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 2 +LIBPATCH = 1 logger = logging.getLogger(__name__) @@ -63,6 +61,9 @@ BACKUP_RESTORE_ATTEMPT_COOLDOWN = 15 +_StrOrBytes = Union[str, bytes] + + class ResyncError(Exception): """Raised when pbm is resyncing configurations and is not ready to be used.""" @@ -316,7 +317,7 @@ def _configure_pbm_options(self, event) -> None: ), return except ExecError as e: - self.charm.unit.status = BlockedStatus(process_pbm_error(e.stdout)) + self.charm.unit.status = BlockedStatus(self.process_pbm_error(e.stdout)) return except subprocess.CalledProcessError as e: logger.error("Syncing configurations failed: %s", str(e)) @@ -418,7 +419,7 @@ def _wait_pbm_status(self) -> None: ) raise ResyncError except ExecError as e: - self.charm.unit.status = BlockedStatus(process_pbm_error(e.stdout)) + self.charm.unit.status = BlockedStatus(self.process_pbm_error(e.stdout)) def get_pbm_status(self) -> StatusBase: """Retrieve pbm status.""" @@ -428,15 +429,14 @@ def get_pbm_status(self) -> StatusBase: try: previous_pbm_status = self.charm.unit.status pbm_status = self.charm.run_pbm_command(PBM_STATUS_CMD) + + # pbm errors are outputted in json and do not raise CLI errors + pbm_error = self.process_pbm_error(pbm_status) + if pbm_error: + return BlockedStatus(pbm_error) + self._log_backup_restore_result(pbm_status, previous_pbm_status) return process_pbm_status(pbm_status) - except ExecError as e: - logger.error(f"Failed to get pbm status. {e}") - return BlockedStatus(process_pbm_error(e.stdout)) - except subprocess.CalledProcessError as e: - # pbm pipes a return code of 1, but its output shows the true error code so it is - # necessary to parse the output - return BlockedStatus(process_pbm_error(e.output)) except Exception as e: # pbm pipes a return code of 1, but its output shows the true error code so it is # necessary to parse the output @@ -505,7 +505,7 @@ def _try_to_restore(self, backup_id: str) -> None: If PBM is resyncing, the function will retry to create backup (up to BACKUP_RESTORE_MAX_ATTEMPTS times) with BACKUP_RESTORE_ATTEMPT_COOLDOWN - time between attepts. + time between attempts. If PMB returen any other error, the function will raise RestoreError. """ @@ -523,7 +523,7 @@ def _try_to_restore(self, backup_id: str) -> None: restore_cmd = restore_cmd + remapping_args.split(" ") self.charm.run_pbm_command(restore_cmd) except (subprocess.CalledProcessError, ExecError) as e: - if type(e) is subprocess.CalledProcessError: + if isinstance(e, subprocess.CalledProcessError): error_message = e.output.decode("utf-8") else: error_message = str(e.stderr) @@ -541,7 +541,7 @@ def _try_to_backup(self): If PBM is resyncing, the function will retry to create backup (up to BACKUP_RESTORE_MAX_ATTEMPTS times) - with BACKUP_RESTORE_ATTEMPT_COOLDOWN time between attepts. + with BACKUP_RESTORE_ATTEMPT_COOLDOWN time between attempts. If PMB returen any other error, the function will raise BackupError. """ @@ -560,7 +560,7 @@ def _try_to_backup(self): ) return backup_id_match.group("backup_id") if backup_id_match else "N/A" except (subprocess.CalledProcessError, ExecError) as e: - if type(e) is subprocess.CalledProcessError: + if isinstance(e, subprocess.CalledProcessError): error_message = e.output.decode("utf-8") else: error_message = str(e.stderr) @@ -642,7 +642,7 @@ def _get_backup_restore_operation_result(self, current_pbm_status, previous_pbm_ return f"Operation is still in progress: '{current_pbm_status.message}'" if ( - type(previous_pbm_status) is MaintenanceStatus + isinstance(previous_pbm_status, MaintenanceStatus) and "backup id:" in previous_pbm_status.message ): backup_id = previous_pbm_status.message.split("backup id:")[-1].strip() @@ -652,3 +652,48 @@ def _get_backup_restore_operation_result(self, current_pbm_status, previous_pbm_ return f"Backup {backup_id} completed successfully" return "Unknown operation result" + + def retrieve_error_message(self, pbm_status: Dict) -> str: + """Parses pbm status for an error message from the current unit. + + If pbm_agent is in the error state, the command `pbm status` does not raise an error. + Instead, it is in the log messages. pbm_agent also shows all the error messages for other + replicas in the set. + """ + try: + clusters = pbm_status["cluster"] + for cluster in clusters: + if cluster["rs"] == self.charm.app.name: + break + + for host_info in cluster["nodes"]: + replica_info = ( + f"mongodb/{self.charm._unit_ip(self.charm.unit)}:{Config.MONGOS_PORT}" + ) + if host_info["host"] == replica_info: + break + + return str(host_info["errors"]) + except KeyError: + return "" + + def process_pbm_error(self, pbm_status: Optional[_StrOrBytes]) -> str: + """Returns errors found in PBM status.""" + if type(pbm_status) is bytes: + pbm_status = pbm_status.decode("utf-8") + + try: + error_message = self.retrieve_error_message(json.loads(pbm_status)) + except json.decoder.JSONDecodeError: + # if pbm status doesn't return a parsable dictionary it is an error message + # represented as a string + error_message = pbm_status + + message = None + if "status code: 403" in error_message: + message = "s3 credentials are incorrect." + elif "status code: 404" in error_message: + message = "s3 configurations are incompatible." + elif "status code: 301" in error_message: + message = "s3 configurations are incompatible." + return message diff --git a/lib/charms/mongodb/v0/mongodb_provider.py b/lib/charms/mongodb/v1/mongodb_provider.py similarity index 89% rename from lib/charms/mongodb/v0/mongodb_provider.py rename to lib/charms/mongodb/v1/mongodb_provider.py index b479b1926..00e052b1d 100644 --- a/lib/charms/mongodb/v0/mongodb_provider.py +++ b/lib/charms/mongodb/v1/mongodb_provider.py @@ -14,8 +14,8 @@ from typing import Optional, Set from charms.data_platform_libs.v0.data_interfaces import DatabaseProvides -from charms.mongodb.v0.helpers import generate_password from charms.mongodb.v0.mongodb import MongoDBConfiguration, MongoDBConnection +from charms.mongodb.v1.helpers import generate_password from ops.charm import CharmBase, RelationBrokenEvent, RelationChangedEvent from ops.framework import Object from ops.model import ActiveStatus, BlockedStatus, MaintenanceStatus, Relation @@ -25,11 +25,11 @@ LIBID = "4067879ef7dd4261bf6c164bc29d94b1" # Increment this major API version when introducing breaking changes -LIBAPI = 0 +LIBAPI = 1 # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 5 +LIBPATCH = 2 logger = logging.getLogger(__name__) REL_NAME = "database" @@ -62,8 +62,13 @@ def __init__(self, charm: CharmBase, substrate="k8s", relation_name: str = "data """ self.relation_name = relation_name self.substrate = substrate + self.charm = charm super().__init__(charm, self.relation_name) + self.framework.observe( + charm.on[self.relation_name].relation_departed, + self.charm.check_relation_broken_or_scale_down, + ) self.framework.observe( charm.on[self.relation_name].relation_broken, self._on_relation_event ) @@ -71,14 +76,34 @@ def __init__(self, charm: CharmBase, substrate="k8s", relation_name: str = "data charm.on[self.relation_name].relation_changed, self._on_relation_event ) - self.charm = charm - # Charm events defined in the database provides charm library. self.database_provides = DatabaseProvides(self.charm, relation_name=self.relation_name) self.framework.observe( self.database_provides.on.database_requested, self._on_relation_event ) + def pass_hook_checks(self) -> bool: + """Runs the pre-hooks checks for MongoDBProvider, returns True if all pass.""" + if not self.charm.is_relation_feasible(self.relation_name): + logger.info("Skipping code for relations.") + return False + + # legacy relations have auth disabled, which new relations require + if self.model.get_relation(LEGACY_REL_NAME): + self.charm.unit.status = BlockedStatus("cannot have both legacy and new relations") + logger.error("Auth disabled due to existing connections to legacy relations") + return False + + if not self.charm.unit.is_leader(): + return False + + # We shouldn't try to create or update users if the database is not + # initialised. We will create users as part of initialisation. + if not self.charm.db_initialised: + return False + + return True + def _on_relation_event(self, event): """Handle relation joined events. @@ -87,17 +112,8 @@ def _on_relation_event(self, event): data. As a result, related charm gets credentials for accessing the MongoDB database. """ - if not self.charm.unit.is_leader(): - return - # We shouldn't try to create or update users if the database is not - # initialised. We will create users as part of initialisation. - if "db_initialised" not in self.charm.app_peer_data: - return - - # legacy relations have auth disabled, which new relations require - if self.model.get_relation(LEGACY_REL_NAME): - self.charm.unit.status = BlockedStatus("cannot have both legacy and new relations") - logger.error("Auth disabled due to existing connections to legacy relations") + if not self.pass_hook_checks(): + logger.info("Skipping %s: hook checks did not pass", type(event)) return # If auth is disabled but there are no legacy relation users, this means that legacy @@ -110,7 +126,25 @@ def _on_relation_event(self, event): departed_relation_id = None if type(event) is RelationBrokenEvent: + # Only relation_deparated events can check if scaling down departed_relation_id = event.relation.id + if not self.charm.has_departed_run(departed_relation_id): + logger.info( + "Deferring, must wait for relation departed hook to decide if relation should be removed." + ) + event.defer() + return + + # check if were scaling down and add a log message + if self.charm.is_scaling_down(departed_relation_id): + logger.info( + "Relation broken event occurring due to scale down, do not proceed to remove users." + ) + return + + logger.info( + "Relation broken event occurring due to relation removal, proceed to remove user." + ) try: self.oversee_users(departed_relation_id, event) @@ -186,6 +220,9 @@ def _diff(self, event: RelationChangedEvent) -> Diff: a Diff instance containing the added, deleted and changed keys from the event relation databag. """ + if not isinstance(event, RelationChangedEvent): + logger.info("Cannot compute diff of event type: %s", type(event)) + return # TODO import marvelous unit tests in a future PR # Retrieve the old data from the data key in the application relation databag. old_data = json.loads(event.relation.data[self.charm.model.app].get("data", "{}")) diff --git a/lib/charms/mongodb/v0/users.py b/lib/charms/mongodb/v1/users.py similarity index 90% rename from lib/charms/mongodb/v0/users.py rename to lib/charms/mongodb/v1/users.py index d48bbf61c..57883d730 100644 --- a/lib/charms/mongodb/v0/users.py +++ b/lib/charms/mongodb/v1/users.py @@ -1,17 +1,17 @@ """Users configuration for MongoDB.""" # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. -from typing import List +from typing import Set # The unique Charmhub library identifier, never change it LIBID = "b74007eda21c453a89e4dcc6382aa2b3" # Increment this major API version when introducing breaking changes -LIBAPI = 0 +LIBAPI = 1 # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 1 +LIBPATCH = 0 class MongoDBUser: @@ -37,7 +37,7 @@ def get_database_name(self) -> str: """Returns the database of the user.""" return self._database_name - def get_roles(self) -> List[str]: + def get_roles(self) -> Set[str]: """Returns the role of the user.""" return self._roles @@ -81,14 +81,16 @@ class _MonitorUser(MongoDBUser): _username = "monitor" _password_key_name = f"{_username}-password" - _database_name = "" + _database_name = "admin" _roles = ["monitor"] _privileges = { "resource": {"db": "", "collection": ""}, "actions": ["listIndexes", "listCollections", "dbStats", "dbHash", "collStats", "find"], } _mongodb_role = "explainRole" - _hosts = [] + _hosts = [ + "127.0.0.1" + ] # MongoDB Exporter can only connect to one replica - not the entire set. class _BackupUser(MongoDBUser): @@ -108,4 +110,8 @@ class _BackupUser(MongoDBUser): BackupUser = _BackupUser() # List of system usernames needed for correct work on the charm. -CHARM_USERS = [OperatorUser.get_username(), BackupUser.get_username(), MonitorUser.get_username()] +CHARM_USERS = [ + OperatorUser.get_username(), + BackupUser.get_username(), + MonitorUser.get_username(), +] diff --git a/lib/charms/prometheus_k8s/v0/prometheus_scrape.py b/lib/charms/prometheus_k8s/v0/prometheus_scrape.py index 5e74edde0..e4297aa1c 100644 --- a/lib/charms/prometheus_k8s/v0/prometheus_scrape.py +++ b/lib/charms/prometheus_k8s/v0/prometheus_scrape.py @@ -18,13 +18,6 @@ Source code can be found on GitHub at: https://github.com/canonical/prometheus-k8s-operator/tree/main/lib/charms/prometheus_k8s -## Dependencies - -Using this library requires you to fetch the juju_topology library from -[observability-libs](https://charmhub.io/observability-libs/libraries/juju_topology). - -`charmcraft fetch-lib charms.observability_libs.v0.juju_topology` - ## Provider Library Usage This Prometheus charm interacts with its scrape targets using its @@ -343,12 +336,11 @@ def _on_scrape_targets_changed(self, event): from collections import defaultdict from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Tuple, Union -from urllib.error import HTTPError, URLError from urllib.parse import urlparse -from urllib.request import urlopen import yaml -from charms.observability_libs.v0.juju_topology import JujuTopology +from cosl import JujuTopology +from cosl.rules import AlertRules from ops.charm import CharmBase, RelationRole from ops.framework import ( BoundEvent, @@ -370,7 +362,9 @@ def _on_scrape_targets_changed(self, event): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 36 +LIBPATCH = 42 + +PYDEPS = ["cosl"] logger = logging.getLogger(__name__) @@ -391,6 +385,7 @@ def _on_scrape_targets_changed(self, event): "scheme", "basic_auth", "tls_config", + "authorization", } DEFAULT_JOB = { "metrics_path": "/metrics", @@ -601,15 +596,22 @@ def render_alertmanager_static_configs(alertmanagers: List[str]): # Create a mapping from paths to netlocs # Group alertmanager targets into a dictionary of lists: # {path: [netloc1, netloc2]} - paths = defaultdict(list) # type: Dict[str, List[str]] + paths = defaultdict(list) # type: Dict[Tuple[str, str], List[str]] for parsed in map(urlparse, sanitized): path = parsed.path or "/" - paths[path].append(parsed.netloc) + paths[(parsed.scheme, path)].append(parsed.netloc) return { "alertmanagers": [ - {"path_prefix": path_prefix, "static_configs": [{"targets": netlocs}]} - for path_prefix, netlocs in paths.items() + { + # For https we still do not render a `tls_config` section because + # certs are expected to be made available by the charm via the + # `update-ca-certificates` mechanism. + "scheme": scheme, + "path_prefix": path_prefix, + "static_configs": [{"targets": netlocs}], + } + for (scheme, path_prefix), netlocs in paths.items() ] } @@ -830,206 +832,6 @@ def _is_single_alert_rule_format(rules_dict: dict) -> bool: return set(rules_dict) >= {"alert", "expr"} -class AlertRules: - """Utility class for amalgamating prometheus alert rule files and injecting juju topology. - - An `AlertRules` object supports aggregating alert rules from files and directories in both - official and single rule file formats using the `add_path()` method. All the alert rules - read are annotated with Juju topology labels and amalgamated into a single data structure - in the form of a Python dictionary using the `as_dict()` method. Such a dictionary can be - easily dumped into JSON format and exchanged over relation data. The dictionary can also - be dumped into YAML format and written directly into an alert rules file that is read by - Prometheus. Note that multiple `AlertRules` objects must not be written into the same file, - since Prometheus allows only a single list of alert rule groups per alert rules file. - - The official Prometheus format is a YAML file conforming to the Prometheus documentation - (https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/). - The custom single rule format is a subsection of the official YAML, having a single alert - rule, effectively "one alert per file". - """ - - # This class uses the following terminology for the various parts of a rule file: - # - alert rules file: the entire groups[] yaml, including the "groups:" key. - # - alert groups (plural): the list of groups[] (a list, i.e. no "groups:" key) - it is a list - # of dictionaries that have the "name" and "rules" keys. - # - alert group (singular): a single dictionary that has the "name" and "rules" keys. - # - alert rules (plural): all the alerts in a given alert group - a list of dictionaries with - # the "alert" and "expr" keys. - # - alert rule (singular): a single dictionary that has the "alert" and "expr" keys. - - def __init__(self, topology: Optional[JujuTopology] = None): - """Build and alert rule object. - - Args: - topology: an optional `JujuTopology` instance that is used to annotate all alert rules. - """ - self.topology = topology - self.tool = CosTool(None) - self.alert_groups = [] # type: List[dict] - - def _from_file(self, root_path: Path, file_path: Path) -> List[dict]: - """Read a rules file from path, injecting juju topology. - - Args: - root_path: full path to the root rules folder (used only for generating group name) - file_path: full path to a *.rule file. - - Returns: - A list of dictionaries representing the rules file, if file is valid (the structure is - formed by `yaml.safe_load` of the file); an empty list otherwise. - """ - with file_path.open() as rf: - # Load a list of rules from file then add labels and filters - try: - rule_file = yaml.safe_load(rf) - - except Exception as e: - logger.error("Failed to read alert rules from %s: %s", file_path.name, e) - return [] - - if not rule_file: - logger.warning("Empty rules file: %s", file_path.name) - return [] - if not isinstance(rule_file, dict): - logger.error("Invalid rules file (must be a dict): %s", file_path.name) - return [] - if _is_official_alert_rule_format(rule_file): - alert_groups = rule_file["groups"] - elif _is_single_alert_rule_format(rule_file): - # convert to list of alert groups - # group name is made up from the file name - alert_groups = [{"name": file_path.stem, "rules": [rule_file]}] - else: - # invalid/unsupported - logger.error("Invalid rules file: %s", file_path.name) - return [] - - # update rules with additional metadata - for alert_group in alert_groups: - # update group name with topology and sub-path - alert_group["name"] = self._group_name( - str(root_path), - str(file_path), - alert_group["name"], - ) - - # add "juju_" topology labels - for alert_rule in alert_group["rules"]: - if "labels" not in alert_rule: - alert_rule["labels"] = {} - - if self.topology: - alert_rule["labels"].update(self.topology.label_matcher_dict) - # insert juju topology filters into a prometheus alert rule - alert_rule["expr"] = self.tool.inject_label_matchers( - re.sub(r"%%juju_topology%%,?", "", alert_rule["expr"]), - self.topology.label_matcher_dict, - ) - - return alert_groups - - def _group_name(self, root_path: str, file_path: str, group_name: str) -> str: - """Generate group name from path and topology. - - The group name is made up of the relative path between the root dir_path, the file path, - and topology identifier. - - Args: - root_path: path to the root rules dir. - file_path: path to rule file. - group_name: original group name to keep as part of the new augmented group name - - Returns: - New group name, augmented by juju topology and relative path. - """ - rel_path = os.path.relpath(os.path.dirname(file_path), root_path) - rel_path = "" if rel_path == "." else rel_path.replace(os.path.sep, "_") - - # Generate group name: - # - name, from juju topology - # - suffix, from the relative path of the rule file; - group_name_parts = [self.topology.identifier] if self.topology else [] - group_name_parts.extend([rel_path, group_name, "alerts"]) - # filter to remove empty strings - return "_".join(filter(None, group_name_parts)) - - @classmethod - def _multi_suffix_glob( - cls, dir_path: Path, suffixes: List[str], recursive: bool = True - ) -> list: - """Helper function for getting all files in a directory that have a matching suffix. - - Args: - dir_path: path to the directory to glob from. - suffixes: list of suffixes to include in the glob (items should begin with a period). - recursive: a flag indicating whether a glob is recursive (nested) or not. - - Returns: - List of files in `dir_path` that have one of the suffixes specified in `suffixes`. - """ - all_files_in_dir = dir_path.glob("**/*" if recursive else "*") - return list(filter(lambda f: f.is_file() and f.suffix in suffixes, all_files_in_dir)) - - def _from_dir(self, dir_path: Path, recursive: bool) -> List[dict]: - """Read all rule files in a directory. - - All rules from files for the same directory are loaded into a single - group. The generated name of this group includes juju topology. - By default, only the top directory is scanned; for nested scanning, pass `recursive=True`. - - Args: - dir_path: directory containing *.rule files (alert rules without groups). - recursive: flag indicating whether to scan for rule files recursively. - - Returns: - a list of dictionaries representing prometheus alert rule groups, each dictionary - representing an alert group (structure determined by `yaml.safe_load`). - """ - alert_groups = [] # type: List[dict] - - # Gather all alerts into a list of groups - for file_path in self._multi_suffix_glob( - dir_path, [".rule", ".rules", ".yml", ".yaml"], recursive - ): - alert_groups_from_file = self._from_file(dir_path, file_path) - if alert_groups_from_file: - logger.debug("Reading alert rule from %s", file_path) - alert_groups.extend(alert_groups_from_file) - - return alert_groups - - def add_path(self, path: str, *, recursive: bool = False) -> None: - """Add rules from a dir path. - - All rules from files are aggregated into a data structure representing a single rule file. - All group names are augmented with juju topology. - - Args: - path: either a rules file or a dir of rules files. - recursive: whether to read files recursively or not (no impact if `path` is a file). - - Returns: - True if path was added else False. - """ - path = Path(path) # type: Path - if path.is_dir(): - self.alert_groups.extend(self._from_dir(path, recursive)) - elif path.is_file(): - self.alert_groups.extend(self._from_file(path.parent, path)) - else: - logger.debug("Alert rules path does not exist: %s", path) - - def as_dict(self) -> dict: - """Return standard alert rules file in dict representation. - - Returns: - a dictionary containing a single list of alert rule groups. - The list of alert rule groups is provided as value of the - "groups" dictionary key. - """ - return {"groups": self.alert_groups} if self.alert_groups else {} - - class TargetsChangedEvent(EventBase): """Event emitted when Prometheus scrape targets change.""" @@ -1320,7 +1122,7 @@ def _inject_alert_expr_labels(self, rules: Dict[str, Any]) -> Dict[str, Any]: # Inject topology and put it back in the list rule["expr"] = self._tool.inject_label_matchers( re.sub(r"%%juju_topology%%,?", "", rule["expr"]), - topology.label_matcher_dict, + topology.alert_expression_dict, ) except KeyError: # Some required JujuTopology key is missing. Just move on. @@ -1352,29 +1154,31 @@ def _static_scrape_config(self, relation) -> list: if not relation.units: return [] - scrape_jobs = json.loads(relation.data[relation.app].get("scrape_jobs", "[]")) + scrape_configs = json.loads(relation.data[relation.app].get("scrape_jobs", "[]")) - if not scrape_jobs: + if not scrape_configs: return [] scrape_metadata = json.loads(relation.data[relation.app].get("scrape_metadata", "{}")) if not scrape_metadata: - return scrape_jobs + return scrape_configs topology = JujuTopology.from_dict(scrape_metadata) job_name_prefix = "juju_{}_prometheus_scrape".format(topology.identifier) - scrape_jobs = PrometheusConfig.prefix_job_names(scrape_jobs, job_name_prefix) - scrape_jobs = PrometheusConfig.sanitize_scrape_configs(scrape_jobs) + scrape_configs = PrometheusConfig.prefix_job_names(scrape_configs, job_name_prefix) + scrape_configs = PrometheusConfig.sanitize_scrape_configs(scrape_configs) hosts = self._relation_hosts(relation) - scrape_jobs = PrometheusConfig.expand_wildcard_targets_into_individual_jobs( - scrape_jobs, hosts, topology + scrape_configs = PrometheusConfig.expand_wildcard_targets_into_individual_jobs( + scrape_configs, hosts, topology ) - return scrape_jobs + # For https scrape targets we still do not render a `tls_config` section because certs + # are expected to be made available by the charm via the `update-ca-certificates` mechanism. + return scrape_configs def _relation_hosts(self, relation: Relation) -> Dict[str, Tuple[str, str]]: """Returns a mapping from unit names to (address, path) tuples, for the given relation.""" @@ -1721,7 +1525,7 @@ def set_scrape_job_spec(self, _=None): if not self._charm.unit.is_leader(): return - alert_rules = AlertRules(topology=self.topology) + alert_rules = AlertRules(query_type="promql", topology=self.topology) alert_rules.add_path(self._alert_rules_path, recursive=True) alert_rules_as_dict = alert_rules.as_dict() @@ -1792,10 +1596,10 @@ def _scrape_jobs(self) -> list: A list of dictionaries, where each dictionary specifies a single scrape job for Prometheus. """ - jobs = self._jobs if self._jobs else [DEFAULT_JOB] + jobs = self._jobs or [] if callable(self._lookaside_jobs): - return jobs + PrometheusConfig.sanitize_scrape_configs(self._lookaside_jobs()) - return jobs + jobs.extend(PrometheusConfig.sanitize_scrape_configs(self._lookaside_jobs())) + return jobs or [DEFAULT_JOB] @property def _scrape_metadata(self) -> dict: @@ -1868,7 +1672,7 @@ def _update_relation_data(self, _): if not self._charm.unit.is_leader(): return - alert_rules = AlertRules() + alert_rules = AlertRules(query_type="promql") alert_rules.add_path(self.dir_path, recursive=self._recursive) alert_rules_as_dict = alert_rules.as_dict() @@ -2248,16 +2052,7 @@ def _static_config_extra_labels(self, target: Dict[str, str]) -> Dict[str, str]: logger.debug("Could not perform DNS lookup for %s", target["hostname"]) dns_name = target["hostname"] extra_info["dns_name"] = dns_name - label_re = re.compile(r'(?P