Skip to content

Commit 0c3ed6b

Browse files
committed
Maintenance: Add type hints across the board
1 parent 40e4d7c commit 0c3ed6b

31 files changed

+247
-156
lines changed

mqttwarn/commands.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# -*- coding: utf-8 -*-
2-
# (c) 2014-2019 The mqttwarn developers
2+
# (c) 2014-2023 The mqttwarn developers
33
from __future__ import print_function
44

55
import codecs
@@ -8,6 +8,7 @@
88
import os
99
import signal
1010
import sys
11+
import typing as t
1112

1213
from docopt import docopt
1314

@@ -92,7 +93,13 @@ def run():
9293
run_mqttwarn()
9394

9495

95-
def launch_plugin_standalone(plugin, options, data, configfile=None, config_more=None):
96+
def launch_plugin_standalone(
97+
plugin: str,
98+
options: t.Dict,
99+
data: t.Dict,
100+
configfile: t.Optional[str] = None,
101+
config_more: t.Optional[t.Dict] = None,
102+
):
96103

97104
# Optionally load configuration file
98105
does_not_exist = False
@@ -148,7 +155,7 @@ def run_mqttwarn():
148155
subscribe_forever()
149156

150157

151-
def setup_logging(config):
158+
def setup_logging(config: Config):
152159
LOGLEVEL = config.loglevelnumber
153160
LOGFILE = config.logfile
154161
LOGFORMAT = config.logformat

mqttwarn/configuration.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
# -*- coding: utf-8 -*-
2-
# (c) 2014-2022 The mqttwarn developers
2+
# (c) 2014-2023 The mqttwarn developers
33
import ast
44
import codecs
55
import logging
66
import os
77
import sys
8+
import typing as t
89
from configparser import NoOptionError, RawConfigParser
910

1011
from mqttwarn.util import load_functions
@@ -21,13 +22,13 @@
2122

2223
class Config(RawConfigParser):
2324

24-
specials = {
25+
specials: t.Dict[str, t.Union[bool, None]] = {
2526
"TRUE": True,
2627
"FALSE": False,
2728
"NONE": None,
2829
}
2930

30-
def __init__(self, configuration_file=None, defaults=None):
31+
def __init__(self, configuration_file: t.Optional[str] = None, defaults: t.Optional[t.Dict] = None):
3132

3233
defaults = defaults or {}
3334

@@ -48,6 +49,7 @@ def __init__(self, configuration_file=None, defaults=None):
4849
self.password = None
4950
self.clientid = None
5051
self.lwt = None
52+
self.lwt_alive = None
5153
self.skipretained = False
5254
self.cleansession = False
5355
self.protocol = 3
@@ -110,7 +112,7 @@ def __init__(self, configuration_file=None, defaults=None):
110112

111113
self.functions = load_functions(functions_file)
112114

113-
def level2number(self, level):
115+
def level2number(self, level: str) -> int:
114116

115117
levels = {
116118
"CRITICAL": 50,
@@ -125,7 +127,8 @@ def level2number(self, level):
125127

126128
return levels.get(level.upper(), levels["DEBUG"])
127129

128-
def g(self, section, key, default=None):
130+
def g(self, section: str, key: str, default=None) -> t.Any:
131+
val = None
129132
try:
130133
val = self.get(section, key)
131134
if isinstance(val, str) and val.upper() in self.specials:
@@ -139,29 +142,29 @@ def g(self, section, key, default=None):
139142
return val
140143
except:
141144
raise
142-
return val
143145

144-
def getlist(self, section, key):
146+
def getlist(self, section: str, key: str) -> t.Union[t.List, None]:
145147
"""Return a list, fail if it isn't a list"""
146148

147149
val = None
148150
try:
149-
val = self.get(section, key)
150-
val = [s.strip() for s in val.split(",")]
151+
val_str = self.get(section, key)
152+
val = [s.strip() for s in val_str.split(",")]
151153
except Exception as e:
152154
logger.warning("Expecting a list in section `%s', key `%s' (%s)" % (section, key, e))
153155

154156
return val
155157

156-
def getdict(self, section, key):
158+
# TODO: Add return type annotation.
159+
def getdict(self, section: str, key: str):
157160
val = self.g(section, key)
158161

159162
try:
160163
return dict(val)
161164
except:
162165
return None
163166

164-
def config(self, section):
167+
def config(self, section: str) -> t.Dict:
165168
"""Convert a whole section's options (except the options specified
166169
explicitly below) into a dict, turning
167170
@@ -183,17 +186,17 @@ def config(self, section):
183186
return d
184187

185188

186-
def load_configuration(configfile=None, name="mqttwarn"):
189+
def load_configuration(configfile: t.Optional[str] = None, name: str = "mqttwarn") -> Config:
187190

188191
if configfile is None:
189-
configfile = os.getenv(name.upper() + "INI", name + ".ini")
192+
configfile = str(os.getenv(name.upper() + "INI", name + ".ini"))
190193

191194
if not os.path.exists(configfile):
192195
raise FileNotFoundError('Configuration file "{}" does not exist'.format(configfile))
193196

194197
# TODO: There should be a factory method which creates a `Config` instance,
195198
# including defaults, but without loading a configuration file.
196-
defaults = {
199+
defaults: t.Dict[str, str] = {
197200
"clientid": name,
198201
"lwt": "clients/{}".format(name),
199202
"lwt_alive": "1",

mqttwarn/context.py

Lines changed: 49 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
# -*- coding: utf-8 -*-
2-
# (c) 2014-2022 The mqttwarn developers
2+
# (c) 2014-2023 The mqttwarn developers
33
import logging
44
import typing as t
55

66
import attr
77

88
from mqttwarn.configuration import Config
9+
from mqttwarn.model import Service, TdataType, TopicTargetType
910
from mqttwarn.util import load_function, sanitize_function_name
1011

1112
logger = logging.getLogger(__name__)
@@ -20,9 +21,9 @@ class RuntimeContext:
2021
"""
2122

2223
config: Config = attr.ib()
23-
invoker: t.Optional["FunctionInvoker"] = attr.ib()
24+
invoker: "FunctionInvoker" = attr.ib()
2425

25-
def get_sections(self):
26+
def get_sections(self) -> t.List[str]:
2627
sections = []
2728
for section in self.config.sections():
2829
if section == "defaults":
@@ -39,57 +40,57 @@ def get_sections(self):
3940
logger.warning("Section `%s' has no targets defined" % section)
4041
return sections
4142

42-
def get_topic(self, section):
43+
def get_topic(self, section: str) -> str:
4344
if self.config.has_option(section, "topic"):
4445
return self.config.get(section, "topic")
4546
return section
4647

47-
def get_qos(self, section):
48+
def get_qos(self, section: str) -> int:
4849
qos = 0
4950
if self.config.has_option(section, "qos"):
5051
qos = int(self.config.get(section, "qos"))
5152
return qos
5253

53-
def get_config(self, section, name):
54+
def get_config(self, section: str, name: str) -> t.Any:
5455
value = None
5556
if self.config.has_option(section, name):
5657
value = self.config.get(section, name)
5758
return value
5859

59-
def is_filtered(self, section, topic, payload):
60+
def is_filtered(self, section: str, topic: str, payload: t.AnyStr) -> bool:
6061
if self.config.has_option(section, "filter"):
61-
filterfunc = sanitize_function_name(self.config.get(section, "filter"))
6262
try:
63-
return self.invoker.filter(filterfunc, topic, payload, section)
63+
name = sanitize_function_name(self.config.get(section, "filter"))
64+
return self.invoker.filter(name, topic, payload, section)
6465
except Exception as e:
65-
logger.exception("Cannot invoke filter function '%s' defined in '%s': %s" % (filterfunc, section, e))
66+
logger.exception("Cannot invoke filter function '%s' defined in '%s': %s" % (name, section, e))
6667
return False
6768

68-
def get_topic_data(self, section, topic):
69+
def get_topic_data(self, section: str, data: TdataType) -> t.Optional[TdataType]:
6970
if self.config.has_option(section, "datamap"):
70-
name = sanitize_function_name(self.config.get(section, "datamap"))
7171
try:
72-
return self.invoker.datamap(name, topic)
72+
name = sanitize_function_name(self.config.get(section, "datamap"))
73+
return self.invoker.datamap(name, data)
7374
except Exception as e:
7475
logger.exception("Cannot invoke datamap function '%s' defined in '%s': %s" % (name, section, e))
7576
return None
7677

77-
def get_all_data(self, section, topic, data):
78+
def get_all_data(self, section: str, topic: str, data: TdataType) -> t.Optional[TdataType]:
7879
if self.config.has_option(section, "alldata"):
79-
name = sanitize_function_name(self.config.get(section, "alldata"))
8080
try:
81+
name = sanitize_function_name(self.config.get(section, "alldata"))
8182
return self.invoker.alldata(name, topic, data)
8283
except Exception as e:
8384
logger.exception("Cannot invoke alldata function '%s' defined in '%s': %s" % (name, section, e))
8485
return None
8586

86-
def get_topic_targets(self, section, topic, data):
87+
def get_topic_targets(self, section: str, topic: str, data: TdataType) -> TopicTargetType:
8788
"""
8889
Topic targets function invoker.
8990
"""
9091
if self.config.has_option(section, "targets"):
91-
name = sanitize_function_name(self.config.get(section, "targets"))
9292
try:
93+
name = sanitize_function_name(self.config.get(section, "targets"))
9394
return self.invoker.topic_target_list(name, topic, data)
9495
except Exception as ex:
9596
error = repr(ex)
@@ -99,20 +100,31 @@ def get_topic_targets(self, section, topic, data):
99100
)
100101
return None
101102

102-
def get_service_config(self, service):
103+
def get_service_config(self, service: str) -> t.Dict[str, t.Any]:
103104
config = self.config.config("config:" + service)
104105
if config is None:
105106
return {}
106107
return dict(config)
107108

108-
def get_service_targets(self, service):
109-
# Be more graceful with jobs w/o any target address information (2021-10-18 [amo]).
109+
def get_service_targets(self, service: str) -> t.List[TopicTargetType]:
110+
"""
111+
Resolve target address descriptor.
112+
113+
2021-10-18 [amo]: Be more graceful with jobs w/o any target address information.
114+
"""
115+
targets: t.List[TopicTargetType] = []
110116
try:
111-
targets = self.config.getdict("config:" + service, "targets") or [None]
112-
return targets
117+
targets = self.config.getdict("config:" + service, "targets")
113118
except:
114119
logger.exception("Unable to access targets for service `%s'" % service)
115120

121+
# TODO: The target address descriptor may be of any type these days,
122+
# and not necessarily a list.
123+
# TODO: Currently, this makes sure to always return one element.
124+
# Verify if this is really needed.
125+
targets = targets or [None]
126+
return targets
127+
116128

117129
@attr.s
118130
class FunctionInvoker:
@@ -121,31 +133,32 @@ class FunctionInvoker:
121133
functions from a configured Python source code file.
122134
"""
123135

124-
config = attr.ib()
125-
srv = attr.ib()
136+
config: Config = attr.ib()
137+
srv: Service = attr.ib()
126138

127-
def datamap(self, name, topic):
139+
def datamap(self, name: str, data: TdataType) -> TdataType:
128140
"""
129141
Invoke function "name" loaded from the "functions" Python module.
130142
131143
:param name: Function name to invoke
132-
:param topic: Topic to pass to the invoked function
144+
:param data: Data to pass to the invoked function
133145
:return: Return value of function invocation
134146
"""
135147

136148
val = None
149+
137150
try:
138151
func = load_function(name=name, py_mod=self.config.functions)
139152
try:
140-
val = func(topic, self.srv) # new version
153+
val = func(data, self.srv) # new version
141154
except TypeError:
142-
val = func(topic) # legacy
155+
val = func(data) # legacy
143156
except:
144157
raise
145158

146159
return val
147160

148-
def alldata(self, name, topic, data):
161+
def alldata(self, name: str, topic: str, data: TdataType) -> TdataType:
149162
"""
150163
Invoke function "name" loaded from the "functions" Python module.
151164
@@ -164,7 +177,7 @@ def alldata(self, name, topic, data):
164177

165178
return val
166179

167-
def topic_target_list(self, name, topic, data):
180+
def topic_target_list(self, name: str, topic: str, data: TdataType) -> TopicTargetType:
168181
"""
169182
Invoke function "name" loaded from the "functions" Python module.
170183
Computes dynamic topic subscription targets.
@@ -185,7 +198,7 @@ def topic_target_list(self, name, topic, data):
185198

186199
return val
187200

188-
def filter(self, name, topic, payload, section=None): # noqa:A003
201+
def filter(self, name: str, topic: str, payload: t.AnyStr, section: t.Optional[str] = None) -> bool: # noqa:A003
189202
"""
190203
Invoke function "name" loaded from the "functions" Python module.
191204
Return that function's True/False.
@@ -199,15 +212,17 @@ def filter(self, name, topic, payload, section=None): # noqa:A003
199212
# Filtering currently only works on text.
200213
# TODO: To let filtering also work on binary data, this line would need to go elsewhere. But where?
201214
if isinstance(payload, bytes):
202-
payload = payload.decode("utf-8")
215+
payload_decoded = payload.decode("utf-8")
216+
else:
217+
payload_decoded = payload
203218

204219
rc = False
205220
try:
206221
func = load_function(name=name, py_mod=self.config.functions)
207222
try:
208-
rc = func(topic, payload, section, self.srv) # new version
223+
rc = func(topic, payload_decoded, section, self.srv) # new version
209224
except TypeError:
210-
rc = func(topic, payload) # legacy signature
225+
rc = func(topic, payload_decoded) # legacy signature
211226
except:
212227
raise
213228

0 commit comments

Comments
 (0)