Skip to content

Commit 18f5901

Browse files
committed
Adapt check pertaining to scs-0116-key-manager
Signed-off-by: Matthias Büchse <[email protected]>
1 parent 6415b7f commit 18f5901

File tree

4 files changed

+132
-2
lines changed

4 files changed

+132
-2
lines changed

Tests/iaas/openstack_test.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
compute_volume_type_lookup, compute_scs_0114_syntax_check, compute_scs_0114_aspect_type
3939
from scs_0115_security_groups.security_groups import \
4040
compute_scs_0115_default_rules
41+
from scs_0116_key_manager.key_manager import \
42+
compute_services_lookup, compute_scs_0116_presence, compute_scs_0116_permissions
4143

4244

4345
logger = logging.getLogger(__name__)
@@ -160,11 +162,20 @@ def make_container(cloud):
160162
c.add_function('scs_0114_replicated_type', lambda c: compute_scs_0114_aspect_type(c.volume_type_lookup, 'replicated'))
161163
c.add_function('volume_types_check', lambda c: all((
162164
c.scs_0114_syntax_check,
165+
# the following is recommended only, but we treat this whole monolithic testcase as recommended
163166
c.scs_0114_encrypted_type, c.scs_0114_replicated_type,
164167
)))
165168
# scs_0115_security_groups
166169
c.add_function('scs_0115_default_rules', lambda c: compute_scs_0115_default_rules(c.conn))
167170
c.add_function('security_groups_default_rules_check', lambda c: c.scs_0115_default_rules)
171+
# scs_0115_key_manager
172+
c.add_function('services_lookup', lambda c: compute_services_lookup(c.conn))
173+
c.add_function('scs_0116_presence', lambda c: compute_scs_0116_presence(c.services_lookup))
174+
c.add_function('scs_0116_permissions', lambda c: compute_scs_0116_permissions(c.conn, c.services_lookup))
175+
c.add_function('key_manager_check', lambda c: all((
176+
# recommended only: c.scs_0116_presence,
177+
c.scs_0116_permissions,
178+
)))
168179
return c
169180

170181

File renamed without changes.
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
from collections import defaultdict
2+
import logging
3+
4+
import openstack
5+
6+
7+
logger = logging.getLogger(__name__)
8+
9+
10+
def fetch_roles(conn: openstack.connection.Connection) -> None:
11+
"""Checks whether the current user has at maximum privileges of the member role.
12+
13+
:param conn: connection to an OpenStack cloud.
14+
:returns: boolean, when role with most privileges is member
15+
"""
16+
role_names = set(conn.session.auth.get_access(conn.session).role_names)
17+
if role_names & {"admin", "manager"}:
18+
return False
19+
if "reader" in role_names:
20+
logger.info("User has reader role.")
21+
custom_roles = sorted(role_names - {"reader", "member"})
22+
if custom_roles:
23+
logger.info(f"User has custom roles {', '.join(custom_roles)}.")
24+
return role_names
25+
26+
27+
def compute_services_lookup(conn: openstack.connection.Connection) -> dict:
28+
try:
29+
services = conn.service_catalog
30+
except Exception:
31+
logger.critical("Could not access Catalog endpoint.")
32+
raise
33+
34+
result = defaultdict(list)
35+
for svc in services:
36+
svc_type = svc["type"]
37+
result[svc_type].append(svc)
38+
return result
39+
40+
41+
def compute_scs_0116_presence(services_lookup):
42+
services = services_lookup.get("key-manager", ())
43+
if not services:
44+
logger.error("key-manager service not found")
45+
return bool(services)
46+
47+
48+
def _find_secrets(conn: openstack.connection.Connection, secret_name_or_id: str) -> list:
49+
"""Replacement method for finding secrets.
50+
51+
Mimicks the behavior of Connection.key_manager.find_secret()
52+
but fixes an issue with the internal implementation raising an
53+
exception due to an unexpected microversion parameter.
54+
Unlike find_secret(), we return a list with all secrets that match.
55+
"""
56+
secrets = conn.key_manager.secrets()
57+
return [s for s in secrets if s.name == secret_name_or_id or s.id == secret_name_or_id]
58+
59+
60+
def _delete_secret(conn: openstack.connection.Connection, secret: openstack.key_manager.v1.secret.Secret):
61+
"""Replacement method for deleting secrets
62+
_delete_secret(connection, secret object)
63+
64+
Workaround for SDK bugs:
65+
- The id field in reality is a href (containg the UUID at the end)
66+
- The delete_secret() function contrary to the documentation does
67+
not accept openstack.key_manager.v1.secret.Secret objects nor the
68+
hrefs, just plain UUIDs.
69+
- It does not return an error when I try to delete a secret passing
70+
an object or href, just silently does nothing.
71+
The code here assumes that the SDK (when fixed) will continue to
72+
accept UUIDs as argument for delete_secret() in the future.
73+
Code is robust against those being passed directly in the .id attr
74+
of the objects. (It would be even more robust to try to pass the
75+
object first, then the href, then the UUID extracted from the href,
76+
each time checking whether it was effective. But that's three delete
77+
plus list calls and very ugly.)
78+
"""
79+
uuid = secret.id.rsplit('/', 1)[-1]
80+
conn.key_manager.delete_secret(uuid)
81+
82+
83+
def compute_scs_0116_permissions(conn: openstack.connection.Connection, services_lookup) -> None:
84+
"""
85+
After checking that the current user only has the member and maybe the
86+
reader role, this method verifies that the user with a member role
87+
has sufficient access to the Key Manager API functionality.
88+
"""
89+
if "member" not in fetch_roles(conn):
90+
raise RuntimeError("Cannot test key-manager permissions. User has wrong roles")
91+
if not services_lookup.get("key-manager", ()):
92+
# this testcase only applies when a key manager is present
93+
return True
94+
secret_name = "scs-member-role-test-secret"
95+
try:
96+
existing_secrets = _find_secrets(conn, secret_name)
97+
for secret in existing_secrets:
98+
_delete_secret(conn, secret)
99+
100+
if existing_secrets:
101+
logger.debug(f'Deleted {len(existing_secrets)} secrets')
102+
103+
conn.key_manager.create_secret(
104+
name=secret_name,
105+
payload_content_type="text/plain",
106+
secret_type="opaque",
107+
payload="foo",
108+
)
109+
try:
110+
new_secret = _find_secrets(conn, secret_name)
111+
if not new_secret:
112+
raise RuntimeError(f"Secret '{secret_name}' was not discoverable by the user")
113+
finally:
114+
_delete_secret(conn, new_secret[0])
115+
except (RuntimeError, openstack.exceptions.ForbiddenException):
116+
logger.debug('exception details', exc_info=True)
117+
logger.error("Unsuccessful at using Key Manager API")
118+
return False
119+
return True

Tests/scs-compatible-iaas.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@ modules:
143143
name: Key manager
144144
url: https://docs.scs.community/standards/scs-0116-v1-key-manager-standard
145145
run:
146-
- executable: ./iaas/key-manager/check-for-key-manager.py
147-
args: --os-cloud {os_cloud} --debug
146+
- executable: ./iaas/openstack_test.py
147+
args: -c {os_cloud} key-manager-check
148148
testcases:
149149
- id: key-manager-check
150150
tags: [mandatory]

0 commit comments

Comments
 (0)