|
18 | 18 | from time import sleep |
19 | 19 | from importlib import metadata |
20 | 20 | from utils import install_package |
| 21 | +from os import environ |
21 | 22 |
|
22 | 23 | def check_tb_paho_mqtt_installed(): |
23 | 24 | try: |
@@ -213,8 +214,18 @@ def get_remaining_tokens(self): |
213 | 214 | return self.tokens |
214 | 215 |
|
215 | 216 |
|
| 217 | +DEFAULT_RATE_LIMIT_PERCENTAGE = environ.get('TB_DEFAULT_RATE_LIMIT_PERCENTAGE') |
| 218 | +if DEFAULT_RATE_LIMIT_PERCENTAGE is None: |
| 219 | + DEFAULT_RATE_LIMIT_PERCENTAGE = 80 |
| 220 | +else: |
| 221 | + try: |
| 222 | + DEFAULT_RATE_LIMIT_PERCENTAGE = int(DEFAULT_RATE_LIMIT_PERCENTAGE) |
| 223 | + except ValueError: |
| 224 | + log.warning("Invalid value for TB_DEFAULT_RATE_LIMIT_PERCENTAGE, using default value of 80%%") |
| 225 | + DEFAULT_RATE_LIMIT_PERCENTAGE = 80 |
| 226 | + |
216 | 227 | class RateLimit: |
217 | | - def __init__(self, rate_limit, name=None, percentage=80): |
| 228 | + def __init__(self, rate_limit, name=None, percentage=DEFAULT_RATE_LIMIT_PERCENTAGE): |
218 | 229 | self.__reached_limit_index = 0 |
219 | 230 | self.__reached_limit_index_time = 0 |
220 | 231 | self._no_limit = False |
@@ -313,7 +324,7 @@ def get_minimal_timeout(self): |
313 | 324 | def has_limit(self): |
314 | 325 | return not self._no_limit |
315 | 326 |
|
316 | | - def set_limit(self, rate_limit, percentage=80): |
| 327 | + def set_limit(self, rate_limit, percentage=DEFAULT_RATE_LIMIT_PERCENTAGE): |
317 | 328 | with self.__lock: |
318 | 329 | self._minimal_timeout = DEFAULT_TIMEOUT |
319 | 330 | self._minimal_limit = float("inf") |
@@ -853,23 +864,23 @@ def on_service_configuration(self, _, response, *args, **kwargs): |
853 | 864 | if use_messages_rate_limit_factor and use_telemetry_rate_limit_factor: |
854 | 865 | max_inflight_messages = int(min(self._messages_rate_limit.get_minimal_limit(), |
855 | 866 | self._telemetry_rate_limit.get_minimal_limit(), |
856 | | - service_config_inflight_messages) * 80 / 100) |
| 867 | + service_config_inflight_messages) * DEFAULT_RATE_LIMIT_PERCENTAGE / 100) |
857 | 868 | elif use_messages_rate_limit_factor: |
858 | 869 | max_inflight_messages = int(min(self._messages_rate_limit.get_minimal_limit(), |
859 | | - service_config_inflight_messages) * 80 / 100) |
| 870 | + service_config_inflight_messages) * DEFAULT_RATE_LIMIT_PERCENTAGE / 100) |
860 | 871 | elif use_telemetry_rate_limit_factor: |
861 | 872 | max_inflight_messages = int(min(self._telemetry_rate_limit.get_minimal_limit(), |
862 | | - service_config_inflight_messages) * 80 / 100) |
| 873 | + service_config_inflight_messages) * DEFAULT_RATE_LIMIT_PERCENTAGE / 100) |
863 | 874 | else: |
864 | | - max_inflight_messages = int(service_config.get('maxInflightMessages', 100) * 80 / 100) |
| 875 | + max_inflight_messages = int(service_config.get('maxInflightMessages', 100) * DEFAULT_RATE_LIMIT_PERCENTAGE / 100) |
865 | 876 | if max_inflight_messages == 0: |
866 | 877 | max_inflight_messages = 10_000 # No limitation on device queue on transport level |
867 | 878 | if max_inflight_messages < 1: |
868 | 879 | max_inflight_messages = 1 |
869 | 880 | self.max_inflight_messages_set(max_inflight_messages) |
870 | 881 | self.max_queued_messages_set(max_inflight_messages) |
871 | 882 | if service_config.get('maxPayloadSize'): |
872 | | - self.max_payload_size = int(int(service_config.get('maxPayloadSize')) * 80 / 100) |
| 883 | + self.max_payload_size = int(int(service_config.get('maxPayloadSize')) * DEFAULT_RATE_LIMIT_PERCENTAGE / 100) |
873 | 884 | log.info("Service configuration was successfully retrieved and applied.") |
874 | 885 | log.info("Current device limits: %r", service_config) |
875 | 886 | self.rate_limits_received = True |
|
0 commit comments