Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ format:
go fmt ./...

vet:
go vet ./pkg/rabbitmq_amqp
go vet ./pkg/rabbitmqamqp

test: format vet
cd ./pkg/rabbitmq_amqp && go run -mod=mod github.com/onsi/ginkgo/v2/ginkgo \
cd ./pkg/rabbitmqamqp && go run -mod=mod github.com/onsi/ginkgo/v2/ginkgo \
--randomize-all --randomize-suites \
--cover --coverprofile=coverage.txt --covermode=atomic \
--race
Expand Down
14 changes: 12 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
# RabbitMQ AMQP 1.0 .Golang Client
# RabbitMQ AMQP 1.0 Golang Client

This library is meant to be used with RabbitMQ 4.0.
Suitable for testing in pre-production environments.


## Getting Started

- [Getting_started](docs/examples/getting_started)
- [Getting Started](docs/examples/getting_started)
- [Examples](docs/examples)


# Packages

The rabbitmq amqp client is a wrapper around the azure amqp client.</b>
You need the following packages to use the rabbitmq amqp client:

- `rabbitmqamqp` - The main package for the rabbitmq amqp client.
- `amqp` - The azure amqp client (You may not need to use this package directly).


## Build from source

- Start the broker with `./.ci/ubuntu/gha-setup.sh start`. Note that this has been tested on Ubuntu 22 with docker.
Expand Down
85 changes: 42 additions & 43 deletions docs/examples/getting_started/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/Azure/go-amqp"
"github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmq_amqp"
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
"time"
)

Expand All @@ -14,62 +13,62 @@ func main() {
queueName := "getting-started-go-queue"
routingKey := "routing-key"

rabbitmq_amqp.Info("Getting started with AMQP Go AMQP 1.0 Client")
rmq.Info("Getting started with AMQP Go AMQP 1.0 Client")

/// Create a channel to receive state change notifications
stateChanged := make(chan *rabbitmq_amqp.StateChanged, 1)
go func(ch chan *rabbitmq_amqp.StateChanged) {
stateChanged := make(chan *rmq.StateChanged, 1)
go func(ch chan *rmq.StateChanged) {
for statusChanged := range ch {
rabbitmq_amqp.Info("[connection]", "Status changed", statusChanged)
rmq.Info("[connection]", "Status changed", statusChanged)
}
}(stateChanged)

// rabbitmq_amqp.NewEnvironment setups the environment.
// rmq.NewEnvironment setups the environment.
// The environment is used to create connections
// given the same parameters
env := rabbitmq_amqp.NewEnvironment([]string{"amqp://"}, nil)
env := rmq.NewEnvironment([]string{"amqp://"}, nil)

// Open a connection to the AMQP 1.0 server ( RabbitMQ >= 4.0)
amqpConnection, err := env.NewConnection(context.Background())
if err != nil {
rabbitmq_amqp.Error("Error opening connection", err)
rmq.Error("Error opening connection", err)
return
}
// Register the channel to receive status change notifications
// this is valid for the connection lifecycle
amqpConnection.NotifyStatusChange(stateChanged)

rabbitmq_amqp.Info("AMQP connection opened.\n")
rmq.Info("AMQP connection opened.\n")
// Create the management interface for the connection
// so we can declare exchanges, queues, and bindings
management := amqpConnection.Management()
exchangeInfo, err := management.DeclareExchange(context.TODO(), &rabbitmq_amqp.TopicExchangeSpecification{
exchangeInfo, err := management.DeclareExchange(context.TODO(), &rmq.TopicExchangeSpecification{
Name: exchangeName,
})
if err != nil {
rabbitmq_amqp.Error("Error declaring exchange", err)
rmq.Error("Error declaring exchange", err)
return
}

// Declare a Quorum queue
queueInfo, err := management.DeclareQueue(context.TODO(), &rabbitmq_amqp.QuorumQueueSpecification{
queueInfo, err := management.DeclareQueue(context.TODO(), &rmq.QuorumQueueSpecification{
Name: queueName,
})

if err != nil {
rabbitmq_amqp.Error("Error declaring queue", err)
rmq.Error("Error declaring queue", err)
return
}

// Bind the queue to the exchange
bindingPath, err := management.Bind(context.TODO(), &rabbitmq_amqp.ExchangeToQueueBindingSpecification{
bindingPath, err := management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{
SourceExchange: exchangeName,
DestinationQueue: queueName,
BindingKey: routingKey,
})

if err != nil {
rabbitmq_amqp.Error("Error binding", err)
rmq.Error("Error binding", err)
return
}

Expand All @@ -78,7 +77,7 @@ func main() {

consumer, err := amqpConnection.NewConsumer(context.Background(), queueName, nil)
if err != nil {
rabbitmq_amqp.Error("Error creating consumer", err)
rmq.Error("Error creating consumer", err)
return
}

Expand All @@ -90,61 +89,61 @@ func main() {
deliveryContext, err := consumer.Receive(ctx)
if errors.Is(err, context.Canceled) {
// The consumer was closed correctly
rabbitmq_amqp.Info("[NewConsumer]", "consumer closed. Context", err)
rmq.Info("[NewConsumer]", "consumer closed. Context", err)
return
}
if err != nil {
// An error occurred receiving the message
rabbitmq_amqp.Error("[NewConsumer]", "Error receiving message", err)
rmq.Error("[NewConsumer]", "Error receiving message", err)
return
}

rabbitmq_amqp.Info("[NewConsumer]", "Received message",
rmq.Info("[NewConsumer]", "Received message",
fmt.Sprintf("%s", deliveryContext.Message().Data))

err = deliveryContext.Accept(context.Background())
if err != nil {
rabbitmq_amqp.Error("Error accepting message", err)
rmq.Error("Error accepting message", err)
return
}
}
}(consumerContext)

publisher, err := amqpConnection.NewPublisher(context.Background(), &rabbitmq_amqp.ExchangeAddress{
publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.ExchangeAddress{
Exchange: exchangeName,
Key: routingKey,
}, "getting-started-publisher")
if err != nil {
rabbitmq_amqp.Error("Error creating publisher", err)
rmq.Error("Error creating publisher", err)
return
}

for i := 0; i < 100; i++ {
// Publish a message to the exchange
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i))))
publishResult, err := publisher.Publish(context.Background(), rmq.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i))))
if err != nil {
rabbitmq_amqp.Error("Error publishing message", "error", err)
rmq.Error("Error publishing message", "error", err)
time.Sleep(1 * time.Second)
continue
}
switch publishResult.Outcome.(type) {
case *amqp.StateAccepted:
rabbitmq_amqp.Info("[NewPublisher]", "Message accepted", publishResult.Message.Data[0])
case *rmq.StateAccepted:
rmq.Info("[NewPublisher]", "Message accepted", publishResult.Message.Data[0])
break
case *amqp.StateReleased:
rabbitmq_amqp.Warn("[NewPublisher]", "Message was not routed", publishResult.Message.Data[0])
case *rmq.StateReleased:
rmq.Warn("[NewPublisher]", "Message was not routed", publishResult.Message.Data[0])
break
case *amqp.StateRejected:
rabbitmq_amqp.Warn("[NewPublisher]", "Message rejected", publishResult.Message.Data[0])
stateType := publishResult.Outcome.(*amqp.StateRejected)
case *rmq.StateRejected:
rmq.Warn("[NewPublisher]", "Message rejected", publishResult.Message.Data[0])
stateType := publishResult.Outcome.(*rmq.StateRejected)
if stateType.Error != nil {
rabbitmq_amqp.Warn("[NewPublisher]", "Message rejected with error: %v", stateType.Error)
rmq.Warn("[NewPublisher]", "Message rejected with error: %v", stateType.Error)
}
break
default:
// these status are not supported. Leave it for AMQP 1.0 compatibility
// see: https://www.rabbitmq.com/docs/next/amqp#outcomes
rabbitmq_amqp.Warn("Message state: %v", publishResult.Outcome)
rmq.Warn("Message state: %v", publishResult.Outcome)
}
}

Expand All @@ -157,53 +156,53 @@ func main() {
//Close the consumer
err = consumer.Close(context.Background())
if err != nil {
rabbitmq_amqp.Error("[NewConsumer]", err)
rmq.Error("[NewConsumer]", err)
return
}
// Close the publisher
err = publisher.Close(context.Background())
if err != nil {
rabbitmq_amqp.Error("[NewPublisher]", err)
rmq.Error("[NewPublisher]", err)
return
}

// Unbind the queue from the exchange
err = management.Unbind(context.TODO(), bindingPath)

if err != nil {
rabbitmq_amqp.Error("Error unbinding: %v\n", err)
rmq.Error("Error unbinding: %v\n", err)
return
}

err = management.DeleteExchange(context.TODO(), exchangeInfo.Name())
if err != nil {
rabbitmq_amqp.Error("Error deleting exchange: %v\n", err)
rmq.Error("Error deleting exchange: %v\n", err)
return
}

// Purge the queue
purged, err := management.PurgeQueue(context.TODO(), queueInfo.Name())
if err != nil {
rabbitmq_amqp.Error("Error purging queue: %v\n", err)
rmq.Error("Error purging queue: %v\n", err)
return
}
rabbitmq_amqp.Info("Purged %d messages from the queue.\n", purged)
rmq.Info("Purged %d messages from the queue.\n", purged)

err = management.DeleteQueue(context.TODO(), queueInfo.Name())
if err != nil {
rabbitmq_amqp.Error("Error deleting queue: %v\n", err)
rmq.Error("Error deleting queue: %v\n", err)
return
}

// Close all the connections. but you can still use the environment
// to create new connections
err = env.CloseConnections(context.Background())
if err != nil {
rabbitmq_amqp.Error("Error closing connection: %v\n", err)
rmq.Error("Error closing connection: %v\n", err)
return
}

rabbitmq_amqp.Info("AMQP connection closed.\n")
rmq.Info("AMQP connection closed.\n")
// not necessary. It waits for the status change to be printed
time.Sleep(100 * time.Millisecond)
close(stateChanged)
Expand Down
22 changes: 11 additions & 11 deletions docs/examples/publisher_msg_targets/publisher_msg_targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,28 @@ package main
import (
"context"
"github.com/Azure/go-amqp"
"github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmq_amqp"
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
)

func checkError(err error) {
if err != nil {
rabbitmq_amqp.Error("Error", err)
rmq.Error("Error", err)
// it should not happen for the example
// so panic just to make sure we catch it
panic(err)
}
}
func main() {

rabbitmq_amqp.Info("Define the publisher message targets")
rmq.Info("Define the publisher message targets")

env := rabbitmq_amqp.NewEnvironment([]string{"amqp://"}, nil)
env := rmq.NewEnvironment([]string{"amqp://"}, nil)
amqpConnection, err := env.NewConnection(context.Background())
checkError(err)
queues := []string{"queue1", "queue2", "queue3"}
management := amqpConnection.Management()
for _, queue := range queues {
_, err = management.DeclareQueue(context.TODO(), &rabbitmq_amqp.QuorumQueueSpecification{
_, err = management.DeclareQueue(context.TODO(), &rmq.QuorumQueueSpecification{
Name: queue,
})
checkError(err)
Expand All @@ -40,24 +40,24 @@ func main() {
// with this helper function we create a message with a target
// that is the same to create a message with:
// msg := amqp.NewMessage([]byte("hello"))
// MessageToAddressHelper(msg, &QueueAddress{Queue: qName})
// MessagePropertyToAddress(msg, &QueueAddress{Queue: qName})
// same like:
// msg := amqp.NewMessage([]byte("hello"))
// msg.Properties = &amqp.MessageProperties{}
// msg.Properties.To = &address
// NewMessageToAddress and MessageToAddressHelper helpers are provided to make the
// NewMessageWithAddress and MessagePropertyToAddress helpers are provided to make the
// code more readable and easier to use
msg, err := rabbitmq_amqp.NewMessageToAddress([]byte("Hello World"),
&rabbitmq_amqp.QueueAddress{Queue: queues[i%3]})
msg, err := rmq.NewMessageWithAddress([]byte("Hello World"),
&rmq.QueueAddress{Queue: queues[i%3]})
checkError(err)
publishResult, err := publisher.Publish(context.Background(), msg)
checkError(err)
switch publishResult.Outcome.(type) {
case *amqp.StateAccepted:
rabbitmq_amqp.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0])
rmq.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0])
break
default:
rabbitmq_amqp.Warn("[Publisher]", "Message not accepted", publishResult.Message.Data[0])
rmq.Warn("[Publisher]", "Message not accepted", publishResult.Message.Data[0])
}
}

Expand Down
Loading