@@ -2144,19 +2144,34 @@ impl AggregationUpdateQueue {
21442144 TaskDataCategory :: Meta ,
21452145 ) ;
21462146 let state = get_mut_or_insert_with ! ( task, Activeness , || ActivenessState :: new( task_id) ) ;
2147+ let is_new = state. is_empty ( ) ;
21472148 let is_zero = state. decrement_active_counter ( ) ;
21482149 let is_empty = state. is_empty ( ) ;
21492150 if is_empty {
21502151 task. remove ( & CachedDataItemKey :: Activeness { } ) ;
21512152 }
2152- if is_zero {
2153+ debug_assert ! (
2154+ !( is_new && is_zero) ,
2155+ // This allows us to but the `if is_zero` block in the else branch of the `if is_new`
2156+ // block below for fewer checks and less problems with the borrow checker.
2157+ "A new Activeness will never be zero after decrementing"
2158+ ) ;
2159+ if is_new {
2160+ // A task is considered "active" purely by the existence of an `Activeness` item, even
2161+ // if that item has an negative active counter. So we need to make sure to
2162+ // schedule it here. That case is pretty rare and only happens under extreme race
2163+ // conditions.
2164+ self . find_and_schedule_dirty_internal ( task_id, task, ctx) ;
2165+ } else if is_zero {
21532166 let followers = get_followers ( & task) ;
21542167 drop ( task) ;
21552168 if !followers. is_empty ( ) {
21562169 self . push ( AggregationUpdateJob :: DecreaseActiveCounts {
21572170 task_ids : followers,
21582171 } ) ;
21592172 }
2173+ } else {
2174+ drop ( task) ;
21602175 }
21612176 }
21622177
@@ -2173,16 +2188,27 @@ impl AggregationUpdateQueue {
21732188 TaskDataCategory :: Meta ,
21742189 ) ;
21752190 let state = get_mut_or_insert_with ! ( task, Activeness , || ActivenessState :: new( task_id) ) ;
2191+ let is_new = state. is_empty ( ) ;
21762192 let is_positive_now = state. increment_active_counter ( ) ;
21772193 let is_empty = state. is_empty ( ) ;
21782194 // This can happen if active count was negative before
21792195 if is_empty {
21802196 task. remove ( & CachedDataItemKey :: Activeness { } ) ;
21812197 }
2198+ debug_assert ! (
2199+ !is_new || is_positive_now,
2200+ // This allows us to nest the `if is_new` block below `if is_positive_now` for fewer
2201+ // checks.
2202+ "A new Activeness will always be positive after incrementing"
2203+ ) ;
21822204 if is_positive_now {
21832205 let followers = get_followers ( & task) ;
2184- // Fast path to schedule
2185- self . find_and_schedule_dirty_internal ( task_id, task, ctx) ;
2206+ if is_new {
2207+ // Fast path to schedule
2208+ self . find_and_schedule_dirty_internal ( task_id, task, ctx) ;
2209+ } else {
2210+ drop ( task) ;
2211+ }
21862212
21872213 if !followers. is_empty ( ) {
21882214 self . push ( AggregationUpdateJob :: IncreaseActiveCounts {
0 commit comments