Skip to content

Commit 8bbc798

Browse files
committed
add e2e test for pubsub.
Signed-off-by: Morven Cao <lcao@redhat.com>
1 parent 0adb891 commit 8bbc798

18 files changed

+754
-13
lines changed

.github/workflows/e2e.yml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,33 @@ jobs:
116116
SERVER_REPLICAS: 2
117117
MESSAGE_DRIVER_TYPE: grpc
118118
ENABLE_MAESTRO_TLS: true
119+
e2e-pubsub:
120+
runs-on: ubuntu-22.04
121+
steps:
122+
- name: Check initial disk usage
123+
run: df -h /
124+
- name: Free disk space (optional cleanup)
125+
run: |
126+
sudo rm -rf /usr/local/lib/android
127+
sudo rm -rf /opt/hostedtoolcache
128+
docker system prune -af
129+
df -h /
130+
- name: Checkout
131+
uses: actions/checkout@v4
132+
- name: Setup Go
133+
uses: actions/setup-go@v5
134+
with:
135+
go-version: ${{ env.GO_VERSION }}
136+
- name: install ginkgo
137+
run: go install github.com/onsi/ginkgo/v2/ginkgo@v2.15.0
138+
- name: Test E2E
139+
run: |
140+
make e2e-test
141+
env:
142+
container_tool: docker
143+
SERVER_REPLICAS: 2
144+
MESSAGE_DRIVER_TYPE: pubsub
145+
ENABLE_MAESTRO_TLS: true
119146
upgrade:
120147
runs-on: ubuntu-22.04
121148
steps:

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
secrets/db.password
4848
secrets/mqtt.password
4949
secrets/mqtt.config
50+
secrets/pubsub.config
5051
hack/mosquitto-passwd.txt
5152

5253
# Ignore image environment file

Makefile

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ mqtt_root_cert ?= ""
6262
mqtt_client_cert ?= ""
6363
mqtt_client_key ?= ""
6464

65+
# Pub/Sub emulator configuration
66+
pubsub_host ?= maestro-pubsub.$(namespace)
67+
pubsub_port ?= 8085
68+
pubsub_project_id ?= maestro-test
69+
pubsub_config_file ?= ${PWD}/secrets/pubsub.config
70+
6571
# Log verbosity level
6672
klog_v:=2
6773

@@ -79,7 +85,7 @@ ENABLE_GRPC_BROKER ?= false
7985
# Enable TLS
8086
ENABLE_TLS ?= false
8187

82-
# message driver type, mqtt or grpc, default is mqtt.
88+
# message driver type, mqtt, grpc or pubsub, default is mqtt.
8389
MESSAGE_DRIVER_TYPE ?= mqtt
8490

8591
# default replicas for maestro server
@@ -130,6 +136,13 @@ help:
130136
@echo "make deploy deploy via templates to local openshift instance"
131137
@echo "make undeploy undeploy from local openshift instance"
132138
@echo "make project create and use an Example project"
139+
@echo "make db/setup setup local PostgreSQL database"
140+
@echo "make db/teardown teardown local PostgreSQL database"
141+
@echo "make mqtt/setup setup local MQTT broker"
142+
@echo "make mqtt/teardown teardown local MQTT broker"
143+
@echo "make pubsub/setup setup local Pub/Sub emulator"
144+
@echo "make pubsub/init initialize Pub/Sub topics and subscriptions"
145+
@echo "make pubsub/teardown teardown local Pub/Sub emulator"
133146
@echo "make clean delete temporary generated files"
134147
@echo "$(fake)"
135148
.PHONY: help
@@ -315,6 +328,9 @@ cmds:
315328
--param="MQTT_CLIENT_CERT=$(mqtt_client_cert)" \
316329
--param="MQTT_CLIENT_KEY=$(mqtt_client_key)" \
317330
--param="MQTT_IMAGE=$(MQTT_IMAGE)" \
331+
--param="PUBSUB_HOST=$(pubsub_host)" \
332+
--param="PUBSUB_PORT=$(pubsub_port)" \
333+
--param="PUBSUB_PROJECT_ID=$(pubsub_project_id)" \
318334
--param="IMAGE_REGISTRY=$(internal_image_registry)" \
319335
--param="IMAGE_REPOSITORY=$(image_repository)" \
320336
--param="IMAGE_TAG=$(image_tag)" \
@@ -427,6 +443,22 @@ mqtt/teardown:
427443
$(container_tool) stop mqtt-maestro
428444
$(container_tool) rm mqtt-maestro
429445

446+
.PHONY: pubsub/setup
447+
pubsub/setup:
448+
@mkdir -p ${PWD}/secrets
449+
@echo '{"projectID":"$(pubsub_project_id)","endpoint":"localhost:$(pubsub_port)","topics":{"sourceEvents":"projects/$(pubsub_project_id)/topics/sourceevents","sourceBroadcast":"projects/$(pubsub_project_id)/topics/sourcebroadcast"},"subscriptions":{"agentEvents":"projects/$(pubsub_project_id)/subscriptions/agentevents-maestro","agentBroadcast":"projects/$(pubsub_project_id)/subscriptions/agentbroadcast-maestro"}}' > $(pubsub_config_file)
450+
$(container_tool) run --name pubsub-maestro -p $(pubsub_port):8085 -e PUBSUB_PROJECT_ID=$(pubsub_project_id) -d gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators gcloud beta emulators pubsub start --host-port=0.0.0.0:8085 --project=$(pubsub_project_id)
451+
452+
.PHONY: pubsub/teardown
453+
pubsub/teardown:
454+
$(container_tool) stop pubsub-maestro
455+
$(container_tool) rm pubsub-maestro
456+
457+
.PHONY: pubsub/init
458+
pubsub/init:
459+
@echo "Initializing Pub/Sub emulator topics and subscriptions..."
460+
@PUBSUB_EMULATOR_HOST=localhost:$(pubsub_port) PUBSUB_PROJECT_ID=$(pubsub_project_id) python3 hack/init-pubsub-emulator.py
461+
430462
crc/login:
431463
@echo "Logging into CRC"
432464
@crc console --credentials -ojson | jq -r .clusterConfig.adminCredentials.password | oc login --username kubeadmin --insecure-skip-tls-verify=true https://api.crc.testing:6443

README.md

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,30 +42,40 @@ reducing the need for direct access to clusters.
4242

4343
## Run in Local Environment
4444

45-
### Make a build, run postgres and mqtt broker
45+
### Make a build, run postgres and message broker (MQTT or Pub/Sub)
4646

4747
```shell
4848

4949
# 1. build the project
5050

51-
$ go install gotest.tools/gotestsum@latest
51+
$ go install gotest.tools/gotestsum@latest
5252
$ make binary
5353

54-
# 2. run a postgres database locally in docker
54+
# 2. run a postgres database locally in docker
5555

5656
$ make db/setup
5757
$ make db/login
58-
58+
5959
root@f076ddf94520:/# psql -h localhost -U maestro maestro
6060
psql (14.4 (Debian 14.4-1.pgdg110+1))
6161
Type "help" for help.
62-
62+
6363
maestro=# \dt
6464
Did not find any relations.
6565

66-
# 3. run a mqtt broker locally in docker
66+
# 3a. run a MQTT broker locally in docker
6767

6868
$ make mqtt/setup
69+
70+
# OR
71+
72+
# 3b. run a Pub/Sub emulator locally in docker
73+
74+
$ make pubsub/setup
75+
76+
# Initialize topics and subscriptions in the emulator
77+
# Note: Requires google-cloud-pubsub Python package (pip install google-cloud-pubsub)
78+
$ make pubsub/init
6979
```
7080

7181
### Run database migrations
@@ -101,7 +111,14 @@ maestro=# \dt
101111
### Running the Service
102112

103113
```shell
114+
# Run with MQTT broker (default)
104115
$ make run
116+
117+
# OR run with Pub/Sub emulator
118+
# First, ensure the Pub/Sub emulator is running and configured
119+
$ export PUBSUB_EMULATOR_HOST=localhost:8085
120+
$ ./maestro migration
121+
$ ./maestro server --message-broker-type=pubsub --message-broker-config-file=./secrets/pubsub.config
105122
```
106123

107124
#### List the consumers
@@ -341,7 +358,11 @@ Now you can create a resource bundle with the `MaestroGRPCSourceWorkClient`, che
341358
You can also run the maestro in a KinD cluster locally. The simplest way is to use the provided script to create a KinD cluster and deploy the maestro in the cluster. It creates a KinD cluster with name `maestro`, and deploys the maestro server and agent in the cluster.
342359

343360
```shell
361+
# Run with MQTT broker (default)
344362
$ make test-env
363+
364+
# OR run with Pub/Sub emulator
365+
$ MESSAGE_DRIVER_TYPE=pubsub make test-env
345366
```
346367
The Kubeconfig of the KinD cluster is in `./test/_output/.kubeconfig`.
347368
```shell

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,3 +205,5 @@ require (
205205
sigs.k8s.io/randfill v1.0.0 // indirect
206206
sigs.k8s.io/structured-merge-diff/v6 v6.3.0 // indirect
207207
)
208+
209+
replace open-cluster-management.io/sdk-go => github.com/morvencao/ocm-sdk-go v0.0.0-20251225091636-0a4a95a2cd88

go.sum

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ
390390
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
391391
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFdJifH4BDsTlE89Zl93FEloxaWZfGcifgq8=
392392
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
393+
github.com/morvencao/ocm-sdk-go v0.0.0-20251225091636-0a4a95a2cd88 h1:sfA27VQ5mDDPV3CDOYAPbX7vKQ3rjf1fssOkPK1Ovaw=
394+
github.com/morvencao/ocm-sdk-go v0.0.0-20251225091636-0a4a95a2cd88/go.mod h1:3xQf3gISaZ3377vFnwjH3QH8EF2UNaf8D9igLPUBChk=
393395
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
394396
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
395397
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
@@ -833,10 +835,6 @@ open-cluster-management.io/api v1.1.1-0.20251215032811-ee922fbb996c h1:LWZ+5dwVS
833835
open-cluster-management.io/api v1.1.1-0.20251215032811-ee922fbb996c/go.mod h1:Hk/3c114t6Ba5qhpqw+RoA93yEbE2CosG+JzzBZ6aCo=
834836
open-cluster-management.io/ocm v1.1.1-0.20251211014758-deb61b0a60d5 h1:02XVQtUt8v/fw6n3Rbf7Tu4FHJVVW2MiQLSpThWLwro=
835837
open-cluster-management.io/ocm v1.1.1-0.20251211014758-deb61b0a60d5/go.mod h1:kaFjiXsxtUUwWChYIy/ojANeMMrDW1KUrliuKA55z/Q=
836-
open-cluster-management.io/sdk-go v1.1.1-0.20251209031938-62521c9935ac h1:Wt7rzenZqrtyYI58+lpe9tmf9e5Ft8Wwd0MyDwuJ4ck=
837-
open-cluster-management.io/sdk-go v1.1.1-0.20251209031938-62521c9935ac/go.mod h1:0EZ9M7AtD0b+x9lUo5pYlyFF2aKOk1y88looeOVybwU=
838-
open-cluster-management.io/sdk-go v1.1.1-0.20251218031856-08bb1caedf74 h1:Sf+w+8ZzJgUQez/ADk2FLhoVDl1hD4rJqLQfvCFuM1o=
839-
open-cluster-management.io/sdk-go v1.1.1-0.20251218031856-08bb1caedf74/go.mod h1:3xQf3gISaZ3377vFnwjH3QH8EF2UNaf8D9igLPUBChk=
840838
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 h1:jpcvIRr3GLoUoEKRkHKSmGjxb6lWwrBlJsXc+eUYQHM=
841839
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2/go.mod h1:Ve9uj1L+deCXFrPOk1LpFXqTg7LCFzFso6PA48q/XZw=
842840
sigs.k8s.io/controller-runtime v0.22.4 h1:GEjV7KV3TY8e+tJ2LCTxUTanW4z/FmNB7l327UfMq9A=

hack/init-pubsub-emulator.py

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Initialize Google Cloud Pub/Sub emulator with topics and subscriptions for Maestro.
4+
5+
This script creates the necessary topics and subscriptions for the Maestro server
6+
to communicate with agents using Pub/Sub.
7+
8+
Environment Variables:
9+
PUBSUB_EMULATOR_HOST: The emulator host (default: localhost:8085)
10+
PUBSUB_PROJECT_ID: The GCP project ID (default: maestro-test)
11+
"""
12+
13+
import os
14+
import sys
15+
from google.cloud import pubsub_v1
16+
from google.api_core import exceptions
17+
18+
19+
def init_server_topics_and_subscriptions(project_id: str):
20+
"""Initialize topics and subscriptions for the Maestro server."""
21+
publisher = pubsub_v1.PublisherClient()
22+
subscriber = pubsub_v1.SubscriberClient()
23+
24+
# Topics to create
25+
topics = ['sourceevents', 'sourcebroadcast', 'agentevents', 'agentbroadcast']
26+
27+
print("Creating topics...")
28+
for topic_name in topics:
29+
topic_path = publisher.topic_path(project_id, topic_name)
30+
try:
31+
publisher.create_topic(request={"name": topic_path})
32+
print(f" ✓ Created topic: {topic_name}")
33+
except exceptions.AlreadyExists:
34+
print(f" - Topic already exists: {topic_name}")
35+
except Exception as e:
36+
print(f" ✗ Error creating topic {topic_name}: {e}", file=sys.stderr)
37+
return False
38+
39+
# Server subscriptions to create (name:topic:filter)
40+
subscriptions = [
41+
('agentevents-maestro', 'agentevents', 'attributes.ce-originalsource="maestro"'),
42+
('agentbroadcast-maestro', 'agentbroadcast', '')
43+
]
44+
45+
print("\nCreating server subscriptions...")
46+
for sub_name, topic_name, filter_expr in subscriptions:
47+
subscription_path = subscriber.subscription_path(project_id, sub_name)
48+
topic_path = publisher.topic_path(project_id, topic_name)
49+
try:
50+
if filter_expr:
51+
subscriber.create_subscription(
52+
request={"name": subscription_path, "topic": topic_path, "filter": filter_expr}
53+
)
54+
print(f" ✓ Created subscription: {sub_name} (filtered by {filter_expr})")
55+
else:
56+
subscriber.create_subscription(
57+
request={"name": subscription_path, "topic": topic_path}
58+
)
59+
print(f" ✓ Created subscription: {sub_name}")
60+
except exceptions.AlreadyExists:
61+
print(f" - Subscription already exists: {sub_name}")
62+
except Exception as e:
63+
print(f" ✗ Error creating subscription {sub_name}: {e}", file=sys.stderr)
64+
return False
65+
66+
return True
67+
68+
69+
def init_agent_subscriptions(project_id: str, consumer_name: str):
70+
"""Initialize subscriptions for a Maestro agent."""
71+
publisher = pubsub_v1.PublisherClient()
72+
subscriber = pubsub_v1.SubscriberClient()
73+
74+
# Agent subscriptions to create: (subscription_name, topic_name, filter)
75+
subscriptions = [
76+
(
77+
f'sourceevents-{consumer_name}',
78+
'sourceevents',
79+
f'attributes.ce-clustername="{consumer_name}"'
80+
),
81+
(
82+
f'sourcebroadcast-{consumer_name}',
83+
'sourcebroadcast',
84+
'' # No filter for broadcast
85+
)
86+
]
87+
88+
print(f"\nCreating agent subscriptions for consumer '{consumer_name}'...")
89+
for sub_name, topic_name, filter_expr in subscriptions:
90+
subscription_path = subscriber.subscription_path(project_id, sub_name)
91+
topic_path = publisher.topic_path(project_id, topic_name)
92+
93+
try:
94+
if filter_expr:
95+
subscriber.create_subscription(
96+
request={
97+
"name": subscription_path,
98+
"topic": topic_path,
99+
"filter": filter_expr
100+
}
101+
)
102+
print(f" ✓ Created subscription: {sub_name} (filtered)")
103+
else:
104+
subscriber.create_subscription(
105+
request={
106+
"name": subscription_path,
107+
"topic": topic_path
108+
}
109+
)
110+
print(f" ✓ Created subscription: {sub_name}")
111+
except exceptions.AlreadyExists:
112+
print(f" - Subscription already exists: {sub_name}")
113+
except Exception as e:
114+
print(f" ✗ Error creating subscription {sub_name}: {e}", file=sys.stderr)
115+
return False
116+
117+
return True
118+
119+
120+
def main():
121+
project_id = os.getenv('PUBSUB_PROJECT_ID', 'maestro-test')
122+
emulator_host = os.getenv('PUBSUB_EMULATOR_HOST', 'localhost:8085')
123+
consumer_name = os.getenv('CONSUMER_NAME', '')
124+
125+
print(f"Initializing Pub/Sub emulator at {emulator_host}")
126+
print(f"Project ID: {project_id}")
127+
128+
# Initialize server topics and subscriptions
129+
if not init_server_topics_and_subscriptions(project_id):
130+
print("\n✗ Failed to initialize server topics and subscriptions", file=sys.stderr)
131+
sys.exit(1)
132+
133+
# Initialize agent subscriptions if consumer name is provided
134+
if consumer_name:
135+
if not init_agent_subscriptions(project_id, consumer_name):
136+
print(f"\n✗ Failed to initialize agent subscriptions for {consumer_name}", file=sys.stderr)
137+
sys.exit(1)
138+
139+
print("\n✓ Pub/Sub emulator initialized successfully!")
140+
141+
142+
if __name__ == '__main__':
143+
try:
144+
main()
145+
except Exception as e:
146+
print(f"✗ Unexpected error: {e}", file=sys.stderr)
147+
sys.exit(1)

templates/README.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,29 @@ This template deploys a simple postgresl-9.4 database deployment with a TLS-enab
3030

3131
This template deploys a simple mosquitto-2.0.18 mqtt broker deployment.
3232

33+
## Pub/Sub Emulator template
34+
35+
`templates/pubsub-template.yml`
36+
37+
This template deploys a Google Cloud Pub/Sub emulator for local development and e2e testing. The emulator runs in a container and provides a compatible Pub/Sub API endpoint without requiring actual GCP credentials.
38+
39+
`templates/pubsub-init-job-template.yml`
40+
41+
This template creates a Kubernetes Job that initializes the Pub/Sub emulator with the required topics and subscriptions for the Maestro server. It uses the Python Pub/Sub client library to create:
42+
- Topics: `sourceevents`, `sourcebroadcast`, `agentevents`, `agentbroadcast`
43+
- Subscriptions:
44+
- `agentevents-maestro` - filtered by `ce-originalsource="maestro"`
45+
- `agentbroadcast-maestro` - receives all broadcast messages
46+
47+
`templates/pubsub-agent-init-job-template.yml`
48+
49+
This template creates a Kubernetes Job that initializes agent-specific Pub/Sub subscriptions. It must be run before deploying each agent and creates:
50+
- Subscription: `sourceevents-{consumer_name}` - filtered by `ce-clustername` attribute
51+
- Subscription: `sourcebroadcast-{consumer_name}` - receives all broadcast messages
52+
53+
For production GCP deployments, use the GCP-specific templates:
54+
- `templates/service-template-gcp.yml` - Maestro server with Pub/Sub integration
55+
- `templates/agent-template-gcp.yml` - Maestro agent with Pub/Sub integration
3356

3457
## Agent template
3558

0 commit comments

Comments
 (0)