Skip to content

Commit db43f25

Browse files
authored
Merge pull request #5 from faern/cleanly-exit-worker-threads
Exit worker threads immediately when their input channels disconnect
2 parents dc13de9 + f0adbf7 commit db43f25

File tree

2 files changed

+107
-109
lines changed

2 files changed

+107
-109
lines changed

src/camera.rs

Lines changed: 102 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -66,142 +66,141 @@ impl CameraThread {
6666
}
6767
}
6868

69-
pub fn run(&mut self) -> ! {
69+
pub fn run(&mut self) {
7070
let (exit_tx, exit_rx) = flume::bounded(0);
7171
let config: Arc<Mutex<Option<ImageConfig>>> = Arc::new(Mutex::new(None));
7272
#[allow(clippy::type_complexity)]
7373
let controls: Arc<Mutex<Option<Vec<(KnownCameraControl, ControlValueSetter)>>>> =
7474
Arc::new(Mutex::new(None));
7575
let mut join_handle = None;
76-
loop {
77-
if let Ok(event) = self.config_rx.recv() {
78-
match event {
79-
CameraEvent::StartStream { id, format } => {
80-
let config = Arc::clone(&config);
81-
let controls = Arc::clone(&controls);
76+
while let Ok(event) = self.config_rx.recv() {
77+
match event {
78+
CameraEvent::StartStream { id, format } => {
79+
let config = Arc::clone(&config);
80+
let controls = Arc::clone(&controls);
8281

83-
let frame_tx = self.frame_tx.clone();
84-
let window_tx = self.window_tx.clone();
85-
let result_tx = self.result_tx.clone();
86-
let exit_rx = exit_rx.clone();
87-
let hdl = std::thread::spawn(move || {
88-
let mut camera = match CallbackCamera::new(
89-
id,
90-
RequestedFormat::new::<RgbFormat>(
91-
nokhwa::utils::RequestedFormatType::Exact(format),
92-
),
93-
|_| {},
94-
) {
95-
Ok(camera) => camera,
96-
Err(e) => {
97-
log::error!("{:?}", e);
98-
result_tx
99-
.send(ThreadResult {
100-
id: ThreadId::Camera,
101-
result: Err("Could not initialize camera".into()),
102-
})
103-
.unwrap();
104-
return;
105-
}
106-
};
107-
108-
if let Err(e) = camera.open_stream() {
82+
let frame_tx = self.frame_tx.clone();
83+
let window_tx = self.window_tx.clone();
84+
let result_tx = self.result_tx.clone();
85+
let exit_rx = exit_rx.clone();
86+
let hdl = std::thread::spawn(move || {
87+
let mut camera = match CallbackCamera::new(
88+
id,
89+
RequestedFormat::new::<RgbFormat>(
90+
nokhwa::utils::RequestedFormatType::Exact(format),
91+
),
92+
|_| {},
93+
) {
94+
Ok(camera) => camera,
95+
Err(e) => {
10996
log::error!("{:?}", e);
11097
result_tx
11198
.send(ThreadResult {
11299
id: ThreadId::Camera,
113-
result: Err("Could not open stream".into()),
100+
result: Err("Could not initialize camera".into()),
114101
})
115102
.unwrap();
116103
return;
117-
};
104+
}
105+
};
118106

107+
if let Err(e) = camera.open_stream() {
108+
log::error!("{:?}", e);
119109
result_tx
120110
.send(ThreadResult {
121111
id: ThreadId::Camera,
122-
result: Ok(()),
112+
result: Err("Could not open stream".into()),
123113
})
124114
.unwrap();
115+
return;
116+
};
125117

126-
let mut inner_config = None;
118+
result_tx
119+
.send(ThreadResult {
120+
id: ThreadId::Camera,
121+
result: Ok(()),
122+
})
123+
.unwrap();
127124

128-
loop {
129-
// Check exit request
130-
if exit_rx.try_recv().is_ok() {
131-
return;
132-
}
133-
// Check for new config
134-
if let Some(cfg) = config.lock().unwrap().take() {
135-
inner_config = Some(cfg);
136-
}
137-
// Check for new controls
138-
if let Some(controls) = controls.lock().unwrap().take() {
139-
for (control, setter) in &controls {
140-
let control: &KnownCameraControl = control;
141-
if let Err(e) =
142-
camera.set_camera_control(*control, setter.clone())
143-
{
144-
log::error!("{:?}", e);
145-
}
146-
}
147-
}
148-
// Get frame
149-
let mut frame = match camera
150-
.poll_frame()
151-
.and_then(|frame| frame.decode_image::<RgbFormat>())
152-
{
153-
Ok(frame) => frame,
154-
Err(e) => {
125+
let mut inner_config = None;
126+
127+
loop {
128+
// Check exit request
129+
if exit_rx.try_recv().is_ok() {
130+
return;
131+
}
132+
// Check for new config
133+
if let Some(cfg) = config.lock().unwrap().take() {
134+
inner_config = Some(cfg);
135+
}
136+
// Check for new controls
137+
if let Some(controls) = controls.lock().unwrap().take() {
138+
for (control, setter) in &controls {
139+
let control: &KnownCameraControl = control;
140+
if let Err(e) =
141+
camera.set_camera_control(*control, setter.clone())
142+
{
155143
log::error!("{:?}", e);
156-
result_tx
157-
.send(ThreadResult {
158-
id: ThreadId::Camera,
159-
result: Err("Could not poll for frame".into()),
160-
})
161-
.unwrap();
162-
return;
163144
}
164-
};
145+
}
146+
}
147+
// Get frame
148+
let mut frame = match camera
149+
.poll_frame()
150+
.and_then(|frame| frame.decode_image::<RgbFormat>())
151+
{
152+
Ok(frame) => frame,
153+
Err(e) => {
154+
log::error!("{:?}", e);
155+
result_tx
156+
.send(ThreadResult {
157+
id: ThreadId::Camera,
158+
result: Err("Could not poll for frame".into()),
159+
})
160+
.unwrap();
161+
return;
162+
}
163+
};
165164

166-
if let Some(cfg) = &inner_config {
167-
// Flip
168-
if cfg.flip {
169-
frame = DynamicImage::ImageRgb8(frame).fliph().into_rgb8();
170-
}
171-
// Extract window
172-
let window = frame
173-
.view(
174-
cfg.window.offset.x as u32,
175-
cfg.window.offset.y as u32,
176-
cfg.window.size.x as u32,
177-
cfg.window.size.y as u32,
178-
)
179-
.to_image();
180-
if window_tx.send(window).is_err() {
181-
return;
182-
};
165+
if let Some(cfg) = &inner_config {
166+
// Flip
167+
if cfg.flip {
168+
frame = DynamicImage::ImageRgb8(frame).fliph().into_rgb8();
183169
}
184-
if frame_tx.send(frame).is_err() {
170+
// Extract window
171+
let window = frame
172+
.view(
173+
cfg.window.offset.x as u32,
174+
cfg.window.offset.y as u32,
175+
cfg.window.size.x as u32,
176+
cfg.window.size.y as u32,
177+
)
178+
.to_image();
179+
if window_tx.send(window).is_err() {
185180
return;
186181
};
187182
}
188-
});
189-
join_handle = Some(hdl);
190-
}
191-
CameraEvent::StopStream => {
192-
if let Some(hdl) = join_handle.take() {
193-
exit_tx.send(Exit {}).ok();
194-
hdl.join().ok();
183+
if frame_tx.send(frame).is_err() {
184+
return;
185+
};
195186
}
187+
});
188+
join_handle = Some(hdl);
189+
}
190+
CameraEvent::StopStream => {
191+
if let Some(hdl) = join_handle.take() {
192+
exit_tx.send(Exit {}).ok();
193+
hdl.join().ok();
196194
}
197-
CameraEvent::Config(cfg) => {
198-
*config.lock().unwrap() = Some(cfg);
199-
}
200-
CameraEvent::Controls(ctrls) => {
201-
*controls.lock().unwrap() = Some(ctrls);
202-
}
195+
}
196+
CameraEvent::Config(cfg) => {
197+
*config.lock().unwrap() = Some(cfg);
198+
}
199+
CameraEvent::Controls(ctrls) => {
200+
*controls.lock().unwrap() = Some(ctrls);
203201
}
204202
}
205203
}
204+
log::debug!("Camera thread exiting");
206205
}
207206
}

src/spectrum.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,13 @@ impl SpectrumCalculator {
3939
}
4040
}
4141

42-
pub fn run(&mut self) -> ! {
43-
loop {
44-
if let Ok(window) = self.window_rx.recv() {
45-
let spectrum = Self::process_window(&window);
42+
pub fn run(&mut self) {
43+
while let Ok(window) = self.window_rx.recv() {
44+
let spectrum = Self::process_window(&window);
4645

47-
self.spectrum_tx.send(spectrum).unwrap();
48-
}
46+
self.spectrum_tx.send(spectrum).unwrap();
4947
}
48+
log::debug!("SpectrumCalculator thread exiting");
5049
}
5150

5251
pub fn process_window(window: &ImageBuffer<Rgb<u8>, Vec<u8>>) -> SpectrumRgb {

0 commit comments

Comments
 (0)