diff --git a/examples/getting_started/main.go b/examples/getting_started/main.go index b7d74df..46a2dc6 100644 --- a/examples/getting_started/main.go +++ b/examples/getting_started/main.go @@ -6,6 +6,7 @@ import ( "fmt" mq "github.com/rabbitmq/rabbitmq-amqp-go-client/rabbitmq_amqp" "os" + "time" ) func main() { @@ -30,28 +31,64 @@ func main() { management := amqpConnection.Management() queueSpec := management.Queue("getting_started_queue"). QueueType(mq.QueueType{Type: mq.Quorum}). - MaxLengthBytes(mq.CapacityGB(1)). - DeadLetterExchange("dead-letter-exchange"). - DeadLetterRoutingKey("dead-letter-routing-key") + MaxLengthBytes(mq.CapacityGB(1)) + exchangeSpec := management.Exchange("getting_started_exchange"). + ExchangeType(mq.ExchangeType{Type: mq.Topic}) + queueInfo, err := queueSpec.Declare(context.Background()) if err != nil { + fmt.Printf("Error declaring queue %s\n", err) return } fmt.Printf("Queue %s created.\n", queueInfo.GetName()) - err = queueSpec.Delete(context.Background()) + + exchangeInfo, err := exchangeSpec.Declare(context.Background()) if err != nil { + fmt.Printf("Error declaring exchange %s\n", err) return } - fmt.Printf("Queue %s deleted.\n", queueInfo.GetName()) + fmt.Printf("Exchange %s created.\n", exchangeInfo.GetName()) - fmt.Println("Press any key to stop ") + bindingSpec := management.Binding().SourceExchange(exchangeInfo.GetName()).DestinationQueue(queueInfo.GetName()).Key("routing-key") + + err = bindingSpec.Bind(context.Background()) + if err != nil { + fmt.Printf("Error binding %s\n", err) + return + } + + fmt.Printf("Binding between %s and %s created.\n", exchangeInfo.GetName(), queueInfo.GetName()) + + fmt.Println("Press any key to cleanup and exit") reader := bufio.NewReader(os.Stdin) _, _ = reader.ReadString('\n') + err = bindingSpec.Unbind(context.Background()) + if err != nil { + fmt.Printf("Error unbinding %s\n", err) + return + } + + fmt.Printf("Binding between %s and %s deleted.\n", exchangeInfo.GetName(), queueInfo.GetName()) + + err = exchangeSpec.Delete(context.Background()) + if err != nil { + fmt.Printf("Error deleting exchange %s\n", err) + return + } + + err = queueSpec.Delete(context.Background()) + if err != nil { + return + } + fmt.Printf("Queue %s deleted.\n", queueInfo.GetName()) + err = amqpConnection.Close(context.Background()) if err != nil { return } fmt.Printf("AMQP Connection closed.\n") + // Wait for the status change to be printed + time.Sleep(500 * time.Millisecond) close(chStatusChanged) } diff --git a/rabbitmq_amqp/amqp_binding.go b/rabbitmq_amqp/amqp_binding.go new file mode 100644 index 0000000..353c419 --- /dev/null +++ b/rabbitmq_amqp/amqp_binding.go @@ -0,0 +1,51 @@ +package rabbitmq_amqp + +import "context" + +type AMQPBindingInfo struct { +} + +type AMQPBinding struct { + sourceExchangeName string + destinationQueue string + bindingKey string + management *AmqpManagement +} + +func newAMQPBinding(management *AmqpManagement) *AMQPBinding { + return &AMQPBinding{management: management} +} + +func (b *AMQPBinding) Key(bindingKey string) IBindingSpecification { + b.bindingKey = bindingKey + return b +} + +func (b *AMQPBinding) SourceExchange(exchangeName string) IBindingSpecification { + b.sourceExchangeName = exchangeName + return b +} + +func (b *AMQPBinding) DestinationQueue(queueName string) IBindingSpecification { + b.destinationQueue = queueName + return b +} + +func (b *AMQPBinding) Bind(ctx context.Context) error { + + path := bindingPath() + kv := make(map[string]any) + kv["binding_key"] = b.bindingKey + kv["source"] = b.sourceExchangeName + kv["destination_queue"] = b.destinationQueue + kv["arguments"] = make(map[string]any) + _, err := b.management.Request(ctx, kv, path, commandPost, []int{responseCode204}) + return err + +} + +func (b *AMQPBinding) Unbind(ctx context.Context) error { + bindingPathWithExchangeQueueKey := bindingPathWithExchangeQueueKey(b.sourceExchangeName, b.destinationQueue, b.bindingKey) + _, err := b.management.Request(ctx, nil, bindingPathWithExchangeQueueKey, commandDelete, []int{responseCode204}) + return err +} diff --git a/rabbitmq_amqp/amqp_binding_test.go b/rabbitmq_amqp/amqp_binding_test.go new file mode 100644 index 0000000..f84dcfa --- /dev/null +++ b/rabbitmq_amqp/amqp_binding_test.go @@ -0,0 +1,58 @@ +package rabbitmq_amqp + +import ( + "context" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("AMQP Bindings test ", func() { + + var connection IConnection + var management IManagement + BeforeEach(func() { + connection = NewAmqpConnection() + Expect(connection).NotTo(BeNil()) + Expect(connection).To(BeAssignableToTypeOf(&AmqpConnection{})) + connectionSettings := NewConnectionSettings() + Expect(connectionSettings).NotTo(BeNil()) + Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) + err := connection.Open(context.TODO(), connectionSettings) + Expect(err).To(BeNil()) + management = connection.Management() + }) + + AfterEach(func() { + Expect(connection.Close(context.Background())).To(BeNil()) + }) + + It("AMQP Bindings between Exchange and Queue Should success ", func() { + const exchangeName = "Exchange_AMQP Bindings between Exchange and Queue Should success" + const queueName = "Queue_AMQP Bindings between Exchange and Queue Should success" + exchangeSpec := management.Exchange(exchangeName) + exchangeInfo, err := exchangeSpec.Declare(context.TODO()) + Expect(err).To(BeNil()) + Expect(exchangeInfo).NotTo(BeNil()) + Expect(exchangeInfo.GetName()).To(Equal(exchangeName)) + + queueSpec := management.Queue(queueName) + queueInfo, err := queueSpec.Declare(context.TODO()) + Expect(err).To(BeNil()) + Expect(queueInfo).NotTo(BeNil()) + Expect(queueInfo.GetName()).To(Equal(queueName)) + + bindingSpec := management.Binding().SourceExchange(exchangeName). + DestinationQueue(queueName). + Key("routing-key") + err = bindingSpec.Bind(context.TODO()) + Expect(err).To(BeNil()) + err = bindingSpec.Unbind(context.TODO()) + Expect(err).To(BeNil()) + err = exchangeSpec.Delete(context.TODO()) + Expect(err).To(BeNil()) + err = queueSpec.Delete(context.TODO()) + Expect(err).To(BeNil()) + + }) + +}) diff --git a/rabbitmq_amqp/amqp_connection.go b/rabbitmq_amqp/amqp_connection.go index 2cf2907..e251710 100644 --- a/rabbitmq_amqp/amqp_connection.go +++ b/rabbitmq_amqp/amqp_connection.go @@ -8,15 +8,25 @@ import ( ) type ConnectionSettings struct { - host string - port int - user string - password string - virtualHost string - scheme string - containerId string - useSsl bool - tlsConfig *tls.Config + host string + port int + user string + password string + virtualHost string + scheme string + containerId string + useSsl bool + tlsConfig *tls.Config + saslMechanism TSaslMechanism +} + +func (c *ConnectionSettings) GetSaslMechanism() TSaslMechanism { + return c.saslMechanism +} + +func (c *ConnectionSettings) SaslMechanism(mechanism SaslMechanism) IConnectionSettings { + c.saslMechanism = mechanism.Type + return c } func (c *ConnectionSettings) TlsConfig(config *tls.Config) IConnectionSettings { @@ -138,8 +148,13 @@ func NewAmqpConnection() IConnection { } func (a *AmqpConnection) Open(ctx context.Context, connectionSettings IConnectionSettings) error { - // TODO: add support for other SASL types sASLType := amqp.SASLTypeAnonymous() + switch connectionSettings.GetSaslMechanism() { + case Plain: + sASLType = amqp.SASLTypePlain(connectionSettings.GetUser(), connectionSettings.GetPassword()) + case External: + sASLType = amqp.SASLTypeExternal("") + } conn, err := amqp.Dial(ctx, connectionSettings.BuildAddress(), &amqp.ConnOptions{ ContainerID: connectionSettings.GetContainerId(), diff --git a/rabbitmq_amqp/amqp_connection_test.go b/rabbitmq_amqp/amqp_connection_test.go index 34b2d7e..a3f987a 100644 --- a/rabbitmq_amqp/amqp_connection_test.go +++ b/rabbitmq_amqp/amqp_connection_test.go @@ -8,18 +8,32 @@ import ( ) var _ = Describe("AMQP Connection Test", func() { - It("AMQP Connection should success", func() { + It("AMQP SASLTypeAnonymous Connection should success", func() { amqpConnection := NewAmqpConnection() Expect(amqpConnection).NotTo(BeNil()) Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) connectionSettings := NewConnectionSettings() Expect(connectionSettings).NotTo(BeNil()) + connectionSettings.SaslMechanism(SaslMechanism{Type: Anonymous}) Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) err := amqpConnection.Open(context.TODO(), connectionSettings) Expect(err).To(BeNil()) }) + It("AMQP SASLTypePlain Connection should success", func() { + amqpConnection := NewAmqpConnection() + Expect(amqpConnection).NotTo(BeNil()) + Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) + + connectionSettings := NewConnectionSettings() + Expect(connectionSettings).NotTo(BeNil()) + Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) + connectionSettings.SaslMechanism(SaslMechanism{Type: Plain}) + err := amqpConnection.Open(context.TODO(), connectionSettings) + Expect(err).To(BeNil()) + }) + It("AMQP Connection should fail due of wrong port", func() { amqpConnection := NewAmqpConnection() Expect(amqpConnection).NotTo(BeNil()) diff --git a/rabbitmq_amqp/amqp_exchange.go b/rabbitmq_amqp/amqp_exchange.go new file mode 100644 index 0000000..d652b2b --- /dev/null +++ b/rabbitmq_amqp/amqp_exchange.go @@ -0,0 +1,74 @@ +package rabbitmq_amqp + +import "context" + +type AmqpExchangeInfo struct { + name string +} + +func newAmqpExchangeInfo(name string) IExchangeInfo { + return &AmqpExchangeInfo{name: name} +} + +func (a *AmqpExchangeInfo) GetName() string { + return a.name +} + +type AmqpExchange struct { + name string + management *AmqpManagement + arguments map[string]any + isAutoDelete bool + exchangeType ExchangeType +} + +func newAmqpExchange(management *AmqpManagement, name string) *AmqpExchange { + return &AmqpExchange{management: management, + name: name, + arguments: make(map[string]any), + exchangeType: ExchangeType{Type: Direct}, + } +} + +func (e *AmqpExchange) Declare(ctx context.Context) (IExchangeInfo, error) { + + path := exchangePath(e.name) + kv := make(map[string]any) + kv["auto_delete"] = e.isAutoDelete + kv["durable"] = true + kv["type"] = e.exchangeType.String() + kv["arguments"] = e.arguments + _, err := e.management.Request(ctx, kv, path, commandPut, []int{responseCode204, responseCode201, responseCode409}) + if err != nil { + return nil, err + } + return newAmqpExchangeInfo(e.name), nil +} + +func (e *AmqpExchange) AutoDelete(isAutoDelete bool) IExchangeSpecification { + e.isAutoDelete = isAutoDelete + return e +} + +func (e *AmqpExchange) IsAutoDelete() bool { + return e.isAutoDelete +} + +func (e *AmqpExchange) Delete(ctx context.Context) error { + path := exchangePath(e.name) + _, err := e.management.Request(ctx, nil, path, commandDelete, []int{responseCode204}) + return err +} + +func (e *AmqpExchange) ExchangeType(exchangeType ExchangeType) IExchangeSpecification { + e.exchangeType = exchangeType + return e +} + +func (e *AmqpExchange) GetExchangeType() TExchangeType { + return e.exchangeType.Type +} + +func (e *AmqpExchange) GetName() string { + return e.name +} diff --git a/rabbitmq_amqp/amqp_exchange_test.go b/rabbitmq_amqp/amqp_exchange_test.go new file mode 100644 index 0000000..3d4fc35 --- /dev/null +++ b/rabbitmq_amqp/amqp_exchange_test.go @@ -0,0 +1,62 @@ +package rabbitmq_amqp + +import ( + "context" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("AMQP Exchange test ", func() { + + var connection IConnection + var management IManagement + BeforeEach(func() { + connection = NewAmqpConnection() + Expect(connection).NotTo(BeNil()) + Expect(connection).To(BeAssignableToTypeOf(&AmqpConnection{})) + connectionSettings := NewConnectionSettings() + Expect(connectionSettings).NotTo(BeNil()) + Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) + err := connection.Open(context.TODO(), connectionSettings) + Expect(err).To(BeNil()) + management = connection.Management() + }) + + AfterEach(func() { + Expect(connection.Close(context.Background())).To(BeNil()) + }) + + It("AMQP Exchange Declare with Default and Delete should success ", func() { + const exchangeName = "AMQP Exchange Declare and Delete with Default should success" + exchangeSpec := management.Exchange(exchangeName) + exchangeInfo, err := exchangeSpec.Declare(context.TODO()) + Expect(err).To(BeNil()) + Expect(exchangeInfo).NotTo(BeNil()) + Expect(exchangeInfo.GetName()).To(Equal(exchangeName)) + err = exchangeSpec.Delete(context.TODO()) + Expect(err).To(BeNil()) + }) + + It("AMQP Exchange Declare with Topic and Delete should success ", func() { + const exchangeName = "AMQP Exchange Declare with Topic and Delete should success" + exchangeSpec := management.Exchange(exchangeName).ExchangeType(ExchangeType{Topic}) + exchangeInfo, err := exchangeSpec.Declare(context.TODO()) + Expect(err).To(BeNil()) + Expect(exchangeInfo).NotTo(BeNil()) + Expect(exchangeInfo.GetName()).To(Equal(exchangeName)) + err = exchangeSpec.Delete(context.TODO()) + Expect(err).To(BeNil()) + }) + + It("AMQP Exchange Declare with FanOut and Delete should success ", func() { + const exchangeName = "AMQP Exchange Declare with FanOut and Delete should success" + exchangeSpec := management.Exchange(exchangeName).ExchangeType(ExchangeType{FanOut}) + exchangeInfo, err := exchangeSpec.Declare(context.TODO()) + Expect(err).To(BeNil()) + Expect(exchangeInfo).NotTo(BeNil()) + Expect(exchangeInfo.GetName()).To(Equal(exchangeName)) + err = exchangeSpec.Delete(context.TODO()) + Expect(err).To(BeNil()) + }) + +}) diff --git a/rabbitmq_amqp/amqp_managent.go b/rabbitmq_amqp/amqp_managent.go index 2994191..67a4f85 100644 --- a/rabbitmq_amqp/amqp_managent.go +++ b/rabbitmq_amqp/amqp_managent.go @@ -20,11 +20,18 @@ type AmqpManagement struct { cancel context.CancelFunc } +func (a *AmqpManagement) Binding() IBindingSpecification { + return newAMQPBinding(a) +} + +func (a *AmqpManagement) Exchange(exchangeName string) IExchangeSpecification { + return newAmqpExchange(a, exchangeName) +} + func NewAmqpManagement() *AmqpManagement { return &AmqpManagement{ lifeCycle: NewLifeCycle(), } - } func (a *AmqpManagement) ensureReceiverLink(ctx context.Context) error { @@ -116,17 +123,6 @@ func (a *AmqpManagement) Open(ctx context.Context, connection IConnection) error if err != nil { return err } - //if ctx.Err() != nil { - // // start processing messages. Here we pass a context that will be closed - // // when the receiver session is closed. - // // we won't expose To the user since the user will call Close - // // and the processing _must_ be running in the background - // // for the management session life. - // //err = a.processMessages(context.Background()) - // //if err != nil { - // // return err - // //} - //} a.lifeCycle.SetStatus(Open) return ctx.Err() } @@ -148,11 +144,16 @@ func (a *AmqpManagement) Request(ctx context.Context, body any, path string, met func (a *AmqpManagement) validateResponseCode(responseCode int, expectedResponseCodes []int) error { + if responseCode == responseCode409 { + return PreconditionFailed + } + for _, code := range expectedResponseCodes { if code == responseCode { return nil } } + return PreconditionFailed } diff --git a/rabbitmq_amqp/amqp_queue.go b/rabbitmq_amqp/amqp_queue.go index 9199347..36b5ac7 100644 --- a/rabbitmq_amqp/amqp_queue.go +++ b/rabbitmq_amqp/amqp_queue.go @@ -61,38 +61,38 @@ func (a *AmqpQueueInfo) GetArguments() map[string]any { } type AmqpQueue struct { - management *AmqpManagement - queueArguments map[string]any - isExclusive bool - isAutoDelete bool - name string + management *AmqpManagement + arguments map[string]any + isExclusive bool + isAutoDelete bool + name string } func (a *AmqpQueue) DeadLetterExchange(dlx string) IQueueSpecification { - a.queueArguments["x-dead-letter-exchange"] = dlx + a.arguments["x-dead-letter-exchange"] = dlx return a } func (a *AmqpQueue) DeadLetterRoutingKey(dlrk string) IQueueSpecification { - a.queueArguments["x-dead-letter-routing-key"] = dlrk + a.arguments["x-dead-letter-routing-key"] = dlrk return a } func (a *AmqpQueue) MaxLengthBytes(length int64) IQueueSpecification { - a.queueArguments["max-length-bytes"] = length + a.arguments["max-length-bytes"] = length return a } func (a *AmqpQueue) QueueType(queueType QueueType) IQueueSpecification { - a.queueArguments["x-queue-type"] = queueType.String() + a.arguments["x-queue-type"] = queueType.String() return a } func (a *AmqpQueue) GetQueueType() TQueueType { - if a.queueArguments["x-queue-type"] == nil { + if a.arguments["x-queue-type"] == nil { return Classic } - return TQueueType(a.queueArguments["x-queue-type"].(string)) + return TQueueType(a.arguments["x-queue-type"].(string)) } func (a *AmqpQueue) Exclusive(isExclusive bool) IQueueSpecification { @@ -115,15 +115,15 @@ func (a *AmqpQueue) IsAutoDelete() bool { func newAmqpQueue(management *AmqpManagement, queueName string) IQueueSpecification { return &AmqpQueue{management: management, - name: queueName, - queueArguments: make(map[string]any)} + name: queueName, + arguments: make(map[string]any)} } func (a *AmqpQueue) validate() error { - if a.queueArguments["max-length-bytes"] != nil { + if a.arguments["max-length-bytes"] != nil { - err := validatePositive("max length", a.queueArguments["max-length-bytes"].(int64)) + err := validatePositive("max length", a.arguments["max-length-bytes"].(int64)) if err != nil { return err } @@ -151,8 +151,8 @@ func (a *AmqpQueue) Declare(ctx context.Context) (IQueueInfo, error) { kv["durable"] = true kv["auto_delete"] = a.isAutoDelete kv["exclusive"] = a.isExclusive - kv["arguments"] = a.queueArguments - response, err := a.management.Request(ctx, kv, path, commandPut, []int{200}) + kv["arguments"] = a.arguments + response, err := a.management.Request(ctx, kv, path, commandPut, []int{responseCode200, responseCode409}) if err != nil { return nil, err } @@ -161,11 +161,8 @@ func (a *AmqpQueue) Declare(ctx context.Context) (IQueueInfo, error) { func (a *AmqpQueue) Delete(ctx context.Context) error { path := queuePath(a.name) - _, err := a.management.Request(ctx, nil, path, commandDelete, []int{200}) - if err != nil { - return err - } - return nil + _, err := a.management.Request(ctx, nil, path, commandDelete, []int{responseCode200}) + return err } func (a *AmqpQueue) Name(queueName string) IQueueSpecification { diff --git a/rabbitmq_amqp/common.go b/rabbitmq_amqp/common.go index 89ce280..a85b8d8 100644 --- a/rabbitmq_amqp/common.go +++ b/rabbitmq_amqp/common.go @@ -78,6 +78,22 @@ func queuePath(queueName string) string { return "/" + Queues + "/" + encodePathSegments(queueName) } +func exchangePath(exchangeName string) string { + return "/" + Exchanges + "/" + encodePathSegments(exchangeName) +} + +func bindingPath() string { + return "/" + Bindings +} + +func bindingPathWithExchangeQueueKey(exchangeName, queueName, key string) string { + //string path = + //$"/{Consts.Bindings}/src={Utils.EncodePathSegment(_sourceName)};{($"{destinationCharacter}={Utils.EncodePathSegment(_destinationName)};key={Utils.EncodePathSegment(_routingKey)};args=")}"; + + return fmt.Sprintf("/%s/src=%s;dstq=%s;key=%s;args=", Bindings, encodePathSegments(exchangeName), encodePathSegments(queueName), encodePathSegments(key)) + +} + func validatePositive(label string, value int64) error { if value < 0 { return fmt.Errorf("value for %s must be positive, got %d", label, value) @@ -85,18 +101,6 @@ func validatePositive(label string, value int64) error { return nil } -//internal static string GenerateName(string prefix) -//{ -//string uuidStr = Guid.NewGuid().ToString(); -//byte[] uuidBytes = Encoding.ASCII.GetBytes(uuidStr); -//var md5 = MD5.Create(); -//byte[] digest = md5.ComputeHash(uuidBytes); -//return prefix + Convert.ToBase64String(digest) -//.Replace('+', '-') -//.Replace('/', '_') -//.Replace("=", ""); -//} - func GenerateNameWithDefaultPrefix() string { return GenerateName("client.gen-") } diff --git a/rabbitmq_amqp/connection.go b/rabbitmq_amqp/connection.go index 39b1923..d5513c2 100644 --- a/rabbitmq_amqp/connection.go +++ b/rabbitmq_amqp/connection.go @@ -5,6 +5,18 @@ import ( "crypto/tls" ) +type TSaslMechanism string + +const ( + Plain TSaslMechanism = "plain" + External TSaslMechanism = "external" + Anonymous TSaslMechanism = "anonymous" +) + +type SaslMechanism struct { + Type TSaslMechanism +} + type IConnectionSettings interface { GetHost() string Host(hostName string) IConnectionSettings @@ -24,6 +36,8 @@ type IConnectionSettings interface { BuildAddress() string TlsConfig(config *tls.Config) IConnectionSettings GetTlsConfig() *tls.Config + GetSaslMechanism() TSaslMechanism + SaslMechanism(mechanism SaslMechanism) IConnectionSettings } type IConnection interface { diff --git a/rabbitmq_amqp/entities.go b/rabbitmq_amqp/entities.go index e373ee7..6d24211 100644 --- a/rabbitmq_amqp/entities.go +++ b/rabbitmq_amqp/entities.go @@ -34,12 +34,14 @@ type IQueueSpecification interface { IEntityInfoSpecification[IQueueInfo] QueueType(queueType QueueType) IQueueSpecification GetQueueType() TQueueType - MaxLengthBytes(length int64) IQueueSpecification DeadLetterExchange(dlx string) IQueueSpecification DeadLetterRoutingKey(dlrk string) IQueueSpecification } +// IQueueInfo represents the information of a queue +// It is returned by the Declare method of IQueueSpecification +// The information come from the server type IQueueInfo interface { GetName() string IsDurable() bool @@ -50,3 +52,43 @@ type IQueueInfo interface { GetReplicas() []string GetArguments() map[string]any } + +type TExchangeType string + +const ( + Direct TExchangeType = "direct" + Topic TExchangeType = "topic" + FanOut TExchangeType = "fanout" +) + +type ExchangeType struct { + Type TExchangeType +} + +func (e ExchangeType) String() string { + return string(e.Type) +} + +// IExchangeInfo represents the information of an exchange +// It is empty at the moment because the server does not return any information +// We leave it here for future use. In case the server returns information about an exchange +type IExchangeInfo interface { + GetName() string +} + +type IExchangeSpecification interface { + GetName() string + AutoDelete(isAutoDelete bool) IExchangeSpecification + IsAutoDelete() bool + IEntityInfoSpecification[IExchangeInfo] + ExchangeType(exchangeType ExchangeType) IExchangeSpecification + GetExchangeType() TExchangeType +} + +type IBindingSpecification interface { + SourceExchange(exchangeName string) IBindingSpecification + DestinationQueue(queueName string) IBindingSpecification + Key(bindingKey string) IBindingSpecification + Bind(ctx context.Context) error + Unbind(ctx context.Context) error +} diff --git a/rabbitmq_amqp/management.go b/rabbitmq_amqp/management.go index a6130ec..0f2226e 100644 --- a/rabbitmq_amqp/management.go +++ b/rabbitmq_amqp/management.go @@ -8,6 +8,8 @@ type IManagement interface { Open(ctx context.Context, connection IConnection) error Close(ctx context.Context) error Queue(queueName string) IQueueSpecification + Exchange(exchangeName string) IExchangeSpecification + Binding() IBindingSpecification QueueClientName() IQueueSpecification GetStatus() int NotifyStatusChange(channel chan *StatusChanged)