|
27 | 27 | CONFIG_FILE = "config.yml" |
28 | 28 |
|
29 | 29 | class IAMPolicyComplianceChecker: |
| 30 | + |
| 31 | + def is_project_service_account_email(self, email: Optional[str]) -> bool: |
| 32 | + """ |
| 33 | + Returns True if the email is not a service account, or if it is a service account and the email contains the project_id. |
| 34 | + """ |
| 35 | + if email and email.endswith('.gserviceaccount.com'): |
| 36 | + return self.project_id in email |
| 37 | + return True |
| 38 | + |
30 | 39 | def __init__(self, project_id: str, users_file: str, logger: logging.Logger, sending_client: Optional[SendingClient] = None): |
31 | 40 | self.project_id = project_id |
32 | 41 | self.users_file = users_file |
@@ -94,6 +103,10 @@ def _export_project_iam(self) -> List[Dict]: |
94 | 103 | for member_str in binding.members: |
95 | 104 | if member_str not in members_data: |
96 | 105 | username, email_address, member_type = self._parse_member(member_str) |
| 106 | + # Skip service accounts not matching the project_id |
| 107 | + if member_type == "serviceAccount" and not self.is_project_service_account_email(email_address): |
| 108 | + self.logger.debug(f"Skipping service account not matching project_id ({self.project_id}): {email_address}") |
| 109 | + continue |
97 | 110 | if member_type == "unknown": |
98 | 111 | self.logger.warning(f"Skipping member {member_str} with no email address") |
99 | 112 | continue # Skip if no email address is found, probably a malformed member |
@@ -190,8 +203,9 @@ def check_compliance(self) -> List[str]: |
190 | 203 | Returns: |
191 | 204 | A list of strings describing any compliance issues found. |
192 | 205 | """ |
193 | | - current_users = {user['email']: user for user in self._export_project_iam()} |
194 | | - existing_users = {user['email']: user for user in self._read_project_iam_file()} |
| 206 | + |
| 207 | + current_users = {user['email']: user for user in self._export_project_iam() if self.is_project_service_account_email(user.get('email'))} |
| 208 | + existing_users = {user['email']: user for user in self._read_project_iam_file() if self.is_project_service_account_email(user.get('email'))} |
195 | 209 |
|
196 | 210 | if not existing_users: |
197 | 211 | error_msg = f"No IAM policy found in the {self.users_file}." |
|
0 commit comments