Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions docs/examples/getting_started/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import (
"context"
"errors"
"fmt"
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
"time"

rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
)

func main() {
Expand Down Expand Up @@ -44,7 +45,7 @@ func main() {
// this is valid for the connection lifecycle
amqpConnection.NotifyStatusChange(stateChanged)

rmq.Info("AMQP connection opened.\n")
rmq.Info("AMQP connection opened")
// Create the management interface for the connection
// so we can declare exchanges, queues, and bindings
management := amqpConnection.Management()
Expand Down Expand Up @@ -94,21 +95,21 @@ func main() {
deliveryContext, err := consumer.Receive(ctx)
if errors.Is(err, context.Canceled) {
// The consumer was closed correctly
rmq.Info("[Consumer]", "consumer closed. Context", err)
rmq.Info("[Consumer] Consumer closed", "context", err)
return
}
if err != nil {
// An error occurred receiving the message
rmq.Error("[Consumer]", "Error receiving message", err)
rmq.Error("[Consumer] Error receiving message", "error", err)
return
}

rmq.Info("[Consumer]", "Received message",
rmq.Info("[Consumer] Received message", "message",
fmt.Sprintf("%s", deliveryContext.Message().Data))

err = deliveryContext.Accept(context.Background())
if err != nil {
rmq.Error("Error accepting message", err)
rmq.Error("[Consumer] Error accepting message", "error", err)
return
}
}
Expand Down Expand Up @@ -172,39 +173,39 @@ func main() {
err = management.Unbind(context.TODO(), bindingPath)

if err != nil {
rmq.Error("Error unbinding: %v\n", err)
rmq.Error("Error unbinding", "error", err)
return
}

err = management.DeleteExchange(context.TODO(), exchangeInfo.Name())
if err != nil {
rmq.Error("Error deleting exchange: %v\n", err)
rmq.Error("Error deleting exchange", "error", err)
return
}

// Purge the queue
purged, err := management.PurgeQueue(context.TODO(), queueInfo.Name())
if err != nil {
rmq.Error("Error purging queue: %v\n", err)
rmq.Error("Error purging queue", "error", err)
return
}
rmq.Info("Purged %d messages from the queue.\n", purged)
rmq.Info("Purged messages from the queue", "count", purged)

err = management.DeleteQueue(context.TODO(), queueInfo.Name())
if err != nil {
rmq.Error("Error deleting queue: %v\n", err)
rmq.Error("Error deleting queue", "error", err)
return
}

// Close all the connections. but you can still use the environment
// to create new connections
err = env.CloseConnections(context.Background())
if err != nil {
rmq.Error("Error closing connection: %v\n", err)
rmq.Error("Error closing connection", "error", err)
return
}

rmq.Info("AMQP connection closed.\n")
rmq.Info("AMQP connection closed")
// not necessary. It waits for the status change to be printed
time.Sleep(100 * time.Millisecond)
close(stateChanged)
Expand Down
2 changes: 1 addition & 1 deletion pkg/rabbitmqamqp/amqp_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ func (a *AmqpConnection) open(ctx context.Context, address string, connOptions *
return fmt.Errorf("failed to open TLS connection: %w", err)
}
if err != nil {
Error("Failed to open connection", ExtractWithoutPassword(address), err, "ID", connOptions.Id)
Error("Failed to open connection", "url", ExtractWithoutPassword(address), "error", err, "ID", connOptions.Id)
return fmt.Errorf("failed to open connection: %w", err)
}
a.properties = azureConnection.Properties()
Expand Down
5 changes: 3 additions & 2 deletions pkg/rabbitmqamqp/amqp_environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package rabbitmqamqp
import (
"context"
"fmt"
"github.com/Azure/go-amqp"
"sync"
"sync/atomic"

"github.com/Azure/go-amqp"
)

type TEndPointStrategy int
Expand Down Expand Up @@ -83,7 +84,7 @@ func (e *Environment) NewConnection(ctx context.Context) (*AmqpConnection, error
}
connection, err := Dial(ctx, addr.Address, cloned)
if err != nil {
Error("Failed to open connection", ExtractWithoutPassword(addr.Address), err)
Error("Failed to open connection", "url", ExtractWithoutPassword(addr.Address), "error", err)
lastError = err
continue
}
Expand Down