Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
all: format vet test
all: test

format:
go fmt ./...

vet:
go vet ./rabbitmq_amqp
go vet ./pkg/rabbitmq_amqp

test:
cd rabbitmq_amqp && go run -mod=mod github.com/onsi/ginkgo/v2/ginkgo \
test: format vet
cd ./pkg/rabbitmq_amqp && go run -mod=mod github.com/onsi/ginkgo/v2/ginkgo \
--randomize-all --randomize-suites \
--cover --coverprofile=coverage.txt --covermode=atomic \
--race
Expand Down
34 changes: 17 additions & 17 deletions docs/examples/getting_started/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"errors"
"fmt"
"github.com/Azure/go-amqp"
"github.com/rabbitmq/rabbitmq-amqp-go-client/rabbitmq_amqp"
"github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmq_amqp"
"time"
)

Expand All @@ -20,7 +20,7 @@ func main() {
stateChanged := make(chan *rabbitmq_amqp.StateChanged, 1)
go func(ch chan *rabbitmq_amqp.StateChanged) {
for statusChanged := range ch {
rabbitmq_amqp.Info("[Connection]", "Status changed", statusChanged)
rabbitmq_amqp.Info("[connection]", "Status changed", statusChanged)
}
}(stateChanged)

Expand All @@ -33,7 +33,7 @@ func main() {
// Register the channel to receive status change notifications
amqpConnection.NotifyStatusChange(stateChanged)

fmt.Printf("AMQP Connection opened.\n")
fmt.Printf("AMQP connection opened.\n")
// Create the management interface for the connection
// so we can declare exchanges, queues, and bindings
management := amqpConnection.Management()
Expand Down Expand Up @@ -86,16 +86,16 @@ func main() {
deliveryContext, err := consumer.Receive(ctx)
if errors.Is(err, context.Canceled) {
// The consumer was closed correctly
rabbitmq_amqp.Info("[Consumer]", "consumer closed. Context", err)
rabbitmq_amqp.Info("[NewConsumer]", "consumer closed. Context", err)
return
}
if err != nil {
// An error occurred receiving the message
rabbitmq_amqp.Error("[Consumer]", "Error receiving message", err)
rabbitmq_amqp.Error("[NewConsumer]", "Error receiving message", err)
return
}

rabbitmq_amqp.Info("[Consumer]", "Received message",
rabbitmq_amqp.Info("[NewConsumer]", "Received message",
fmt.Sprintf("%s", deliveryContext.Message().Data))

err = deliveryContext.Accept(context.Background())
Expand All @@ -115,26 +115,26 @@ func main() {
return
}

for i := 0; i < 10; i++ {

for i := 0; i < 1_000; i++ {
// Publish a message to the exchange
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i))))
if err != nil {
rabbitmq_amqp.Error("Error publishing message", err)
return
rabbitmq_amqp.Error("Error publishing message", "error", err)
time.Sleep(1 * time.Second)
continue
}
switch publishResult.Outcome.(type) {
case *amqp.StateAccepted:
rabbitmq_amqp.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0])
rabbitmq_amqp.Info("[NewPublisher]", "Message accepted", publishResult.Message.Data[0])
break
case *amqp.StateReleased:
rabbitmq_amqp.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0])
rabbitmq_amqp.Warn("[NewPublisher]", "Message was not routed", publishResult.Message.Data[0])
break
case *amqp.StateRejected:
rabbitmq_amqp.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0])
rabbitmq_amqp.Warn("[NewPublisher]", "Message rejected", publishResult.Message.Data[0])
stateType := publishResult.Outcome.(*amqp.StateRejected)
if stateType.Error != nil {
rabbitmq_amqp.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error)
rabbitmq_amqp.Warn("[NewPublisher]", "Message rejected with error: %v", stateType.Error)
}
break
default:
Expand All @@ -153,13 +153,13 @@ func main() {
//Close the consumer
err = consumer.Close(context.Background())
if err != nil {
rabbitmq_amqp.Error("[Consumer]", err)
rabbitmq_amqp.Error("[NewConsumer]", err)
return
}
// Close the publisher
err = publisher.Close(context.Background())
if err != nil {
rabbitmq_amqp.Error("[Publisher]", err)
rabbitmq_amqp.Error("[NewPublisher]", err)
return
}

Expand Down Expand Up @@ -197,7 +197,7 @@ func main() {
return
}

fmt.Printf("AMQP Connection closed.\n")
fmt.Printf("AMQP connection closed.\n")
// not necessary. It waits for the status change to be printed
time.Sleep(100 * time.Millisecond)
close(stateChanged)
Expand Down
213 changes: 213 additions & 0 deletions docs/examples/reliable/reliable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package main

import (
"context"
"errors"
"fmt"
"github.com/Azure/go-amqp"
"github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmq_amqp"
"sync"
"sync/atomic"
"time"
)

func main() {
queueName := "reliable-amqp10-go-queue"
var stateAccepted int32
var stateReleased int32
var stateRejected int32

var received int32
var failed int32

startTime := time.Now()
go func() {
for {
time.Sleep(5 * time.Second)
total := stateAccepted + stateReleased + stateRejected
messagesPerSecond := float64(total) / time.Since(startTime).Seconds()
rabbitmq_amqp.Info("[Stats]", "sent", total, "received", received, "failed", failed, "messagesPerSecond", messagesPerSecond)

}
}()

rabbitmq_amqp.Info("How to deal with network disconnections")
signalBlock := sync.Cond{L: &sync.Mutex{}}
/// Create a channel to receive state change notifications
stateChanged := make(chan *rabbitmq_amqp.StateChanged, 1)
go func(ch chan *rabbitmq_amqp.StateChanged) {
for statusChanged := range ch {
rabbitmq_amqp.Info("[connection]", "Status changed", statusChanged)
switch statusChanged.To.(type) {
case *rabbitmq_amqp.StateOpen:
signalBlock.Broadcast()
}
}
}(stateChanged)

// Open a connection to the AMQP 1.0 server
amqpConnection, err := rabbitmq_amqp.Dial(context.Background(), []string{"amqp://"}, &rabbitmq_amqp.AmqpConnOptions{
SASLType: amqp.SASLTypeAnonymous(),
ContainerID: "reliable-amqp10-go",
RecoveryConfiguration: &rabbitmq_amqp.RecoveryConfiguration{
ActiveRecovery: true,
BackOffReconnectInterval: 2 * time.Second, // we reduce the reconnect interval to speed up the test. The default is 5 seconds
// In production, you should avoid BackOffReconnectInterval with low values since it can cause a high number of reconnection attempts
MaxReconnectAttempts: 5,
},
})
if err != nil {
rabbitmq_amqp.Error("Error opening connection", err)
return
}
// Register the channel to receive status change notifications
amqpConnection.NotifyStatusChange(stateChanged)

fmt.Printf("AMQP connection opened.\n")
// Create the management interface for the connection
// so we can declare exchanges, queues, and bindings
management := amqpConnection.Management()

// Declare a Quorum queue
queueInfo, err := management.DeclareQueue(context.TODO(), &rabbitmq_amqp.QuorumQueueSpecification{
Name: queueName,
})
if err != nil {
rabbitmq_amqp.Error("Error declaring queue", err)
return
}

consumer, err := amqpConnection.NewConsumer(context.Background(), &rabbitmq_amqp.QueueAddress{
Queue: queueName,
}, "reliable-consumer")
if err != nil {
rabbitmq_amqp.Error("Error creating consumer", err)
return
}

consumerContext, cancel := context.WithCancel(context.Background())

// Consume messages from the queue
go func(ctx context.Context) {
for {
deliveryContext, err := consumer.Receive(ctx)
if errors.Is(err, context.Canceled) {
// The consumer was closed correctly
return
}
if err != nil {
// An error occurred receiving the message
// here the consumer could be disconnected from the server due to a network error
signalBlock.L.Lock()
rabbitmq_amqp.Info("[Consumer]", "Consumer is blocked, queue", queueName, "error", err)
signalBlock.Wait()
rabbitmq_amqp.Info("[Consumer]", "Consumer is unblocked, queue", queueName)

signalBlock.L.Unlock()
continue
}

atomic.AddInt32(&received, 1)
err = deliveryContext.Accept(context.Background())
if err != nil {
// same here the delivery could not be accepted due to a network error
// we wait for 2_500 ms and try again
time.Sleep(2500 * time.Millisecond)
continue
}
}
}(consumerContext)

publisher, err := amqpConnection.NewPublisher(context.Background(), &rabbitmq_amqp.QueueAddress{
Queue: queueName,
}, "reliable-publisher")
if err != nil {
rabbitmq_amqp.Error("Error creating publisher", err)
return
}

wg := &sync.WaitGroup{}
for i := 0; i < 1; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 500_000; i++ {
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i))))
if err != nil {
// here you need to deal with the error. You can store the message in a local in memory/persistent storage
// then retry to send the message as soon as the connection is reestablished

atomic.AddInt32(&failed, 1)
// block signalBlock until the connection is reestablished
signalBlock.L.Lock()
rabbitmq_amqp.Info("[Publisher]", "Publisher is blocked, queue", queueName, "error", err)
signalBlock.Wait()
rabbitmq_amqp.Info("[Publisher]", "Publisher is unblocked, queue", queueName)
signalBlock.L.Unlock()

} else {
switch publishResult.Outcome.(type) {
case *amqp.StateAccepted:
atomic.AddInt32(&stateAccepted, 1)
break
case *amqp.StateReleased:
atomic.AddInt32(&stateReleased, 1)
break
case *amqp.StateRejected:
atomic.AddInt32(&stateRejected, 1)
break
default:
// these status are not supported. Leave it for AMQP 1.0 compatibility
// see: https://www.rabbitmq.com/docs/next/amqp#outcomes
rabbitmq_amqp.Warn("Message state: %v", publishResult.Outcome)
}
}
}
}()
}
wg.Wait()

println("press any key to close the connection")

var input string
_, _ = fmt.Scanln(&input)

cancel()
//Close the consumer
err = consumer.Close(context.Background())
if err != nil {
rabbitmq_amqp.Error("[NewConsumer]", err)
return
}
// Close the publisher
err = publisher.Close(context.Background())
if err != nil {
rabbitmq_amqp.Error("[NewPublisher]", err)
return
}

// Purge the queue
purged, err := management.PurgeQueue(context.TODO(), queueInfo.Name())
if err != nil {
fmt.Printf("Error purging queue: %v\n", err)
return
}
fmt.Printf("Purged %d messages from the queue.\n", purged)

err = management.DeleteQueue(context.TODO(), queueInfo.Name())
if err != nil {
fmt.Printf("Error deleting queue: %v\n", err)
return
}

err = amqpConnection.Close(context.Background())
if err != nil {
fmt.Printf("Error closing connection: %v\n", err)
return
}

fmt.Printf("AMQP connection closed.\n")
// not necessary. It waits for the status change to be printed
time.Sleep(100 * time.Millisecond)
close(stateChanged)
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
Loading