Skip to content

Commit 8ecb325

Browse files
committed
Handle timestamp collision while pushing data to redistimeseries
- If a datapoint already exists in that millisecond, try to add it in the next
1 parent 2908019 commit 8ecb325

File tree

1 file changed

+35
-27
lines changed

1 file changed

+35
-27
lines changed

redis_benchmarks_specification/__common__/timeseries.py

Lines changed: 35 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -495,39 +495,47 @@ def push_data_to_redistimeseries(rts, time_series_dict: dict, expire_msecs=0):
495495
)
496496
for timeseries_name, time_series in time_series_dict.items():
497497
exporter_create_ts(rts, time_series, timeseries_name)
498-
for timestamp, value in time_series["data"].items():
499-
try:
500-
if timestamp is None:
501-
logging.warning("The provided timestamp is null. Using auto-ts")
502-
rts.ts().add(
503-
timeseries_name,
504-
value,
505-
duplicate_policy="last",
506-
)
507-
else:
498+
for orig_timestamp, value in time_series["data"].items():
499+
if orig_timestamp is None:
500+
logging.warning("The provided timestamp is null. Using auto-ts")
501+
timestamp = "*"
502+
else:
503+
timestamp = orig_timestamp
504+
505+
try_to_insert = True
506+
retry_count = 0
507+
while try_to_insert and retry_count < 100:
508+
# (try to) insert the datapoint in given timestamp
509+
try_to_insert = False
510+
511+
try:
508512
rts.ts().add(
509513
timeseries_name,
510514
timestamp,
511515
value,
512-
duplicate_policy="last",
516+
duplicate_policy="block",
513517
)
514-
datapoint_inserts += 1
515-
except redis.exceptions.DataError:
516-
logging.warning(
517-
"Error while inserting datapoint ({} : {}) in timeseries named {}. ".format(
518-
timestamp, value, timeseries_name
518+
datapoint_inserts += 1
519+
except redis.exceptions.DataError:
520+
logging.warning(
521+
"Error while inserting datapoint ({} : {}) in timeseries named {}. ".format(
522+
timestamp, value, timeseries_name
523+
)
519524
)
520-
)
521-
datapoint_errors += 1
522-
pass
523-
except redis.exceptions.ResponseError:
524-
logging.warning(
525-
"Error while inserting datapoint ({} : {}) in timeseries named {}. ".format(
526-
timestamp, value, timeseries_name
527-
)
528-
)
529-
datapoint_errors += 1
530-
pass
525+
datapoint_errors += 1
526+
except redis.exceptions.ResponseError as e:
527+
if "DUPLICATE_POLICY" in e.__str__():
528+
# duplicate timestamp: try to insert again, but in the next milisecond
529+
timestamp += 1
530+
try_to_insert = True
531+
retry_count += 1
532+
else:
533+
logging.warning(
534+
"Error while inserting datapoint ({} : {}) in timeseries named {}. ".format(
535+
timestamp, value, timeseries_name
536+
)
537+
)
538+
datapoint_errors += 1
531539
if expire_msecs > 0:
532540
rts.pexpire(timeseries_name, expire_msecs)
533541
progress.update()

0 commit comments

Comments
 (0)