Skip to content

Commit d3885f7

Browse files
authored
Merge pull request #106 from freifunkMUC/multithread_worker
Add queues for netlink messages
2 parents 4a9436d + 7a4cbcd commit d3885f7

File tree

5 files changed

+82
-18
lines changed

5 files changed

+82
-18
lines changed

wgkex/worker/BUILD

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ py_library(
1616
],
1717
)
1818

19+
1920
py_test(
2021
name = "netlink_test",
2122
srcs = ["netlink_test.py"],
@@ -36,6 +37,7 @@ py_library(
3637
"//wgkex/common:utils",
3738
"//wgkex/common:logger",
3839
"//wgkex/config:config",
40+
":msg_queue",
3941
":netlink",
4042
],
4143
)
@@ -45,6 +47,7 @@ py_test(
4547
srcs = ["mqtt_test.py"],
4648
deps = [
4749
":mqtt",
50+
":msg_queue",
4851
requirement("mock"),
4952
],
5053
)
@@ -54,6 +57,7 @@ py_binary(
5457
srcs = ["app.py"],
5558
deps = [
5659
":mqtt",
60+
":msg_queue",
5761
"//wgkex/config:config",
5862
"//wgkex/common:logger",
5963
],
@@ -64,6 +68,16 @@ py_test(
6468
srcs = ["app_test.py"],
6569
deps = [
6670
":app",
71+
":msg_queue",
6772
requirement("mock"),
6873
],
6974
)
75+
76+
py_library(
77+
name = "msg_queue",
78+
srcs = ["msg_queue.py"],
79+
visibility = ["//visibility:public"],
80+
deps = [
81+
"//wgkex/common:logger",
82+
],
83+
)

wgkex/worker/app.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22

33
import wgkex.config.config as config
44
from wgkex.worker import mqtt
5+
from wgkex.worker.msg_queue import watch_queue
56
from wgkex.worker.netlink import wg_flush_stale_peers
6-
import threading
77
import time
8+
import threading
89
from wgkex.common import logger
910
from typing import List, Text
1011

@@ -60,6 +61,7 @@ def main():
6061
if not domains:
6162
raise DomainsNotInConfig("Could not locate domains in configuration.")
6263
clean_up_worker(domains)
64+
watch_queue()
6365
mqtt.connect()
6466

6567

wgkex/worker/mqtt.py

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,9 @@
77
from wgkex.config.config import load_config
88
import socket
99
import re
10-
from wgkex.worker.netlink import link_handler
11-
from wgkex.worker.netlink import WireGuardClient
12-
from typing import Optional, Dict, List, Any, Union
10+
from typing import Optional, Dict, Any, Union
1311
from wgkex.common import logger
12+
from wgkex.worker.msg_queue import q
1413

1514

1615
def fetch_from_config(var: str) -> Optional[Union[Dict[str, str], str]]:
@@ -93,13 +92,8 @@ def on_message(client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage) ->
9392
)
9493
domain = domain.group(1)
9594
logger.debug("Found domain %s", domain)
96-
client = WireGuardClient(
97-
public_key=str(message.payload.decode("utf-8")),
98-
domain=domain,
99-
remove=False,
100-
)
95+
10196
logger.info(
102-
f"Received create message for key {client.public_key} on domain {domain} with lladdr {client.lladdr}"
97+
f"Received create message for key {str(message.payload.decode('utf-8'))} on domain {domain} adding to queue"
10398
)
104-
# TODO(ruairi): Verify return type here.
105-
logger.debug(link_handler(client))
99+
q.put((domain, str(message.payload.decode("utf-8"))))

wgkex/worker/mqtt_test.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import unittest
33
import mock
44
import mqtt
5+
import msg_queue
56

67

78
class MQTTTest(unittest.TestCase):
@@ -40,10 +41,10 @@ def test_connect_fails_mqtt_error(self, config_mock, mqtt_mock):
4041
with self.assertRaises(ValueError):
4142
mqtt.connect()
4243

43-
@mock.patch.object(mqtt, "link_handler")
44+
45+
""" @mock.patch.object(msg_queue, "link_handler")
4446
@mock.patch.object(mqtt, "load_config")
4547
def test_on_message_success(self, config_mock, link_mock):
46-
"""Tests on_message for success."""
4748
config_mock.return_value = {"domain_prefix": "_ffmuc_"}
4849
link_mock.return_value = dict(WireGuard="result")
4950
mqtt_msg = mock.patch.object(mqtt.mqtt, "MQTTMessage")
@@ -53,18 +54,17 @@ def test_on_message_success(self, config_mock, link_mock):
5354
link_mock.assert_has_calls(
5455
[
5556
mock.call(
56-
mqtt.WireGuardClient(
57+
msg_queue.WireGuardClient(
5758
public_key="PUB_KEY", domain="domain1", remove=False
5859
)
5960
)
6061
],
6162
any_order=True,
6263
)
6364
64-
@mock.patch.object(mqtt, "link_handler")
65+
@mock.patch.object(msg_queue, "link_handler")
6566
@mock.patch.object(mqtt, "load_config")
6667
def test_on_message_fails_no_domain(self, config_mock, link_mock):
67-
"""Tests on_message for failure to parse domain."""
6868
config_mock.return_value = {
6969
"domain_prefix": "ffmuc_",
7070
"log_level": "DEBUG",
@@ -83,7 +83,7 @@ def test_on_message_fails_no_domain(self, config_mock, link_mock):
8383
mqtt_msg.topic = "bad_domain_match"
8484
with self.assertRaises(ValueError):
8585
mqtt.on_message(None, None, mqtt_msg)
86-
86+
"""
8787

8888
if __name__ == "__main__":
8989
unittest.main()

wgkex/worker/msg_queue.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
#!/usr/bin/env python3
2+
import threading
3+
from queue import Queue
4+
from time import sleep
5+
from wgkex.common import logger
6+
from wgkex.worker.netlink import link_handler
7+
from wgkex.worker.netlink import WireGuardClient
8+
9+
10+
class UniqueQueue(Queue):
11+
def put(self, item, block=True, timeout=None):
12+
if item not in self.queue:
13+
Queue.put(self, item, block, timeout)
14+
15+
def _init(self, maxsize):
16+
self.queue = set()
17+
18+
def _put(self, item):
19+
self.queue.add(item)
20+
21+
def _get(self):
22+
return self.queue.pop()
23+
24+
25+
q = UniqueQueue()
26+
27+
28+
def watch_queue() -> None:
29+
"""Watches the queue for new messages."""
30+
logger.debug("Starting queue watcher")
31+
threading.Thread(target=pick_from_queue, daemon=True).start()
32+
33+
34+
def pick_from_queue() -> None:
35+
"""Picks a message from the queue and processes it."""
36+
logger.debug("Starting queue processor")
37+
while True:
38+
if not q.empty():
39+
logger.debug("Queue is not empty current size is %i", q.qsize())
40+
domain, message = q.get()
41+
logger.debug("Processing queue item %s for domain %s", message, domain)
42+
client = WireGuardClient(
43+
public_key=message,
44+
domain=domain,
45+
remove=False,
46+
)
47+
logger.info(
48+
f"Processing queue for key {client.public_key} on domain {domain} with lladdr {client.lladdr}"
49+
)
50+
logger.debug(link_handler(client))
51+
q.task_done()
52+
else:
53+
logger.debug("Queue is empty")
54+
sleep(1)

0 commit comments

Comments
 (0)