Skip to content

Commit e377d1e

Browse files
committed
Fix candidate for messages processing with rate limits
1 parent fc7592f commit e377d1e

File tree

1 file changed

+53
-19
lines changed

1 file changed

+53
-19
lines changed

tb_device_mqtt.py

Lines changed: 53 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
2: "invalid client identifier",
6161
3: "server unavailable",
6262
4: "bad username or password",
63-
5: "not authorised",
63+
5: "not authorized",
6464
}
6565

6666

@@ -235,12 +235,17 @@ def has_limit(self):
235235

236236
def set_limit(self, rate_limit, percentage=80):
237237
with self.__lock:
238+
self._minimal_timeout = DEFAULT_TIMEOUT
239+
self._minimal_limit = 1000000000
238240
old_rate_limit_dict = deepcopy(self._rate_limit_dict)
239241
self._rate_limit_dict = {}
240-
self.percentage = percentage if percentage != 0 else self.percentage
242+
self.percentage = percentage if percentage > 0 else self.percentage
241243
rate_configs = rate_limit.split(";")
242244
if "," in rate_limit:
243245
rate_configs = rate_limit.split(",")
246+
if len(rate_configs) == 2 and rate_configs[0] == "0:0":
247+
self._no_limit = True
248+
return
244249
for rate in rate_configs:
245250
if rate == "":
246251
continue
@@ -718,18 +723,23 @@ def on_service_configuration(self, _, response, *args, **kwargs):
718723
if service_config.get('maxInflightMessages'):
719724
use_messages_rate_limit_factor = self._messages_rate_limit.has_limit()
720725
use_telemetry_rate_limit_factor = self._telemetry_rate_limit.has_limit()
726+
service_config_inflight_messages = int(service_config.get('maxInflightMessages', 100))
721727
if use_messages_rate_limit_factor and use_telemetry_rate_limit_factor:
722728
max_inflight_messages = int(min(self._messages_rate_limit.get_minimal_limit(),
723729
self._telemetry_rate_limit.get_minimal_limit(),
724-
service_config.get('maxInflightMessages', 100)) * 80 / 100)
730+
service_config_inflight_messages) * 80 / 100)
725731
elif use_messages_rate_limit_factor:
726732
max_inflight_messages = int(min(self._messages_rate_limit.get_minimal_limit(),
727-
service_config.get('maxInflightMessages', 100)) * 80 / 100)
733+
service_config_inflight_messages) * 80 / 100)
728734
elif use_telemetry_rate_limit_factor:
729735
max_inflight_messages = int(min(self._telemetry_rate_limit.get_minimal_limit(),
730-
service_config.get('maxInflightMessages', 100)) * 80 / 100)
736+
service_config_inflight_messages) * 80 / 100)
731737
else:
732738
max_inflight_messages = int(service_config.get('maxInflightMessages', 100) * 80 / 100)
739+
if max_inflight_messages == 0:
740+
max_inflight_messages = 10_000 # No limitation on device queue on transport level
741+
if max_inflight_messages < 1:
742+
max_inflight_messages = 1
733743
self.max_inflight_messages_set(max_inflight_messages)
734744
self.max_queued_messages_set(max_inflight_messages)
735745
if service_config.get('maxPayloadSize'):
@@ -773,8 +783,8 @@ def _wait_for_rate_limit_released(self, timeout, message_rate_limit, dp_rate_lim
773783
return TBPublishInfo(paho.MQTTMessageInfo(None))
774784
if not log_posted and limit_reached_check:
775785
if isinstance(limit_reached_check, int):
776-
log.warning("Rate limit reached for %i seconds, waiting for rate limit to be released...",
777-
limit_reached_check)
786+
log.debug("Rate limit reached for %i seconds, waiting for rate limit to be released...",
787+
limit_reached_check)
778788
waited = True
779789
else:
780790
log.debug("Waiting for rate limit to be released...")
@@ -787,28 +797,42 @@ def _wait_for_rate_limit_released(self, timeout, message_rate_limit, dp_rate_lim
787797
def _wait_until_current_queued_messages_processed(self):
788798
previous_notification_time = 0
789799
current_out_messages = len(self._client._out_messages) * 2
790-
inflight_messages = self._client._max_inflight_messages or 5
800+
max_inflight_messages = self._client._max_inflight_messages if self._client._max_inflight_messages > 0 else 5
791801
logger = None
792802
waiting_started = int(monotonic())
793803
connection_was_lost = False
794-
timeout_for_break = 600
804+
timeout_for_break = 300
795805

796806
if current_out_messages > 0:
797-
while current_out_messages >= inflight_messages and not self.stopped:
807+
while current_out_messages >= max_inflight_messages and not self.stopped:
798808
current_out_messages = len(self._client._out_messages)
799-
if int(monotonic()) - previous_notification_time > 5 and current_out_messages > inflight_messages:
809+
elapsed = monotonic() - waiting_started
810+
remaining = timeout_for_break - elapsed
811+
812+
if int(monotonic()) - previous_notification_time > 5 and current_out_messages > max_inflight_messages:
800813
if logger is None:
801814
logger = logging.getLogger('tb_connection')
802-
logger.debug("Waiting for messages to be processed by paho client, current queue size - %r, max inflight messages: %r", # noqa
803-
current_out_messages, inflight_messages)
815+
logger.debug(
816+
"Waiting for messages to be processed: current queue size: %r, max inflight: %r. "
817+
"Elapsed time: %.2f seconds, remaining timeout: %.2f seconds",
818+
current_out_messages, max_inflight_messages, elapsed, remaining
819+
)
804820
previous_notification_time = int(monotonic())
821+
805822
if not self.is_connected():
823+
with self._client._out_message_mutex:
824+
self._client._out_messages.clear()
806825
connection_was_lost = True
807-
if current_out_messages >= inflight_messages:
808-
sleep(.001)
809-
if int(monotonic()) - waiting_started > timeout_for_break and not connection_was_lost or self.stopped:
826+
827+
if current_out_messages >= max_inflight_messages:
828+
sleep(.01)
829+
830+
if (elapsed > timeout_for_break and not connection_was_lost) or self.stopped:
831+
logger.debug("Breaking wait loop after %.2f seconds due to timeout or stop signal.", elapsed)
810832
break
811833

834+
sleep(.001)
835+
812836
def _send_request(self, _type, kwargs, timeout=DEFAULT_TIMEOUT, device=None,
813837
msg_rate_limit=None, dp_rate_limit=None):
814838
if msg_rate_limit is None:
@@ -901,7 +925,8 @@ def __send_split_message(self, results, part, kwargs, timeout, device, msg_rate_
901925
if msg_rate_limit.has_limit() or dp_rate_limit.has_limit():
902926
msg_rate_limit.increase_rate_limit_counter()
903927
kwargs["payload"] = dumps(part['message'])
904-
self._wait_until_current_queued_messages_processed()
928+
if msg_rate_limit.has_limit() or dp_rate_limit.has_limit():
929+
self._wait_until_current_queued_messages_processed()
905930
if not self.stopped:
906931
if device is not None:
907932
log.debug("Device: %s, Sending message to topic: %s ", device, topic)
@@ -919,11 +944,20 @@ def __send_split_message(self, results, part, kwargs, timeout, device, msg_rate_
919944
log.debug("Data points rate limits after sending message: %r", dp_rate_limit.__dict__)
920945
result = self._client.publish(**kwargs)
921946
if result.rc == MQTT_ERR_QUEUE_SIZE:
947+
error_appear_counter = 1
948+
sleep_time = 0.1 # 100 ms, in case of change - change max tries in while loop
922949
while not self.stopped and result.rc == MQTT_ERR_QUEUE_SIZE:
950+
error_appear_counter += 1
951+
if error_appear_counter > 78: # 78 tries ~ totally 300 seconds for sleep 0.1
952+
# Clearing the queue and trying to send the message again
953+
log.warning("!!! Queue size exceeded, clearing the paho out queue and trying to send message again !!!") # Possible data loss, due to issue with paho queue clearing! # noqa
954+
with self._client._out_message_mutex:
955+
self._client._out_packet.clear()
956+
self._client._out_messages.clear()
923957
if int(monotonic()) - self.__error_logged > 10:
924-
log.warning("Queue size exceeded, waiting for messages to be processed by paho client.")
958+
log.debug("Queue size exceeded, waiting for messages to be processed by paho client.")
925959
self.__error_logged = int(monotonic())
926-
sleep(.01) # Give some time for paho to process messages
960+
sleep(sleep_time) # Give some time for paho to process messages
927961
result = self._client.publish(**kwargs)
928962
results.append(result)
929963

0 commit comments

Comments
 (0)