Skip to content

Commit 0002725

Browse files
authored
fix: potential deadlock in async data movement window manager implementation (#2755)
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
1 parent 79edd72 commit 0002725

File tree

3 files changed

+116
-97
lines changed

3 files changed

+116
-97
lines changed

rust/numaflow-core/src/reduce/reducer/aligned/windower/fixed.rs

Lines changed: 44 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,14 @@ impl FixedWindowManager {
7070
// Window exists, append message
7171
AlignedWindowOperation::Append {
7272
message: msg,
73-
window: window.clone(),
73+
window,
7474
}
7575
} else {
7676
// New window, insert it
7777
active_windows.insert(window.clone());
7878
AlignedWindowOperation::Open {
7979
message: msg,
80-
window: window.clone(),
80+
window,
8181
}
8282
};
8383

@@ -93,36 +93,41 @@ impl FixedWindowManager {
9393
pub(crate) fn close_windows(&self, watermark: DateTime<Utc>) -> Vec<AlignedWindowMessage> {
9494
let mut result = Vec::new();
9595

96-
let mut active_windows = self
97-
.active_windows
98-
.write()
99-
.expect("Poisoned lock for active_windows");
100-
let mut closed_windows = self
101-
.closed_windows
102-
.write()
103-
.expect("Poisoned lock for closed_windows");
104-
105-
let mut windows_to_close = Vec::new();
106-
107-
for window in active_windows.iter() {
108-
if window.end_time <= watermark {
109-
// Create close message
110-
let window_msg = AlignedWindowMessage {
111-
operation: AlignedWindowOperation::Close {
112-
window: window.clone(),
113-
},
114-
pnf_slot: window_pnf_slot(window),
115-
};
96+
let windows_to_close = {
97+
let mut active_windows = self
98+
.active_windows
99+
.write()
100+
.expect("Poisoned lock for active_windows");
101+
102+
let mut windows_to_close = Vec::new();
103+
active_windows.retain(|window| {
104+
if window.end_time <= watermark {
105+
windows_to_close.push(window.clone());
106+
false
107+
} else {
108+
true
109+
}
110+
});
111+
112+
windows_to_close
113+
};
116114

117-
result.push(window_msg);
118-
windows_to_close.push(window.clone());
115+
// add the windows to closed_windows
116+
{
117+
let mut closed_windows = self
118+
.closed_windows
119+
.write()
120+
.expect("Poisoned lock for closed_windows");
121+
for window in &windows_to_close {
122+
closed_windows.insert(window.clone());
119123
}
120124
}
121125

122-
// Move windows from active to closed
123126
for window in windows_to_close {
124-
active_windows.remove(&window);
125-
closed_windows.insert(window);
127+
result.push(AlignedWindowMessage {
128+
pnf_slot: window_pnf_slot(&window),
129+
operation: AlignedWindowOperation::Close { window },
130+
});
126131
}
127132

128133
result
@@ -142,20 +147,22 @@ impl FixedWindowManager {
142147
// get the oldest window from closed_windows, if closed_windows is empty, get the oldest
143148
// from active_windows
144149
// NOTE: closed windows will always have a lower end time than active_windows
145-
self.closed_windows
150+
{
151+
let closed_windows = self
152+
.closed_windows
153+
.read()
154+
.expect("Poisoned lock for closed_windows");
155+
if let Some(window) = closed_windows.iter().next() {
156+
return Some(window.clone());
157+
}
158+
}
159+
160+
self.active_windows
146161
.read()
147-
.expect("Poisoned lock for closed_windows")
162+
.expect("Poisoned lock for active_windows")
148163
.iter()
149164
.next()
150165
.cloned()
151-
.or_else(|| {
152-
self.active_windows
153-
.read()
154-
.expect("Poisoned lock for active_windows")
155-
.iter()
156-
.next()
157-
.cloned()
158-
})
159166
}
160167
}
161168

rust/numaflow-core/src/reduce/reducer/aligned/windower/sliding.rs

Lines changed: 43 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -156,33 +156,41 @@ impl SlidingWindowManager {
156156
pub(crate) fn close_windows(&self, watermark: DateTime<Utc>) -> Vec<AlignedWindowMessage> {
157157
let mut result = Vec::new();
158158

159-
// Find windows that need to be closed
160-
let mut active_windows = self
161-
.active_windows
162-
.write()
163-
.expect("Poisoned lock in active_windows");
164-
let mut closed_windows = self
165-
.closed_windows
166-
.write()
167-
.expect("Poisoned lock in active_windows");
159+
let windows_to_close = {
160+
let mut active_windows = self
161+
.active_windows
162+
.write()
163+
.expect("Poisoned lock for active_windows");
168164

169-
// get all the windows that have end time less than the watermark
170-
let windows_to_close: Vec<_> = active_windows
171-
.iter()
172-
.filter(|window| window.end_time <= watermark) // window end time is exclusive, hence <=
173-
.cloned()
174-
.collect();
165+
let mut windows_to_close = Vec::new();
166+
active_windows.retain(|window| {
167+
if window.end_time <= watermark {
168+
windows_to_close.push(window.clone());
169+
false
170+
} else {
171+
true
172+
}
173+
});
174+
175+
windows_to_close
176+
};
177+
178+
// add the windows to closed_windows
179+
{
180+
let mut closed_windows = self
181+
.closed_windows
182+
.write()
183+
.expect("Poisoned lock for closed_windows");
184+
for window in &windows_to_close {
185+
closed_windows.insert(window.clone());
186+
}
187+
}
175188

176-
// Move windows from active to closed
177189
for window in windows_to_close {
178190
result.push(AlignedWindowMessage {
179-
operation: AlignedWindowOperation::Close {
180-
window: window.clone(),
181-
},
182191
pnf_slot: window_pnf_slot(&window),
192+
operation: AlignedWindowOperation::Close { window },
183193
});
184-
active_windows.remove(&window);
185-
closed_windows.insert(window);
186194
}
187195

188196
result
@@ -206,20 +214,22 @@ impl SlidingWindowManager {
206214
// get the oldest window from closed_windows, if closed_windows is empty, get the oldest
207215
// from active_windows
208216
// NOTE: closed windows will always have a lower end time than active_windows
209-
self.closed_windows
217+
{
218+
let closed_windows = self
219+
.closed_windows
220+
.read()
221+
.expect("Poisoned lock for closed_windows");
222+
if let Some(window) = closed_windows.iter().next() {
223+
return Some(window.clone());
224+
}
225+
}
226+
227+
self.active_windows
210228
.read()
211-
.expect("Poisoned lock for closed_windows")
229+
.expect("Poisoned lock for active_windows")
212230
.iter()
213231
.next()
214232
.cloned()
215-
.or_else(|| {
216-
self.active_windows
217-
.read()
218-
.expect("Poisoned lock for active_windows")
219-
.iter()
220-
.next()
221-
.cloned()
222-
})
223233
}
224234

225235
/// Helper method to format sorted window information for logging.
@@ -722,7 +732,7 @@ mod tests {
722732
#[test]
723733
fn test_assign_windows_with_small_slide() {
724734
// prepopulate active windows
725-
let active_windows = vec![
735+
let active_windows = [
726736
Window::new(
727737
Utc.timestamp_millis_opt(90000).unwrap(),
728738
Utc.timestamp_millis_opt(150000).unwrap(),
@@ -771,7 +781,7 @@ mod tests {
771781
// Window 4: [70000, 130000) - event falls in this window
772782
// Window 5: [60000, 120000) - event falls in this window
773783
// Window 6: [50000, 110000) - event falls in this window
774-
let expected_windows = vec![
784+
let expected_windows = [
775785
Window::new(
776786
Utc.timestamp_millis_opt(100000).unwrap(),
777787
Utc.timestamp_millis_opt(160000).unwrap(),

rust/numaflow-core/src/reduce/reducer/unaligned/windower/session.rs

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -149,17 +149,16 @@ impl SessionWindowManager {
149149
/// window.end_time might grow overtime and overlap. At the time of close, we merge these windows
150150
/// those have overlapping end times.
151151
pub(crate) fn close_windows(&self, watermark: DateTime<Utc>) -> Vec<UnalignedWindowMessage> {
152-
let mut active_windows = self.active_windows.write().expect("Poisoned lock");
153-
154152
// Extract and remove expired windows from active windows
155-
let closed_windows_by_key = Self::extract_expired_windows(&mut active_windows, watermark);
153+
let closed_windows_by_key = {
154+
let mut active_windows = self.active_windows.write().expect("Poisoned lock");
155+
Self::extract_expired_windows(&mut active_windows, watermark)
156+
};
156157

157158
// Process each key's closed windows
158159
closed_windows_by_key
159160
.into_iter()
160-
.flat_map(|(key, windows)| {
161-
self.process_closed_windows(&mut active_windows, &key, windows)
162-
})
161+
.flat_map(|(key, windows)| self.process_closed_windows(&key, windows))
163162
.collect()
164163
}
165164

@@ -198,20 +197,18 @@ impl SessionWindowManager {
198197
/// and then close the windows that cannot be merged.
199198
fn process_closed_windows(
200199
&self,
201-
active_windows: &mut HashMap<String, BTreeSet<Window>>,
202200
key: &str,
203201
closed_windows: Vec<Window>,
204202
) -> Vec<UnalignedWindowMessage> {
205203
Self::windows_that_can_be_merged(&closed_windows)
206204
.into_iter()
207-
.filter_map(|group| self.process_closing_window_group(active_windows, key, group))
205+
.filter_map(|group| self.process_closing_window_group(key, group))
208206
.collect()
209207
}
210208

211209
/// Process a group of windows that can be merged
212210
fn process_closing_window_group(
213211
&self,
214-
active_windows: &mut HashMap<String, BTreeSet<Window>>,
215212
key: &str,
216213
closing_group: Vec<Window>,
217214
) -> Option<UnalignedWindowMessage> {
@@ -223,7 +220,12 @@ impl SessionWindowManager {
223220
let window_to_close = Self::merge_windows(&closing_group);
224221

225222
// Try to merge with active windows
226-
match Self::try_merge_with_active(active_windows, key, &window_to_close) {
223+
let merge_result = {
224+
let mut active_windows = self.active_windows.write().expect("Poisoned lock");
225+
Self::try_merge_with_active(&mut active_windows, key, &window_to_close)
226+
};
227+
228+
match merge_result {
227229
Some((old_active, new_merged)) => Some(UnalignedWindowMessage {
228230
operation: UnalignedWindowOperation::Merge {
229231
windows: vec![window_to_close, old_active, new_merged],
@@ -232,10 +234,10 @@ impl SessionWindowManager {
232234
}),
233235
None => {
234236
// Move window to closed_windows
235-
self.closed_windows
236-
.write()
237-
.expect("Poisoned lock")
238-
.insert(window_to_close.clone());
237+
{
238+
let mut closed_windows = self.closed_windows.write().expect("Poisoned lock");
239+
closed_windows.insert(window_to_close.clone());
240+
}
239241

240242
Some(UnalignedWindowMessage {
241243
operation: UnalignedWindowOperation::Close {
@@ -353,20 +355,20 @@ impl SessionWindowManager {
353355
// Get the oldest window from closed_windows first, if closed_windows is empty, get the oldest
354356
// from active_windows
355357
// NOTE: closed windows will always have a lower end time than active_windows
356-
self.closed_windows
357-
.read()
358-
.expect("Poisoned lock")
359-
.iter()
360-
.next()
358+
359+
{
360+
let closed_windows = self.closed_windows.read().expect("Poisoned lock");
361+
if let Some(window) = closed_windows.iter().next() {
362+
return Some(window.end_time);
363+
}
364+
}
365+
366+
let active_windows = self.active_windows.read().expect("Poisoned lock");
367+
active_windows
368+
.values()
369+
.filter_map(|windows| windows.iter().next())
361370
.map(|window| window.end_time)
362-
.or_else(|| {
363-
self.active_windows
364-
.read()
365-
.expect("Poisoned lock")
366-
.values()
367-
.flat_map(|window_set| window_set.iter().map(|window| window.end_time))
368-
.min()
369-
})
371+
.min()
370372
}
371373
}
372374

0 commit comments

Comments
 (0)