Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 74 additions & 25 deletions examples/getting_started/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"errors"
"fmt"
"github.com/Azure/go-amqp"
"github.com/rabbitmq/rabbitmq-amqp-go-client/rabbitmq_amqp"
Expand Down Expand Up @@ -67,48 +68,97 @@ func main() {
return
}

addr, err := rabbitmq_amqp.ExchangeAddress(&exchangeName, &routingKey)

publisher, err := amqpConnection.Publisher(context.Background(), addr, "getting-started-publisher")
// Create a consumer to receive messages from the queue
// you need to build the address of the queue, but you can use the helper function
addrQueue, _ := rabbitmq_amqp.QueueAddress(&queueName)
consumer, err := amqpConnection.Consumer(context.Background(), addrQueue, "getting-started-consumer")
if err != nil {
rabbitmq_amqp.Error("Error creating publisher", err)
rabbitmq_amqp.Error("Error creating consumer", err)
return
}

// Publish a message to the exchange
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!")))
consumerContext, cancel := context.WithCancel(context.Background())

// Consume messages from the queue
go func(ctx context.Context) {
for {
deliveryContext, err := consumer.Receive(ctx)
if errors.Is(err, context.Canceled) {
// The consumer was closed correctly
rabbitmq_amqp.Info("[Consumer]", "consumer closed. Context", err)
return
}
if err != nil {
// An error occurred receiving the message
rabbitmq_amqp.Error("[Consumer]", "Error receiving message", err)
return
}

rabbitmq_amqp.Info("[Consumer]", "Received message",
fmt.Sprintf("%s", deliveryContext.Message().Data))

err = deliveryContext.Accept(context.Background())
if err != nil {
rabbitmq_amqp.Error("Error accepting message", err)
return
}
}
}(consumerContext)

addr, _ := rabbitmq_amqp.ExchangeAddress(&exchangeName, &routingKey)
publisher, err := amqpConnection.Publisher(context.Background(), addr, "getting-started-publisher")
if err != nil {
rabbitmq_amqp.Error("Error publishing message", err)
rabbitmq_amqp.Error("Error creating publisher", err)
return
}
switch publishResult.Outcome {
case &amqp.StateAccepted{}:
rabbitmq_amqp.Info("Message accepted")
case &amqp.StateReleased{}:
rabbitmq_amqp.Warn("Message was not routed")
case &amqp.StateRejected{}:
rabbitmq_amqp.Warn("Message rejected")
stateType := publishResult.Outcome.(*amqp.StateRejected)
if stateType.Error != nil {
rabbitmq_amqp.Warn("Message rejected with error: %v", stateType.Error)

for i := 0; i < 10; i++ {

// Publish a message to the exchange
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i))))
if err != nil {
rabbitmq_amqp.Error("Error publishing message", err)
return
}
switch publishResult.Outcome.(type) {
case *amqp.StateAccepted:
rabbitmq_amqp.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0])
break
case *amqp.StateReleased:
rabbitmq_amqp.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0])
break
case *amqp.StateRejected:
rabbitmq_amqp.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0])
stateType := publishResult.Outcome.(*amqp.StateRejected)
if stateType.Error != nil {
rabbitmq_amqp.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error)
}
break
default:
// these status are not supported. Leave it for AMQP 1.0 compatibility
// see: https://www.rabbitmq.com/docs/next/amqp#outcomes
rabbitmq_amqp.Warn("Message state: %v", publishResult.Outcome)
}
default:
// these status are not supported
rabbitmq_amqp.Warn("Message state: %v", publishResult.Outcome)
}

println("press any key to close the connection")

var input string
_, _ = fmt.Scanln(&input)

cancel()
//Close the consumer
err = consumer.Close(context.Background())
if err != nil {
rabbitmq_amqp.Error("[Consumer]", err)
}
// Close the publisher
err = publisher.Close(context.Background())
if err != nil {
return
}
// Unbind the queue from the exchange

// Unbind the queue from the exchange
err = management.Unbind(context.TODO(), bindingPath)

if err != nil {
Expand Down Expand Up @@ -143,8 +193,7 @@ func main() {
}

fmt.Printf("AMQP Connection closed.\n")
// Wait for the status change to be printed
time.Sleep(500 * time.Millisecond)

close(stateChangeds)
// not necessary. It waits for the status change to be printed
time.Sleep(100 * time.Millisecond)
close(stateChanged)
}
13 changes: 12 additions & 1 deletion rabbitmq_amqp/amqp_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,24 @@ func (a *AmqpConnection) Publisher(ctx context.Context, destinationAdd string, l
if !validateAddress(destinationAdd) {
return nil, fmt.Errorf("invalid destination address, the address should start with /%s/ or/%s/ ", exchanges, queues)
}

sender, err := a.session.NewSender(ctx, destinationAdd, createSenderLinkOptions(destinationAdd, linkName, AtLeastOnce))
if err != nil {
return nil, err
}
return newPublisher(sender), nil
}

func (a *AmqpConnection) Consumer(ctx context.Context, destinationAdd string, linkName string) (*Consumer, error) {
if !validateAddress(destinationAdd) {
return nil, fmt.Errorf("invalid destination address, the address should start with /%s/ or/%s/ ", exchanges, queues)
}
receiver, err := a.session.NewReceiver(ctx, destinationAdd, createReceiverLinkOptions(destinationAdd, linkName, AtLeastOnce))
if err != nil {
return nil, err
}
return newConsumer(receiver), nil
}

// Dial connect to the AMQP 1.0 server using the provided connectionSettings
// Returns a pointer to the new AmqpConnection if successful else an error.
// addresses is a list of addresses to connect to. It picks one randomly.
Expand Down Expand Up @@ -93,6 +103,7 @@ func (a *AmqpConnection) open(ctx context.Context, addr string, connOptions *amq
}
a.Connection = conn
a.session, err = a.Connection.NewSession(ctx, nil)

if err != nil {
return err
}
Expand Down
82 changes: 82 additions & 0 deletions rabbitmq_amqp/amqp_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package rabbitmq_amqp

import (
"context"
"github.com/Azure/go-amqp"
)

type DeliveryContext struct {
receiver *amqp.Receiver
message *amqp.Message
}

func (dc *DeliveryContext) Message() *amqp.Message {
return dc.message
}

func (dc *DeliveryContext) Accept(ctx context.Context) error {
return dc.receiver.AcceptMessage(ctx, dc.message)
}

func (dc *DeliveryContext) Discard(ctx context.Context, e *amqp.Error) error {
return dc.receiver.RejectMessage(ctx, dc.message, e)
}

func (dc *DeliveryContext) DiscardWithAnnotations(ctx context.Context, annotations amqp.Annotations) error {
if err := validateMessageAnnotations(annotations); err != nil {
return err
}
// copy the rabbitmq annotations to amqp annotations
destination := make(amqp.Annotations)
for key, value := range annotations {
destination[key] = value

}

return dc.receiver.ModifyMessage(ctx, dc.message, &amqp.ModifyMessageOptions{
DeliveryFailed: true,
UndeliverableHere: true,
Annotations: destination,
})
}

func (dc *DeliveryContext) Requeue(ctx context.Context) error {
return dc.receiver.ReleaseMessage(ctx, dc.message)
}

func (dc *DeliveryContext) RequeueWithAnnotations(ctx context.Context, annotations amqp.Annotations) error {
if err := validateMessageAnnotations(annotations); err != nil {
return err
}
// copy the rabbitmq annotations to amqp annotations
destination := make(amqp.Annotations)
for key, value := range annotations {
destination[key] = value

}
return dc.receiver.ModifyMessage(ctx, dc.message, &amqp.ModifyMessageOptions{
DeliveryFailed: false,
UndeliverableHere: false,
Annotations: destination,
})
}

type Consumer struct {
receiver *amqp.Receiver
}

func newConsumer(receiver *amqp.Receiver) *Consumer {
return &Consumer{receiver: receiver}
}

func (c *Consumer) Receive(ctx context.Context) (*DeliveryContext, error) {
msg, err := c.receiver.Receive(ctx, nil)
if err != nil {
return nil, err
}
return &DeliveryContext{receiver: c.receiver, message: msg}, nil
}

func (c *Consumer) Close(ctx context.Context) error {
return c.receiver.Close(ctx)
}
Loading
Loading