@@ -96,5 +96,341 @@ def test_subscribe_to_attrs(self):
9696 self .client .unsubscribe_from_attribute (sub_id_2 )
9797
9898
99+ class TestSplitMessageVariants (unittest .TestCase ):
100+
101+ def test_empty_input (self ):
102+ self .assertEqual (TBDeviceMqttClient ._split_message ([], 10 , 100 ), [])
103+
104+ def test_rpc_payload (self ):
105+ rpc_payload = {'device' : 'dev1' }
106+ result = TBDeviceMqttClient ._split_message (rpc_payload , 10 , 100 )
107+ self .assertEqual (len (result ), 1 )
108+ self .assertEqual (result [0 ]['data' ], rpc_payload )
109+
110+ def test_single_value_message (self ):
111+ msg = [{'ts' : 1 , 'values' : {'a' : 1 }}]
112+ result = TBDeviceMqttClient ._split_message (msg , 10 , 100 )
113+ self .assertEqual (result [0 ]['data' ][0 ]['values' ], {'a' : 1 })
114+
115+ def test_timestamp_change_split (self ):
116+ msg = [{'ts' : 1 , 'values' : {'a' : 1 }}, {'ts' : 2 , 'values' : {'b' : 2 }}]
117+ result = TBDeviceMqttClient ._split_message (msg , 10 , 100 )
118+ self .assertEqual (len (result ), 2 )
119+
120+ def test_exceeding_datapoint_limit (self ):
121+ msg = [{'ts' : 1 , 'values' : {f'k{ i } ' : i for i in range (10 )}}]
122+ result = TBDeviceMqttClient ._split_message (msg , 5 , 1000 )
123+ self .assertGreaterEqual (len (result ), 2 )
124+
125+ def test_message_with_metadata (self ):
126+ msg = [{'ts' : 1 , 'values' : {'a' : 1 }, 'metadata' : {'unit' : 'C' }}]
127+ result = TBDeviceMqttClient ._split_message (msg , 10 , 100 )
128+ self .assertIn ('metadata' , result [0 ]['data' ][0 ])
129+
130+ def test_large_payload_split (self ):
131+ msg = [{'ts' : 1 , 'values' : {f'key{ i } ' : 'v' * 50 for i in range (5 )}}]
132+ result = TBDeviceMqttClient ._split_message (msg , 10 , 100 )
133+ self .assertGreater (len (result ), 1 )
134+
135+ def test_metadata_present_with_ts (self ):
136+ msg = [{'ts' : 123456789 , 'values' : {'temperature' : 25 }, 'metadata' : {'unit' : 'C' }}]
137+ result = TBDeviceMqttClient ._split_message (msg , 10 , 100 )
138+ self .assertIn ('metadata' , result [0 ]['data' ][0 ])
139+ self .assertEqual (result [0 ]['data' ][0 ]['metadata' ], {'unit' : 'C' })
140+
141+ def test_metadata_ignored_without_ts (self ):
142+ msg = [{'values' : {'temperature' : 25 }, 'metadata' : {'unit' : 'C' }}]
143+ result = TBDeviceMqttClient ._split_message (msg , 10 , 100 )
144+ self .assertTrue (all ('metadata' not in entry for r in result for entry in r ['data' ]))
145+
146+ def test_grouping_same_ts_exceeds_datapoint_limit (self ):
147+ msg = [
148+ {'ts' : 1 , 'values' : {'a' : 1 , 'b' : 2 , 'c' : 3 }},
149+ {'ts' : 1 , 'values' : {'d' : 4 , 'e' : 5 }}
150+ ]
151+ result = TBDeviceMqttClient ._split_message (msg , datapoints_max_count = 3 , max_payload_size = 1000 )
152+ self .assertGreater (len (result ), 1 )
153+ total_keys = set ()
154+ for part in result :
155+ for d in part ['data' ]:
156+ total_keys .update (d ['values' ].keys ())
157+ self .assertEqual (total_keys , {'a' , 'b' , 'c' , 'd' , 'e' })
158+
159+ def test_grouping_same_ts_exceeds_payload_limit (self ):
160+ msg = [
161+ {'ts' : 1 , 'values' : {'a' : 'x' * 30 }},
162+ {'ts' : 1 , 'values' : {'b' : 'y' * 30 }},
163+ {'ts' : 1 , 'values' : {'c' : 'z' * 30 }}
164+ ]
165+ result = TBDeviceMqttClient ._split_message (msg , datapoints_max_count = 10 , max_payload_size = 64 )
166+ self .assertGreater (len (result ), 1 )
167+ all_keys = set ()
168+ for r in result :
169+ for d in r ['data' ]:
170+ all_keys .update (d ['values' ].keys ())
171+ self .assertEqual (all_keys , {'a' , 'b' , 'c' })
172+
173+ def test_individual_messages_not_grouped_if_payload_limit_exceeded (self ):
174+ msg = [
175+ {'ts' : 1 , 'values' : {'a' : 'x' * 100 }},
176+ {'ts' : 1 , 'values' : {'b' : 'y' * 100 }},
177+ {'ts' : 1 , 'values' : {'c' : 'z' * 100 }}
178+ ]
179+ result = TBDeviceMqttClient ._split_message (msg , datapoints_max_count = 10 , max_payload_size = 110 )
180+ self .assertEqual (len (result ), 3 )
181+
182+ grouped_keys = []
183+ for r in result :
184+ for d in r ['data' ]:
185+ grouped_keys .extend (d ['values' ].keys ())
186+
187+ self .assertEqual (set (grouped_keys ), {'a' , 'b' , 'c' })
188+ for r in result :
189+ for d in r ['data' ]:
190+ self .assertLessEqual (sum (len (k ) + len (str (v )) for k , v in d ['values' ].items ()), 110 )
191+
192+ def test_partial_grouping_due_to_payload_limit (self ):
193+ msg = [
194+ {'ts' : 1 , 'values' : {'a' : 'x' * 10 , 'b' : 'y' * 10 }}, # should be grouped together
195+ {'ts' : 1 , 'values' : {'c' : 'z' * 100 }}, # should be on its own due to size
196+ {'ts' : 1 , 'values' : {'d' : 'w' * 10 , 'e' : 'q' * 10 }} # should be grouped again
197+ ]
198+ result = TBDeviceMqttClient ._split_message (msg , datapoints_max_count = 10 , max_payload_size = 64 )
199+ self .assertEqual (len (result ), 3 )
200+
201+ result_keys = []
202+ for r in result :
203+ keys = []
204+ for d in r ['data' ]:
205+ keys .extend (d ['values' ].keys ())
206+ result_keys .append (set (keys ))
207+
208+ self .assertIn ({'a' , 'b' }, result_keys )
209+ self .assertIn ({'c' }, result_keys )
210+ self .assertIn ({'d' , 'e' }, result_keys )
211+
212+ def test_partial_grouping_due_to_datapoint_limit (self ):
213+ msg = [
214+ {'ts' : 1 , 'values' : {'a' : 1 , 'b' : 2 }}, # grouped
215+ {'ts' : 1 , 'values' : {'c' : 3 , 'd' : 4 }}, # grouped
216+ {'ts' : 1 , 'values' : {'e' : 5 }}, # forced into next group due to datapoint limit
217+ {'ts' : 1 , 'values' : {'f' : 6 }}, # grouped with above
218+ ]
219+ # Max datapoints per message is 4 (after subtracting 1 in implementation)
220+ result = TBDeviceMqttClient ._split_message (msg , datapoints_max_count = 5 , max_payload_size = 1000 )
221+ self .assertEqual (len (result ), 2 )
222+
223+ all_keys = []
224+ for r in result :
225+ keys = set ()
226+ for d in r ['data' ]:
227+ keys .update (d ['values' ].keys ())
228+ all_keys .append (keys )
229+
230+ # First group should contain a, b, c, d (4 datapoints)
231+ self .assertIn ({'a' , 'b' , 'c' , 'd' }, all_keys )
232+ # Second group should contain e, f (2 datapoints)
233+ self .assertIn ({'e' , 'f' }, all_keys )
234+
235+ def test_values_included_only_when_ts_present (self ):
236+ msg = [{'values' : {'a' : 1 , 'b' : 2 }}]
237+ result = TBDeviceMqttClient ._split_message (msg , 10 , 100 )
238+ self .assertEqual (result [0 ]['data' ][0 ], {'a' : 1 , 'b' : 2 })
239+
240+ def test_missing_values_field_uses_whole_message (self ):
241+ msg = [{'ts' : 123 , 'a' : 1 , 'b' : 2 }]
242+ result = TBDeviceMqttClient ._split_message (msg , 10 , 100 )
243+ self .assertIn ('values' , result [0 ]['data' ][0 ])
244+ self .assertEqual (result [0 ]['data' ][0 ]['values' ], {'a' : 1 , 'b' : 2 , 'ts' : 123 })
245+
246+ def test_metadata_conflict_same_ts_no_grouping (self ):
247+ msg = [
248+ {'ts' : 1 , 'values' : {'a' : 1 }, 'metadata' : {'unit' : 'C' }},
249+ {'ts' : 1 , 'values' : {'b' : 2 }, 'metadata' : {'unit' : 'F' }}
250+ ]
251+ result = TBDeviceMqttClient ._split_message (msg , 10 , 100 )
252+
253+ self .assertEqual (len (result ), 2 )
254+
255+ metadata_sets = [d ['data' ][0 ].get ('metadata' ) for d in result ]
256+ self .assertIn ({'unit' : 'C' }, metadata_sets )
257+ self .assertIn ({'unit' : 'F' }, metadata_sets )
258+
259+ value_keys_sets = [set (d ['data' ][0 ]['values' ].keys ()) for d in result ]
260+ self .assertIn ({'a' }, value_keys_sets )
261+ self .assertIn ({'b' }, value_keys_sets )
262+
263+ def test_non_dict_message_is_skipped (self ):
264+ msg = [{'ts' : 1 , 'values' : {'a' : 1 }}, 'this_is_not_a_dict' , {'ts' : 1 , 'values' : {'b' : 2 }}]
265+ result = TBDeviceMqttClient ._split_message (msg , 10 , 100 )
266+ self .assertEqual (len (result ), 1 )
267+ values = result [0 ]['data' ][0 ]['values' ]
268+ self .assertEqual (set (values .keys ()), {'a' , 'b' })
269+
270+ def test_multiple_dicts_without_ts_values_metadata (self ):
271+ msg = [{'a' : 1 }, {'b' : 2 }, {'c' : 3 }]
272+ result = TBDeviceMqttClient ._split_message (msg , 10 , 1000 )
273+ self .assertEqual (len (result ), 1 ) # grouped
274+ combined_keys = {}
275+ for d in result [0 ]['data' ]:
276+ combined_keys .update (d )
277+ self .assertEqual (set (combined_keys .keys ()), {'a' , 'b' , 'c' })
278+
279+ def test_multiple_dicts_without_ts_split_by_payload (self ):
280+ msg = [{'a' : 'x' * 60 }, {'b' : 'y' * 60 }, {'c' : 'z' * 60 }]
281+ result = TBDeviceMqttClient ._split_message (msg , datapoints_max_count = 10 , max_payload_size = 64 )
282+ self .assertEqual (len (result ), 3 ) # each too large to group
283+ keys = [list (r ['data' ][0 ].keys ())[0 ] for r in result ]
284+ self .assertEqual (set (keys ), {'a' , 'b' , 'c' })
285+
286+ def test_mixed_dicts_with_and_without_ts (self ):
287+ msg = [
288+ {'ts' : 1 , 'values' : {'a' : 1 }},
289+ {'b' : 2 },
290+ {'ts' : 1 , 'values' : {'c' : 3 }},
291+ {'d' : 4 }
292+ ]
293+ result = TBDeviceMqttClient ._split_message (msg , 10 , 1000 )
294+ # Should split into at least 2 chunks: one for ts=1 and one for ts=None
295+ self .assertGreaterEqual (len (result ), 2 )
296+
297+ ts_chunks = [d for r in result for d in r ['data' ] if 'ts' in d ]
298+ raw_chunks = [d for r in result for d in r ['data' ] if 'ts' not in d ]
299+
300+ ts_keys = set ()
301+ for d in ts_chunks :
302+ ts_keys .update (d ['values' ].keys ())
303+
304+ raw_keys = set ()
305+ for d in raw_chunks :
306+ raw_keys .update (d .keys ())
307+
308+ self .assertEqual (ts_keys , {'a' , 'c' })
309+ self .assertEqual (raw_keys , {'b' , 'd' })
310+
311+ def test_complex_mixed_messages (self ):
312+ msg = [
313+ {'ts' : 1 , 'values' : {'a' : 1 }},
314+ {'ts' : 1 , 'values' : {'b' : 2 }, 'metadata' : {'unit' : 'C' }},
315+
316+ {'ts' : 2 , 'values' : {'c1' : 1 , 'c2' : 2 , 'c3' : 3 }},
317+ {'ts' : 2 , 'values' : {'c4' : 4 , 'c5' : 5 }},
318+
319+ {'ts' : 3 , 'values' : {'x' : 'x' * 60 }},
320+ {'ts' : 3 , 'values' : {'y' : 'y' * 60 }},
321+
322+ {'m1' : 1 , 'm2' : 2 },
323+
324+ 123 ,
325+
326+ {'m3' : 'a' * 100 },
327+ {'m4' : 'b' * 100 },
328+
329+ {'k1' : 1 , 'k2' : 2 , 'k3' : 3 },
330+ {'k4' : 4 , 'k5' : 5 },
331+
332+ {'ts' : 1 , 'values' : {'z' : 99 }},
333+ ]
334+
335+ result = TBDeviceMqttClient ._split_message (msg , datapoints_max_count = 4 , max_payload_size = 64 )
336+
337+ all_ts_groups = {}
338+ raw_chunks = []
339+ for r in result :
340+ for entry in r ['data' ]:
341+ if isinstance (entry , dict ) and 'ts' in entry :
342+ ts = entry ['ts' ]
343+ if ts not in all_ts_groups :
344+ all_ts_groups [ts ] = set ()
345+ all_ts_groups [ts ].update (entry .get ('values' , {}).keys ())
346+ if 'metadata' in entry :
347+ self .assertIsInstance (entry ['metadata' ], dict )
348+ else :
349+ raw_chunks .append (entry )
350+
351+ self .assertIn (1 , all_ts_groups )
352+ self .assertEqual (all_ts_groups [1 ], {'a' , 'b' , 'z' })
353+
354+ self .assertIn (2 , all_ts_groups )
355+ self .assertEqual (all_ts_groups [2 ], {'c1' , 'c2' , 'c3' , 'c4' , 'c5' })
356+
357+ self .assertIn (3 , all_ts_groups )
358+ self .assertEqual (all_ts_groups [3 ], {'x' , 'y' })
359+
360+ all_raw_keys = [set (entry .keys ()) for entry in raw_chunks ]
361+
362+ expected_raw_key_sets = [
363+ {'m1' , 'm2' },
364+ {'m3' },
365+ {'m4' },
366+ {'k1' , 'k2' , 'k3' },
367+ {'k4' , 'k5' }
368+ ]
369+
370+ for expected_keys in expected_raw_key_sets :
371+ self .assertIn (expected_keys , all_raw_keys )
372+
373+ for r in raw_chunks :
374+ self .assertLessEqual (len (r ), 4 ) # Max datapoints = 4
375+
376+ total_size = sum (len (k ) + len (str (v )) for k , v in r .items ())
377+
378+ if len (r ) > 1 :
379+ self .assertLessEqual (total_size , 64 )
380+ if total_size > 64 :
381+ self .assertEqual (len (r ), 1 )
382+
383+ self .assertGreaterEqual (len (result ), 8 )
384+
385+ def test_empty_values_should_skip_or_include_empty (self ):
386+ msg = [{'ts' : 1 , 'values' : {}}]
387+ result = TBDeviceMqttClient ._split_message (msg , 10 , 100 )
388+ self .assertEqual (result , [])
389+
390+ def test_duplicate_keys_within_same_ts (self ):
391+ msg = [{'ts' : 1 , 'values' : {'a' : 1 }}, {'ts' : 1 , 'values' : {'a' : 2 }}]
392+ result = TBDeviceMqttClient ._split_message (msg , 10 , 100 )
393+ values = {}
394+ for d in result [0 ]['data' ]:
395+ values .update (d ['values' ])
396+ self .assertEqual (values ['a' ], 2 ) # Last value wins
397+
398+ def test_partial_metadata_presence (self ):
399+ msg = [{'ts' : 1 , 'values' : {'a' : 1 }, 'metadata' : {'unit' : 'C' }},
400+ {'ts' : 1 , 'values' : {'b' : 2 }}]
401+ result = TBDeviceMqttClient ._split_message (msg , 10 , 100 )
402+ for d in result [0 ]['data' ]:
403+ if d ['values' ].keys () == {'a' , 'b' } or d ['values' ].keys () == {'b' , 'a' }:
404+ self .assertIn ('metadata' , d )
405+
406+ def test_non_dict_metadata_should_be_ignored (self ):
407+ msg = [{'ts' : 1 , 'values' : {'a' : 1 }, 'metadata' : ['invalid' ]}]
408+ result = TBDeviceMqttClient ._split_message (msg , 10 , 100 )
409+ self .assertTrue (all (
410+ isinstance (d .get ('metadata' , {}), dict ) or 'metadata' not in d
411+ for r in result for d in r ['data' ]
412+ ))
413+
414+ def test_non_list_message_pack_single_dict_raw (self ):
415+ msg = {'a' : 1 , 'b' : 2 }
416+ result = TBDeviceMqttClient ._split_message (msg , 10 , 100 )
417+ self .assertEqual (result [0 ]['data' ][0 ], msg )
418+
419+ def test_nested_value_object_should_count_size_correctly (self ):
420+ msg = [{'ts' : 1 , 'values' : {'a' : {'nested' : 'structure' }}}]
421+ result = TBDeviceMqttClient ._split_message (msg , 10 , 1000 )
422+ total_size = sum (len (k ) + len (str (v )) for r in result for d in r ['data' ] for k , v in d ['values' ].items ())
423+ self .assertGreater (total_size , 0 )
424+
425+ def test_raw_duplicate_keys_overwrite_behavior (self ):
426+ msg = [{'a' : 1 }, {'a' : 2 }]
427+ result = TBDeviceMqttClient ._split_message (msg , 10 , 100 )
428+ all_data = {}
429+ for r in result :
430+ for d in r ['data' ]:
431+ all_data .update (d )
432+ self .assertEqual (all_data ['a' ], 2 ) # Last value wins
433+
434+
99435if __name__ == '__main__' :
100436 unittest .main ('tb_device_mqtt_client_tests' )
0 commit comments