Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
# RabbitMQ AMQP 1.0 .Golang Client

This library is in early stages of development. It is meant to be used with RabbitMQ 4.0.
This library is meant to be used with RabbitMQ 4.0.
Suitable for testing in pre-production environments.

## How to Run

- Start the broker with `./.ci/ubuntu/gha-setup.sh start`. Note that this has been tested on Ubuntu 22 with docker.
- `make test` to run the tests
- Stop RabbitMQ with `./.ci/ubuntu/gha-setup.sh stop`

## Getting Started

You can find an example in: `docs/examples/getting_started`
- [Getting_started](docs/examples/getting_started)
- [Examples](docs/examples)

## Examples
## Build from source

You can find more examples in: `docs/examples`
- Start the broker with `./.ci/ubuntu/gha-setup.sh start`. Note that this has been tested on Ubuntu 22 with docker.
- `make test` to run the tests
- Stop RabbitMQ with `./.ci/ubuntu/gha-setup.sh stop`


8 changes: 8 additions & 0 deletions docs/examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
### AMQP 1.0 .Golang Client Examples


- [Getting Started](getting_started) - A simple example to get you started.
- [Reliable](reliable) - An example of how to deal with reconnections and error handling.
- [Streams](streams) - An example of how to use [RabbitMQ Streams](https://www.rabbitmq.com/docs/streams) with AMQP 1.0
- [Stream Filtering](streams_filtering) - An example of how to use streams [Filter Expressions](https://www.rabbitmq.com/blog/2024/12/13/amqp-filter-expressions)
- [Publisher per message target](publisher_msg_targets) - An example of how to use a single publisher to send messages in different queues with the address to the message target in the message properties.
34 changes: 21 additions & 13 deletions docs/examples/getting_started/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
)

func main() {
exchangeName := "getting-started-exchange"
queueName := "getting-started-queue"
exchangeName := "getting-started-go-exchange"
queueName := "getting-started-go-queue"
routingKey := "routing-key"

rabbitmq_amqp.Info("Getting started with AMQP Go AMQP 1.0 Client")
Expand All @@ -24,16 +24,22 @@ func main() {
}
}(stateChanged)

// Open a connection to the AMQP 1.0 server
amqpConnection, err := rabbitmq_amqp.Dial(context.Background(), []string{"amqp://"}, nil)
// rabbitmq_amqp.NewEnvironment setups the environment.
// The environment is used to create connections
// given the same parameters
env := rabbitmq_amqp.NewEnvironment([]string{"amqp://"}, nil)

// Open a connection to the AMQP 1.0 server ( RabbitMQ >= 4.0)
amqpConnection, err := env.NewConnection(context.Background())
if err != nil {
rabbitmq_amqp.Error("Error opening connection", err)
return
}
// Register the channel to receive status change notifications
// this is valid for the connection lifecycle
amqpConnection.NotifyStatusChange(stateChanged)

fmt.Printf("AMQP connection opened.\n")
rabbitmq_amqp.Info("AMQP connection opened.\n")
// Create the management interface for the connection
// so we can declare exchanges, queues, and bindings
management := amqpConnection.Management()
Expand Down Expand Up @@ -165,37 +171,39 @@ func main() {
err = management.Unbind(context.TODO(), bindingPath)

if err != nil {
fmt.Printf("Error unbinding: %v\n", err)
rabbitmq_amqp.Error("Error unbinding: %v\n", err)
return
}

err = management.DeleteExchange(context.TODO(), exchangeInfo.Name())
if err != nil {
fmt.Printf("Error deleting exchange: %v\n", err)
rabbitmq_amqp.Error("Error deleting exchange: %v\n", err)
return
}

// Purge the queue
purged, err := management.PurgeQueue(context.TODO(), queueInfo.Name())
if err != nil {
fmt.Printf("Error purging queue: %v\n", err)
rabbitmq_amqp.Error("Error purging queue: %v\n", err)
return
}
fmt.Printf("Purged %d messages from the queue.\n", purged)
rabbitmq_amqp.Info("Purged %d messages from the queue.\n", purged)

err = management.DeleteQueue(context.TODO(), queueInfo.Name())
if err != nil {
fmt.Printf("Error deleting queue: %v\n", err)
rabbitmq_amqp.Error("Error deleting queue: %v\n", err)
return
}

err = amqpConnection.Close(context.Background())
// Close all the connections. but you can still use the environment
// to create new connections
err = env.CloseConnections(context.Background())
if err != nil {
fmt.Printf("Error closing connection: %v\n", err)
rabbitmq_amqp.Error("Error closing connection: %v\n", err)
return
}

fmt.Printf("AMQP connection closed.\n")
rabbitmq_amqp.Info("AMQP connection closed.\n")
// not necessary. It waits for the status change to be printed
time.Sleep(100 * time.Millisecond)
close(stateChanged)
Expand Down
71 changes: 71 additions & 0 deletions docs/examples/publisher_msg_targets/publisher_msg_targets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package main

import (
"context"
"github.com/Azure/go-amqp"
"github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmq_amqp"
)

func checkError(err error) {
if err != nil {
rabbitmq_amqp.Error("Error", err)
// it should not happen for the example
// so panic just to make sure we catch it
panic(err)
}
}
func main() {

rabbitmq_amqp.Info("Define the publisher message targets")

env := rabbitmq_amqp.NewEnvironment([]string{"amqp://"}, nil)
amqpConnection, err := env.NewConnection(context.Background())
checkError(err)
queues := []string{"queue1", "queue2", "queue3"}
management := amqpConnection.Management()
for _, queue := range queues {
_, err = management.DeclareQueue(context.TODO(), &rabbitmq_amqp.QuorumQueueSpecification{
Name: queue,
})
checkError(err)
}

// create a publisher without a target
publisher, err := amqpConnection.NewPublisher(context.TODO(), nil, "stream-publisher")
checkError(err)

// publish messages to the stream
for i := 0; i < 12; i++ {

// with this helper function we create a message with a target
// that is the same to create a message with:
// msg := amqp.NewMessage([]byte("hello"))
// MessageToAddressHelper(msg, &QueueAddress{Queue: qName})
// same like:
// msg := amqp.NewMessage([]byte("hello"))
// msg.Properties = &amqp.MessageProperties{}
// msg.Properties.To = &address
// NewMessageToAddress and MessageToAddressHelper helpers are provided to make the
// code more readable and easier to use
msg, err := rabbitmq_amqp.NewMessageToAddress([]byte("Hello World"),
&rabbitmq_amqp.QueueAddress{Queue: queues[i%3]})
checkError(err)
publishResult, err := publisher.Publish(context.Background(), msg)
checkError(err)
switch publishResult.Outcome.(type) {
case *amqp.StateAccepted:
rabbitmq_amqp.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0])
break
default:
rabbitmq_amqp.Warn("[Publisher]", "Message not accepted", publishResult.Message.Data[0])
}
}

// check the UI, you should see 4 messages in each queue

// Close the publisher
err = publisher.Close(context.Background())
checkError(err)
err = env.CloseConnections(context.Background())
checkError(err)
}
98 changes: 98 additions & 0 deletions docs/examples/streams/streams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package main

import (
"context"
"github.com/Azure/go-amqp"
"github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmq_amqp"
"time"
)

func checkError(err error) {
if err != nil {
rabbitmq_amqp.Error("Error", err)
// it should not happen for the example
// so panic just to make sure we catch it
panic(err)
}
}

func main() {

rabbitmq_amqp.Info("Golang AMQP 1.0 Streams example")
queueStream := "stream-go-queue-" + time.Now().String()
env := rabbitmq_amqp.NewEnvironment([]string{"amqp://"}, nil)
amqpConnection, err := env.NewConnection(context.Background())
checkError(err)
management := amqpConnection.Management()
// define a stream queue
_, err = management.DeclareQueue(context.TODO(), &rabbitmq_amqp.StreamQueueSpecification{
Name: queueStream,
// it is a best practice to set the max length of the stream
// to avoid the stream to grow indefinitely
// the value here is low just for the example
MaxLengthBytes: rabbitmq_amqp.CapacityGB(5),
})
checkError(err)

// create a stream publisher. In this case we use the QueueAddress to make the example
// simple. So we use the default exchange here.
publisher, err := amqpConnection.NewPublisher(context.TODO(), &rabbitmq_amqp.QueueAddress{Queue: queueStream}, "stream-publisher")
checkError(err)

// publish messages to the stream
for i := 0; i < 10; i++ {
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello World")))
checkError(err)

// check the outcome of the publishResult
switch publishResult.Outcome.(type) {
case *amqp.StateAccepted:
rabbitmq_amqp.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0])
break
case *amqp.StateReleased:
rabbitmq_amqp.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0])
break
case *amqp.StateRejected:
rabbitmq_amqp.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0])
stateType := publishResult.Outcome.(*amqp.StateRejected)
if stateType.Error != nil {
rabbitmq_amqp.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error)
}
break
default:
// these status are not supported. Leave it for AMQP 1.0 compatibility
// see: https://www.rabbitmq.com/docs/next/amqp#outcomes
rabbitmq_amqp.Warn("Message state: %v", publishResult.Outcome)
}

}
// create a stream consumer
consumer, err := amqpConnection.NewConsumer(context.Background(), queueStream, &rabbitmq_amqp.StreamConsumerOptions{
// the offset is set to the first chunk of the stream
// so here it starts from the beginning
Offset: &rabbitmq_amqp.OffsetFirst{},
})
checkError(err)

// receive messages from the stream
for i := 0; i < 10; i++ {
deliveryContext, err := consumer.Receive(context.Background())
checkError(err)
rabbitmq_amqp.Info("[Consumer]", "Message received", deliveryContext.Message().Data[0])
// accept the message
err = deliveryContext.Accept(context.Background())
checkError(err)
}

// close the consumer
err = consumer.Close(context.Background())
checkError(err)

err = amqpConnection.Management().DeleteQueue(context.Background(), queueStream)
checkError(err)

err = env.CloseConnections(context.Background())
checkError(err)

rabbitmq_amqp.Info("Example completed")
}
Loading