Skip to content

Commit 2fb003a

Browse files
unicod3mdelapenya
andauthored
feat: add Solace pubsub+ module (#3230)
* feat: Add solace pubsub+ module * chore: Update docs * Remove coverage file * Upgrade the version * Apply PR suggestions * Apply PR suggestions * Encapsulate internal variables * Introduce options pattern * Update solace.md * Use wait package for the container script execution * Update solace script * Apply PR suggestions * Apply PR suggestions * Make use of `CopyToContainer` method * Replace exposed ports to WithServices * Update solace.md * Render solace script from a template * Apply PR suggestions * Fix linter issues * Switch assert to require for tests * Fix image name * docs: refinements * chore: add a comment for the testable example's output * chore: move default services to a variable * Remove sleep from the test * fix: typo --------- Co-authored-by: Manuel de la Peña <[email protected]>
1 parent f5c1309 commit 2fb003a

File tree

14 files changed

+1287
-0
lines changed

14 files changed

+1287
-0
lines changed

.github/dependabot.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ updates:
6868
- /modules/registry
6969
- /modules/scylladb
7070
- /modules/socat
71+
- /modules/solace
7172
- /modules/surrealdb
7273
- /modules/toxiproxy
7374
- /modules/valkey

.vscode/.testcontainers-go.code-workspace

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,10 @@
221221
"name": "module / socat",
222222
"path": "../modules/socat"
223223
},
224+
{
225+
"name": "module / solace",
226+
"path": "../modules/solace"
227+
},
224228
{
225229
"name": "module / surrealdb",
226230
"path": "../modules/surrealdb"

docs/modules/solace.md

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
# Solace Pubsub+
2+
3+
Not available until the next release <a href="https://github.com/testcontainers/testcontainers-go"><span class="tc-version">:material-tag: main</span></a>
4+
5+
## Introduction
6+
7+
The Testcontainers module for Solace Pubsub+.
8+
9+
## Adding this module to your project dependencies
10+
11+
Please run the following command to add the Solace Pubsub+ module to your Go dependencies:
12+
13+
```
14+
go get github.com/testcontainers/testcontainers-go/modules/solace
15+
```
16+
17+
## Usage example
18+
19+
<!--codeinclude-->
20+
[Creating a Solace Pubsub+ container](../../modules/solace/examples_test.go) inside_block:runSolaceContainer
21+
<!--/codeinclude-->
22+
23+
## Module Reference
24+
25+
### Run function
26+
27+
- Not available until the next release <a href="https://github.com/testcontainers/testcontainers-go"><span class="tc-version">:material-tag: main</span></a>
28+
29+
The Solace Pubsub+ module exposes one entrypoint function to create the Solace Pubsub+ container, and this function receives three parameters:
30+
31+
```golang
32+
func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustomizer) (*SolaceContainer, error)
33+
```
34+
35+
- `context.Context`, the Go context.
36+
- `string`, the Docker image to use.
37+
- `testcontainers.ContainerCustomizer`, a variadic argument for passing options.
38+
39+
#### Image
40+
41+
Use the second argument in the `Run` function to set a valid Docker image.
42+
In example: `Run(context.Background(), "solace/solace-pubsub-standard:latest")`.
43+
44+
### Container Options
45+
46+
When starting the Solace Pubsub+ container, you can pass options in a variadic way to configure it.
47+
48+
#### WithCredentials
49+
50+
- Not available until the next release <a href="https://github.com/testcontainers/testcontainers-go"><span class="tc-version">:material-tag: main</span></a>
51+
52+
`WithCredentials(username, password string)`: sets the client credentials for authentication
53+
54+
#### WithVpn
55+
56+
- Not available until the next release <a href="https://github.com/testcontainers/testcontainers-go"><span class="tc-version">:material-tag: main</span></a>
57+
58+
`WithVpn(vpn string)`: sets the VPN name (defaults to "default")
59+
60+
#### WithQueue
61+
62+
- Not available until the next release <a href="https://github.com/testcontainers/testcontainers-go"><span class="tc-version">:material-tag: main</span></a>
63+
64+
`WithQueue(queueName, topic string)`: subscribes a given topic to a queue (for SMF/AMQP testing)
65+
66+
#### WithShmSize
67+
68+
- Not available until the next release <a href="https://github.com/testcontainers/testcontainers-go"><span class="tc-version">:material-tag: main</span></a>
69+
70+
`WithShmSize(size int64)`: sets the shared memory size (defaults to 1 GiB)
71+
72+
#### WithServices
73+
74+
- Not available until the next release <a href="https://github.com/testcontainers/testcontainers-go"><span class="tc-version">:material-tag: main</span></a>
75+
76+
The `WithServices` option is the recommended way to configure which Solace services should be exposed and made available in your container. This option automatically handles port exposure and sets up wait strategies for each specified service.
77+
78+
Available services:
79+
80+
- `ServiceAMQP` - AMQP service (port 5672)
81+
- `ServiceMQTT` - MQTT service (port 1883)
82+
- `ServiceREST` - REST service (port 9000)
83+
- `ServiceManagement` - Management service (port 8080)
84+
- `ServiceSMF` - SMF service (port 55555)
85+
- `ServiceSMFSSL` - SMF SSL service (port 55443)
86+
87+
By default, when no `WithServices` option is specified, the container will expose AMQP, SMF, REST, and MQTT services.
88+
89+
{% include "../features/common_functional_options_list.md" %}
90+
91+
### Container Methods
92+
93+
The Solace Pubsub+ container exposes the following methods:
94+
95+
#### BrokerURLFor
96+
97+
- Not available until the next release <a href="https://github.com/testcontainers/testcontainers-go"><span class="tc-version">:material-tag: main</span></a>
98+
99+
`BrokerURLFor(ctx context.Context, service Service) (string, error)` - returns the connection URL for a given Solace service.
100+
101+
This method allows you to retrieve the connection URL for specific Solace services. The available services are:
102+
103+
- `ServiceAMQP` - AMQP service (port 5672, protocol: amqp)
104+
- `ServiceMQTT` - MQTT service (port 1883, protocol: tcp)
105+
- `ServiceREST` - REST service (port 9000, protocol: http)
106+
- `ServiceManagement` - Management service (port 8080, protocol: http)
107+
- `ServiceSMF` - SMF service (port 55555, protocol: tcp)
108+
- `ServiceSMFSSL` - SMF SSL service (port 55443, protocol: tcps)
109+
110+
```go
111+
// Get the AMQP connection URL
112+
amqpURL, err := container.BrokerURLFor(ctx, solace.ServiceAMQP)
113+
if err != nil {
114+
log.Fatal(err)
115+
}
116+
// amqpURL will be something like: amqp://localhost:32768
117+
118+
// Get the management URL
119+
mgmtURL, err := container.BrokerURLFor(ctx, solace.ServiceManagement)
120+
if err != nil {
121+
log.Fatal(err)
122+
}
123+
// mgmtURL will be something like: http://localhost:32769
124+
```
125+
126+
#### Username
127+
128+
- Not available until the next release <a href="https://github.com/testcontainers/testcontainers-go"><span class="tc-version">:material-tag: main</span></a>
129+
130+
`Username() string` - returns the configured username for authentication
131+
132+
#### Password
133+
134+
- Not available until the next release <a href="https://github.com/testcontainers/testcontainers-go"><span class="tc-version">:material-tag: main</span></a>
135+
136+
`Password() string` - returns the configured password for authentication
137+
138+
#### VPN
139+
140+
- Not available until the next release <a href="https://github.com/testcontainers/testcontainers-go"><span class="tc-version">:material-tag: main</span></a>
141+
142+
`VPN() string` - returns the configured VPN name

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ nav:
119119
- modules/registry.md
120120
- modules/scylladb.md
121121
- modules/socat.md
122+
- modules/solace.md
122123
- modules/surrealdb.md
123124
- modules/toxiproxy.md
124125
- modules/valkey.md

modules/solace/Makefile

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
include ../../commons-test.mk
2+
3+
.PHONY: test
4+
test:
5+
$(MAKE) test-solace

modules/solace/examples_test.go

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
package solace_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"time"
8+
9+
"solace.dev/go/messaging"
10+
"solace.dev/go/messaging/pkg/solace"
11+
"solace.dev/go/messaging/pkg/solace/config"
12+
"solace.dev/go/messaging/pkg/solace/message"
13+
"solace.dev/go/messaging/pkg/solace/resource"
14+
15+
"github.com/testcontainers/testcontainers-go"
16+
sc "github.com/testcontainers/testcontainers-go/modules/solace"
17+
)
18+
19+
func ExampleRun() {
20+
// runSolaceContainer {
21+
ctx := context.Background()
22+
ctr, err := sc.Run(ctx, "solace/solace-pubsub-standard:latest",
23+
sc.WithCredentials("admin", "admin"),
24+
sc.WithServices(sc.ServiceAMQP, sc.ServiceManagement),
25+
testcontainers.WithEnv(map[string]string{
26+
"username_admin_globalaccesslevel": "admin",
27+
"username_admin_password": "admin",
28+
}),
29+
sc.WithShmSize(1<<30),
30+
)
31+
defer func() {
32+
if err := testcontainers.TerminateContainer(ctr); err != nil {
33+
log.Printf("failed to terminate container: %s", err)
34+
}
35+
}()
36+
fmt.Println(err)
37+
// }
38+
39+
// Output:
40+
// <nil>
41+
}
42+
43+
func ExampleRun_withTopicAndQueue() {
44+
ctx := context.Background()
45+
46+
ctr, err := sc.Run(ctx, "solace/solace-pubsub-standard:latest",
47+
sc.WithCredentials("admin", "admin"),
48+
sc.WithVPN("test-vpn"),
49+
sc.WithServices(sc.ServiceAMQP, sc.ServiceManagement, sc.ServiceSMF),
50+
testcontainers.WithEnv(map[string]string{
51+
"username_admin_globalaccesslevel": "admin",
52+
"username_admin_password": "admin",
53+
}),
54+
sc.WithShmSize(1<<30),
55+
sc.WithQueue("TestQueue", "Topic/MyTopic"),
56+
)
57+
defer func() {
58+
if err := testcontainers.TerminateContainer(ctr); err != nil {
59+
log.Printf("failed to terminate container: %s", err)
60+
}
61+
}()
62+
fmt.Println(err)
63+
// the [testMessagePublishAndConsume] function is responsible for printing the output
64+
// to the console, so be aware of that when adding it to other examples.
65+
err = testMessagePublishAndConsume(ctr, "TestQueue", "Topic/MyTopic")
66+
fmt.Println(err)
67+
68+
// Output:
69+
// <nil>
70+
// Published message to topic: Topic/MyTopic
71+
// Received message: Hello from Solace testcontainers!
72+
// Successfully received message from queue: TestQueue
73+
// <nil>
74+
}
75+
76+
func testMessagePublishAndConsume(ctr *sc.Container, queueName, topicName string) error {
77+
// Get the SMF service URL from the container
78+
smfURL, err := ctr.BrokerURLFor(context.Background(), sc.ServiceSMF)
79+
if err != nil {
80+
return fmt.Errorf("failed to get SMF URL: %w", err)
81+
}
82+
83+
// Configure connection properties
84+
brokerConfig := config.ServicePropertyMap{
85+
config.TransportLayerPropertyHost: smfURL,
86+
config.ServicePropertyVPNName: ctr.VPN(),
87+
config.AuthenticationPropertyScheme: config.AuthenticationSchemeBasic,
88+
config.AuthenticationPropertySchemeBasicUserName: ctr.Username(),
89+
config.AuthenticationPropertySchemeBasicPassword: ctr.Password(),
90+
config.TransportLayerPropertyReconnectionAttempts: 0,
91+
}
92+
93+
// Build messaging service
94+
messagingService, err := messaging.NewMessagingServiceBuilder().
95+
FromConfigurationProvider(brokerConfig).
96+
Build()
97+
if err != nil {
98+
return fmt.Errorf("failed to build messaging service: %w", err)
99+
}
100+
101+
// Connect to the messaging service
102+
if err := messagingService.Connect(); err != nil {
103+
return fmt.Errorf("failed to connect to messaging service: %w", err)
104+
}
105+
defer func() {
106+
if err := messagingService.Disconnect(); err != nil {
107+
log.Printf("Error disconnecting from messaging service: %v", err)
108+
}
109+
}()
110+
111+
// Test message publishing
112+
err = publishTestMessage(messagingService, topicName)
113+
if err != nil {
114+
return fmt.Errorf("failed to publish message: %w", err)
115+
}
116+
117+
// Test message consumption from queue
118+
err = consumeTestMessage(messagingService, queueName)
119+
if err != nil {
120+
return fmt.Errorf("failed to consume message: %w", err)
121+
}
122+
123+
return nil
124+
}
125+
126+
func publishTestMessage(messagingService solace.MessagingService, topicName string) error {
127+
// Build a direct message publisher
128+
directPublisher, err := messagingService.CreateDirectMessagePublisherBuilder().Build()
129+
if err != nil {
130+
return fmt.Errorf("failed to build publisher: %w", err)
131+
}
132+
133+
// Start the publisher
134+
if err := directPublisher.Start(); err != nil {
135+
return fmt.Errorf("failed to start publisher: %w", err)
136+
}
137+
defer func() {
138+
if err := directPublisher.Terminate(1 * time.Second); err != nil {
139+
log.Printf("Error terminating direct publisher: %v", err)
140+
}
141+
}()
142+
143+
// Create a message
144+
messageBuilder := messagingService.MessageBuilder()
145+
message, err := messageBuilder.
146+
WithProperty("custom-property", "test-value").
147+
BuildWithStringPayload("Hello from Solace testcontainers!")
148+
if err != nil {
149+
return fmt.Errorf("failed to build message: %w", err)
150+
}
151+
152+
// Create topic resource
153+
topic := resource.TopicOf(topicName)
154+
155+
// Publish the message
156+
if err := directPublisher.Publish(message, topic); err != nil {
157+
return fmt.Errorf("failed to publish message: %w", err)
158+
}
159+
160+
fmt.Printf("Published message to topic: %s\n", topicName)
161+
return nil
162+
}
163+
164+
func consumeTestMessage(messagingService solace.MessagingService, queueName string) error {
165+
// Build a persistent message receiver
166+
persistentReceiver, err := messagingService.CreatePersistentMessageReceiverBuilder().
167+
Build(resource.QueueDurableExclusive(queueName))
168+
if err != nil {
169+
return fmt.Errorf("failed to build receiver: %w", err)
170+
}
171+
172+
// Set up message handler
173+
messageReceived := make(chan message.InboundMessage, 1)
174+
errorChan := make(chan error, 1)
175+
176+
messageHandler := func(msg message.InboundMessage) {
177+
payload, ok := msg.GetPayloadAsString()
178+
if ok {
179+
fmt.Printf("Received message: %s\n", payload)
180+
}
181+
messageReceived <- msg
182+
}
183+
184+
// Start the receiver
185+
if err := persistentReceiver.Start(); err != nil {
186+
return fmt.Errorf("failed to start receiver: %w", err)
187+
}
188+
defer func() {
189+
if err := persistentReceiver.Terminate(1 * time.Second); err != nil {
190+
log.Printf("Error terminating persistent receiver: %v", err)
191+
}
192+
}()
193+
194+
// Receive messages asynchronously
195+
if err := persistentReceiver.ReceiveAsync(messageHandler); err != nil {
196+
return fmt.Errorf("failed to start async receive: %w", err)
197+
}
198+
199+
// Wait for message with timeout
200+
select {
201+
case <-messageReceived:
202+
fmt.Printf("Successfully received message from queue: %s\n", queueName)
203+
// For persistent messages, acknowledgment is typically handled automatically
204+
// or through the receiver's configuration
205+
return nil
206+
case err := <-errorChan:
207+
return fmt.Errorf("error receiving message: %w", err)
208+
case <-time.After(15 * time.Second):
209+
return fmt.Errorf("timeout waiting for message from queue: %s", queueName)
210+
}
211+
}

0 commit comments

Comments
 (0)