@@ -1411,9 +1411,7 @@ def _split_message(message_pack, datapoints_max_count, max_payload_size):
14111411 message_pack = [message_pack ]
14121412
14131413 def _get_metadata_repr (metadata ):
1414- if isinstance (metadata , dict ):
1415- return tuple (sorted (metadata .items ()))
1416- return None
1414+ return tuple (sorted (metadata .items ())) if isinstance (metadata , dict ) else None
14171415
14181416 def estimate_chunk_size (chunk ):
14191417 if isinstance (chunk , dict ) and "values" in chunk :
@@ -1427,47 +1425,46 @@ def estimate_chunk_size(chunk):
14271425 else :
14281426 return len (str (chunk )) + 20
14291427
1430-
14311428 ts_group_cache = {}
14321429 current_message = {"data" : [], "datapoints" : 0 }
1433- current_datapoints = 0
14341430 current_size = 0
1431+ current_datapoints = 0
14351432
14361433 def flush_current_message ():
1437- nonlocal current_message , current_datapoints , current_size
1434+ nonlocal current_message , current_size , current_datapoints
14381435 if current_message ["data" ]:
14391436 split_messages .append (current_message )
14401437 current_message = {"data" : [], "datapoints" : 0 }
1441- current_datapoints = 0
14421438 current_size = 0
1439+ current_datapoints = 0
14431440
14441441 def split_and_add_chunk (chunk , chunk_datapoints ):
1445- nonlocal current_message , current_datapoints , current_size
1442+ nonlocal current_message , current_size , current_datapoints
1443+
14461444 chunk_size = estimate_chunk_size (chunk )
14471445
1448- if (datapoints_max_count > 0 and current_datapoints + chunk_datapoints > datapoints_max_count ) or \
1446+ if (0 < datapoints_max_count <= current_datapoints + chunk_datapoints ) or \
14491447 (current_size + chunk_size > max_payload_size ):
14501448 flush_current_message ()
14511449
14521450 if chunk_datapoints > datapoints_max_count > 0 or chunk_size > max_payload_size :
1453- keys = list (chunk [ "values" ] .keys ()) if "values" in chunk else list (chunk .keys ())
1451+ keys = list (chunk . get ( "values" , {}) .keys ()) if isinstance ( chunk , dict ) else list (chunk .keys ())
14541452 if len (keys ) == 1 :
1453+ flush_current_message ()
14551454 current_message ["data" ].append (chunk )
14561455 current_message ["datapoints" ] += chunk_datapoints
14571456 current_size += chunk_size
1457+ current_datapoints += chunk_datapoints
1458+ flush_current_message ()
14581459 return
14591460
1460- max_step = int (datapoints_max_count ) if datapoints_max_count > 0 else len (keys )
1461- if max_step < 1 :
1462- max_step = 1
1463-
1461+ max_step = max (1 , datapoints_max_count if datapoints_max_count > 0 else len (keys ))
14641462 for i in range (0 , len (keys ), max_step ):
14651463 sub_values = (
1466- {k : chunk ["values" ][k ] for k in keys [i :i + max_step ]}
1467- if "values" in chunk else
1468- {k : chunk [k ] for k in keys [i :i + max_step ]}
1464+ {k : chunk ["values" ][k ] for k in keys [i :i + max_step ]} if "values" in chunk
1465+ else {k : chunk [k ] for k in keys [i :i + max_step ]}
14691466 )
1470-
1467+ sub_chunk = {}
14711468 if "ts" in chunk :
14721469 sub_chunk = {"ts" : chunk ["ts" ], "values" : sub_values }
14731470 if "metadata" in chunk :
@@ -1478,46 +1475,55 @@ def split_and_add_chunk(chunk, chunk_datapoints):
14781475 sub_datapoints = len (sub_values )
14791476 sub_size = estimate_chunk_size (sub_chunk )
14801477
1481- if sub_size > max_payload_size :
1478+ if sub_size > max_payload_size or (0 < datapoints_max_count <= sub_datapoints ):
1479+ flush_current_message ()
14821480 current_message ["data" ].append (sub_chunk )
14831481 current_message ["datapoints" ] += sub_datapoints
14841482 current_size += sub_size
1485- continue
1486-
1487- split_and_add_chunk (sub_chunk , sub_datapoints )
1483+ current_datapoints += sub_datapoints
1484+ flush_current_message ()
1485+ else :
1486+ split_and_add_chunk (sub_chunk , sub_datapoints )
14881487 return
14891488
14901489 current_message ["data" ].append (chunk )
14911490 current_message ["datapoints" ] += chunk_datapoints
14921491 current_size += chunk_size
1492+ current_datapoints += chunk_datapoints
1493+
1494+ if 0 < datapoints_max_count == current_datapoints :
1495+ flush_current_message ()
14931496
14941497 def add_chunk_to_current_message (chunk , chunk_datapoints ):
1495- nonlocal current_message , current_datapoints , current_size
1498+ nonlocal current_message , current_size , current_datapoints
1499+
14961500 chunk_size = estimate_chunk_size (chunk )
14971501
1498- if (datapoints_max_count > 0 and chunk_datapoints > datapoints_max_count ) or chunk_size > max_payload_size :
1502+ if (0 < datapoints_max_count <= chunk_datapoints ) or chunk_size > max_payload_size :
14991503 split_and_add_chunk (chunk , chunk_datapoints )
15001504 return
15011505
1502- if (datapoints_max_count > 0 and current_datapoints + chunk_datapoints > datapoints_max_count ) or \
1506+ if (0 < datapoints_max_count <= current_datapoints + chunk_datapoints ) or \
15031507 (current_size + chunk_size > max_payload_size ):
15041508 flush_current_message ()
15051509
15061510 current_message ["data" ].append (chunk )
15071511 current_message ["datapoints" ] += chunk_datapoints
15081512 current_size += chunk_size
1513+ current_datapoints += chunk_datapoints
15091514
1510- if datapoints_max_count > 0 and current_message [ "datapoints" ] == datapoints_max_count :
1515+ if 0 < datapoints_max_count == current_datapoints :
15111516 flush_current_message ()
15121517
15131518 def flush_ts_group (ts_key , ts , metadata_repr ):
1519+ nonlocal current_message , current_size , current_datapoints
15141520 if ts_key not in ts_group_cache :
15151521 return
1522+
15161523 values , _ , metadata = ts_group_cache .pop (ts_key )
15171524 keys = list (values .keys ())
1518- step = int (datapoints_max_count ) if datapoints_max_count > 0 else len (keys )
1519- if step < 1 :
1520- step = 1
1525+
1526+ step = max (1 , datapoints_max_count if datapoints_max_count > 0 else len (keys ))
15211527 for i in range (0 , len (keys ), step ):
15221528 chunk_values = {k : values [k ] for k in keys [i :i + step ]}
15231529 if ts is not None :
@@ -1526,13 +1532,25 @@ def flush_ts_group(ts_key, ts, metadata_repr):
15261532 chunk ["metadata" ] = metadata
15271533 else :
15281534 chunk = chunk_values .copy ()
1529- add_chunk_to_current_message (chunk , len (chunk_values ))
1535+
1536+ chunk_datapoints = len (chunk_values )
1537+ chunk_size = estimate_chunk_size (chunk )
1538+
1539+ if chunk_size > max_payload_size or (0 < datapoints_max_count <= chunk_datapoints ):
1540+ flush_current_message ()
1541+ current_message ["data" ].append (chunk )
1542+ current_message ["datapoints" ] += chunk_datapoints
1543+ current_size += chunk_size
1544+ current_datapoints += chunk_datapoints
1545+ flush_current_message ()
1546+ else :
1547+ add_chunk_to_current_message (chunk , chunk_datapoints )
15301548
15311549 for message in message_pack :
15321550 if not isinstance (message , dict ):
15331551 continue
15341552
1535- ts = message .get ("ts" , None )
1553+ ts = message .get ("ts" )
15361554 metadata = message .get ("metadata" ) if isinstance (message .get ("metadata" ), dict ) else None
15371555 values = message .get ("values" ) if isinstance (message .get ("values" ), dict ) else \
15381556 message if isinstance (message , dict ) else {}
0 commit comments