Skip to content

Commit 5997d9b

Browse files
simplify task queue API to directly accept futures (#137)
1 parent eb59038 commit 5997d9b

File tree

2 files changed

+76
-82
lines changed

2 files changed

+76
-82
lines changed

crates/djls-server/src/queue.rs

Lines changed: 23 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -118,23 +118,21 @@ impl Queue {
118118

119119
/// Submits an asynchronous task to the queue.
120120
///
121-
/// The task is provided as a closure (`task_fn`) that returns a `Future`.
122-
/// This function wraps the provided closure and future for type erasure
123-
/// and sends it to the background worker task for sequential execution.
121+
/// This method accepts a `Future` directly and sends it to the background worker
122+
/// task for sequential execution. The future should resolve to `Result<()>`.
124123
///
125124
/// The `await` on this method only waits for the task to be *sent* to the
126125
/// queue's channel, not for the task to be *executed*. If the queue's
127126
/// channel is full, this method will wait until space becomes available.
128127
///
129128
/// # Usage
130129
///
131-
/// The `task_fn` must be a closure that takes no arguments and returns
132-
/// a `Future` which resolves to `Result<()>`. Typically, this is written
133-
/// using the `|| async move { ... }` syntax:
130+
/// The `future` must be a `Future` which resolves to `Result<()>`. Typically,
131+
/// this is provided using an `async move` block:
134132
///
135133
/// ```rust,ignore
136134
/// let data_to_capture = 42;
137-
/// queue.submit(move || async move {
135+
/// queue.submit(async move {
138136
/// // ... perform async work using data_to_capture ...
139137
/// println!("Processing data: {}", data_to_capture);
140138
/// tokio::time::sleep(std::time::Duration::from_millis(10)).await;
@@ -144,20 +142,14 @@ impl Queue {
144142
///
145143
/// # Type Parameters
146144
///
147-
/// - `F`: The type of the closure (`FnOnce() -> Fut`). Must be `Send + 'static`.
148-
/// - `Fut`: The type of the `Future` returned by the closure. Must resolve
149-
/// to `Result<()>` and be `Send + 'static`.
150-
pub async fn submit<F, Fut>(&self, task_fn: F) -> Result<()>
145+
/// - `F`: The type of the `Future`. Must resolve to `Result<()>` and be `Send + 'static`.
146+
pub async fn submit<F>(&self, future: F) -> Result<()>
151147
where
152-
F: FnOnce() -> Fut + Send + 'static,
153-
Fut: Future<Output = Result<()>> + Send + 'static,
148+
F: Future<Output = Result<()>> + Send + 'static,
154149
{
155150
// Create the inner closure that matches `TaskClosure`'s signature.
156-
// This closure, when called by the worker:
157-
// 1. Calls the user-provided `task_fn()`.
158-
// 2. Gets the user's concrete `Future` (`Fut`).
159-
// 3. Pins it and Boxes it, implicitly converting it to `TaskFuture`.
160-
let boxed_task_closure: TaskClosure = Box::new(move || Box::pin(task_fn()));
151+
// This closure wraps the future in a way that TaskClosure expects.
152+
let boxed_task_closure: TaskClosure = Box::new(move || Box::pin(future));
161153

162154
// Send the boxed, type-erased closure to the worker task.
163155
// This will wait if the channel buffer is full.
@@ -209,7 +201,7 @@ mod tests {
209201
for i in 0..5 {
210202
let counter_clone = Arc::clone(&counter);
211203
queue
212-
.submit(move || async move {
204+
.submit(async move {
213205
sleep(Duration::from_millis(5)).await;
214206
println!("Processing task {}", i);
215207
counter_clone.fetch_add(1, Ordering::SeqCst);
@@ -221,7 +213,7 @@ mod tests {
221213

222214
// Submit a task that will fail
223215
queue
224-
.submit(|| async {
216+
.submit(async {
225217
println!("Submitting failing task");
226218
Err(anyhow!("Task failed intentionally"))
227219
})
@@ -234,7 +226,7 @@ mod tests {
234226
// Submit another task
235227
let counter_clone = Arc::clone(&counter);
236228
queue
237-
.submit(|| async move {
229+
.submit(async move {
238230
println!("Processing task after error");
239231
counter_clone.fetch_add(1, Ordering::SeqCst);
240232
Ok(())
@@ -257,7 +249,7 @@ mod tests {
257249
let counter_clone = Arc::clone(&counter);
258250
tasks.push(tokio::spawn(async move {
259251
queue_clone
260-
.submit(|| async move {
252+
.submit(async move {
261253
counter_clone.fetch_add(1, Ordering::Relaxed);
262254
sleep(Duration::from_millis(2)).await;
263255
Ok(())
@@ -273,7 +265,7 @@ mod tests {
273265
println!("Finished submitting initial 32 tasks");
274266

275267
let counter_clone = Arc::clone(&counter);
276-
let submit_task = queue.submit(|| async move {
268+
let submit_task = queue.submit(async move {
277269
println!("Processing the 33rd task");
278270
counter_clone.fetch_add(1, Ordering::Relaxed);
279271
Ok(())
@@ -298,7 +290,7 @@ mod tests {
298290

299291
let counter_clone1 = Arc::clone(&counter);
300292
queue
301-
.submit(|| async move {
293+
.submit(async move {
302294
sleep(Duration::from_millis(50)).await;
303295
counter_clone1.fetch_add(1, Ordering::SeqCst);
304296
Ok(())
@@ -308,7 +300,7 @@ mod tests {
308300

309301
let counter_clone2 = Arc::clone(&counter);
310302
queue
311-
.submit(|| async move {
303+
.submit(async move {
312304
sleep(Duration::from_millis(50)).await;
313305
counter_clone2.fetch_add(1, Ordering::SeqCst);
314306
Ok(())
@@ -331,13 +323,13 @@ mod tests {
331323
let counter = Arc::new(AtomicUsize::new(0));
332324

333325
let counter_clone1 = Arc::clone(&counter);
334-
let task1 = queue1.submit(|| async move {
326+
let task1 = queue1.submit(async move {
335327
counter_clone1.fetch_add(1, Ordering::SeqCst);
336328
Ok(())
337329
});
338330

339331
let counter_clone2 = Arc::clone(&counter);
340-
let task2 = queue2.submit(|| async move {
332+
let task2 = queue2.submit(async move {
341333
counter_clone2.fetch_add(1, Ordering::SeqCst);
342334
Ok(())
343335
});
@@ -354,21 +346,21 @@ mod tests {
354346

355347
let counter_clone1 = Arc::clone(&counter);
356348
queue
357-
.submit(|| async move {
349+
.submit(async move {
358350
counter_clone1.fetch_add(1, Ordering::SeqCst);
359351
Ok(())
360352
})
361353
.await
362354
.unwrap();
363355

364356
queue
365-
.submit(|| async { Err(anyhow!("Intentional failure")) })
357+
.submit(async { Err(anyhow!("Intentional failure")) })
366358
.await
367359
.unwrap();
368360

369361
let counter_clone2 = Arc::clone(&counter);
370362
queue
371-
.submit(|| async move {
363+
.submit(async move {
372364
counter_clone2.fetch_add(1, Ordering::SeqCst);
373365
Ok(())
374366
})
@@ -379,7 +371,7 @@ mod tests {
379371

380372
let counter_clone3 = Arc::clone(&counter);
381373
queue
382-
.submit(|| async move {
374+
.submit(async move {
383375
counter_clone3.fetch_add(1, Ordering::SeqCst);
384376
Ok(())
385377
})

crates/djls-server/src/server.rs

Lines changed: 53 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -158,73 +158,75 @@ impl LanguageServer for DjangoLanguageServer {
158158

159159
let project_arc = Arc::clone(&self.project);
160160
let client = self.client.clone();
161-
162161
let settings_arc = Arc::clone(&self.settings);
163-
let task = move || async move {
164-
let mut project_guard = project_arc.write().await;
165-
if let Some(project) = project_guard.as_mut() {
166-
let path_display = project.path().display().to_string();
167-
client
168-
.log_message(
169-
MessageType::INFO,
170-
&format!(
171-
"Task: Starting initialization for project at: {}",
172-
path_display
173-
),
174-
)
175-
.await;
176162

177-
let venv_path = {
178-
let settings = settings_arc.read().await;
179-
settings.venv_path().map(|s| s.to_string())
180-
};
181-
182-
if let Some(ref path) = venv_path {
163+
if let Err(e) = self
164+
.queue
165+
.submit(async move {
166+
let mut project_guard = project_arc.write().await;
167+
if let Some(project) = project_guard.as_mut() {
168+
let path_display = project.path().display().to_string();
183169
client
184170
.log_message(
185171
MessageType::INFO,
186-
&format!("Using virtual environment from config: {}", path),
172+
&format!(
173+
"Task: Starting initialization for project at: {}",
174+
path_display
175+
),
187176
)
188177
.await;
189-
}
190178

191-
match project.initialize(venv_path.as_deref()) {
192-
Ok(()) => {
179+
let venv_path = {
180+
let settings = settings_arc.read().await;
181+
settings.venv_path().map(|s| s.to_string())
182+
};
183+
184+
if let Some(ref path) = venv_path {
193185
client
194186
.log_message(
195187
MessageType::INFO,
196-
&format!(
197-
"Task: Successfully initialized project: {}",
198-
path_display
199-
),
188+
&format!("Using virtual environment from config: {}", path),
200189
)
201190
.await;
202191
}
203-
Err(e) => {
204-
client
205-
.log_message(
206-
MessageType::ERROR,
207-
&format!(
208-
"Task: Failed to initialize Django project at {}: {}",
209-
path_display, e
210-
),
211-
)
212-
.await;
213-
*project_guard = None;
192+
193+
match project.initialize(venv_path.as_deref()) {
194+
Ok(()) => {
195+
client
196+
.log_message(
197+
MessageType::INFO,
198+
&format!(
199+
"Task: Successfully initialized project: {}",
200+
path_display
201+
),
202+
)
203+
.await;
204+
}
205+
Err(e) => {
206+
client
207+
.log_message(
208+
MessageType::ERROR,
209+
&format!(
210+
"Task: Failed to initialize Django project at {}: {}",
211+
path_display, e
212+
),
213+
)
214+
.await;
215+
*project_guard = None;
216+
}
214217
}
218+
} else {
219+
client
220+
.log_message(
221+
MessageType::INFO,
222+
"Task: No project instance found to initialize.",
223+
)
224+
.await;
215225
}
216-
} else {
217-
client
218-
.log_message(
219-
MessageType::INFO,
220-
"Task: No project instance found to initialize.",
221-
)
222-
.await;
223-
}
224-
Ok(())
225-
};
226-
227-
if let Err(e) = self.queue.submit(task).await {
226+
Ok(())
227+
})
228+
.await
229+
{
228230
self.log_message(
229231
MessageType::ERROR,
230232
&format!("Failed to submit project initialization task: {}", e),

0 commit comments

Comments
 (0)