Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions tests/templates/kuttl/aas-user-info/10-install-opa.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,21 @@ commands:
- script: |
kubectl apply -n $NAMESPACE -f - <<EOF
---
apiVersion: v1
kind: ConfigMap
metadata:
name: test
labels:
opa.stackable.tech/bundle: "true"
data:
test.rego: |
package test

import data.stackable.opa.userinfo.v1 as userinfo

currentUserInfoByUsername := userinfo.userInfoByUsername(input.username)
currentUserInfoById := userinfo.userInfoById(input.id)
---
apiVersion: opa.stackable.tech/v1alpha1
kind: OpaCluster
metadata:
Expand Down
2 changes: 1 addition & 1 deletion tests/templates/kuttl/aas-user-info/30-assert.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ kind: TestAssert
metadata:
name: test-regorule
commands:
- script: kubectl exec -n $NAMESPACE test-regorule-0 -- python /tmp/test-regorule.py -u 'http://test-opa-server-default:8081/v1/data/stackable/opa/userinfo/v1'
- script: kubectl exec -n $NAMESPACE test-regorule-0 -- python /tmp/test-regorule.py -u 'http://test-opa-server-default:8081/v1/data/test'
46 changes: 32 additions & 14 deletions tests/templates/kuttl/aas-user-info/test-regorule.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,65 @@
import json


def assertions(username, response, opa_attribute, expected_groups, expected_attributes={}):
def assertions(
username, response, opa_attribute, expected_groups, expected_attributes={}
):
assert "result" in response
assert opa_attribute in response["result"]
result = response["result"]
assert opa_attribute in result, f"expected {opa_attribute} in {result}"

# repeated the right hand side for better output on error
assert "customAttributes" in response["result"][opa_attribute]
assert "groups" in response["result"][opa_attribute]
assert "id" in response["result"][opa_attribute]
assert "username" in response["result"][opa_attribute]
assert "customAttributes" in result[opa_attribute]
assert "groups" in result[opa_attribute]
assert "id" in result[opa_attribute]
assert "username" in result[opa_attribute]

# todo: split out group assertions
print(f"Testing for {username} in groups {expected_groups}")
groups = sorted(response["result"][opa_attribute]["groups"])
groups = sorted(result[opa_attribute]["groups"])
expected_groups = sorted(expected_groups)
assert groups == expected_groups, f"got {groups}, expected: {expected_groups}"

# todo: split out customAttribute assertions
print(f"Testing for {username} with customAttributes {expected_attributes}")
custom_attributes = response["result"][opa_attribute]["customAttributes"]
assert custom_attributes == expected_attributes, f"got {custom_attributes}, expected: {expected_attributes}"
custom_attributes = result[opa_attribute]["customAttributes"]
assert (
custom_attributes == expected_attributes
), f"got {custom_attributes}, expected: {expected_attributes}"


if __name__ == "__main__":
all_args = argparse.ArgumentParser()
all_args.add_argument("-u", "--url", required=True, help="OPA service url")
args = vars(all_args.parse_args())
params = {'strict-builtin-errors': 'true'}
params = {"strict-builtin-errors": "true"}

def make_request(payload):
return requests.post(args['url'], data=json.dumps(payload), params=params).json()
response = requests.post(args["url"], data=json.dumps(payload), params=params)
expected_status_code = 200
assert (
response.status_code == expected_status_code
), f"got {response.status_code}, expected: {expected_status_code}"
return response.json()

for subject_id in ["alice", "bob"]:
try:
# todo: try this out locally until it works
# url = 'http://test-opa-svc:8081/v1/data'
payload = {'input': {'id': subject_id}}
payload = {"input": {"id": subject_id}}
response = make_request(payload)
assertions(subject_id, response, "currentUserInfoById", [], {"e-mail": f"{subject_id}@example.com", "company": "openid"})
assertions(
subject_id,
response,
"currentUserInfoById",
[],
{"e-mail": f"{subject_id}@example.com", "company": "openid"},
)
except Exception as e:
print(f"exception: {e}")
if response is not None:
print(f"something went wrong. last response: {response}")
print(f"request body: {payload}")
print(f"response body: {response}")
raise e

print("Test successful!")
15 changes: 15 additions & 0 deletions tests/templates/kuttl/keycloak-user-info/10-install-opa.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,21 @@ commands:
- script: |
kubectl apply -n $NAMESPACE -f - <<EOF
---
apiVersion: v1
kind: ConfigMap
metadata:
name: test
labels:
opa.stackable.tech/bundle: "true"
data:
test.rego: |
package test

import data.stackable.opa.userinfo.v1 as userinfo

currentUserInfoByUsername := userinfo.userInfoByUsername(input.username)
currentUserInfoById := userinfo.userInfoById(input.id)
---
apiVersion: opa.stackable.tech/v1alpha1
kind: OpaCluster
metadata:
Expand Down
2 changes: 1 addition & 1 deletion tests/templates/kuttl/keycloak-user-info/30-assert.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ kind: TestAssert
metadata:
name: test-regorule
commands:
- script: kubectl exec -n $NAMESPACE test-regorule-0 -- python /tmp/test-regorule.py -u 'http://test-opa-server-default:8081/v1/data/stackable/opa/userinfo/v1'
- script: kubectl exec -n $NAMESPACE test-regorule-0 -- python /tmp/test-regorule.py -u 'http://test-opa-server-default:8081/v1/data/test'
40 changes: 26 additions & 14 deletions tests/templates/kuttl/keycloak-user-info/test-regorule.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,53 +10,65 @@
}


def assertions(username, response, opa_attribute, expected_groups, expected_attributes={}):
def assertions(
username, response, opa_attribute, expected_groups, expected_attributes={}
):
assert "result" in response
assert opa_attribute in response["result"]
result = response["result"]
assert opa_attribute in result, f"expected {opa_attribute} in {result}"

# repeated the right hand side for better output on error
assert "customAttributes" in response["result"][opa_attribute]
assert "groups" in response["result"][opa_attribute]
assert "id" in response["result"][opa_attribute]
assert "username" in response["result"][opa_attribute]
assert "customAttributes" in result[opa_attribute]
assert "groups" in result[opa_attribute]
assert "id" in result[opa_attribute]
assert "username" in result[opa_attribute]

# todo: split out group assertions
print(f"Testing for {username} in groups {expected_groups}")
groups = sorted(response["result"][opa_attribute]["groups"])
groups = sorted(result[opa_attribute]["groups"])
expected_groups = sorted(expected_groups)
assert groups == expected_groups, f"got {groups}, expected: {expected_groups}"

# todo: split out customAttribute assertions
print(f"Testing for {username} with customAttributes {expected_attributes}")
custom_attributes = response["result"][opa_attribute]["customAttributes"]
assert custom_attributes == expected_attributes, f"got {custom_attributes}, expected: {expected_attributes}"
custom_attributes = result[opa_attribute]["customAttributes"]
assert (
custom_attributes == expected_attributes
), f"got {custom_attributes}, expected: {expected_attributes}"


if __name__ == "__main__":
all_args = argparse.ArgumentParser()
all_args.add_argument("-u", "--url", required=True, help="OPA service url")
args = vars(all_args.parse_args())
params = {'strict-builtin-errors': 'true'}
params = {"strict-builtin-errors": "true"}

def make_request(payload):
return requests.post(args['url'], data=json.dumps(payload), params=params).json()
response = requests.post(args["url"], data=json.dumps(payload), params=params)
expected_status_code = 200
assert (
response.status_code == expected_status_code
), f"got {response.status_code}, expected: {expected_status_code}"
return response.json()

for username, groups in users_and_groups.items():
try:
# todo: try this out locally until it works
# url = 'http://test-opa-svc:8081/v1/data'
payload = {'input': {'username': username}}
payload = {"input": {"username": username}}
response = make_request(payload)
assertions(username, response, "currentUserInfoByUsername", groups, {})

# do the reverse lookup
user_id = response["result"]["currentUserInfoByUsername"]["id"]
payload = {'input': {'id': user_id}}
payload = {"input": {"id": user_id}}
response = make_request(payload)
assertions(username, response, "currentUserInfoById", groups, {})
except Exception as e:
print(f"exception: {e}")
if response is not None:
print(f"something went wrong. last response: {response}")
print(f"request body: {payload}")
print(f"response body: {response}")
raise e

print("Test successful!")
Loading