Skip to content

Commit 5a4a443

Browse files
committed
add e2e test for pubsub.
Signed-off-by: Morven Cao <lcao@redhat.com>
1 parent 0c2c7c6 commit 5a4a443

File tree

11 files changed

+369
-1
lines changed

11 files changed

+369
-1
lines changed

.github/workflows/e2e.yml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,33 @@ jobs:
128128
SERVER_REPLICAS: 2
129129
MESSAGE_DRIVER_TYPE: grpc
130130
ENABLE_MAESTRO_TLS: true
131+
e2e-pubsub:
132+
runs-on: ubuntu-22.04
133+
steps:
134+
- name: Check initial disk usage
135+
run: df -h /
136+
- name: Free disk space (optional cleanup)
137+
run: |
138+
sudo rm -rf /usr/local/lib/android
139+
sudo rm -rf /opt/hostedtoolcache
140+
docker system prune -af
141+
df -h /
142+
- name: Checkout
143+
uses: actions/checkout@v4
144+
- name: Setup Go
145+
uses: actions/setup-go@v5
146+
with:
147+
go-version: ${{ env.GO_VERSION }}
148+
- name: install ginkgo
149+
run: go install github.com/onsi/ginkgo/v2/ginkgo@v2.15.0
150+
- name: Test E2E
151+
run: |
152+
make e2e-test
153+
env:
154+
container_tool: docker
155+
SERVER_REPLICAS: 2
156+
MESSAGE_DRIVER_TYPE: pubsub
157+
ENABLE_MAESTRO_TLS: true
131158
upgrade:
132159
runs-on: ubuntu-22.04
133160
steps:
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
{{- if eq .Values.messageBroker.type "pubsub" }}
2+
apiVersion: batch/v1
3+
kind: Job
4+
metadata:
5+
name: {{ include "maestro-agent.fullname" . }}-pubsub-init
6+
namespace: {{ .Release.Namespace }}
7+
labels:
8+
{{- include "maestro-agent.labels" . | nindent 4 }}
9+
app.kubernetes.io/component: pubsub-init
10+
annotations:
11+
"helm.sh/hook": pre-install,pre-upgrade
12+
"helm.sh/hook-weight": "-5"
13+
"helm.sh/hook-delete-policy": before-hook-creation,hook-succeeded
14+
spec:
15+
template:
16+
metadata:
17+
labels:
18+
{{- include "maestro-agent.selectorLabels" . | nindent 8 }}
19+
app.kubernetes.io/component: pubsub-init
20+
spec:
21+
restartPolicy: Never
22+
containers:
23+
- name: init
24+
image: registry.access.redhat.com/ubi9/python-311
25+
imagePullPolicy: IfNotPresent
26+
env:
27+
- name: PUBSUB_EMULATOR_HOST
28+
value: "{{ .Values.messageBroker.pubsub.endpoint }}"
29+
- name: PUBSUB_PROJECT_ID
30+
value: "{{ .Values.messageBroker.pubsub.projectID }}"
31+
- name: CONSUMER_NAME
32+
value: "{{ .Values.consumerName }}"
33+
command:
34+
- /bin/bash
35+
- -c
36+
- |
37+
set -e
38+
echo "Installing google-cloud-pubsub..."
39+
pip install --quiet google-cloud-pubsub==2.34.0
40+
41+
echo "Initializing Pub/Sub agent subscriptions for consumer: ${CONSUMER_NAME}..."
42+
python3 <<'PYTHON'
43+
import os
44+
from google.cloud import pubsub_v1
45+
from google.api_core import exceptions
46+
47+
project_id = os.environ['PUBSUB_PROJECT_ID']
48+
consumer_name = os.environ['CONSUMER_NAME']
49+
50+
subscriber = pubsub_v1.SubscriberClient()
51+
publisher = pubsub_v1.PublisherClient()
52+
53+
# Extract topic names from full resource paths
54+
# If using full paths like "projects/my-project/topics/sourceevents"
55+
# we need to extract just the topic name
56+
def extract_topic_name(topic_path):
57+
if topic_path.startswith('projects/'):
58+
# Full resource path: projects/PROJECT_ID/topics/TOPIC_NAME
59+
return topic_path.split('/')[-1]
60+
return topic_path
61+
62+
subscriptions = [
63+
(
64+
f'sourceevents-{consumer_name}',
65+
'sourceevents',
66+
f'attributes.ce-clustername="{consumer_name}"'
67+
),
68+
(
69+
f'sourcebroadcast-{consumer_name}',
70+
'sourcebroadcast',
71+
''
72+
)
73+
]
74+
75+
for sub_name, topic_name, filter_expr in subscriptions:
76+
subscription_path = subscriber.subscription_path(project_id, sub_name)
77+
topic_path = publisher.topic_path(project_id, topic_name)
78+
79+
try:
80+
if filter_expr:
81+
subscriber.create_subscription(
82+
request={
83+
"name": subscription_path,
84+
"topic": topic_path,
85+
"filter": filter_expr
86+
}
87+
)
88+
print(f"Created subscription: {subscription_path} with filter: {filter_expr}")
89+
else:
90+
subscriber.create_subscription(
91+
request={
92+
"name": subscription_path,
93+
"topic": topic_path
94+
}
95+
)
96+
print(f"Created subscription: {subscription_path}")
97+
except exceptions.AlreadyExists:
98+
print(f"Subscription already exists: {subscription_path}")
99+
except Exception as e:
100+
print(f"Error creating subscription {subscription_path}: {e}")
101+
raise
102+
103+
print(f"Pub/Sub agent subscriptions for '{consumer_name}' initialized successfully!")
104+
PYTHON
105+
{{- end }}

charts/maestro-agent/templates/secret.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ stringData:
6262
{{- if .Values.messageBroker.pubsub.projectID }}
6363
projectID: {{ .Values.messageBroker.pubsub.projectID }}
6464
{{- end }}
65+
{{- if .Values.messageBroker.pubsub.endpoint }}
66+
endpoint: {{ .Values.messageBroker.pubsub.endpoint }}
67+
{{- end }}
68+
disableTLS: {{ .Values.messageBroker.pubsub.disableTLS }}
6569
{{- if .Values.messageBroker.pubsub.credentialsJSON }}
6670
credentialsFile: {{ .Values.messageBroker.pubsub.credentialsFile }}
6771
{{- end }}

charts/maestro-agent/values.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ messageBroker:
5858
# Google Cloud Pub/Sub configuration
5959
pubsub:
6060
projectID: ""
61+
endpoint: ""
62+
disableTLS: false
6163
credentialsJSON: ""
6264
credentialsFile: /secrets/pubsub/credentials.json
6365
# Topics configuration (use full GCP resource names)
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
{{- if .Values.pubsubEmulator.enabled -}}
2+
---
3+
apiVersion: v1
4+
kind: Service
5+
metadata:
6+
name: {{ .Values.pubsubEmulator.service.name }}
7+
labels:
8+
app.kubernetes.io/component: pubsub-emulator
9+
annotations:
10+
template.openshift.io/expose-uri: "tcp://{.spec.clusterIP}:{.spec.ports[?(.name=='pubsub')].port}"
11+
spec:
12+
type: ClusterIP
13+
ports:
14+
- name: pubsub
15+
protocol: TCP
16+
port: {{ .Values.pubsubEmulator.service.port }}
17+
targetPort: 8085
18+
selector:
19+
app.kubernetes.io/component: pubsub-emulator
20+
---
21+
apiVersion: apps/v1
22+
kind: Deployment
23+
metadata:
24+
name: {{ .Values.pubsubEmulator.service.name }}
25+
labels:
26+
app.kubernetes.io/component: pubsub-emulator
27+
spec:
28+
replicas: 1
29+
strategy:
30+
type: Recreate
31+
selector:
32+
matchLabels:
33+
app.kubernetes.io/component: pubsub-emulator
34+
template:
35+
metadata:
36+
labels:
37+
app.kubernetes.io/component: pubsub-emulator
38+
spec:
39+
serviceAccountName: {{ .Values.serviceAccount.name }}
40+
containers:
41+
- name: pubsub-emulator
42+
image: {{ .Values.pubsubEmulator.image }}
43+
imagePullPolicy: IfNotPresent
44+
command:
45+
- gcloud
46+
- beta
47+
- emulators
48+
- pubsub
49+
- start
50+
- --host-port=0.0.0.0:8085
51+
- --project={{ .Values.pubsubEmulator.projectID }}
52+
ports:
53+
- containerPort: 8085
54+
name: pubsub
55+
protocol: TCP
56+
volumeMounts:
57+
- name: pubsub-persistent-storage
58+
mountPath: /data
59+
volumes:
60+
- name: pubsub-persistent-storage
61+
emptyDir: {}
62+
{{- end }}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
{{- if .Values.pubsubEmulator.enabled }}
2+
apiVersion: batch/v1
3+
kind: Job
4+
metadata:
5+
name: {{ .Values.pubsubEmulator.service.name }}-init
6+
namespace: {{ .Release.Namespace }}
7+
labels:
8+
app.kubernetes.io/component: pubsub-init
9+
annotations:
10+
"helm.sh/hook": post-install,post-upgrade
11+
"helm.sh/hook-weight": "-5"
12+
"helm.sh/hook-delete-policy": before-hook-creation,hook-succeeded
13+
spec:
14+
template:
15+
metadata:
16+
labels:
17+
app.kubernetes.io/component: pubsub-init
18+
spec:
19+
restartPolicy: Never
20+
serviceAccountName: {{ .Values.serviceAccount.name }}
21+
containers:
22+
- name: init
23+
image: registry.access.redhat.com/ubi9/python-311
24+
imagePullPolicy: IfNotPresent
25+
env:
26+
- name: PUBSUB_EMULATOR_HOST
27+
value: "{{ .Values.pubsubEmulator.service.name }}:{{ .Values.pubsubEmulator.service.port }}"
28+
- name: PUBSUB_PROJECT_ID
29+
value: "{{ .Values.pubsubEmulator.projectID }}"
30+
command:
31+
- /bin/bash
32+
- -c
33+
- |
34+
set -e
35+
echo "Installing google-cloud-pubsub..."
36+
pip install --quiet google-cloud-pubsub==2.34.0
37+
38+
echo "Initializing Pub/Sub topics and subscriptions..."
39+
python3 <<'PYTHON'
40+
import os
41+
from google.cloud import pubsub_v1
42+
from google.api_core import exceptions
43+
44+
project_id = os.environ['PUBSUB_PROJECT_ID']
45+
46+
# Create publisher and subscriber clients
47+
publisher = pubsub_v1.PublisherClient()
48+
subscriber = pubsub_v1.SubscriberClient()
49+
50+
# Topics to create
51+
topics = ['sourceevents', 'sourcebroadcast', 'agentevents', 'agentbroadcast']
52+
53+
# Create topics
54+
for topic_name in topics:
55+
topic_path = publisher.topic_path(project_id, topic_name)
56+
try:
57+
publisher.create_topic(request={"name": topic_path})
58+
print(f"Created topic: {topic_path}")
59+
except exceptions.AlreadyExists:
60+
print(f"Topic already exists: {topic_path}")
61+
except Exception as e:
62+
print(f"Error creating topic {topic_path}: {e}")
63+
raise
64+
65+
# Subscriptions to create (name:topic:filter)
66+
subscriptions = [
67+
('agentevents-maestro', 'agentevents', 'attributes.ce-originalsource="maestro"'),
68+
('agentbroadcast-maestro', 'agentbroadcast', '')
69+
]
70+
71+
# Create subscriptions
72+
for sub_name, topic_name, filter_expr in subscriptions:
73+
subscription_path = subscriber.subscription_path(project_id, sub_name)
74+
topic_path = publisher.topic_path(project_id, topic_name)
75+
try:
76+
if filter_expr:
77+
subscriber.create_subscription(
78+
request={"name": subscription_path, "topic": topic_path, "filter": filter_expr}
79+
)
80+
print(f"Created subscription: {subscription_path} with filter: {filter_expr}")
81+
else:
82+
subscriber.create_subscription(
83+
request={"name": subscription_path, "topic": topic_path}
84+
)
85+
print(f"Created subscription: {subscription_path}")
86+
except exceptions.AlreadyExists:
87+
print(f"Subscription already exists: {subscription_path}")
88+
except Exception as e:
89+
print(f"Error creating subscription {subscription_path}: {e}")
90+
raise
91+
92+
print("Pub/Sub initialization complete!")
93+
PYTHON
94+
{{- end }}

charts/maestro-server/templates/pubsub.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ stringData:
1111
{{- if .Values.messageBroker.pubsub.projectID }}
1212
projectID: {{ .Values.messageBroker.pubsub.projectID }}
1313
{{- end }}
14+
{{- if .Values.messageBroker.pubsub.endpoint }}
15+
endpoint: {{ .Values.messageBroker.pubsub.endpoint }}
16+
{{- end }}
17+
disableTLS: {{ .Values.messageBroker.pubsub.disableTLS }}
1418
{{- if .Values.messageBroker.pubsub.topics }}
1519
topics:
1620
{{- if .Values.messageBroker.pubsub.topics.sourceEvents }}

charts/maestro-server/values.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ messageBroker:
8585
# Google Cloud Pub/Sub configuration
8686
pubsub:
8787
projectID: ""
88+
endpoint: ""
89+
disableTLS: false
8890
# Topics configuration (use full GCP resource names)
8991
topics:
9092
sourceEvents: "" # e.g., projects/my-project/topics/sourceevents
@@ -94,6 +96,15 @@ messageBroker:
9496
agentEvents: "" # e.g., projects/my-project/subscriptions/agentevents-maestro
9597
agentBroadcast: "" # e.g., projects/my-project/subscriptions/agentbroadcast-maestro
9698

99+
# Pub/Sub emulator deployment (optional, for development)
100+
pubsubEmulator:
101+
enabled: false
102+
image: gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators
103+
projectID: maestro-test
104+
service:
105+
name: maestro-pubsub
106+
port: 8085
107+
97108

98109
# Server configuration
99110
server:

test/e2e/pkg/cert_rotation_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,27 @@ var _ = Describe("Certificate Rotation", Ordered, Label("e2e-tests-cert-rotation
3030
var deployName string
3131
var originalMQTTCerts map[string][]byte
3232
var originalGRPCBrokerCerts map[string][]byte
33+
var skip bool
3334

3435
BeforeAll(func() {
36+
// Check if any CA secrets exist (MQTT or gRPC)
37+
// Certificate rotation is only applicable for MQTT and gRPC brokers
38+
// Pub/Sub emulator does not use client certificates
39+
_, mqttErr := agentTestOpts.kubeClientSet.CoreV1().Secrets(agentTestOpts.agentNamespace).Get(ctx, "maestro-mqtt-ca", metav1.GetOptions{})
40+
_, grpcErr := agentTestOpts.kubeClientSet.CoreV1().Secrets(agentTestOpts.agentNamespace).Get(ctx, "maestro-grpc-broker-ca", metav1.GetOptions{})
41+
42+
if errors.IsNotFound(mqttErr) && errors.IsNotFound(grpcErr) {
43+
skip = true
44+
Skip("Skipping certificate rotation tests: no CA secrets found")
45+
}
46+
47+
if mqttErr != nil && !errors.IsNotFound(mqttErr) {
48+
Expect(mqttErr).ShouldNot(HaveOccurred())
49+
}
50+
if grpcErr != nil && !errors.IsNotFound(grpcErr) {
51+
Expect(grpcErr).ShouldNot(HaveOccurred())
52+
}
53+
3554
workName = fmt.Sprintf("cert-rotation-%s", k8srand.String(5))
3655
deployName = fmt.Sprintf("nginx-%s", k8srand.String(5))
3756

@@ -102,6 +121,10 @@ var _ = Describe("Certificate Rotation", Ordered, Label("e2e-tests-cert-rotation
102121
})
103122

104123
AfterAll(func() {
124+
if skip {
125+
return
126+
}
127+
105128
By("restoring original MQTT certificate")
106129
if len(originalMQTTCerts) > 0 {
107130
secret, err := agentTestOpts.kubeClientSet.CoreV1().Secrets(agentTestOpts.agentNamespace).Get(ctx, "maestro-agent-certs", metav1.GetOptions{})

0 commit comments

Comments
 (0)