Skip to content

Commit 909524b

Browse files
mmahroussfda-odoo
authored andcommitted
[IMP] server: request restart, give shutdown priority
1 parent 1689870 commit 909524b

File tree

2 files changed

+173
-97
lines changed

2 files changed

+173
-97
lines changed

server/src/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,6 @@ impl Server {
387387
drop(receiver_clone);
388388
let hook = panic::take_hook(); //drop sender stored in panic
389389
drop(hook);
390-
let _ = self.sender_to_delayed_process.send(DelayedProcessingMessage::EXIT);
391390
let _ = stop_sender.send(());
392391
self.connection = None; //drop connection before joining threads
393392
if let Some(pid_join_handle) = pid_thread {
@@ -396,6 +395,7 @@ impl Server {
396395
for thread in self.threads {
397396
thread.join().unwrap();
398397
}
398+
let _ = self.sender_to_delayed_process.send(DelayedProcessingMessage::EXIT);
399399
self.delayed_process_thread.join().unwrap();
400400
exit_no_error_code
401401
}

server/src/threads.rs

Lines changed: 172 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{path::PathBuf, sync::{atomic::Ordering, Arc, Mutex}, time::Instant};
1+
use std::{collections::VecDeque, path::PathBuf, sync::{atomic::Ordering, Arc, Mutex}, time::Instant};
22

33
use crossbeam_channel::{Receiver, Sender, TryRecvError};
44
use lsp_server::{Message, RequestId, Response, ResponseError};
@@ -188,6 +188,7 @@ pub enum DelayedProcessingMessage {
188188

189189
pub fn delayed_changes_process_thread(sender_session: Sender<Message>, receiver_session: Receiver<Message>, receiver: Receiver<DelayedProcessingMessage>, sync_odoo: Arc<Mutex<SyncOdoo>>, delayed_process_sender: Sender<DelayedProcessingMessage>) {
190190
const MAX_DELAY: u64 = 15000;
191+
let mut restart_requested = false;
191192
let mut normal_delay = std::time::Duration::from_millis(std::cmp::min(sync_odoo.lock().unwrap().config.auto_refresh_delay, MAX_DELAY));
192193
let check_reset = |msg: Option<&DelayedProcessingMessage>| {
193194
let length = sync_odoo.lock().unwrap().watched_file_updates.load(Ordering::SeqCst);
@@ -201,22 +202,20 @@ pub fn delayed_changes_process_thread(sender_session: Sender<Message>, receiver_
201202
let message = "Too many requests, possible change of branch, restarting Odoo LS";
202203
info!(message);
203204
{
204-
let mut session = SessionInfo{
205+
let session = SessionInfo{
205206
sender: sender_session.clone(),
206207
receiver: receiver_session.clone(),
207208
sync_odoo: &mut sync_odoo.lock().unwrap(),
208209
delayed_process_sender: Some(delayed_process_sender.clone()),
209210
noqas_stack: vec![],
210211
current_noqa: NoqaInfo::None,
211212
};
212-
let config = session.sync_odoo.config.clone();
213213
session.send_notification(ShowMessage::METHOD, ShowMessageParams{
214214
typ: MessageType::INFO,
215215
message: message.to_string()
216216
});
217217
// Drain channel before resetting
218-
let _: Vec<DelayedProcessingMessage> = receiver.try_iter().collect();
219-
SyncOdoo::reset(&mut session, config);
218+
session.send_notification("$Odoo/restartNeeded", ());
220219
}
221220
return true;
222221

@@ -231,7 +230,15 @@ pub fn delayed_changes_process_thread(sender_session: Sender<Message>, receiver_
231230
let mut update_file_index = None;
232231
let mut delay = normal_delay;
233232
let msg: Result<DelayedProcessingMessage, crossbeam_channel::RecvError> = receiver.recv();
233+
if restart_requested {
234+
// We just wait for the exit message, otherwise we just ignore the message
235+
match msg {
236+
Ok(DelayedProcessingMessage::EXIT) => return,
237+
_ => continue,
238+
}
239+
}
234240
if check_reset(msg.as_ref().ok()) {
241+
restart_requested = true;
235242
continue;
236243
}
237244
match msg {
@@ -255,6 +262,7 @@ pub fn delayed_changes_process_thread(sender_session: Sender<Message>, receiver_
255262
loop {
256263
let new_msg: Result<DelayedProcessingMessage, TryRecvError> = receiver.try_recv();
257264
if check_reset(new_msg.as_ref().ok()) {
265+
restart_requested = true;
258266
continue 'main_loop;
259267
}
260268
match new_msg {
@@ -319,109 +327,177 @@ pub fn delayed_changes_process_thread(sender_session: Sender<Message>, receiver_
319327
}
320328

321329
pub fn message_processor_thread_main(sync_odoo: Arc<Mutex<SyncOdoo>>, generic_receiver: Receiver<Message>, sender: Sender<Message>, receiver: Receiver<Message>, delayed_process_sender: Sender<DelayedProcessingMessage>) {
330+
let mut buffer = VecDeque::new();
322331
loop {
323-
let msg = generic_receiver.recv();
324-
if msg.is_err() {
325-
error!("Got an RecvError, exiting thread");
326-
break;
332+
// Drain all available messages into buffer
333+
loop {
334+
let maybe_msg = generic_receiver.try_recv();
335+
match maybe_msg {
336+
Ok(msg) => {
337+
// Check for shutdown
338+
if matches!(&msg, Message::Notification(n) if n.method.as_str() == Shutdown::METHOD) {
339+
warn!("Main thread - got shutdown.");
340+
return;
341+
}
342+
buffer.push_back(msg);
343+
},
344+
Err(TryRecvError::Empty) => break,
345+
Err(TryRecvError::Disconnected) => {
346+
error!("Generic channel disconnected, exiting thread");
347+
return;
348+
}
349+
}
327350
}
328-
let msg = msg.unwrap();
329-
let mut session = SessionInfo{
330-
sender: sender.clone(),
331-
receiver: receiver.clone(),
332-
sync_odoo: &mut sync_odoo.lock().unwrap(),
333-
delayed_process_sender: Some(delayed_process_sender.clone()),
334-
noqas_stack: vec![],
335-
current_noqa: NoqaInfo::None,
336-
};
337-
match msg {
338-
Message::Request(r) => {
339-
let (value, error) = match r.method.as_str() {
340-
Completion::METHOD => {
341-
to_value::<CompletionResponse>(Odoo::handle_autocomplete(&mut session, serde_json::from_value(r.params).unwrap()))
342-
},
343-
_ => {error!("Request not handled by main thread: {}", r.method); (None, Some(ResponseError{
344-
code: 1,
345-
message: S!("Request not handled by the server"),
346-
data: None
347-
}))}
348-
};
349-
sender.send(Message::Response(Response { id: r.id, result: value, error: error })).unwrap();
350-
},
351-
Message::Notification(n) => {
352-
match n.method.as_str() {
353-
DidOpenTextDocument::METHOD => { Odoo::handle_did_open(&mut session, serde_json::from_value(n.params).unwrap()); }
354-
DidChangeConfiguration::METHOD => { Odoo::handle_did_change_configuration(&mut session, serde_json::from_value(n.params).unwrap()) }
355-
DidChangeWorkspaceFolders::METHOD => { Odoo::handle_did_change_workspace_folders(&mut session, serde_json::from_value(n.params).unwrap()) }
356-
DidChangeTextDocument::METHOD => { Odoo::handle_did_change(&mut session, serde_json::from_value(n.params).unwrap()); }
357-
DidCloseTextDocument::METHOD => { Odoo::handle_did_close(&mut session, serde_json::from_value(n.params).unwrap()); }
358-
DidSaveTextDocument::METHOD => { Odoo::handle_did_save(&mut session, serde_json::from_value(n.params).unwrap()); }
359-
DidRenameFiles::METHOD => { Odoo::handle_did_rename(&mut session, serde_json::from_value(n.params).unwrap()); }
360-
DidCreateFiles::METHOD => { Odoo::handle_did_create(&mut session, serde_json::from_value(n.params).unwrap()); }
361-
DidDeleteFiles::METHOD => { Odoo::handle_did_delete(&mut session, serde_json::from_value(n.params).unwrap()); }
362-
DidChangeWatchedFiles::METHOD => { Odoo::handle_did_change_watched_files(&mut session, serde_json::from_value(n.params).unwrap())}
363-
"custom/server/register_capabilities" => { Odoo::register_capabilities(&mut session); }
364-
"custom/server/init" => { Odoo::init(&mut session); }
365-
Shutdown::METHOD => { warn!("Main thread - got shutdown."); break;}
366-
_ => {error!("Notification not handled by main thread: {}", n.method)}
351+
// If buffer is empty, block for next message so we do not busy wait
352+
if buffer.is_empty() {
353+
match generic_receiver.recv() {
354+
Ok(msg) => {
355+
// Check for shutdown
356+
if matches!(&msg, Message::Notification(n) if n.method.as_str() == Shutdown::METHOD) {
357+
warn!("Main thread - got shutdown.");
358+
return;
359+
}
360+
buffer.push_back(msg);
361+
},
362+
Err(_) => {
363+
error!("Got an RecvError, exiting thread");
364+
break;
365+
}
366+
}
367+
}
368+
// Process buffered messages
369+
if let Some(msg) = buffer.pop_front() {
370+
let mut session = SessionInfo{
371+
sender: sender.clone(),
372+
receiver: receiver.clone(),
373+
sync_odoo: &mut sync_odoo.lock().unwrap(),
374+
delayed_process_sender: Some(delayed_process_sender.clone()),
375+
noqas_stack: vec![],
376+
current_noqa: NoqaInfo::None,
377+
};
378+
match msg {
379+
Message::Request(r) => {
380+
let (value, error) = match r.method.as_str() {
381+
Completion::METHOD => {
382+
to_value::<CompletionResponse>(Odoo::handle_autocomplete(&mut session, serde_json::from_value(r.params).unwrap()))
383+
},
384+
_ => {error!("Request not handled by main thread: {}", r.method); (None, Some(ResponseError{
385+
code: 1,
386+
message: S!("Request not handled by the server"),
387+
data: None
388+
}))}
389+
};
390+
sender.send(Message::Response(Response { id: r.id, result: value, error: error })).unwrap();
391+
},
392+
Message::Notification(n) => {
393+
match n.method.as_str() {
394+
DidOpenTextDocument::METHOD => { Odoo::handle_did_open(&mut session, serde_json::from_value(n.params).unwrap()); }
395+
DidChangeConfiguration::METHOD => { Odoo::handle_did_change_configuration(&mut session, serde_json::from_value(n.params).unwrap()) }
396+
DidChangeWorkspaceFolders::METHOD => { Odoo::handle_did_change_workspace_folders(&mut session, serde_json::from_value(n.params).unwrap()) }
397+
DidChangeTextDocument::METHOD => { Odoo::handle_did_change(&mut session, serde_json::from_value(n.params).unwrap()); }
398+
DidCloseTextDocument::METHOD => { Odoo::handle_did_close(&mut session, serde_json::from_value(n.params).unwrap()); }
399+
DidSaveTextDocument::METHOD => { Odoo::handle_did_save(&mut session, serde_json::from_value(n.params).unwrap()); }
400+
DidRenameFiles::METHOD => { Odoo::handle_did_rename(&mut session, serde_json::from_value(n.params).unwrap()); }
401+
DidCreateFiles::METHOD => { Odoo::handle_did_create(&mut session, serde_json::from_value(n.params).unwrap()); }
402+
DidDeleteFiles::METHOD => { Odoo::handle_did_delete(&mut session, serde_json::from_value(n.params).unwrap()); }
403+
DidChangeWatchedFiles::METHOD => { Odoo::handle_did_change_watched_files(&mut session, serde_json::from_value(n.params).unwrap())}
404+
"custom/server/register_capabilities" => { Odoo::register_capabilities(&mut session); }
405+
"custom/server/init" => { Odoo::init(&mut session); }
406+
Shutdown::METHOD => { warn!("Main thread - got shutdown."); return;} // should be already caught
407+
_ => {error!("Notification not handled by main thread: {}", n.method)}
408+
}
409+
},
410+
Message::Response(_) => {
411+
error!("Error: Responses should not arrives in generic channel. Exiting thread");
412+
return;
367413
}
368-
},
369-
Message::Response(_) => {
370-
error!("Error: Responses should not arrives in generic channel. Exiting thread");
371-
break;
372414
}
373415
}
374416
}
375417
}
376418

377419
pub fn message_processor_thread_read(sync_odoo: Arc<Mutex<SyncOdoo>>, generic_receiver: Receiver<Message>, sender: Sender<Message>, receiver: Receiver<Message>, delayed_process_sender: Sender<DelayedProcessingMessage>) {
420+
let mut buffer = VecDeque::new();
378421
loop {
379-
let msg = generic_receiver.recv();
380-
if msg.is_err() {
381-
error!("Got an RecvError, exiting thread");
382-
break;
422+
// Drain all available messages into buffer
423+
loop {
424+
let maybe_msg = generic_receiver.try_recv();
425+
match maybe_msg {
426+
Ok(msg) => {
427+
// Check for shutdown
428+
if matches!(&msg, Message::Notification(n) if n.method.as_str() == Shutdown::METHOD) {
429+
warn!("Read thread - got shutdown.");
430+
return;
431+
}
432+
buffer.push_back(msg);
433+
},
434+
Err(TryRecvError::Empty) => break,
435+
Err(TryRecvError::Disconnected) => {
436+
error!("Generic channel disconnected, exiting thread");
437+
return;
438+
}
439+
}
383440
}
384-
let msg = msg.unwrap();
385-
let mut session = SessionInfo{
386-
sender: sender.clone(),
387-
receiver: receiver.clone(),
388-
sync_odoo: &mut sync_odoo.lock().unwrap(), //TODO work on read access
389-
delayed_process_sender: Some(delayed_process_sender.clone()),
390-
noqas_stack: vec![],
391-
current_noqa: NoqaInfo::None,
392-
};
393-
match msg {
394-
Message::Request(r) => {
395-
let (value, error) = match r.method.as_str() {
396-
HoverRequest::METHOD => {
397-
to_value::<Hover>(Odoo::handle_hover(&mut session, serde_json::from_value(r.params).unwrap()))
398-
},
399-
GotoDefinition::METHOD => {
400-
to_value::<GotoTypeDefinitionResponse>(Odoo::handle_goto_definition(&mut session, serde_json::from_value(r.params).unwrap()))
401-
},
402-
References::METHOD => {
403-
to_value::<Vec<Location>>(Odoo::handle_references(&mut session, serde_json::from_value(r.params).unwrap()))
404-
},
405-
DocumentSymbolRequest::METHOD => {
406-
to_value::<DocumentSymbolResponse>(Odoo::handle_document_symbols(&mut session, serde_json::from_value(r.params).unwrap()))
407-
},
408-
_ => {error!("Request not handled by read thread: {}", r.method); (None, Some(ResponseError{
409-
code: 1,
410-
message: S!("Request not handled by the server"),
411-
data: None
412-
}))}
413-
};
414-
sender.send(Message::Response(Response { id: r.id, result: value, error: error })).unwrap();
415-
},
416-
Message::Notification(r) => {
417-
match r.method.as_str() {
418-
Shutdown::METHOD => { warn!("Read thread - got shutdown."); break;}
419-
_ => {error!("Notification not handled by read thread: {}", r.method)}
441+
// If buffer is empty, block for next message so we do not busy wait
442+
if buffer.is_empty() {
443+
match generic_receiver.recv() {
444+
Ok(msg) => {
445+
// Check for shutdown
446+
if matches!(&msg, Message::Notification(n) if n.method.as_str() == Shutdown::METHOD) {
447+
warn!("Read thread - got shutdown.");
448+
return;
449+
}
450+
buffer.push_back(msg);
451+
},
452+
Err(_) => {
453+
error!("Got an RecvError, exiting thread");
454+
break;
455+
}
456+
}
457+
}
458+
// Process buffered messages
459+
if let Some(msg) = buffer.pop_front() {
460+
let mut session = SessionInfo{
461+
sender: sender.clone(),
462+
receiver: receiver.clone(),
463+
sync_odoo: &mut sync_odoo.lock().unwrap(),
464+
delayed_process_sender: Some(delayed_process_sender.clone()),
465+
noqas_stack: vec![],
466+
current_noqa: NoqaInfo::None,
467+
};
468+
match msg {
469+
Message::Request(r) => {
470+
let (value, error) = match r.method.as_str() {
471+
HoverRequest::METHOD => {
472+
to_value::<Hover>(Odoo::handle_hover(&mut session, serde_json::from_value(r.params).unwrap()))
473+
},
474+
GotoDefinition::METHOD => {
475+
to_value::<GotoTypeDefinitionResponse>(Odoo::handle_goto_definition(&mut session, serde_json::from_value(r.params).unwrap()))
476+
},
477+
References::METHOD => {
478+
to_value::<Vec<Location>>(Odoo::handle_references(&mut session, serde_json::from_value(r.params).unwrap()))
479+
},
480+
DocumentSymbolRequest::METHOD => {
481+
to_value::<DocumentSymbolResponse>(Odoo::handle_document_symbols(&mut session, serde_json::from_value(r.params).unwrap()))
482+
},
483+
_ => {error!("Request not handled by read thread: {}", r.method); (None, Some(ResponseError{
484+
code: 1,
485+
message: S!("Request not handled by the server"),
486+
data: None
487+
}))}
488+
};
489+
sender.send(Message::Response(Response { id: r.id, result: value, error: error })).unwrap();
490+
},
491+
Message::Notification(r) => {
492+
match r.method.as_str() {
493+
Shutdown::METHOD => { warn!("Read thread - got shutdown."); return;}
494+
_ => {error!("Notification not handled by read thread: {}", r.method)}
495+
}
496+
},
497+
Message::Response(_) => {
498+
error!("Error: Responses should not arrives in generic channel. Exiting thread");
499+
return;
420500
}
421-
},
422-
Message::Response(_) => {
423-
error!("Error: Responses should not arrives in generic channel. Exiting thread");
424-
break;
425501
}
426502
}
427503
}

0 commit comments

Comments
 (0)