@@ -111,7 +111,10 @@ where
111111
112112 let sorted_count = sorted_trackers. len ( ) ;
113113 let new_tracker = match sorted_trackers. entry ( sorted_attrs) {
114- Entry :: Occupied ( occupied_entry) => occupied_entry. get ( ) . clone ( ) ,
114+ Entry :: Occupied ( occupied_entry) => {
115+ // do not return early, because collection phase might clear `all_trackers` multiple times
116+ occupied_entry. get ( ) . clone ( )
117+ }
115118 Entry :: Vacant ( vacant_entry) => {
116119 if !is_under_cardinality_limit ( sorted_count) {
117120 sorted_trackers. entry ( STREAM_OVERFLOW_ATTRIBUTES . clone ( ) )
@@ -197,8 +200,28 @@ where
197200 ) ) ;
198201 }
199202
200- for ( attrs, tracker) in to_collect. drain ( ) {
201- let tracker = Arc :: into_inner ( tracker) . expect ( "the only instance" ) ;
203+ for ( attrs, mut tracker) in to_collect. drain ( ) {
204+ // Handles special case:
205+ // measure-thread: get inserted tracker from `sorted_attribs` (holds tracker)
206+ // collect-thread: replace sorted_attribs (clears sorted_attribs)
207+ // collect-thread: clear all_attribs
208+ // collect_thread: THIS-LOOP: loop until measure-thread still holds a tracker
209+ // measure-thread: insert tracker into `all_attribs``
210+ // collect_thread: exits this loop after clearing trackers
211+ let tracker = loop {
212+ match Arc :: try_unwrap ( tracker) {
213+ Ok ( inner) => {
214+ break inner;
215+ }
216+ Err ( reinserted) => {
217+ tracker = reinserted;
218+ match self . all_attribs . write ( ) {
219+ Ok ( mut all_trackers) => all_trackers. clear ( ) ,
220+ Err ( _) => return ,
221+ } ;
222+ }
223+ } ;
224+ } ;
202225 dest. push ( map_fn ( attrs, tracker) ) ;
203226 }
204227 }
0 commit comments