@@ -214,14 +214,7 @@ def get_range(self, model, keys, start, end, rollup=None):
214214 return dict (results_by_key )
215215
216216 def merge (self , model , destination , sources , timestamp = None ):
217- rollups = {}
218- for rollup , samples in self .rollups .items ():
219- _ , series = self .get_optimal_rollup_series (
220- to_datetime (self .get_earliest_timestamp (rollup , timestamp = timestamp )),
221- end = None ,
222- rollup = rollup ,
223- )
224- rollups [rollup ] = map (to_datetime , series )
217+ rollups = self .get_active_series (timestamp = timestamp )
225218
226219 with self .cluster .map () as client :
227220 data = {}
@@ -265,6 +258,24 @@ def merge(self, model, destination, sources, timestamp=None):
265258 ),
266259 )
267260
261+ def delete (self , models , keys , start = None , end = None , timestamp = None ):
262+ rollups = self .get_active_series (start , end , timestamp )
263+
264+ with self .cluster .map () as client :
265+ for rollup , series in rollups .items ():
266+ for timestamp in series :
267+ for model in models :
268+ for key in keys :
269+ model_key = self .get_model_key (key )
270+ client .hdel (
271+ self .make_counter_key (
272+ model ,
273+ self .normalize_to_rollup (timestamp , rollup ),
274+ model_key ,
275+ ),
276+ model_key ,
277+ )
278+
268279 def record (self , model , key , values , timestamp = None ):
269280 self .record_multi (((model , key , values ),), timestamp )
270281
@@ -434,14 +445,7 @@ def merge_aggregates(values):
434445 )
435446
436447 def merge_distinct_counts (self , model , destination , sources , timestamp = None ):
437- rollups = {}
438- for rollup , samples in self .rollups .items ():
439- _ , series = self .get_optimal_rollup_series (
440- to_datetime (self .get_earliest_timestamp (rollup , timestamp = timestamp )),
441- end = None ,
442- rollup = rollup ,
443- )
444- rollups [rollup ] = map (to_datetime , series )
448+ rollups = self .get_active_series (timestamp = timestamp )
445449
446450 temporary_id = uuid .uuid1 ().hex
447451
@@ -499,6 +503,23 @@ def make_temporary_key(key):
499503 ),
500504 )
501505
506+ def delete_distinct_counts (self , models , keys , start = None , end = None , timestamp = None ):
507+ rollups = self .get_active_series (start , end , timestamp )
508+
509+ with self .cluster .map () as client :
510+ for rollup , series in rollups .items ():
511+ for timestamp in series :
512+ for model in models :
513+ for key in keys :
514+ client .delete (
515+ self .make_key (
516+ model ,
517+ rollup ,
518+ to_timestamp (timestamp ),
519+ key ,
520+ )
521+ )
522+
502523 def make_frequency_table_keys (self , model , rollup , timestamp , key ):
503524 prefix = self .make_key (model , rollup , timestamp , key )
504525 return map (
@@ -697,3 +718,14 @@ def merge_frequencies(self, model, destination, sources, timestamp=None):
697718 self .cluster .execute_commands ({
698719 destination : imports ,
699720 })
721+
722+ def delete_frequencies (self , models , keys , start = None , end = None , timestamp = None ):
723+ rollups = self .get_active_series (start , end , timestamp )
724+
725+ with self .cluster .map () as client :
726+ for rollup , series in rollups .items ():
727+ for timestamp in series :
728+ for model in models :
729+ for key in keys :
730+ for k in self .make_frequency_table_keys (model , rollup , to_timestamp (timestamp ), key ):
731+ client .delete (k )
0 commit comments