Skip to content

Commit 1bf3af7

Browse files
authored
Merge pull request #65 from freifunkMUC/fix_reconnect_handler
Fix reconnect handler
2 parents 9ee59e6 + 7cdae32 commit 1bf3af7

File tree

2 files changed

+25
-7
lines changed

2 files changed

+25
-7
lines changed

wgkex/worker/app.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ def main():
6767
if not domains:
6868
raise DomainsNotInConfig("Could not locate domains in configuration.")
6969
clean_up_worker(domains)
70-
mqtt.connect(domains)
70+
mqtt.connect()
7171

7272

7373
if __name__ == "__main__":

wgkex/worker/mqtt.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,29 +38,47 @@ def fetch_from_config(var: str) -> Optional[Union[Dict[str, str], str]]:
3838
return config.get(var)
3939

4040

41-
def connect(domains: List[str]) -> None:
41+
def connect() -> None:
4242
"""Connect to MQTT for the given domains.
4343
4444
Argument:
4545
domains: The domains to connect to.
4646
"""
47-
if not domains:
48-
logging.error("No domains were passed: %s", domains)
4947
base_config = fetch_from_config("mqtt")
5048
broker_address = base_config.get("broker_url")
5149
broker_port = base_config.get("broker_port")
5250
broker_keepalive = base_config.get("keepalive")
5351
# TODO(ruairi): Move the hostname to a global variable.
5452
client = mqtt.Client(socket.gethostname())
53+
54+
# Register handlers
55+
client.on_connect = on_connect
5556
client.on_message = on_message
5657
logging.info("connecting to broker %s", broker_address)
57-
client.max_inflight_messages_set(200)
58+
5859
client.connect(broker_address, port=broker_port, keepalive=broker_keepalive)
60+
client.loop_forever()
61+
62+
63+
# The callback for when the client receives a CONNACK response from the server.
64+
def on_connect(client: mqtt.Client, userdata: Any, flags, rc) -> None:
65+
"""Handles MQTT connect and subscribes to topics on connect
66+
67+
Arguments:
68+
client: the client instance for this callback.
69+
userdata: the private user data.
70+
flags: The MQTT flags.
71+
rc: The MQTT rc.
72+
"""
73+
logging.debug("Connected with result code " + str(rc))
74+
domains = load_config().get("domains")
75+
76+
# Subscribing in on_connect() means that if we lose the connection and
77+
# reconnect then subscriptions will be renewed.
5978
for domain in domains:
6079
topic = f"wireguard/{domain}/+"
6180
logging.info(f"Subscribing to topic {topic}")
6281
client.subscribe(topic)
63-
client.loop_forever()
6482

6583

6684
def on_message(client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage) -> None:
@@ -83,4 +101,4 @@ def on_message(client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage) ->
83101
)
84102
logging.info(f"Received node create message for key {client.public_key}")
85103
# TODO(ruairi): Verify return type here.
86-
logging.info(link_handler(client))
104+
logging.debug(link_handler(client))

0 commit comments

Comments
 (0)