Skip to content

Commit 3b439fd

Browse files
committed
add integration test for pubsub.
Signed-off-by: Morven Cao <lcao@redhat.com>
1 parent 0adb891 commit 3b439fd

File tree

9 files changed

+434
-55
lines changed

9 files changed

+434
-55
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
secrets/db.password
4848
secrets/mqtt.password
4949
secrets/mqtt.config
50+
secrets/pubsub.config
5051
hack/mosquitto-passwd.txt
5152

5253
# Ignore image environment file

Makefile

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ mqtt_root_cert ?= ""
6262
mqtt_client_cert ?= ""
6363
mqtt_client_key ?= ""
6464

65+
# Pub/Sub emulator configuration
66+
pubsub_host ?= maestro-pubsub.$(namespace)
67+
pubsub_port ?= 8085
68+
pubsub_project_id ?= maestro-test
69+
pubsub_config_file ?= ${PWD}/secrets/pubsub.config
70+
6571
# Log verbosity level
6672
klog_v:=2
6773

@@ -79,7 +85,7 @@ ENABLE_GRPC_BROKER ?= false
7985
# Enable TLS
8086
ENABLE_TLS ?= false
8187

82-
# message driver type, mqtt or grpc, default is mqtt.
88+
# message driver type, mqtt, grpc or pubsub, default is mqtt.
8389
MESSAGE_DRIVER_TYPE ?= mqtt
8490

8591
# default replicas for maestro server
@@ -92,6 +98,7 @@ MQTT_IMAGE ?= quay.io/maestro/eclipse-mosquitto:2.0.18
9298
# Test output files
9399
unit_test_json_output ?= ${PWD}/unit-test-results.json
94100
mqtt_integration_test_json_output ?= ${PWD}/mqtt-integration-test-results.json
101+
pubsub_integration_test_json_output ?= ${PWD}/pubsub-integration-test-results.json
95102
grpc_integration_test_json_output ?= ${PWD}/grpc-integration-test-results.json
96103

97104
# maestro services config
@@ -130,6 +137,13 @@ help:
130137
@echo "make deploy deploy via templates to local openshift instance"
131138
@echo "make undeploy undeploy from local openshift instance"
132139
@echo "make project create and use an Example project"
140+
@echo "make db/setup setup local PostgreSQL database"
141+
@echo "make db/teardown teardown local PostgreSQL database"
142+
@echo "make mqtt/setup setup local MQTT broker"
143+
@echo "make mqtt/teardown teardown local MQTT broker"
144+
@echo "make pubsub/setup setup local Pub/Sub emulator"
145+
@echo "make pubsub/init initialize Pub/Sub topics and subscriptions"
146+
@echo "make pubsub/teardown teardown local Pub/Sub emulator"
133147
@echo "make clean delete temporary generated files"
134148
@echo "$(fake)"
135149
.PHONY: help
@@ -233,16 +247,21 @@ test:
233247
# make test-integration TESTFLAGS="-run TestAccounts" acts as TestAccounts* and run TestAccountsGet, TestAccountsPost, etc.
234248
# make test-integration TESTFLAGS="-run TestAccountsGet" runs TestAccountsGet
235249
# make test-integration TESTFLAGS="-short" skips long-run tests
236-
test-integration: test-integration-mqtt test-integration-grpc
250+
test-integration: test-integration-mqtt test-integration-pubsub test-integration-grpc
237251
.PHONY: test-integration
238252

239253
test-integration-mqtt:
240-
BROKER=mqtt MAESTRO_ENV=testing gotestsum --jsonfile-timing-events=$(mqtt_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \
254+
MESSAGE_BROKER_TYPE=mqtt MAESTRO_ENV=testing gotestsum --jsonfile-timing-events=$(mqtt_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \
241255
./test/integration
242256
.PHONY: test-integration-mqtt
243257

258+
test-integration-pubsub:
259+
MESSAGE_BROKER_TYPE=pubsub MESSAGE_BROKER_CONFIG=$(PWD)/secrets/pubsub.config MAESTRO_ENV=testing gotestsum --jsonfile-timing-events=$(pubsub_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \
260+
./test/integration
261+
.PHONY: test-integration-pubsub
262+
244263
test-integration-grpc:
245-
BROKER=grpc MAESTRO_ENV=testing gotestsum --jsonfile-timing-events=$(grpc_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -count=1 -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \
264+
MESSAGE_BROKER_TYPE=grpc MAESTRO_ENV=testing gotestsum --jsonfile-timing-events=$(grpc_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -count=1 -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \
246265
./test/integration
247266
.PHONY: test-integration-grpc
248267

@@ -427,6 +446,22 @@ mqtt/teardown:
427446
$(container_tool) stop mqtt-maestro
428447
$(container_tool) rm mqtt-maestro
429448

449+
.PHONY: pubsub/setup
450+
pubsub/setup:
451+
@mkdir -p ${PWD}/secrets
452+
@echo '{"projectID":"$(pubsub_project_id)","endpoint":"localhost:$(pubsub_port)","insecure":true,"topics":{"sourceEvents":"projects/$(pubsub_project_id)/topics/sourceevents","sourceBroadcast":"projects/$(pubsub_project_id)/topics/sourcebroadcast"},"subscriptions":{"agentEvents":"projects/$(pubsub_project_id)/subscriptions/agentevents-maestro","agentBroadcast":"projects/$(pubsub_project_id)/subscriptions/agentbroadcast-maestro"}}' > $(pubsub_config_file)
453+
$(container_tool) run --name pubsub-maestro -p $(pubsub_port):8085 -e PUBSUB_PROJECT_ID=$(pubsub_project_id) -d gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators gcloud beta emulators pubsub start --host-port=0.0.0.0:8085 --project=$(pubsub_project_id)
454+
455+
.PHONY: pubsub/teardown
456+
pubsub/teardown:
457+
$(container_tool) stop pubsub-maestro
458+
$(container_tool) rm pubsub-maestro
459+
460+
.PHONY: pubsub/init
461+
pubsub/init:
462+
@echo "Initializing Pub/Sub emulator topics and subscriptions..."
463+
@PUBSUB_EMULATOR_HOST=localhost:$(pubsub_port) PUBSUB_PROJECT_ID=$(pubsub_project_id) python3 hack/init-pubsub-emulator.py
464+
430465
crc/login:
431466
@echo "Logging into CRC"
432467
@crc console --credentials -ojson | jq -r .clusterConfig.adminCredentials.password | oc login --username kubeadmin --insecure-skip-tls-verify=true https://api.crc.testing:6443

README.md

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,30 +42,40 @@ reducing the need for direct access to clusters.
4242

4343
## Run in Local Environment
4444

45-
### Make a build, run postgres and mqtt broker
45+
### Make a build, run postgres and message broker (MQTT or Pub/Sub)
4646

4747
```shell
4848

4949
# 1. build the project
5050

51-
$ go install gotest.tools/gotestsum@latest
51+
$ go install gotest.tools/gotestsum@latest
5252
$ make binary
5353

54-
# 2. run a postgres database locally in docker
54+
# 2. run a postgres database locally in docker
5555

5656
$ make db/setup
5757
$ make db/login
58-
58+
5959
root@f076ddf94520:/# psql -h localhost -U maestro maestro
6060
psql (14.4 (Debian 14.4-1.pgdg110+1))
6161
Type "help" for help.
62-
62+
6363
maestro=# \dt
6464
Did not find any relations.
6565

66-
# 3. run a mqtt broker locally in docker
66+
# 3a. run a MQTT broker locally in docker
6767

6868
$ make mqtt/setup
69+
70+
# OR
71+
72+
# 3b. run a Pub/Sub emulator locally in docker
73+
74+
$ make pubsub/setup
75+
76+
# Initialize topics and subscriptions in the emulator
77+
# Note: Requires google-cloud-pubsub Python package (pip3 install google-cloud-pubsub)
78+
$ make pubsub/init
6979
```
7080

7181
### Run database migrations
@@ -101,7 +111,13 @@ maestro=# \dt
101111
### Running the Service
102112

103113
```shell
114+
# Run with MQTT broker (default)
104115
$ make run
116+
117+
# OR run with Pub/Sub emulator
118+
# First, ensure the Pub/Sub emulator is running and configured
119+
$ ./maestro migration
120+
$ ./maestro server --message-broker-type=pubsub --message-broker-config-file=./secrets/pubsub.config
105121
```
106122

107123
#### List the consumers

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/openshift-online/maestro
33
go 1.25.0
44

55
require (
6+
cloud.google.com/go/pubsub/v2 v2.3.0
67
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.12.0
78
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.6.0
89
github.com/Masterminds/squirrel v1.5.4
@@ -42,6 +43,7 @@ require (
4243
go.opentelemetry.io/otel/sdk v1.39.0
4344
go.opentelemetry.io/otel/trace v1.39.0
4445
golang.org/x/oauth2 v0.32.0
46+
google.golang.org/api v0.255.0
4547
google.golang.org/grpc v1.77.0
4648
google.golang.org/protobuf v1.36.10
4749
gopkg.in/resty.v1 v1.12.0
@@ -68,7 +70,6 @@ require (
6870
cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect
6971
cloud.google.com/go/compute/metadata v0.9.0 // indirect
7072
cloud.google.com/go/iam v1.5.2 // indirect
71-
cloud.google.com/go/pubsub/v2 v2.3.0 // indirect
7273
filippo.io/edwards25519 v1.1.0 // indirect
7374
github.com/Azure/azure-sdk-for-go/sdk/internal v1.9.0 // indirect
7475
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect
@@ -184,7 +185,6 @@ require (
184185
golang.org/x/time v0.14.0 // indirect
185186
golang.org/x/tools v0.39.0 // indirect
186187
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
187-
google.golang.org/api v0.255.0 // indirect
188188
google.golang.org/genproto v0.0.0-20250603155806-513f23925822 // indirect
189189
google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 // indirect
190190
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -833,8 +833,6 @@ open-cluster-management.io/api v1.1.1-0.20251215032811-ee922fbb996c h1:LWZ+5dwVS
833833
open-cluster-management.io/api v1.1.1-0.20251215032811-ee922fbb996c/go.mod h1:Hk/3c114t6Ba5qhpqw+RoA93yEbE2CosG+JzzBZ6aCo=
834834
open-cluster-management.io/ocm v1.1.1-0.20251211014758-deb61b0a60d5 h1:02XVQtUt8v/fw6n3Rbf7Tu4FHJVVW2MiQLSpThWLwro=
835835
open-cluster-management.io/ocm v1.1.1-0.20251211014758-deb61b0a60d5/go.mod h1:kaFjiXsxtUUwWChYIy/ojANeMMrDW1KUrliuKA55z/Q=
836-
open-cluster-management.io/sdk-go v1.1.1-0.20251209031938-62521c9935ac h1:Wt7rzenZqrtyYI58+lpe9tmf9e5Ft8Wwd0MyDwuJ4ck=
837-
open-cluster-management.io/sdk-go v1.1.1-0.20251209031938-62521c9935ac/go.mod h1:0EZ9M7AtD0b+x9lUo5pYlyFF2aKOk1y88looeOVybwU=
838836
open-cluster-management.io/sdk-go v1.1.1-0.20251218031856-08bb1caedf74 h1:Sf+w+8ZzJgUQez/ADk2FLhoVDl1hD4rJqLQfvCFuM1o=
839837
open-cluster-management.io/sdk-go v1.1.1-0.20251218031856-08bb1caedf74/go.mod h1:3xQf3gISaZ3377vFnwjH3QH8EF2UNaf8D9igLPUBChk=
840838
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 h1:jpcvIRr3GLoUoEKRkHKSmGjxb6lWwrBlJsXc+eUYQHM=

hack/init-pubsub-emulator.py

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Initialize Google Cloud Pub/Sub emulator with topics and subscriptions for Maestro.
4+
5+
This script creates the necessary topics and subscriptions for the Maestro server
6+
to communicate with agents using Pub/Sub.
7+
8+
Environment Variables:
9+
PUBSUB_EMULATOR_HOST: The emulator host (default: localhost:8085)
10+
PUBSUB_PROJECT_ID: The GCP project ID (default: maestro-test)
11+
"""
12+
13+
import os
14+
import sys
15+
from google.cloud import pubsub_v1
16+
from google.api_core import exceptions
17+
18+
19+
def init_server_topics_and_subscriptions(project_id: str):
20+
"""Initialize topics and subscriptions for the Maestro server."""
21+
publisher = pubsub_v1.PublisherClient()
22+
subscriber = pubsub_v1.SubscriberClient()
23+
24+
# Topics to create
25+
topics = ['sourceevents', 'sourcebroadcast', 'agentevents', 'agentbroadcast']
26+
27+
print("Creating topics...")
28+
for topic_name in topics:
29+
topic_path = publisher.topic_path(project_id, topic_name)
30+
try:
31+
publisher.create_topic(request={"name": topic_path})
32+
print(f" ✓ Created topic: {topic_name}")
33+
except exceptions.AlreadyExists:
34+
print(f" - Topic already exists: {topic_name}")
35+
except Exception as e:
36+
print(f" ✗ Error creating topic {topic_name}: {e}", file=sys.stderr)
37+
return False
38+
39+
# Server subscriptions to create (name:topic:filter)
40+
subscriptions = [
41+
('agentevents-maestro', 'agentevents', 'attributes.ce-originalsource="maestro"'),
42+
('agentbroadcast-maestro', 'agentbroadcast', '')
43+
]
44+
45+
print("\nCreating server subscriptions...")
46+
for sub_name, topic_name, filter_expr in subscriptions:
47+
subscription_path = subscriber.subscription_path(project_id, sub_name)
48+
topic_path = publisher.topic_path(project_id, topic_name)
49+
try:
50+
if filter_expr:
51+
subscriber.create_subscription(
52+
request={"name": subscription_path, "topic": topic_path, "filter": filter_expr}
53+
)
54+
print(f" ✓ Created subscription: {sub_name} (filtered by {filter_expr})")
55+
else:
56+
subscriber.create_subscription(
57+
request={"name": subscription_path, "topic": topic_path}
58+
)
59+
print(f" ✓ Created subscription: {sub_name}")
60+
except exceptions.AlreadyExists:
61+
print(f" - Subscription already exists: {sub_name}")
62+
except Exception as e:
63+
print(f" ✗ Error creating subscription {sub_name}: {e}", file=sys.stderr)
64+
return False
65+
66+
return True
67+
68+
69+
def init_agent_subscriptions(project_id: str, consumer_name: str):
70+
"""Initialize subscriptions for a Maestro agent."""
71+
publisher = pubsub_v1.PublisherClient()
72+
subscriber = pubsub_v1.SubscriberClient()
73+
74+
# Agent subscriptions to create: (subscription_name, topic_name, filter)
75+
subscriptions = [
76+
(
77+
f'sourceevents-{consumer_name}',
78+
'sourceevents',
79+
f'attributes.ce-clustername="{consumer_name}"'
80+
),
81+
(
82+
f'sourcebroadcast-{consumer_name}',
83+
'sourcebroadcast',
84+
'' # No filter for broadcast
85+
)
86+
]
87+
88+
print(f"\nCreating agent subscriptions for consumer '{consumer_name}'...")
89+
for sub_name, topic_name, filter_expr in subscriptions:
90+
subscription_path = subscriber.subscription_path(project_id, sub_name)
91+
topic_path = publisher.topic_path(project_id, topic_name)
92+
93+
try:
94+
if filter_expr:
95+
subscriber.create_subscription(
96+
request={
97+
"name": subscription_path,
98+
"topic": topic_path,
99+
"filter": filter_expr
100+
}
101+
)
102+
print(f" ✓ Created subscription: {sub_name} (filtered)")
103+
else:
104+
subscriber.create_subscription(
105+
request={
106+
"name": subscription_path,
107+
"topic": topic_path
108+
}
109+
)
110+
print(f" ✓ Created subscription: {sub_name}")
111+
except exceptions.AlreadyExists:
112+
print(f" - Subscription already exists: {sub_name}")
113+
except Exception as e:
114+
print(f" ✗ Error creating subscription {sub_name}: {e}", file=sys.stderr)
115+
return False
116+
117+
return True
118+
119+
120+
def main():
121+
project_id = os.getenv('PUBSUB_PROJECT_ID', 'maestro-test')
122+
emulator_host = os.getenv('PUBSUB_EMULATOR_HOST', 'localhost:8085')
123+
consumer_name = os.getenv('CONSUMER_NAME', '')
124+
125+
print(f"Initializing Pub/Sub emulator at {emulator_host}")
126+
print(f"Project ID: {project_id}")
127+
128+
# Initialize server topics and subscriptions
129+
if not init_server_topics_and_subscriptions(project_id):
130+
print("\n✗ Failed to initialize server topics and subscriptions", file=sys.stderr)
131+
sys.exit(1)
132+
133+
# Initialize agent subscriptions if consumer name is provided
134+
if consumer_name:
135+
if not init_agent_subscriptions(project_id, consumer_name):
136+
print(f"\n✗ Failed to initialize agent subscriptions for {consumer_name}", file=sys.stderr)
137+
sys.exit(1)
138+
139+
print("\n✓ Pub/Sub emulator initialized successfully!")
140+
141+
142+
if __name__ == '__main__':
143+
try:
144+
main()
145+
except Exception as e:
146+
print(f"✗ Unexpected error: {e}", file=sys.stderr)
147+
sys.exit(1)

0 commit comments

Comments
 (0)