Skip to content

Commit c2e27d4

Browse files
rename, relocate, and simplify the task queue (#106)
1 parent 3fb6fa9 commit c2e27d4

File tree

10 files changed

+229
-333
lines changed

10 files changed

+229
-333
lines changed

CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ and this project attempts to adhere to [Semantic Versioning](https://semver.org/
1818

1919
## [Unreleased]
2020

21+
### Changed
22+
23+
- **Internal**: Moved task queueing functionality to `djls-server` crate, renamed from `Worker` to `Queue`, and simplified API.
24+
2125
## [5.2.0a0]
2226

2327
### Added
@@ -79,4 +83,4 @@ and this project attempts to adhere to [Semantic Versioning](https://semver.org/
7983
[5.1.0a1]: https://github.com/joshuadavidthomas/django-language-server/releases/tag/v5.1.0a1
8084
[5.1.0a2]: https://github.com/joshuadavidthomas/django-language-server/releases/tag/v5.1.0a2
8185

82-
[5.2.0a0]: https://github.com/joshuadavidthomas/django-language-server/releases/tag/v5.2.0a0
86+
[5.2.0a0]: https://github.com/joshuadavidthomas/django-language-server/releases/tag/v5.2.0a0

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ djls = { path = "crates/djls" }
77
djls-project = { path = "crates/djls-project" }
88
djls-server = { path = "crates/djls-server" }
99
djls-templates = { path = "crates/djls-templates" }
10-
djls-worker = { path = "crates/djls-worker" }
1110

1211
anyhow = "1.0"
1312
async-trait = "0.1"

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,6 @@ The project is written in Rust using PyO3 for Python integration. Here is a high
172172
- Django and Python project introspection ([`crates/djls-project/`](./crates/djls-project/))
173173
- LSP server implementation ([`crates/djls-server/`](./crates/djls-server/))
174174
- Template parsing ([`crates/djls-templates/`](./crates/djls-templates/))
175-
- Tokio-based background task management ([`crates/djls-worker/`](./crates/djls-worker/))
176175

177176
Code contributions are welcome from developers of all backgrounds. Rust expertise is valuable for the LSP server and core components, but Python and Django developers should not be deterred by the Rust codebase - Django expertise is just as valuable. Understanding Django's internals and common development patterns helps inform what features would be most valuable.
178177

crates/djls-server/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ edition = "2021"
66
[dependencies]
77
djls-project = { workspace = true }
88
djls-templates = { workspace = true }
9-
djls-worker = { workspace = true }
109

1110
anyhow = { workspace = true }
1211
pyo3 = { workspace = true }

crates/djls-server/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
mod documents;
2+
mod queue;
23
mod server;
3-
mod tasks;
44
mod workspace;
55

66
use crate::server::DjangoLanguageServer;

crates/djls-server/src/queue.rs

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
use anyhow::Result;
2+
use std::sync::Arc;
3+
use tokio::sync::{mpsc, oneshot};
4+
5+
pub trait Task: Send + 'static {
6+
type Output: Send + 'static;
7+
fn run(&self) -> Result<Self::Output>;
8+
}
9+
10+
trait TaskTrait: Send {
11+
fn run_boxed(self: Box<Self>);
12+
}
13+
14+
impl<T: Task> TaskTrait for T {
15+
fn run_boxed(self: Box<Self>) {
16+
match self.run() {
17+
Ok(_) => { /* Task succeeded, do nothing */ }
18+
Err(e) => {
19+
// Log the error if the task failed.
20+
// Consider adding a proper logging mechanism later.
21+
eprintln!("Task failed: {}", e);
22+
}
23+
}
24+
}
25+
}
26+
27+
#[derive(Clone)]
28+
pub struct Queue {
29+
inner: Arc<QueueInner>,
30+
}
31+
32+
struct QueueInner {
33+
sender: mpsc::Sender<Box<dyn TaskTrait>>,
34+
shutdown_sender: Option<oneshot::Sender<()>>,
35+
}
36+
37+
impl Queue {
38+
pub fn new() -> Self {
39+
let (sender, mut receiver) = mpsc::channel::<Box<dyn TaskTrait>>(32); // Channel for tasks
40+
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
41+
42+
tokio::spawn(async move {
43+
loop {
44+
tokio::select! {
45+
Some(task) = receiver.recv() => {
46+
task.run_boxed();
47+
}
48+
_ = &mut shutdown_rx => {
49+
// Drain the channel before shutting down? Optional.
50+
// For now, just break.
51+
break;
52+
}
53+
else => break,
54+
}
55+
}
56+
});
57+
58+
Self {
59+
inner: Arc::new(QueueInner {
60+
sender,
61+
shutdown_sender: Some(shutdown_tx),
62+
}),
63+
}
64+
}
65+
66+
/// Submits a task to the queue asynchronously, waiting if the channel is full.
67+
/// The task is executed in the background, and its result is ignored.
68+
pub async fn submit<T>(&self, task: T) -> Result<()>
69+
where
70+
T: Task + 'static,
71+
{
72+
self.inner
73+
.sender
74+
.send(Box::new(task))
75+
.await
76+
.map_err(|e| anyhow::anyhow!("Failed to submit task: {}", e))
77+
}
78+
}
79+
80+
impl Default for Queue {
81+
fn default() -> Self {
82+
Self::new()
83+
}
84+
}
85+
86+
impl Drop for QueueInner {
87+
fn drop(&mut self) {
88+
if let Some(sender) = self.shutdown_sender.take() {
89+
sender.send(()).ok();
90+
}
91+
}
92+
}
93+
94+
#[cfg(test)]
95+
mod tests {
96+
use super::*;
97+
use anyhow::anyhow;
98+
use std::time::Duration;
99+
use tokio::time::sleep;
100+
101+
struct TestTask(i32);
102+
impl Task for TestTask {
103+
type Output = i32;
104+
fn run(&self) -> Result<Self::Output> {
105+
std::thread::sleep(Duration::from_millis(10));
106+
Ok(self.0 * 2)
107+
}
108+
}
109+
110+
struct ErrorTask;
111+
impl Task for ErrorTask {
112+
type Output = ();
113+
fn run(&self) -> Result<Self::Output> {
114+
Err(anyhow!("Task failed intentionally"))
115+
}
116+
}
117+
118+
#[tokio::test]
119+
async fn test_submit_and_process() {
120+
let queue = Queue::new();
121+
// Submit a few tasks
122+
for i in 0..5 {
123+
queue.submit(TestTask(i)).await.unwrap();
124+
}
125+
// Submit a task that will fail
126+
queue.submit(ErrorTask).await.unwrap();
127+
128+
// Allow some time for tasks to be processed by the background worker.
129+
// In a real scenario, you might not wait like this, but for testing,
130+
// we need to ensure the background task has a chance to run.
131+
sleep(Duration::from_millis(100)).await;
132+
133+
// We can't directly assert results here, but we can check the queue still works.
134+
queue.submit(TestTask(10)).await.unwrap();
135+
sleep(Duration::from_millis(50)).await; // Allow time for the last task
136+
}
137+
138+
#[tokio::test]
139+
async fn test_channel_backpressure_submit() {
140+
let queue = Queue::new();
141+
142+
// Fill the channel (channel size is 32) using submit
143+
let mut tasks = Vec::new();
144+
for i in 0..32 {
145+
let queue_clone = queue.clone();
146+
// Spawn tasks to submit concurrently, as submit waits
147+
tasks.push(tokio::spawn(async move {
148+
queue_clone
149+
.submit(TestTask(i))
150+
.await
151+
.expect("Submit should succeed");
152+
}));
153+
}
154+
// Wait for all initial submissions to likely be sent (though not necessarily processed)
155+
for task in tasks {
156+
task.await.unwrap();
157+
}
158+
159+
// Try submitting one more task. This should wait until a slot is free.
160+
// We'll use a timeout to ensure it doesn't block forever if something is wrong.
161+
let submit_task = queue.submit(TestTask(33));
162+
match tokio::time::timeout(Duration::from_millis(200), submit_task).await {
163+
Ok(Ok(_)) => { /* Successfully submitted after waiting */ }
164+
Ok(Err(e)) => panic!("Submit failed unexpectedly: {}", e),
165+
Err(_) => panic!("Submit timed out, likely blocked due to backpressure not resolving"),
166+
}
167+
168+
// Allow time for processing
169+
sleep(Duration::from_millis(100)).await;
170+
}
171+
172+
#[tokio::test]
173+
async fn test_shutdown() {
174+
let queue = Queue::new();
175+
queue.submit(TestTask(1)).await.unwrap();
176+
queue.submit(TestTask(2)).await.unwrap();
177+
// Queue is dropped here, triggering shutdown
178+
drop(queue);
179+
180+
// Allow time for shutdown signal to be processed and potentially
181+
// for the background task to finish ongoing work (though not guaranteed here).
182+
sleep(Duration::from_millis(100)).await;
183+
// No direct assertion, just checking it doesn't panic/hang.
184+
}
185+
186+
#[tokio::test]
187+
async fn test_queue_cloning() {
188+
let queue1 = Queue::new();
189+
let queue2 = queue1.clone();
190+
191+
// Submit tasks via both clones
192+
let task1 = queue1.submit(TestTask(10));
193+
let task2 = queue2.submit(TestTask(20));
194+
195+
// Wait for submissions to complete
196+
tokio::try_join!(task1, task2).unwrap();
197+
198+
// Allow time for processing
199+
sleep(Duration::from_millis(100)).await;
200+
}
201+
202+
#[tokio::test]
203+
async fn test_error_task_does_not_stop_queue() {
204+
let queue = Queue::new();
205+
206+
queue.submit(TestTask(1)).await.unwrap();
207+
queue.submit(ErrorTask).await.unwrap(); // Submit the failing task
208+
queue.submit(TestTask(2)).await.unwrap();
209+
210+
// Allow time for tasks to process
211+
sleep(Duration::from_millis(100)).await;
212+
213+
// Submit another task to ensure the queue is still running after the error
214+
queue.submit(TestTask(3)).await.unwrap();
215+
sleep(Duration::from_millis(50)).await;
216+
// If we reach here without panic, the queue continued after the error.
217+
// We expect an error message "Task failed: Task failed intentionally"
218+
// to be printed to stderr during the test run.
219+
}
220+
}

crates/djls-server/src/server.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use crate::documents::Store;
2+
use crate::queue::Queue;
23
use crate::workspace::get_project_path;
34
use anyhow::Result;
45
use djls_project::DjangoProject;
5-
use djls_worker::Worker;
66
use std::sync::Arc;
77
use tokio::sync::RwLock;
88
use tower_lsp_server::jsonrpc::Result as LspResult;
@@ -16,7 +16,7 @@ pub struct DjangoLanguageServer {
1616
client: Client,
1717
project: Arc<RwLock<Option<DjangoProject>>>,
1818
documents: Arc<RwLock<Store>>,
19-
worker: Worker,
19+
queue: Queue,
2020
}
2121

2222
impl DjangoLanguageServer {
@@ -25,7 +25,7 @@ impl DjangoLanguageServer {
2525
client,
2626
project: Arc::new(RwLock::new(None)),
2727
documents: Arc::new(RwLock::new(Store::new())),
28-
worker: Worker::new(),
28+
queue: Queue::new(),
2929
}
3030
}
3131

crates/djls-server/src/tasks.rs

Lines changed: 0 additions & 25 deletions
This file was deleted.

crates/djls-worker/Cargo.toml

Lines changed: 0 additions & 9 deletions
This file was deleted.

0 commit comments

Comments
 (0)