@@ -975,10 +975,11 @@ class CountTrigger(Trigger[T, CountWindow]):
975975 A Trigger that fires once the count of elements in a pane reaches the given count.
976976 """
977977
978- def __init__ (self , window_size : int ):
978+ def __init__ (self , window_size : int , purge_on_fire : bool = False ):
979979 self ._window_size = window_size
980980 self ._count_state_descriptor = ReducingStateDescriptor (
981981 "count" , lambda a , b : a + b , Types .LONG ())
982+ self ._purge_on_fire = purge_on_fire
982983
983984 @staticmethod
984985 def of (window_size : int ) -> 'CountTrigger' :
@@ -993,9 +994,9 @@ def on_element(self,
993994 count_state .add (1 )
994995 if count_state .get () >= self ._window_size :
995996 # On FIRE, the window is evaluated and results are emitted. The window is not purged
996- # though, all elements are retained.
997+ # though, all elements are retained if _purge_on_fire is False .
997998 count_state .clear ()
998- return TriggerResult .FIRE
999+ return TriggerResult .FIRE_AND_PURGE if self . _purge_on_fire else TriggerResult . FIRE
9991000 else :
10001001 # No action is taken on the window.
10011002 return TriggerResult .CONTINUE
@@ -1091,7 +1092,7 @@ def assign_windows(self,
10911092 return [CountWindow (current_count // self ._window_size )]
10921093
10931094 def get_default_trigger (self , env ) -> Trigger [T , CountWindow ]:
1094- return CountTrigger (self ._window_size )
1095+ return CountTrigger (self ._window_size , purge_on_fire = True )
10951096
10961097 def get_window_serializer (self ) -> TypeSerializer [CountWindow ]:
10971098 return CountWindowSerializer ()
0 commit comments