Skip to content

Commit 8532685

Browse files
committed
Updated rate limits processing
1 parent 69844fc commit 8532685

File tree

3 files changed

+86
-85
lines changed

3 files changed

+86
-85
lines changed

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
with open(path.join(this_directory, 'README.md')) as f:
2222
long_description = f.read()
2323

24-
VERSION = "1.13.1"
24+
VERSION = "1.13.4"
2525

2626
setup(
2727
version=VERSION,

tb_device_mqtt.py

Lines changed: 77 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -185,13 +185,13 @@ def get(self):
185185

186186
class GreedyTokenBucket:
187187
def __init__(self, capacity, duration_sec):
188-
self.capacity = capacity
189-
self.duration = duration_sec
190-
self.tokens = capacity
191-
self.last_updated = int(monotonic())
188+
self.capacity = float(capacity)
189+
self.duration = float(duration_sec)
190+
self.tokens = float(capacity)
191+
self.last_updated = monotonic()
192192

193193
def refill(self):
194-
now = int(monotonic())
194+
now = monotonic()
195195
elapsed = now - self.last_updated
196196
refill_rate = self.capacity / self.duration
197197
refill_amount = elapsed * refill_rate
@@ -200,7 +200,7 @@ def refill(self):
200200

201201
def can_consume(self, amount=1):
202202
self.refill()
203-
return self.tokens >= amount
203+
return round(self.tokens, 6) >= round(amount, 6)
204204

205205
def consume(self, amount=1):
206206
self.refill()
@@ -307,10 +307,9 @@ def check_limit_reached(self, amount=1):
307307

308308
for duration, bucket in self._rate_buckets.items():
309309
log.debug("%s left tokens: %.2f per %r seconds",
310-
self.name,
311-
bucket.get_remaining_tokens(),
312-
duration)
313-
bucket.consume(amount)
310+
self.name,
311+
bucket.get_remaining_tokens(),
312+
duration)
314313

315314
return False
316315

@@ -378,7 +377,7 @@ def reach_limit(self):
378377

379378
with self.__lock:
380379
durations = sorted(self._rate_buckets.keys())
381-
current_monotonic = int(monotonic())
380+
current_monotonic = monotonic()
382381
if self.__reached_limit_index_time >= current_monotonic - self._rate_buckets[durations[-1]].duration:
383382
self.__reached_limit_index = 0
384383
self.__reached_limit_index_time = current_monotonic
@@ -878,7 +877,17 @@ def on_service_configuration(self, _, response, *args, **kwargs):
878877
if max_inflight_messages < 1:
879878
max_inflight_messages = 1
880879
self.max_inflight_messages_set(max_inflight_messages)
881-
self.max_queued_messages_set(max_inflight_messages)
880+
881+
if (not self._messages_rate_limit.has_limit() and
882+
not self._telemetry_rate_limit.has_limit() and
883+
not self._telemetry_dp_rate_limit.has_limit() and
884+
not kwargs.get("gateway_limits_present", False)):
885+
log.debug("No rate limits for device, setting max_queued_messages to 50000")
886+
self.max_queued_messages_set(50000)
887+
else:
888+
log.debug("Rate limits for device, setting max_queued_messages to %r", max_inflight_messages)
889+
self.max_queued_messages_set(max_inflight_messages)
890+
882891
if service_config.get('maxPayloadSize'):
883892
self.max_payload_size = int(int(service_config.get('maxPayloadSize')) * DEFAULT_RATE_LIMIT_PERCENTAGE / 100)
884893
log.info("Service configuration was successfully retrieved and applied.")
@@ -968,9 +977,6 @@ def _wait_until_current_queued_messages_processed(self):
968977
)
969978
previous_notification_time = int(monotonic())
970979

971-
if not self.is_connected():
972-
with self._client._out_message_mutex:
973-
self._client._out_messages.clear()
974980
connection_was_lost = True
975981

976982
if current_out_messages >= max_inflight_messages:
@@ -1098,11 +1104,10 @@ def __send_split_message(self, results, part, kwargs, timeout, device, msg_rate_
10981104
while not self.stopped and result.rc == MQTT_ERR_QUEUE_SIZE:
10991105
error_appear_counter += 1
11001106
if error_appear_counter > 78: # 78 tries ~ totally 300 seconds for sleep 0.1
1101-
# Clearing the queue and trying to send the message again
1102-
log.warning("!!! Queue size exceeded, clearing the paho out queue and trying to send message again !!!") # Possible data loss, due to issue with paho queue clearing! # noqa
1103-
with self._client._out_message_mutex:
1104-
self._client._out_packet.clear()
1105-
self._client._out_messages.clear()
1107+
log.warning("Cannot send message to platform in %i seconds, queue size exceeded, current max inflight messages: %r, max queued messages: %r.", # noqa
1108+
int(error_appear_counter * sleep_time),
1109+
self._client._max_inflight_messages,
1110+
self._client._max_queued_messages)
11061111
if int(monotonic()) - self.__error_logged > 10:
11071112
log.debug("Queue size exceeded, waiting for messages to be processed by paho client.")
11081113
self.__error_logged = int(monotonic())
@@ -1333,10 +1338,12 @@ def _split_message(message_pack, datapoints_max_count, max_payload_size):
13331338

13341339
split_messages = []
13351340

1336-
# Handle RPC message case
13371341
if isinstance(message_pack, dict) and message_pack.get('device') and len(message_pack) in [1, 2]:
1338-
return [{'data': message_pack, 'datapoints': TBDeviceMqttClient._count_datapoints_in_message(message_pack),
1339-
'message': message_pack}]
1342+
return [{
1343+
'data': message_pack,
1344+
'datapoints': TBDeviceMqttClient._count_datapoints_in_message(message_pack),
1345+
'message': message_pack
1346+
}]
13401347

13411348
if not isinstance(message_pack, list):
13421349
message_pack = [message_pack]
@@ -1345,80 +1352,67 @@ def _split_message(message_pack, datapoints_max_count, max_payload_size):
13451352

13461353
append_split_message = split_messages.append
13471354

1348-
final_message_item = {'data': [], 'datapoints': 0}
1355+
# Group cache key = (ts, metadata_repr or None)
1356+
ts_group_cache = {}
1357+
1358+
def _get_metadata_repr(metadata):
1359+
if isinstance(metadata, dict):
1360+
return tuple(sorted(metadata.items()))
1361+
return None
1362+
1363+
def flush_ts_group(ts_key):
1364+
if ts_key in ts_group_cache:
1365+
ts, metadata_repr = ts_key
1366+
values, size, metadata = ts_group_cache.pop(ts_key)
1367+
if ts is not None:
1368+
chunk = {"ts": ts, "values": values}
1369+
if metadata:
1370+
chunk["metadata"] = metadata
1371+
else:
1372+
chunk = values # Raw mode, no ts
13491373

1350-
message_item_values_with_allowed_size = {}
1351-
ts = None
1352-
current_size = 0
1374+
message = {
1375+
"data": [chunk],
1376+
"datapoints": len(values)
1377+
}
1378+
append_split_message(message)
13531379

1354-
for (message_index, message) in enumerate(message_pack):
1380+
for message_index, message in enumerate(message_pack):
13551381
if not isinstance(message, dict):
13561382
log.error("Message is not a dictionary!")
13571383
log.debug("Message: %s", message)
13581384
continue
1359-
old_ts = ts if ts is not None else message.get("ts")
1360-
ts = message.get("ts")
1361-
ts_changed = ts is not None and old_ts != ts
13621385

1386+
ts = message.get("ts")
13631387
values = message.get("values", message)
1364-
values_data_keys = list(values)
1388+
metadata = message.get("metadata") if "metadata" in message and isinstance(message["metadata"],
1389+
dict) else None
1390+
metadata_repr = _get_metadata_repr(metadata)
13651391

1366-
values_length = len(values_data_keys)
1367-
1368-
if values_length == 1:
1369-
single_data = {'ts': ts, 'values': values} if ts else values
1370-
if not isinstance(single_data, list):
1371-
single_data = [single_data]
1372-
append_split_message({'data': single_data, 'datapoints': 1})
1373-
continue
1392+
ts_key = (ts, metadata_repr)
13741393

1375-
for current_data_key_index, data_key in enumerate(values_data_keys):
1376-
value = values[data_key]
1394+
for data_key, value in values.items():
13771395
data_key_size = len(data_key) + len(str(value))
13781396

1379-
if ((datapoints_max_count == 0 or len(message_item_values_with_allowed_size) < datapoints_max_count)
1380-
and current_size + data_key_size < max_payload_size) and not ts_changed:
1381-
message_item_values_with_allowed_size[data_key] = value
1382-
current_size += data_key_size
1383-
1384-
if ((TBDeviceMqttClient._datapoints_limit_reached(datapoints_max_count, len(message_item_values_with_allowed_size), current_size)) # noqa
1385-
or TBDeviceMqttClient._payload_size_limit_reached(max_payload_size, current_size, data_key_size)) or ts_changed: # noqa
1386-
if ts:
1387-
ts_to_write = ts
1388-
if old_ts is not None and old_ts != ts:
1389-
ts_to_write = old_ts
1390-
old_ts = ts
1391-
message_chunk = {"ts": ts_to_write, "values": message_item_values_with_allowed_size.copy()}
1392-
if 'metadata' in message:
1393-
message_chunk['metadata'] = message['metadata']
1394-
final_message_item['data'].append(message_chunk)
1395-
else:
1396-
final_message_item['data'].append(message_item_values_with_allowed_size.copy())
1397-
1398-
final_message_item['datapoints'] = len(message_item_values_with_allowed_size)
1399-
append_split_message(final_message_item.copy())
1400-
final_message_item = {'data': [], 'datapoints': 0}
1401-
1402-
message_item_values_with_allowed_size.clear()
1403-
if ts_changed:
1404-
message_item_values_with_allowed_size[data_key] = value
1405-
current_size += data_key_size
1406-
ts_changed = False
1407-
current_size = 0
1408-
1409-
if (message_index == len(message_pack) - 1
1410-
and len(message_item_values_with_allowed_size) > 0):
1411-
if ts:
1412-
message_chunk = {"ts": ts, "values": message_item_values_with_allowed_size.copy()}
1413-
if 'metadata' in message:
1414-
message_chunk['metadata'] = message['metadata']
1415-
final_message_item['data'].append(message_chunk)
1397+
if ts_key not in ts_group_cache:
1398+
ts_group_cache[ts_key] = ({}, 0, metadata)
1399+
1400+
ts_values, current_size, current_metadata = ts_group_cache[ts_key]
1401+
1402+
can_add = (
1403+
(datapoints_max_count == 0 or len(ts_values) < datapoints_max_count)
1404+
and (current_size + data_key_size < max_payload_size)
1405+
)
1406+
1407+
if can_add:
1408+
ts_values[data_key] = value
1409+
ts_group_cache[ts_key] = (ts_values, current_size + data_key_size, metadata)
14161410
else:
1417-
final_message_item['data'].append(message_item_values_with_allowed_size.copy())
1411+
flush_ts_group(ts_key)
1412+
ts_group_cache[ts_key] = ({data_key: value}, data_key_size, metadata)
14181413

1419-
final_message_item['datapoints'] = len(message_item_values_with_allowed_size)
1420-
if final_message_item['data']:
1421-
append_split_message(final_message_item.copy())
1414+
for ts_key in list(ts_group_cache.keys()):
1415+
flush_ts_group(ts_key)
14221416

14231417
return split_messages
14241418

tb_gateway_mqtt.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,8 +333,15 @@ def __on_service_configuration(self, _, response, *args, **kwargs):
333333
else:
334334
self._devices_connected_through_gateway_telemetry_datapoints_rate_limit.set_limit('0:0,')
335335

336+
gateway_limits_present = any(
337+
[self._devices_connected_through_gateway_messages_rate_limit.has_limit(),
338+
self._devices_connected_through_gateway_telemetry_messages_rate_limit.has_limit(),
339+
self._devices_connected_through_gateway_telemetry_datapoints_rate_limit.has_limit()]
340+
)
341+
336342
super().on_service_configuration(_,
337343
{'rateLimits': gateway_device_itself_rate_limit_config, **service_config},
338344
*args,
339-
**kwargs)
345+
**kwargs,
346+
gateway_limits_present=gateway_limits_present)
340347
log.info("Current limits for devices connected through the gateway: %r", gateway_devices_rate_limit_config)

0 commit comments

Comments
 (0)