Skip to content

Commit 8ca30d7

Browse files
committed
Further configuration
1 parent cf8bbfa commit 8ca30d7

File tree

6 files changed

+59
-40
lines changed

6 files changed

+59
-40
lines changed

services/rabbit/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@ rabbitmq.conf and advanced.config changes take effect after a node restart. This
77

88
Source: https://www.rabbitmq.com/docs/next/configure#config-changes-effects
99

10+
## How to add / remove nodes
11+
12+
The only supported way, is to completely shutdown the cluster (docker stack and most likely rabbit node volumes) and start brand new.
13+
14+
With manual effort this can be done on the running cluster, by adding 1 more rabbit node manually (as a separate docker stack or new service) and manually executing rabbitmqctl commands (some hints can be found here https://www.rabbitmq.com/docs/clustering#creating)
15+
1016
## Enable node Maintenance mode
1117

1218
1. Get inside container's shell (`docker exec -it <container-id> bash`)

services/rabbit/docker-compose.yml.j2

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,14 @@
33
services:
44
loadbalancer:
55
image: haproxy:3.2
6-
ports:
7-
- "5672:5672"
8-
- "15672:15672"
6+
deploy:
7+
# https://discourse.haproxy.org/t/haproxy-high-availability-configuration/11983
8+
replicas: 2
9+
# necessary to preserve client ip
10+
# otherwise we see overlay network lb ip
11+
# (rabbitmq management dashboard connection section)
12+
endpoint_mode: dnsrr
13+
# healthcheck: https://github.com/haproxy/haproxy/issues/3091
914
networks:
1015
- rabbit
1116
configs:

services/rabbit/haproxy.cfg.j2

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,27 @@ global
33
maxconn 4096
44
pidfile /tmp/haproxy.pid
55

6+
# haproxy by default resolves server hostname only once
7+
# this breaks if container restarts. By using resolvers
8+
# we tell haproxy to re-resolve the hostname (so container
9+
# restarts are handled properly)
10+
resolvers dockerdns
11+
nameserver dns1 127.0.0.11:53
12+
resolve_retries 3
13+
timeout resolve 1s
14+
timeout retry 1s
15+
hold other 10s
16+
hold refused 10s
17+
hold nx 10s
18+
hold timeout 10s
19+
hold valid 10s
20+
hold obsolete 10s
21+
622
defaults
723
log global
824
mode tcp
925
option tcplog
26+
1027
timeout connect 5s
1128
timeout client 30s
1229
timeout server 30s
@@ -21,16 +38,20 @@ frontend rabbit_dashboard
2138

2239
backend rabbit_backends
2340
balance roundrobin
24-
server rabbit01 rabbit_rabbit01:5672 check inter 5s rise 2 fall 3
25-
server rabbit02 rabbit_rabbit02:5672 check inter 5s rise 2 fall 3
26-
server rabbit03 rabbit_rabbit03:5672 check inter 5s rise 2 fall 3
41+
42+
# init-addrs libc,none - start even if there aren’t any running
43+
server rabbit01 rabbit_rabbit01:5672 check resolvers dockerdns init-addr libc,none inter 5s rise 2 fall 3 send-proxy
44+
server rabbit02 rabbit_rabbit02:5672 check resolvers dockerdns init-addr libc,none inter 5s rise 2 fall 3 send-proxy
45+
server rabbit03 rabbit_rabbit03:5672 check resolvers dockerdns init-addr libc,none inter 5s rise 2 fall 3 send-proxy
2746

2847
backend rabbit_dashboard_backends
2948
mode http
3049
balance roundrobin
50+
3151
option forwardfor
3252
http-request set-header X-Forwarded-Port %[dst_port]
3353
http-request add-header X-Forwarded-Proto https if { ssl_fc }
34-
server rabbit01 rabbit_rabbit01:15672 check inter 5s rise 2 fall 3
35-
server rabbit02 rabbit_rabbit02:15672 check inter 5s rise 2 fall 3
36-
server rabbit03 rabbit_rabbit03:15672 check inter 5s rise 2 fall 3
54+
55+
server rabbit01 rabbit_rabbit01:15672 check resolvers dockerdns init-addr libc,none inter 5s rise 2 fall 3
56+
server rabbit02 rabbit_rabbit02:15672 check resolvers dockerdns init-addr libc,none inter 5s rise 2 fall 3
57+
server rabbit03 rabbit_rabbit03:15672 check resolvers dockerdns init-addr libc,none inter 5s rise 2 fall 3

services/rabbit/rabbitmq.conf.j2

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,7 @@ cluster_formation.classic_config.nodes.{{ ix }} = rabbit@rabbit_rabbit0{{ ix }}
1212
## at declaration time.
1313
# https://www.rabbitmq.com/docs/quorum-queues#quorum-requirements
1414
quorum_queue.initial_cluster_size = {{ RABBIT_QUORUM_QUEUE_DEFAULT_REPLICA_COUNT }}
15+
16+
# Extract proper client ip when behind a proxy (e.g. haproxy)
17+
# https://www.rabbitmq.com/docs/networking#proxy-protocol
18+
proxy_protocol = true

services/rabbit/test/pub.py

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,12 @@
1010
)
1111
logger = logging.getLogger(__name__)
1212

13-
hosts = os.environ["RABBIT_HOSTS"].split(",")
14-
credentials = pika.PlainCredentials(os.getenv("RABBIT_USER"), os.getenv("RABBIT_PASS"))
15-
16-
endpoints = [
17-
pika.URLParameters(
18-
f"amqp://{credentials.username}:{credentials.password}@{h.strip()}:5672/"
19-
)
20-
for h in hosts
21-
]
13+
host = os.environ["RABBIT_HOSTS"].strip()
14+
credentials = pika.PlainCredentials(os.environ["RABBIT_USER"], os.environ["RABBIT_PASS"])
15+
parameters = pika.ConnectionParameters(host=host, port=5672, credentials=credentials, client_properties={"connection_name": "publisher"})
2216

2317
while True:
24-
logger.info(f"Verbinde zu RabbitMQ Hosts: {hosts}")
25-
26-
random.shuffle(endpoints)
27-
connection = pika.BlockingConnection(endpoints)
18+
connection = pika.BlockingConnection(parameters)
2819
try:
2920
channel = connection.channel()
3021

@@ -43,7 +34,8 @@
4334
)
4435
logger.info(f"Gesendet: {msg}")
4536
time.sleep(3)
46-
except pika.exceptions.ConnectionClosedByBroker:
37+
except Exception:
4738
logger.error(
4839
"Verbindung zum RabbitMQ Broker wurde geschlossen. Versuche erneut zu verbinden..."
4940
)
41+
time.sleep(5)

services/rabbit/test/sub.py

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,23 @@
11
import logging
22
import os
33
import random
4-
4+
import time
55
import pika
66

77
logging.basicConfig(
88
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s"
99
)
1010
logger = logging.getLogger(__name__)
1111

12-
hosts = os.environ["RABBIT_HOSTS"].split(",")
13-
credentials = pika.PlainCredentials(os.getenv("RABBIT_USER"), os.getenv("RABBIT_PASS"))
14-
15-
hosts = os.environ["RABBIT_HOSTS"].split(",")
16-
credentials = pika.PlainCredentials(os.getenv("RABBIT_USER"), os.getenv("RABBIT_PASS"))
17-
18-
endpoints = [
19-
pika.URLParameters(
20-
f"amqp://{credentials.username}:{credentials.password}@{h.strip()}:5672/"
21-
)
22-
for h in hosts
23-
]
12+
host = os.environ["RABBIT_HOSTS"].strip()
13+
credentials = pika.PlainCredentials(os.environ["RABBIT_USER"], os.environ["RABBIT_PASS"])
14+
parameters = pika.ConnectionParameters(host=host, port=5672, credentials=credentials, client_properties={"connection_name": "subscriber"})
2415

2516
while True:
26-
logger.info(f"Verbinde zu RabbitMQ Hosts: {hosts}")
17+
logger.info(f"Verbinde zu RabbitMQ Host: {host}")
2718

28-
random.shuffle(endpoints)
2919
try:
30-
connection = pika.BlockingConnection(endpoints)
20+
connection = pika.BlockingConnection(parameters)
3121
channel = connection.channel()
3222

3323
logger.info("Queue 'hello' (quorum) deklarieren...")
@@ -42,7 +32,8 @@ def callback(ch, method, properties, body):
4232
channel.basic_consume(queue="hello", on_message_callback=callback)
4333
logger.info("Warte auf Nachrichten...")
4434
channel.start_consuming()
45-
except pika.exceptions.ConnectionClosedByBroker:
35+
except Exception:
4636
logger.error(
4737
"Verbindung zum RabbitMQ Broker wurde geschlossen. Versuche erneut zu verbinden..."
4838
)
39+
time.sleep(5)

0 commit comments

Comments
 (0)