Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 31 additions & 17 deletions examples/getting_started/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,28 @@ import (
)

func main() {

exchangeName := "getting-started-exchange"
queueName := "getting-started-queue"
routingKey := "routing-key"

fmt.Printf("Getting started with AMQP Go AMQP 1.0 Client\n")
rabbitmq_amqp.Info("Getting started with AMQP Go AMQP 1.0 Client")

/// Create a channel to receive status change notifications
chStatusChanged := make(chan *rabbitmq_amqp.StatusChanged, 1)
go func(ch chan *rabbitmq_amqp.StatusChanged) {
/// Create a channel to receive state change notifications
stateChanged := make(chan *rabbitmq_amqp.StateChanged, 1)
go func(ch chan *rabbitmq_amqp.StateChanged) {
for statusChanged := range ch {
fmt.Printf("%s\n", statusChanged)
rabbitmq_amqp.Info("[Connection]", "Status changed", statusChanged)
}
}(chStatusChanged)
}(stateChanged)

// Open a connection to the AMQP 1.0 server
amqpConnection, err := rabbitmq_amqp.Dial(context.Background(), "amqp://", nil)
amqpConnection, err := rabbitmq_amqp.Dial(context.Background(), []string{"amqp://"}, nil)
if err != nil {
fmt.Printf("Error opening connection: %v\n", err)
rabbitmq_amqp.Error("Error opening connection", err)
return
}
// Register the channel to receive status change notifications
amqpConnection.NotifyStatusChange(chStatusChanged)
amqpConnection.NotifyStatusChange(stateChanged)

fmt.Printf("AMQP Connection opened.\n")
// Create the management interface for the connection
Expand All @@ -41,7 +40,7 @@ func main() {
Name: exchangeName,
})
if err != nil {
fmt.Printf("Error declaring exchange: %v\n", err)
rabbitmq_amqp.Error("Error declaring exchange", err)
return
}

Expand All @@ -52,7 +51,7 @@ func main() {
})

if err != nil {
fmt.Printf("Error declaring queue: %v\n", err)
rabbitmq_amqp.Error("Error declaring queue", err)
return
}

Expand All @@ -64,24 +63,39 @@ func main() {
})

if err != nil {
fmt.Printf("Error binding: %v\n", err)
rabbitmq_amqp.Error("Error binding", err)
return
}

addr, err := rabbitmq_amqp.ExchangeAddress(&exchangeName, &routingKey)

publisher, err := amqpConnection.Publisher(context.Background(), addr, "getting-started-publisher")
if err != nil {
fmt.Printf("Error creating publisher: %v\n", err)
rabbitmq_amqp.Error("Error creating publisher", err)
return
}

// Publish a message to the exchange
err = publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!")))
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!")))
if err != nil {
fmt.Printf("Error publishing message: %v\n", err)
rabbitmq_amqp.Error("Error publishing message", err)
return
}
switch publishResult.Outcome {
case &amqp.StateAccepted{}:
rabbitmq_amqp.Info("Message accepted")
case &amqp.StateReleased{}:
rabbitmq_amqp.Warn("Message was not routed")
case &amqp.StateRejected{}:
rabbitmq_amqp.Warn("Message rejected")
stateType := publishResult.Outcome.(*amqp.StateRejected)
if stateType.Error != nil {
rabbitmq_amqp.Warn("Message rejected with error: %v", stateType.Error)
}
default:
// these status are not supported
rabbitmq_amqp.Warn("Message state: %v", publishResult.Outcome)
}

println("press any key to close the connection")

Expand Down Expand Up @@ -132,5 +146,5 @@ func main() {
// Wait for the status change to be printed
time.Sleep(500 * time.Millisecond)

close(chStatusChanged)
close(stateChangeds)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/rabbitmq/rabbitmq-amqp-go-client
go 1.22.0

require (
github.com/Azure/go-amqp v1.2.0
github.com/Azure/go-amqp v1.4.0-beta.1
github.com/google/uuid v1.6.0
github.com/onsi/ginkgo/v2 v2.20.2
github.com/onsi/gomega v1.34.2
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
github.com/Azure/go-amqp v1.1.1-0.20240913224415-f631e6909719 h1:rL7yrEV9yputQV7T+Y9eJVmTVkK4B0aHlBc8TUITC5A=
github.com/Azure/go-amqp v1.1.1-0.20240913224415-f631e6909719/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/Azure/go-amqp v1.2.0 h1:NNyfN3/cRszWzMvjmm64yaPZDHX/2DJkowv8Ub9y01I=
github.com/Azure/go-amqp v1.2.0/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/Azure/go-amqp v1.4.0-beta.1 h1:BjZM/308FpfsQjX0gXtYK8Vx+WgQ1eng3oVQDEeXMmA=
github.com/Azure/go-amqp v1.4.0-beta.1/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
Expand Down
24 changes: 13 additions & 11 deletions rabbitmq_amqp/address_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package rabbitmq_amqp
import (
"errors"
"fmt"
"net/url"
"strings"
)

Expand Down Expand Up @@ -77,16 +76,16 @@ func encodePathSegments(input string) string {
return encoded.String()
}

// Decode takes a percent-encoded string and returns its decoded representation.
func decode(input string) (string, error) {
// Use url.QueryUnescape which properly decodes percent-encoded strings
decoded, err := url.QueryUnescape(input)
if err != nil {
return "", err
}

return decoded, nil
}
//// Decode takes a percent-encoded string and returns its decoded representation.
//func decode(input string) (string, error) {
// // Use url.QueryUnescape which properly decodes percent-encoded strings
// decoded, err := url.QueryUnescape(input)
// if err != nil {
// return "", err
// }
//
// return decoded, nil
//}

// isUnreserved checks if a character is an unreserved character in percent encoding
// Unreserved characters are: A-Z, a-z, 0-9, -, ., _, ~
Expand All @@ -111,5 +110,8 @@ func bindingPathWithExchangeQueueKey(toQueue bool, sourceName, destinationName,
}
format := "/%s/src=%s;%s=%s;key=%s;args="
return fmt.Sprintf(format, bindings, sourceNameEncoded, destinationType, destinationNameEncoded, keyEncoded)
}

func validateAddress(address string) bool {
return strings.HasPrefix(address, fmt.Sprintf("/%s/", exchanges)) || strings.HasPrefix(address, fmt.Sprintf("/%s/", queues))
}
6 changes: 3 additions & 3 deletions rabbitmq_amqp/amqp_binding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
)

var _ = Describe("AMQP Bindings test ", func() {
var connection IConnection
var management IManagement
var connection *AmqpConnection
var management *AmqpManagement
BeforeEach(func() {
conn, err := Dial(context.TODO(), "amqp://", nil)
conn, err := Dial(context.TODO(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
connection = conn
management = connection.Management()
Expand Down
72 changes: 48 additions & 24 deletions rabbitmq_amqp/amqp_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,50 @@ import (

type AmqpConnection struct {
Connection *amqp.Conn
management IManagement
management *AmqpManagement
lifeCycle *LifeCycle
session *amqp.Session
}

func (a *AmqpConnection) Publisher(ctx context.Context, destinationAdd string, linkName string) (IPublisher, error) {
sender, err := a.session.NewSender(ctx, destinationAdd, createSenderLinkOptions(destinationAdd, linkName))
func (a *AmqpConnection) Publisher(ctx context.Context, destinationAdd string, linkName string) (*Publisher, error) {
if !validateAddress(destinationAdd) {
return nil, fmt.Errorf("invalid destination address, the address should start with /%s/ or/%s/ ", exchanges, queues)
}

sender, err := a.session.NewSender(ctx, destinationAdd, createSenderLinkOptions(destinationAdd, linkName, AtLeastOnce))
if err != nil {
return nil, err
}
return newPublisher(sender), nil
}

// Management returns the management interface for the connection.
// See IManagement interface.
func (a *AmqpConnection) Management() IManagement {
return a.management
}

// Dial creates a new AmqpConnection
// with a new AmqpManagement and a new LifeCycle.
// Returns a pointer to the new AmqpConnection
func Dial(ctx context.Context, addr string, connOptions *amqp.ConnOptions) (IConnection, error) {
// Dial connect to the AMQP 1.0 server using the provided connectionSettings
// Returns a pointer to the new AmqpConnection if successful else an error.
// addresses is a list of addresses to connect to. It picks one randomly.
// It is enough that one of the addresses is reachable.
func Dial(ctx context.Context, addresses []string, connOptions *amqp.ConnOptions) (*AmqpConnection, error) {
conn := &AmqpConnection{
management: NewAmqpManagement(),
lifeCycle: NewLifeCycle(),
}
err := conn.open(ctx, addr, connOptions)
if err != nil {
return nil, err
tmp := make([]string, len(addresses))
copy(tmp, addresses)

// random pick and extract one address to use for connection
for len(tmp) > 0 {
idx := random(len(tmp))
addr := tmp[idx]
// remove the index from the tmp list
tmp = append(tmp[:idx], tmp[idx+1:]...)
err := conn.open(ctx, addr, connOptions)
if err != nil {
Error("Failed to open connection", ExtractWithoutPassword(addr), err)
continue
}
Debug("Connected to", ExtractWithoutPassword(addr))
return conn, nil
}
return conn, nil
return nil, fmt.Errorf("no address to connect to")
}

// Open opens a connection to the AMQP 1.0 server.
Expand Down Expand Up @@ -84,30 +96,42 @@ func (a *AmqpConnection) open(ctx context.Context, addr string, connOptions *amq
if err != nil {
return err
}
err = a.Management().Open(ctx, a)
err = a.management.Open(ctx, a)
if err != nil {
// TODO close connection?
return err
}

a.lifeCycle.SetStatus(Open)
a.lifeCycle.SetState(&StateOpen{})
return nil
}

func (a *AmqpConnection) Close(ctx context.Context) error {
err := a.Management().Close(ctx)
err := a.management.Close(ctx)
if err != nil {
return err
}
err = a.Connection.Close()
a.lifeCycle.SetStatus(Closed)
a.lifeCycle.SetState(&StateClosed{})
return err
}

func (a *AmqpConnection) NotifyStatusChange(channel chan *StatusChanged) {
// NotifyStatusChange registers a channel to receive getState change notifications
// from the connection.
func (a *AmqpConnection) NotifyStatusChange(channel chan *StateChanged) {
a.lifeCycle.chStatusChanged = channel
}

func (a *AmqpConnection) Status() int {
return a.lifeCycle.Status()
func (a *AmqpConnection) State() LifeCycleState {
return a.lifeCycle.State()
}

// *** management section ***

// Management returns the management interface for the connection.
// The management interface is used to declare and delete exchanges, queues, and bindings.
func (a *AmqpConnection) Management() *AmqpManagement {
return a.management
}

//*** end management section ***
29 changes: 20 additions & 9 deletions rabbitmq_amqp/amqp_connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
var _ = Describe("AMQP Connection Test", func() {
It("AMQP SASLTypeAnonymous Connection should succeed", func() {

connection, err := Dial(context.Background(), "amqp://", &amqp.ConnOptions{
connection, err := Dial(context.Background(), []string{"amqp://"}, &amqp.ConnOptions{
SASLType: amqp.SASLTypeAnonymous()})
Expect(err).To(BeNil())
err = connection.Close(context.Background())
Expand All @@ -20,43 +20,54 @@ var _ = Describe("AMQP Connection Test", func() {

It("AMQP SASLTypePlain Connection should succeed", func() {

connection, err := Dial(context.Background(), "amqp://", &amqp.ConnOptions{
connection, err := Dial(context.Background(), []string{"amqp://"}, &amqp.ConnOptions{
SASLType: amqp.SASLTypePlain("guest", "guest")})

Expect(err).To(BeNil())
err = connection.Close(context.Background())
Expect(err).To(BeNil())
})

It("AMQP Connection connect to the one correct uri and fails the others", func() {
conn, err := Dial(context.Background(), []string{"amqp://localhost:1234", "amqp://nohost:555", "amqp://"}, nil)
Expect(err).To(BeNil())
Expect(conn.Close(context.Background()))
})

It("AMQP Connection should fail due of wrong Port", func() {
_, err := Dial(context.Background(), "amqp://localhost:1234", nil)
_, err := Dial(context.Background(), []string{"amqp://localhost:1234"}, nil)
Expect(err).NotTo(BeNil())
})

It("AMQP Connection should fail due of wrong Host", func() {
_, err := Dial(context.Background(), "amqp://wrong_host:5672", nil)
_, err := Dial(context.Background(), []string{"amqp://wrong_host:5672"}, nil)
Expect(err).NotTo(BeNil())
})

It("AMQP Connection should fails with all the wrong uris", func() {
_, err := Dial(context.Background(), []string{"amqp://localhost:1234", "amqp://nohost:555", "amqp://nono"}, nil)
Expect(err).NotTo(BeNil())
})

It("AMQP Connection should fail due to context cancellation", func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
cancel()
_, err := Dial(ctx, "amqp://", nil)
_, err := Dial(ctx, []string{"amqp://"}, nil)
Expect(err).NotTo(BeNil())
})

It("AMQP Connection should receive events", func() {
ch := make(chan *StatusChanged, 1)
connection, err := Dial(context.Background(), "amqp://", nil)
ch := make(chan *StateChanged, 1)
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
connection.NotifyStatusChange(ch)
err = connection.Close(context.Background())
Expect(err).To(BeNil())

recv := <-ch
Expect(recv).NotTo(BeNil())
Expect(recv.From).To(Equal(Open))
Expect(recv.To).To(Equal(Closed))
Expect(recv.From).To(Equal(&StateOpen{}))
Expect(recv.To).To(Equal(&StateClosed{}))
})

//It("AMQP TLS Connection should success with SASLTypeAnonymous ", func() {
Expand Down
4 changes: 2 additions & 2 deletions rabbitmq_amqp/amqp_exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type AmqpExchangeInfo struct {
name string
}

func newAmqpExchangeInfo(name string) IExchangeInfo {
func newAmqpExchangeInfo(name string) *AmqpExchangeInfo {
return &AmqpExchangeInfo{name: name}
}

Expand All @@ -33,7 +33,7 @@ func newAmqpExchange(management *AmqpManagement, name string) *AmqpExchange {
}
}

func (e *AmqpExchange) Declare(ctx context.Context) (IExchangeInfo, error) {
func (e *AmqpExchange) Declare(ctx context.Context) (*AmqpExchangeInfo, error) {
path, err := ExchangeAddress(&e.name, nil)
if err != nil {
return nil, err
Expand Down
Loading
Loading