Skip to content

Commit 1420594

Browse files
committed
feat: add support for mTLS EventStreams
This depends on - Gateway PR which adds mTLS Routes - Platform collection which can be used by the installer - Installer changes to create a mTLS route
1 parent b6c03d4 commit 1420594

File tree

5 files changed

+292
-2
lines changed

5 files changed

+292
-2
lines changed

src/aap_eda/api/views/eda_credential.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ def partial_update(self, request, pk):
203203
setattr(eda_credential, key, value)
204204

205205
with transaction.atomic():
206+
eda_credential._request = request
206207
eda_credential.save()
207208
check_related_permissions(
208209
request.user,

src/aap_eda/api/views/event_stream.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@
3030

3131
from aap_eda.api import exceptions as api_exc, filters, serializers
3232
from aap_eda.core import models
33-
from aap_eda.core.enums import ResourceType
33+
from aap_eda.core.enums import EventStreamAuthType, ResourceType
34+
from aap_eda.core.exceptions import GatewayAPIError, MissingCredentials
3435
from aap_eda.core.utils import logging_utils
36+
from aap_eda.services.sync_certs import SyncCertificates
3537

3638
logger = logging.getLogger(__name__)
3739

@@ -110,6 +112,7 @@ def destroy(self, request, *args, **kwargs):
110112
f"Event stream '{event_stream.name}' is being referenced by "
111113
f"{ref_count} activation(s) and cannot be deleted"
112114
)
115+
self._sync_certificates(event_stream, "destroy")
113116
self.perform_destroy(event_stream)
114117

115118
logger.info(
@@ -182,6 +185,7 @@ def create(self, request, *args, **kwargs):
182185
RoleDefinition.objects.give_creator_permissions(
183186
request.user, serializer.instance
184187
)
188+
self._sync_certificates(response, "create")
185189

186190
logger.info(
187191
logging_utils.generate_simple_audit_log(
@@ -307,3 +311,21 @@ def activations(self, request, id):
307311
)
308312
)
309313
return self.get_paginated_response(serializer.data)
314+
315+
def _sync_certificates(
316+
self,
317+
event_stream: models.EventStream,
318+
action: str,
319+
):
320+
if (
321+
event_stream.eda_credential.credential_type.kind
322+
== EventStreamAuthType.MTLS
323+
):
324+
try:
325+
obj = SyncCertificates(event_stream.eda_credential.id)
326+
if action == "destroy":
327+
obj.delete(event_stream.id)
328+
else:
329+
obj.update()
330+
except (GatewayAPIError, MissingCredentials) as ex:
331+
logger.error("Could not %s certificates %s", action, str(ex))

src/aap_eda/core/exceptions.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,11 @@ class InvalidEnvKeyError(Exception):
4747

4848
class GracefulExit(Exception):
4949
pass
50+
51+
52+
class GatewayAPIError(Exception):
53+
pass
54+
55+
56+
class MissingCredentials(Exception):
57+
pass

src/aap_eda/core/management/commands/create_initial_data.py

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
AUTH_TYPE_LABEL = "Event Stream Authentication Type"
4848
SIGNATURE_ENCODING_LABEL = "Signature Encoding"
4949
HTTP_HEADER_LABEL = "HTTP Header Key"
50-
DEPRECATED_CREDENTIAL_KINDS = ["mtls"]
50+
DEPRECATED_CREDENTIAL_KINDS = [""]
5151
# FIXME(cutwater): Role descriptions were taken from the RBAC design document
5252
# and must be updated.
5353
ORG_ROLES = [
@@ -1041,6 +1041,53 @@
10411041
"template.postgres_sslkey": "{{ postgres_sslkey }}",
10421042
},
10431043
}
1044+
1045+
EVENT_STREAM_MTLS_INPUTS = {
1046+
"fields": [
1047+
{
1048+
"id": "auth_type",
1049+
"label": AUTH_TYPE_LABEL,
1050+
"type": "string",
1051+
"default": "mtls",
1052+
"hidden": True,
1053+
},
1054+
{
1055+
"id": "certificate",
1056+
"label": "Certificate",
1057+
"type": "string",
1058+
"multiline": True,
1059+
"help_text": (
1060+
"The Certificate collection in PEM format. You can have "
1061+
"multiple certificates in this field separated by "
1062+
"-----BEGIN CERTIFICATE----- "
1063+
"and ending in -----END CERTIFICATE-----"
1064+
"If a certificate is provided it will be transferred "
1065+
"to the Gateway, otherwise its assumed that the Gateway "
1066+
"already has the CA certificates in place to validate "
1067+
"the incoming client certificate."
1068+
),
1069+
},
1070+
{
1071+
"id": "subject",
1072+
"label": "Certificate Subject",
1073+
"type": "string",
1074+
"help_text": (
1075+
"The Subject from Certificate compliant with RFC 2253."
1076+
"This is optional and can be used to check the subject "
1077+
"defined in the certificate."
1078+
),
1079+
},
1080+
{
1081+
"id": "http_header_key",
1082+
"label": HTTP_HEADER_LABEL,
1083+
"type": "string",
1084+
"default": "Subject",
1085+
"hidden": True,
1086+
},
1087+
],
1088+
"required": ["auth_type", "http_header_key"],
1089+
}
1090+
10441091
CREDENTIAL_TYPES = [
10451092
{
10461093
"name": enums.DefaultCredentialType.SOURCE_CONTROL,
@@ -1244,6 +1291,21 @@
12441291
"Credential for analytics that use for authentication."
12451292
),
12461293
},
1294+
{
1295+
"name": enums.EventStreamCredentialType.MTLS,
1296+
"namespace": "event_stream",
1297+
"kind": "mtls",
1298+
"inputs": EVENT_STREAM_MTLS_INPUTS,
1299+
"injectors": {},
1300+
"managed": True,
1301+
"description": (
1302+
"Credential for Event Streams that use mutual TLS. "
1303+
"If CA Certificates are defined in the UI it will "
1304+
"be transferred to the Gateway proxy for validation "
1305+
"of incoming requests. We can optionally validate the "
1306+
"Subject defined in the inbound Certificate."
1307+
),
1308+
},
12471309
]
12481310

12491311

src/aap_eda/services/sync_certs.py

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
# Copyright 2025 Red Hat, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Synchronize Certificates with Gateway."""
15+
import hashlib
16+
import logging
17+
from typing import Optional
18+
from urllib.parse import urljoin
19+
20+
import requests
21+
import yaml
22+
from ansible_base.resource_registry import resource_server
23+
from django.conf import settings
24+
from django.db.models.signals import post_save
25+
from django.dispatch import receiver
26+
from rest_framework import status
27+
28+
from aap_eda.core import enums, models
29+
from aap_eda.core.exceptions import GatewayAPIError, MissingCredentials
30+
31+
LOGGER = logging.getLogger(__name__)
32+
SLUG = "api/gateway/v1/ca_certificates/"
33+
DEFAULT_TIMEOUT = 30
34+
SERVICE_TOKEN_HEADER = "X-ANSIBLE-SERVICE-AUTH"
35+
36+
37+
class SyncCertificates:
38+
"""This class synchronizes the certificates with Gateway."""
39+
40+
def __init__(self, eda_credential_id: int):
41+
self.eda_credential_id = eda_credential_id
42+
self.gateway_url = settings.RESOURCE_SERVER["URL"]
43+
self.gateway_ssl_verify = settings.RESOURCE_SERVER.get(
44+
"VALIDATE_HTTPS", True
45+
)
46+
47+
self.eda_credential = models.EdaCredential.objects.get(
48+
id=self.eda_credential_id
49+
)
50+
51+
def update(self):
52+
"""Handle creating and updating the certificate in Gateway."""
53+
inputs = yaml.safe_load(self.eda_credential.inputs.get_secret_value())
54+
existing_object = self._fetch_from_gateway()
55+
56+
# If the user had a certificate and then they deleted it
57+
# remove it from Gateway
58+
if existing_object and not inputs["certificate"]:
59+
return self.delete()
60+
61+
# If the user has not provided any certificate nothing to do
62+
if not inputs["certificate"]:
63+
return
64+
65+
sha256 = hashlib.sha256(
66+
inputs["certificate"].encode("utf-8")
67+
).hexdigest()
68+
69+
if existing_object.get("sha256", "") != sha256:
70+
data = {
71+
"name": self.eda_credential.name,
72+
"pem_data": inputs["certificate"],
73+
"sha256": sha256,
74+
"eda_credential_id": self._get_remote_id(),
75+
}
76+
headers = self._prep_headers()
77+
if existing_object:
78+
slug = f"{SLUG}/{existing_object['id']}/"
79+
url = urljoin(self.gateway_url, slug)
80+
response = requests.patch(
81+
url,
82+
json=data,
83+
headers=headers,
84+
verify=self.gateway_ssl_verify,
85+
timeout=DEFAULT_TIMEOUT,
86+
)
87+
else:
88+
url = urljoin(self.gateway_url, SLUG)
89+
response = requests.post(
90+
url,
91+
json=data,
92+
headers=headers,
93+
verify=self.gateway_ssl_verify,
94+
timeout=DEFAULT_TIMEOUT,
95+
)
96+
97+
if response.status_code in [
98+
status.HTTP_200_OK,
99+
status.HTTP_201_CREATED,
100+
]:
101+
LOGGER.debug("Certificate updated")
102+
elif response.status_code == status.HTTP_400_BAD_REQUEST:
103+
LOGGER.error("Update failed")
104+
else:
105+
LOGGER.error("Couldn't update certificate")
106+
107+
else:
108+
LOGGER.debug("No changes detected")
109+
110+
def delete(self, event_stream_id: Optional[int]):
111+
"""Delete the Certificate from Gateway."""
112+
existing_object = self._fetch_from_gateway()
113+
if not existing_object:
114+
return
115+
116+
objects = models.EventStream.objects.filter(
117+
eda_credential_id=self.eda_credential
118+
)
119+
if not event_stream_id:
120+
self._delete_from_gateway(existing_object)
121+
elif len(objects) == 1 and event_stream_id == objects[0].id:
122+
self._delete_from_gateway(existing_object)
123+
124+
def _delete_from_gateway(self, existing_object: dict):
125+
slug = f"{SLUG}/{existing_object['id']}/"
126+
url = urljoin(self.gateway_url, slug)
127+
headers = self._prep_headers()
128+
response = requests.delete(
129+
url,
130+
headers=headers,
131+
verify=self.gateway_ssl_verify,
132+
timeout=DEFAULT_TIMEOUT,
133+
)
134+
if response.status_code == status.HTTP_200_OK:
135+
LOGGER.debug("Certificate object deleted")
136+
if response.status_code == status.HTTP_404_NOT_FOUND:
137+
LOGGER.warning("Certificate object missing during delete")
138+
else:
139+
LOGGER.error("Couldn't delete certificate object in gateway")
140+
raise GatewayAPIError
141+
142+
def _fetch_from_gateway(self):
143+
slug = f"{SLUG}/?eda_credential_id={self._get_remote_id()}"
144+
url = urljoin(self.gateway_url, slug)
145+
headers = self._prep_headers()
146+
response = requests.get(
147+
url,
148+
headers=headers,
149+
verify=self.gateway_ssl_verify,
150+
timeout=DEFAULT_TIMEOUT,
151+
)
152+
153+
if response.status_code == status.HTTP_200_OK:
154+
LOGGER.debug("Certificate object exists in gateway")
155+
data = response.json()
156+
if data["count"] > 0:
157+
return data["results"][0]
158+
else:
159+
return {}
160+
if response.status_code == status.HTTP_404_NOT_FOUND:
161+
LOGGER.debug("Certificate object does not exist in gateway")
162+
return {}
163+
164+
LOGGER.error("Error fetching certificate object")
165+
raise GatewayAPIError
166+
167+
def _get_remote_id(self) -> str:
168+
return f"eda_{self.eda_credential_id}"
169+
170+
def _prep_headers(self) -> dict:
171+
token = resource_server.get_service_token()
172+
if token:
173+
return {SERVICE_TOKEN_HEADER: token}
174+
175+
LOGGER.error("Cannot connect to gateway service token")
176+
raise MissingCredentials
177+
178+
179+
@receiver(post_save, sender=models.EdaCredential)
180+
def gw_handler(sender, instance, **kwargs):
181+
"""Handle updates to EdaCredential object and force a certificate sync."""
182+
if (
183+
instance.credential_type is not None
184+
and instance.credential_type.name
185+
== enums.EventStreamCredentialType.MTLS
186+
and hasattr(instance, "_request")
187+
):
188+
try:
189+
objects = models.EventStream.objects.filter(
190+
eda_credential_id=instance.id
191+
)
192+
if len(objects) > 0:
193+
SyncCertificates(instance.id).update()
194+
except (GatewayAPIError, MissingCredentials) as ex:
195+
LOGGER.error(
196+
"Couldn't trigger gateway certificate updates %s", str(ex)
197+
)

0 commit comments

Comments
 (0)