Skip to content

Commit 1be4f1c

Browse files
committed
Add ha rabbit
1 parent 901ee0c commit 1be4f1c

File tree

10 files changed

+225
-55
lines changed

10 files changed

+225
-55
lines changed

services/rabbit/.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
docker-compose.yml
2+
!erlang.cookie.secret.template
3+
rabbitmq.conf

services/rabbit/Makefile

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ include ${REPO_BASE_DIR}/scripts/common-services.Makefile
66
include ${REPO_BASE_DIR}/scripts/common.Makefile
77

88
.PHONY: up
9-
up: ${TEMP_COMPOSE} prune-docker-stack-configs ## Deploys metabase stack
9+
up: ${TEMP_COMPOSE} prune-docker-stack-configs prune-docker-stack-secrets ## Deploys metabase stack
1010
@docker stack deploy --with-registry-auth --prune --compose-file ${TEMP_COMPOSE} ${STACK_NAME}
1111

1212
up-aws: up
@@ -22,5 +22,13 @@ up-public: up
2222
${TEMP_COMPOSE}: docker-compose.yml .env
2323
@${REPO_BASE_DIR}/scripts/docker-stack-config.bash -e .env $< > $@
2424

25-
docker-compose.yml: docker-compose.yml.j2 .env venv
25+
erlang.cookie.secret: erlang.cookie.secret.template .env
26+
@set -a; source .env; set +a; \
27+
envsubst < $< > $@
28+
29+
rabbitmq.conf: rabbitmq.conf.j2 .env venv
30+
# generate $@
31+
@$(call jinja, $<, .env, $@)
32+
33+
docker-compose.yml: docker-compose.yml.j2 .env rabbitmq.conf erlang.cookie.secret venv
2634
@$(call jinja, $<, .env, $@)

services/rabbit/README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
## Updating rabbitmq.conf / advanced.config (zero-downtime)
2+
3+
rabbitmq.conf and advanced.config changes take effect after a node restart. This can be performed with zero-downtime when RabbitMQ is clustered (have multiple nodes). This can be achieved by stopping and starting rabbitmq nodes one by one
4+
* `docker exec -it <container-id> bash`
5+
* (inside container) `rabbitmqctl stop_app` and wait some time until node is stopped (can be seen in management ui)
6+
* (inside container) `rabbitmqctl start_app`
7+
8+
Source: https://www.rabbitmq.com/docs/next/configure#config-changes-effects
9+
10+
## Enable node Maintenance mode
11+
12+
1. Get inside container's shell (`docker exec -it <container-id> bash`)
13+
2. (Inside container) execute `rabbitmq-upgrade drain`
14+
15+
Source: https://www.rabbitmq.com/docs/upgrade#maintenance-mode
16+
17+
## Rotating erlang cookie (zero-downtime)
18+
19+
https://github.com/rabbitmq/rabbitmq-server/issues/14390
20+
https://github.com/rabbitmq/rabbitmq-server/discussions/9784

services/rabbit/docker-compose.yml

Lines changed: 0 additions & 39 deletions
This file was deleted.
Lines changed: 76 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
{% set NODE_IXS = range(1, (RABBIT_CLUSTER_NODE_COUNT | int) + 1) -%}
2+
13
services:
2-
rabbit01:
4+
{% for ix in NODE_IXS %}
5+
rabbit0{{ ix }}:
36
image: itisfoundation/rabbitmq:4.1.2-management
47
init: true
58
# https://docs.docker.com/reference/cli/docker/service/create/#create-services-using-templates
@@ -8,32 +11,91 @@ services:
811
RABBITMQ_DEFAULT_USER: ${RABBIT_USER}
912
RABBITMQ_DEFAULT_PASS: ${RABBIT_PASSWORD}
1013
RABBITMQ_NODENAME: {% raw %}"rabbit@{{.Service.Name}}"{% endraw %}
14+
# https://docs.docker.com/reference/compose-file/services/#long-syntax-5
15+
# https://hub.docker.com/_/rabbitmq#erlang-cookie
16+
{% if ix == 1 %}
17+
# TODO: remove
18+
ports:
19+
- "15672:15672" # management UI
20+
{% endif %}
21+
secrets:
22+
# https://github.com/docker-library/rabbitmq/issues/279
23+
- source: rabbit_erlang_cookie
24+
target: /var/lib/rabbitmq/.erlang.cookie
25+
mode: 0600
26+
# as long as "default" user is used (no user explicitly specified)
27+
uid: "999"
28+
gid: "999"
29+
configs:
30+
- source: rabbitmq.conf
31+
target: /etc/rabbitmq/rabbitmq.conf
32+
mode: 0600
33+
uid: "999"
34+
gid: "999"
1135
volumes:
12-
- rabbit_data:/var/lib/rabbitmq
36+
- rabbit0{{ ix }}_data:/var/lib/rabbitmq
1337
# TODO: sync with existing rabbit attached networks
1438
networks:
1539
- rabbit
1640
# TODO: consider another healthcheck (e.g. check kubernetes operator)
1741
healthcheck:
18-
# see https://www.rabbitmq.com/monitoring.html#individual-checks for info about health-checks available in rabbitmq
19-
test: rabbitmq-diagnostics -q status
20-
interval: 5s
21-
timeout: 30s
22-
retries: 5
23-
start_period: 5s
42+
# see https://hub.docker.com/_/rabbitmq#healthlivenessreadiness-checking
43+
# https://www.rabbitmq.com/docs/clustering#restarting-readiness-probes
44+
# we must have a healthcheck that does not require node to be fully booted (i.e. joined a cluster)
45+
# because it creates a deadlock: docker swarm will not route to the node until it is healthy
46+
# node is not healthy until it is part of a cluster (other node can talk to it)
47+
test: rabbitmq-diagnostics ping
48+
interval: 60s
49+
timeout: 10s
50+
retries: 2
51+
start_period: 30s
52+
start_interval: 10s
53+
{% endfor %}
2454

25-
busybox:
26-
image: ubuntu:22.04
27-
command: /bin/sh -c "sleep infinity"
55+
subscriber:
56+
image: python:3.11
57+
command: sh -c "pip install pika && python /app/sub.py"
58+
environment:
59+
- RABBIT_HOSTS=rabbit_rabbit01,rabbit_rabbit02,rabbit_rabbit03
60+
- RABBIT_USER=${RABBIT_USER}
61+
- RABBIT_PASS=${RABBIT_PASSWORD}
62+
networks:
63+
- rabbit
64+
volumes:
65+
- ./test/sub.py:/app/sub.py
66+
67+
publisher:
68+
image: python:3.11
69+
command: sh -c "pip install pika && python /app/pub.py"
70+
environment:
71+
- RABBIT_HOSTS=rabbit_rabbit01,rabbit_rabbit02,rabbit_rabbit03
72+
- RABBIT_USER=${RABBIT_USER}
73+
- RABBIT_PASS=${RABBIT_PASSWORD}
2874
networks:
2975
- rabbit
76+
volumes:
77+
- ./test/pub.py:/app/pub.py
78+
79+
volumes:
80+
{% for ix in NODE_IXS %}
81+
rabbit0{{ ix }}_data:
82+
name: ${STACK_NAME}0{{ ix }}_data
83+
{%- endfor %}
84+
3085
networks:
3186
# TODO: consider creating it externally for better control over subnets size and ip ranges
3287
rabbit:
3388
name: ${RABBIT_DOCKER_SWARM_NETWORK_NAME}
3489
driver: overlay
3590
attachable: true
3691

37-
volumes:
38-
rabbit_data:
39-
name: ${STACK_NAME}_rabbit_data
92+
configs:
93+
rabbitmq.conf:
94+
file: ./rabbitmq.conf
95+
name: ${STACK_NAME}_rabbitmq_conf_{{ "./rabbitmq.conf" | sha256file | substring(0,10) }}
96+
97+
secrets:
98+
rabbit_erlang_cookie:
99+
# TODO: rolling secrets update?
100+
name: ${STACK_NAME}_erlang_cookie
101+
file: ./erlang.cookie.secret
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
${RABBIT_ERLANG_COOKIE}

services/rabbit/rabbitmq.conf.j2

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
{% set NODE_IXS = range(1, (RABBIT_CLUSTER_NODE_COUNT | int) + 1) -%}
2+
3+
# https://www.rabbitmq.com/docs/cluster-formation#peer-discovery-configuring-mechanism
4+
cluster_formation.peer_discovery_backend = classic_config
5+
6+
{% for ix in NODE_IXS %}
7+
cluster_formation.classic_config.nodes.{{ ix }} = rabbit@rabbit_rabbit0{{ ix }}
8+
{%- endfor %}
9+
10+
## Sets the initial quorum queue replica count for newly declared quorum queues.
11+
## This value can be overridden using the 'x-quorum-initial-group-size' queue argument
12+
## at declaration time.
13+
# https://www.rabbitmq.com/docs/quorum-queues#quorum-requirements
14+
quorum_queue.initial_cluster_size = {{ RABBIT_QUORUM_QUEUE_DEFAULT_REPLICA_COUNT }}

services/rabbit/template.env

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
STACK_NAME=${STACK_NAME}
22

3+
RABBIT_CLUSTER_NODE_COUNT=${RABBIT_CLUSTER_NODE_COUNT}
4+
RABBIT_QUORUM_QUEUE_DEFAULT_REPLICA_COUNT=${RABBIT_QUORUM_QUEUE_DEFAULT_REPLICA_COUNT}
5+
36
RABBIT_USER=${RABBIT_USER}
47
RABBIT_PASSWORD=${RABBIT_PASSWORD}
8+
RABBIT_ERLANG_COOKIE=${RABBIT_ERLANG_COOKIE}
59

610
RABBIT_DOCKER_SWARM_NETWORK_NAME=rabbit_rabbit_subnet

services/rabbit/test/pub.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import logging
2+
import os
3+
import random
4+
import time
5+
6+
import pika
7+
8+
logging.basicConfig(
9+
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s"
10+
)
11+
logger = logging.getLogger(__name__)
12+
13+
hosts = os.environ["RABBIT_HOSTS"].split(",")
14+
credentials = pika.PlainCredentials(os.getenv("RABBIT_USER"), os.getenv("RABBIT_PASS"))
15+
16+
endpoints = [
17+
pika.URLParameters(
18+
f"amqp://{credentials.username}:{credentials.password}@{h.strip()}:5672/"
19+
)
20+
for h in hosts
21+
]
22+
23+
while True:
24+
logger.info(f"Verbinde zu RabbitMQ Hosts: {hosts}")
25+
26+
random.shuffle(endpoints)
27+
connection = pika.BlockingConnection(endpoints)
28+
try:
29+
channel = connection.channel()
30+
31+
logger.info("Queue 'hello' (quorum) deklarieren...")
32+
channel.queue_declare(
33+
queue="hello", durable=True, arguments={"x-queue-type": "quorum"}
34+
)
35+
36+
for i in range(10_000):
37+
msg = f"Nachricht {i}"
38+
channel.basic_publish(
39+
exchange="",
40+
routing_key="hello",
41+
body=msg,
42+
properties=pika.BasicProperties(delivery_mode=2),
43+
)
44+
logger.info(f"Gesendet: {msg}")
45+
time.sleep(3)
46+
except pika.exceptions.ConnectionClosedByBroker:
47+
logger.error(
48+
"Verbindung zum RabbitMQ Broker wurde geschlossen. Versuche erneut zu verbinden..."
49+
)

services/rabbit/test/sub.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import logging
2+
import os
3+
import random
4+
5+
import pika
6+
7+
logging.basicConfig(
8+
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s"
9+
)
10+
logger = logging.getLogger(__name__)
11+
12+
hosts = os.environ["RABBIT_HOSTS"].split(",")
13+
credentials = pika.PlainCredentials(os.getenv("RABBIT_USER"), os.getenv("RABBIT_PASS"))
14+
15+
hosts = os.environ["RABBIT_HOSTS"].split(",")
16+
credentials = pika.PlainCredentials(os.getenv("RABBIT_USER"), os.getenv("RABBIT_PASS"))
17+
18+
endpoints = [
19+
pika.URLParameters(
20+
f"amqp://{credentials.username}:{credentials.password}@{h.strip()}:5672/"
21+
)
22+
for h in hosts
23+
]
24+
25+
while True:
26+
logger.info(f"Verbinde zu RabbitMQ Hosts: {hosts}")
27+
28+
random.shuffle(endpoints)
29+
try:
30+
connection = pika.BlockingConnection(endpoints)
31+
channel = connection.channel()
32+
33+
logger.info("Queue 'hello' (quorum) deklarieren...")
34+
channel.queue_declare(
35+
queue="hello", durable=True, arguments={"x-queue-type": "quorum"}
36+
)
37+
38+
def callback(ch, method, properties, body):
39+
logger.info(f"Empfangen: {body.decode()}")
40+
ch.basic_ack(delivery_tag=method.delivery_tag)
41+
42+
channel.basic_consume(queue="hello", on_message_callback=callback)
43+
logger.info("Warte auf Nachrichten...")
44+
channel.start_consuming()
45+
except pika.exceptions.ConnectionClosedByBroker:
46+
logger.error(
47+
"Verbindung zum RabbitMQ Broker wurde geschlossen. Versuche erneut zu verbinden..."
48+
)

0 commit comments

Comments
 (0)