Skip to content

Commit 05f7cd9

Browse files
authored
First implementation for Exchanges and bindings (#2)
* First implementation for Exchanges and bindings --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent fa7d5d9 commit 05f7cd9

File tree

13 files changed

+435
-64
lines changed

13 files changed

+435
-64
lines changed

examples/getting_started/main.go

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
mq "github.com/rabbitmq/rabbitmq-amqp-go-client/rabbitmq_amqp"
88
"os"
9+
"time"
910
)
1011

1112
func main() {
@@ -30,28 +31,64 @@ func main() {
3031
management := amqpConnection.Management()
3132
queueSpec := management.Queue("getting_started_queue").
3233
QueueType(mq.QueueType{Type: mq.Quorum}).
33-
MaxLengthBytes(mq.CapacityGB(1)).
34-
DeadLetterExchange("dead-letter-exchange").
35-
DeadLetterRoutingKey("dead-letter-routing-key")
34+
MaxLengthBytes(mq.CapacityGB(1))
35+
exchangeSpec := management.Exchange("getting_started_exchange").
36+
ExchangeType(mq.ExchangeType{Type: mq.Topic})
37+
3638
queueInfo, err := queueSpec.Declare(context.Background())
3739
if err != nil {
40+
fmt.Printf("Error declaring queue %s\n", err)
3841
return
3942
}
4043
fmt.Printf("Queue %s created.\n", queueInfo.GetName())
41-
err = queueSpec.Delete(context.Background())
44+
45+
exchangeInfo, err := exchangeSpec.Declare(context.Background())
4246
if err != nil {
47+
fmt.Printf("Error declaring exchange %s\n", err)
4348
return
4449
}
45-
fmt.Printf("Queue %s deleted.\n", queueInfo.GetName())
50+
fmt.Printf("Exchange %s created.\n", exchangeInfo.GetName())
4651

47-
fmt.Println("Press any key to stop ")
52+
bindingSpec := management.Binding().SourceExchange(exchangeInfo.GetName()).DestinationQueue(queueInfo.GetName()).Key("routing-key")
53+
54+
err = bindingSpec.Bind(context.Background())
55+
if err != nil {
56+
fmt.Printf("Error binding %s\n", err)
57+
return
58+
}
59+
60+
fmt.Printf("Binding between %s and %s created.\n", exchangeInfo.GetName(), queueInfo.GetName())
61+
62+
fmt.Println("Press any key to cleanup and exit")
4863
reader := bufio.NewReader(os.Stdin)
4964
_, _ = reader.ReadString('\n')
5065

66+
err = bindingSpec.Unbind(context.Background())
67+
if err != nil {
68+
fmt.Printf("Error unbinding %s\n", err)
69+
return
70+
}
71+
72+
fmt.Printf("Binding between %s and %s deleted.\n", exchangeInfo.GetName(), queueInfo.GetName())
73+
74+
err = exchangeSpec.Delete(context.Background())
75+
if err != nil {
76+
fmt.Printf("Error deleting exchange %s\n", err)
77+
return
78+
}
79+
80+
err = queueSpec.Delete(context.Background())
81+
if err != nil {
82+
return
83+
}
84+
fmt.Printf("Queue %s deleted.\n", queueInfo.GetName())
85+
5186
err = amqpConnection.Close(context.Background())
5287
if err != nil {
5388
return
5489
}
5590
fmt.Printf("AMQP Connection closed.\n")
91+
// Wait for the status change to be printed
92+
time.Sleep(500 * time.Millisecond)
5693
close(chStatusChanged)
5794
}

rabbitmq_amqp/amqp_binding.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package rabbitmq_amqp
2+
3+
import "context"
4+
5+
type AMQPBindingInfo struct {
6+
}
7+
8+
type AMQPBinding struct {
9+
sourceExchangeName string
10+
destinationQueue string
11+
bindingKey string
12+
management *AmqpManagement
13+
}
14+
15+
func newAMQPBinding(management *AmqpManagement) *AMQPBinding {
16+
return &AMQPBinding{management: management}
17+
}
18+
19+
func (b *AMQPBinding) Key(bindingKey string) IBindingSpecification {
20+
b.bindingKey = bindingKey
21+
return b
22+
}
23+
24+
func (b *AMQPBinding) SourceExchange(exchangeName string) IBindingSpecification {
25+
b.sourceExchangeName = exchangeName
26+
return b
27+
}
28+
29+
func (b *AMQPBinding) DestinationQueue(queueName string) IBindingSpecification {
30+
b.destinationQueue = queueName
31+
return b
32+
}
33+
34+
func (b *AMQPBinding) Bind(ctx context.Context) error {
35+
36+
path := bindingPath()
37+
kv := make(map[string]any)
38+
kv["binding_key"] = b.bindingKey
39+
kv["source"] = b.sourceExchangeName
40+
kv["destination_queue"] = b.destinationQueue
41+
kv["arguments"] = make(map[string]any)
42+
_, err := b.management.Request(ctx, kv, path, commandPost, []int{responseCode204})
43+
return err
44+
45+
}
46+
47+
func (b *AMQPBinding) Unbind(ctx context.Context) error {
48+
bindingPathWithExchangeQueueKey := bindingPathWithExchangeQueueKey(b.sourceExchangeName, b.destinationQueue, b.bindingKey)
49+
_, err := b.management.Request(ctx, nil, bindingPathWithExchangeQueueKey, commandDelete, []int{responseCode204})
50+
return err
51+
}

rabbitmq_amqp/amqp_binding_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package rabbitmq_amqp
2+
3+
import (
4+
"context"
5+
. "github.com/onsi/ginkgo/v2"
6+
. "github.com/onsi/gomega"
7+
)
8+
9+
var _ = Describe("AMQP Bindings test ", func() {
10+
11+
var connection IConnection
12+
var management IManagement
13+
BeforeEach(func() {
14+
connection = NewAmqpConnection()
15+
Expect(connection).NotTo(BeNil())
16+
Expect(connection).To(BeAssignableToTypeOf(&AmqpConnection{}))
17+
connectionSettings := NewConnectionSettings()
18+
Expect(connectionSettings).NotTo(BeNil())
19+
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
20+
err := connection.Open(context.TODO(), connectionSettings)
21+
Expect(err).To(BeNil())
22+
management = connection.Management()
23+
})
24+
25+
AfterEach(func() {
26+
Expect(connection.Close(context.Background())).To(BeNil())
27+
})
28+
29+
It("AMQP Bindings between Exchange and Queue Should success ", func() {
30+
const exchangeName = "Exchange_AMQP Bindings between Exchange and Queue Should success"
31+
const queueName = "Queue_AMQP Bindings between Exchange and Queue Should success"
32+
exchangeSpec := management.Exchange(exchangeName)
33+
exchangeInfo, err := exchangeSpec.Declare(context.TODO())
34+
Expect(err).To(BeNil())
35+
Expect(exchangeInfo).NotTo(BeNil())
36+
Expect(exchangeInfo.GetName()).To(Equal(exchangeName))
37+
38+
queueSpec := management.Queue(queueName)
39+
queueInfo, err := queueSpec.Declare(context.TODO())
40+
Expect(err).To(BeNil())
41+
Expect(queueInfo).NotTo(BeNil())
42+
Expect(queueInfo.GetName()).To(Equal(queueName))
43+
44+
bindingSpec := management.Binding().SourceExchange(exchangeName).
45+
DestinationQueue(queueName).
46+
Key("routing-key")
47+
err = bindingSpec.Bind(context.TODO())
48+
Expect(err).To(BeNil())
49+
err = bindingSpec.Unbind(context.TODO())
50+
Expect(err).To(BeNil())
51+
err = exchangeSpec.Delete(context.TODO())
52+
Expect(err).To(BeNil())
53+
err = queueSpec.Delete(context.TODO())
54+
Expect(err).To(BeNil())
55+
56+
})
57+
58+
})

rabbitmq_amqp/amqp_connection.go

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,25 @@ import (
88
)
99

1010
type ConnectionSettings struct {
11-
host string
12-
port int
13-
user string
14-
password string
15-
virtualHost string
16-
scheme string
17-
containerId string
18-
useSsl bool
19-
tlsConfig *tls.Config
11+
host string
12+
port int
13+
user string
14+
password string
15+
virtualHost string
16+
scheme string
17+
containerId string
18+
useSsl bool
19+
tlsConfig *tls.Config
20+
saslMechanism TSaslMechanism
21+
}
22+
23+
func (c *ConnectionSettings) GetSaslMechanism() TSaslMechanism {
24+
return c.saslMechanism
25+
}
26+
27+
func (c *ConnectionSettings) SaslMechanism(mechanism SaslMechanism) IConnectionSettings {
28+
c.saslMechanism = mechanism.Type
29+
return c
2030
}
2131

2232
func (c *ConnectionSettings) TlsConfig(config *tls.Config) IConnectionSettings {
@@ -138,8 +148,13 @@ func NewAmqpConnection() IConnection {
138148
}
139149

140150
func (a *AmqpConnection) Open(ctx context.Context, connectionSettings IConnectionSettings) error {
141-
// TODO: add support for other SASL types
142151
sASLType := amqp.SASLTypeAnonymous()
152+
switch connectionSettings.GetSaslMechanism() {
153+
case Plain:
154+
sASLType = amqp.SASLTypePlain(connectionSettings.GetUser(), connectionSettings.GetPassword())
155+
case External:
156+
sASLType = amqp.SASLTypeExternal("")
157+
}
143158

144159
conn, err := amqp.Dial(ctx, connectionSettings.BuildAddress(), &amqp.ConnOptions{
145160
ContainerID: connectionSettings.GetContainerId(),

rabbitmq_amqp/amqp_connection_test.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,32 @@ import (
88
)
99

1010
var _ = Describe("AMQP Connection Test", func() {
11-
It("AMQP Connection should success", func() {
11+
It("AMQP SASLTypeAnonymous Connection should success", func() {
1212
amqpConnection := NewAmqpConnection()
1313
Expect(amqpConnection).NotTo(BeNil())
1414
Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{}))
1515

1616
connectionSettings := NewConnectionSettings()
1717
Expect(connectionSettings).NotTo(BeNil())
18+
connectionSettings.SaslMechanism(SaslMechanism{Type: Anonymous})
1819
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
1920
err := amqpConnection.Open(context.TODO(), connectionSettings)
2021
Expect(err).To(BeNil())
2122
})
2223

24+
It("AMQP SASLTypePlain Connection should success", func() {
25+
amqpConnection := NewAmqpConnection()
26+
Expect(amqpConnection).NotTo(BeNil())
27+
Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{}))
28+
29+
connectionSettings := NewConnectionSettings()
30+
Expect(connectionSettings).NotTo(BeNil())
31+
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
32+
connectionSettings.SaslMechanism(SaslMechanism{Type: Plain})
33+
err := amqpConnection.Open(context.TODO(), connectionSettings)
34+
Expect(err).To(BeNil())
35+
})
36+
2337
It("AMQP Connection should fail due of wrong port", func() {
2438
amqpConnection := NewAmqpConnection()
2539
Expect(amqpConnection).NotTo(BeNil())

rabbitmq_amqp/amqp_exchange.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package rabbitmq_amqp
2+
3+
import "context"
4+
5+
type AmqpExchangeInfo struct {
6+
name string
7+
}
8+
9+
func newAmqpExchangeInfo(name string) IExchangeInfo {
10+
return &AmqpExchangeInfo{name: name}
11+
}
12+
13+
func (a *AmqpExchangeInfo) GetName() string {
14+
return a.name
15+
}
16+
17+
type AmqpExchange struct {
18+
name string
19+
management *AmqpManagement
20+
arguments map[string]any
21+
isAutoDelete bool
22+
exchangeType ExchangeType
23+
}
24+
25+
func newAmqpExchange(management *AmqpManagement, name string) *AmqpExchange {
26+
return &AmqpExchange{management: management,
27+
name: name,
28+
arguments: make(map[string]any),
29+
exchangeType: ExchangeType{Type: Direct},
30+
}
31+
}
32+
33+
func (e *AmqpExchange) Declare(ctx context.Context) (IExchangeInfo, error) {
34+
35+
path := exchangePath(e.name)
36+
kv := make(map[string]any)
37+
kv["auto_delete"] = e.isAutoDelete
38+
kv["durable"] = true
39+
kv["type"] = e.exchangeType.String()
40+
kv["arguments"] = e.arguments
41+
_, err := e.management.Request(ctx, kv, path, commandPut, []int{responseCode204, responseCode201, responseCode409})
42+
if err != nil {
43+
return nil, err
44+
}
45+
return newAmqpExchangeInfo(e.name), nil
46+
}
47+
48+
func (e *AmqpExchange) AutoDelete(isAutoDelete bool) IExchangeSpecification {
49+
e.isAutoDelete = isAutoDelete
50+
return e
51+
}
52+
53+
func (e *AmqpExchange) IsAutoDelete() bool {
54+
return e.isAutoDelete
55+
}
56+
57+
func (e *AmqpExchange) Delete(ctx context.Context) error {
58+
path := exchangePath(e.name)
59+
_, err := e.management.Request(ctx, nil, path, commandDelete, []int{responseCode204})
60+
return err
61+
}
62+
63+
func (e *AmqpExchange) ExchangeType(exchangeType ExchangeType) IExchangeSpecification {
64+
e.exchangeType = exchangeType
65+
return e
66+
}
67+
68+
func (e *AmqpExchange) GetExchangeType() TExchangeType {
69+
return e.exchangeType.Type
70+
}
71+
72+
func (e *AmqpExchange) GetName() string {
73+
return e.name
74+
}

0 commit comments

Comments
 (0)