feat(queue): add queue introspection for console dashboard#136
feat(queue): add queue introspection for console dashboard#136
Conversation
Add queue introspection methods to support the console queue dashboard. Extends QueueKvStore with lrange, list_keys_with_prefix, and zcount. Adds list_queues, queue_stats, list_jobs, and get_job to the QueueAdapter trait with default implementations. Implements on BuiltinQueueAdapter and registers as engine functions including redrive_dlq and dlq_count.
645bece to
8fc340b
Compare
Address review findings: validate queue names (alphanumeric + -_.:, max 128 chars), job IDs (max 256 chars), and job states against allowed values. Clamp limit to max 500. Add tracing::warn for destructive redrive_dlq calls.
📝 WalkthroughWalkthroughThis PR extends the queue system with introspection and job retrieval capabilities across multiple architectural layers. It adds six new async methods to BuiltinQueue for listing queues and jobs by state with pagination and count operations. Supporting KV store methods for range queries, prefix-based key enumeration, and sorted set counting are introduced. A trait-level QueueAdapter abstraction defines four new optional methods with default "not supported" errors. These propagate through the adapter implementation and surface as six new public QueueCoreModule functions with input validation and error handling. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
src/modules/queue/adapters/builtin/adapter.rs (1)
197-219: Consider potential performance impact with many queues.The
list_queuesimplementation makes 4 async calls per queue (waiting_count, active_count, delayed_count, dlq_count). For dashboards with many queues, this could introduce latency. This is acceptable for console dashboard use cases, but worth noting if queue counts grow significantly.If performance becomes a concern in the future, consider adding a batch stats method to
BuiltinQueuethat returns all counts in a single operation.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/modules/queue/adapters/builtin/adapter.rs` around lines 197 - 219, list_queues currently awaits four separate async calls per queue (waiting_count, active_count, delayed_count, dlq_count) which will scale poorly; add a batch stats API on the BuiltinQueue (e.g., a method like queue_stats or counts_for that returns a struct with waiting/active/delayed/dlq totals) and update list_queues to call that single async method per queue (replace the four awaits with one await for queue.queue_stats(&name).await and map fields into the JSON), so counts are fetched in one operation; alternatively, if you cannot change BuiltinQueue now, run the four count futures concurrently using futures::join or join_all and then assemble the result in list_queues.src/builtins/queue.rs (1)
695-700: Delayed job pagination is inefficient for large queues.The current implementation fetches all delayed jobs with
zrangebyscoreand then performs in-memory pagination withskip(offset).take(limit). For queues with many delayed jobs, this loads all job IDs into memory before pagination.Consider adding a
zrangemethod with offset/count support toQueueKvStorefor more efficient pagination, similar to howlrangeworks for lists. This would allow fetching only the needed slice directly from the sorted set.💡 Suggested improvement for delayed job pagination
Add a
zrangemethod toQueueKvStorethat supports offset-based pagination:// In queue_kv.rs pub async fn zrange(&self, key: &str, start: usize, stop: usize) -> Vec<String> { let sorted_sets = self.sorted_sets.read().await; let Some(set) = sorted_sets.get(key) else { return vec![]; }; set.iter() .flat_map(|(_, members)| members.iter().cloned()) .skip(start) .take(stop - start + 1) .collect() }Then use it in
list_jobs_in_state:"delayed" => { - let all = self - .kv_store - .zrangebyscore(&self.delayed_key(queue), 0, i64::MAX) - .await; - all.into_iter().skip(offset).take(limit).collect() + self.kv_store + .zrange(&self.delayed_key(queue), offset, offset + limit - 1) + .await }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/builtins/queue.rs` around lines 695 - 700, The delayed-job branch in list_jobs_in_state currently calls self.kv_store.zrangebyscore(&self.delayed_key(queue), 0, i64::MAX) and then does in-memory pagination with skip/take, which loads all IDs; add a zrange(start, count) method to QueueKvStore (analogous to lrange) and replace the zrangebyscore call in list_jobs_in_state's "delayed" arm to call self.kv_store.zrange(&self.delayed_key(queue), offset, limit) so only the requested slice is fetched; ensure the new zrange honors ordering used by the sorted set and returns Vec<String> of job IDs to keep compatibility with the rest of the code.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/modules/queue/queue.rs`:
- Around line 197-224: The response currently returns the original user-provided
limit (input.limit) instead of the clamped value; in the jobs function update
the success payload to use the clamped limit variable (computed with
input.limit.min(MAX_LIMIT)) when setting the "limit" field in the JSON response
so API consumers see the actual effective limit returned by list_jobs.
---
Nitpick comments:
In `@src/builtins/queue.rs`:
- Around line 695-700: The delayed-job branch in list_jobs_in_state currently
calls self.kv_store.zrangebyscore(&self.delayed_key(queue), 0, i64::MAX) and
then does in-memory pagination with skip/take, which loads all IDs; add a
zrange(start, count) method to QueueKvStore (analogous to lrange) and replace
the zrangebyscore call in list_jobs_in_state's "delayed" arm to call
self.kv_store.zrange(&self.delayed_key(queue), offset, limit) so only the
requested slice is fetched; ensure the new zrange honors ordering used by the
sorted set and returns Vec<String> of job IDs to keep compatibility with the
rest of the code.
In `@src/modules/queue/adapters/builtin/adapter.rs`:
- Around line 197-219: list_queues currently awaits four separate async calls
per queue (waiting_count, active_count, delayed_count, dlq_count) which will
scale poorly; add a batch stats API on the BuiltinQueue (e.g., a method like
queue_stats or counts_for that returns a struct with waiting/active/delayed/dlq
totals) and update list_queues to call that single async method per queue
(replace the four awaits with one await for queue.queue_stats(&name).await and
map fields into the JSON), so counts are fetched in one operation;
alternatively, if you cannot change BuiltinQueue now, run the four count futures
concurrently using futures::join or join_all and then assemble the result in
list_queues.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
src/builtins/queue.rssrc/builtins/queue_kv.rssrc/modules/queue/adapters/builtin/adapter.rssrc/modules/queue/mod.rssrc/modules/queue/queue.rs
| #[function(id = "jobs", description = "List jobs by state")] | ||
| pub async fn jobs(&self, input: QueueJobsInput) -> FunctionResult<Option<Value>, ErrorBody> { | ||
| if let Err(e) = validate_queue_name(&input.topic) { | ||
| return FunctionResult::Failure(e); | ||
| } | ||
| if let Err(e) = validate_job_state(&input.state) { | ||
| return FunctionResult::Failure(e); | ||
| } | ||
|
|
||
| let limit = input.limit.min(MAX_LIMIT); | ||
|
|
||
| match self | ||
| .adapter | ||
| .list_jobs(&input.topic, &input.state, input.offset, limit) | ||
| .await | ||
| { | ||
| Ok(jobs) => FunctionResult::Success(Some(serde_json::json!({ | ||
| "jobs": jobs, | ||
| "count": jobs.len(), | ||
| "offset": input.offset, | ||
| "limit": input.limit, | ||
| }))), | ||
| Err(e) => FunctionResult::Failure(ErrorBody { | ||
| code: "jobs_failed".into(), | ||
| message: format!("{:?}", e), | ||
| }), | ||
| } | ||
| } |
There was a problem hiding this comment.
Minor: Response includes original limit instead of clamped value.
At line 217, the response includes "limit": input.limit (the original user-provided value) rather than the clamped limit variable from line 206. This could be confusing to API consumers who might not understand why they received fewer items than their requested limit.
🔧 Suggested fix
Ok(jobs) => FunctionResult::Success(Some(serde_json::json!({
"jobs": jobs,
"count": jobs.len(),
"offset": input.offset,
- "limit": input.limit,
+ "limit": limit,
}))),📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| #[function(id = "jobs", description = "List jobs by state")] | |
| pub async fn jobs(&self, input: QueueJobsInput) -> FunctionResult<Option<Value>, ErrorBody> { | |
| if let Err(e) = validate_queue_name(&input.topic) { | |
| return FunctionResult::Failure(e); | |
| } | |
| if let Err(e) = validate_job_state(&input.state) { | |
| return FunctionResult::Failure(e); | |
| } | |
| let limit = input.limit.min(MAX_LIMIT); | |
| match self | |
| .adapter | |
| .list_jobs(&input.topic, &input.state, input.offset, limit) | |
| .await | |
| { | |
| Ok(jobs) => FunctionResult::Success(Some(serde_json::json!({ | |
| "jobs": jobs, | |
| "count": jobs.len(), | |
| "offset": input.offset, | |
| "limit": input.limit, | |
| }))), | |
| Err(e) => FunctionResult::Failure(ErrorBody { | |
| code: "jobs_failed".into(), | |
| message: format!("{:?}", e), | |
| }), | |
| } | |
| } | |
| #[function(id = "jobs", description = "List jobs by state")] | |
| pub async fn jobs(&self, input: QueueJobsInput) -> FunctionResult<Option<Value>, ErrorBody> { | |
| if let Err(e) = validate_queue_name(&input.topic) { | |
| return FunctionResult::Failure(e); | |
| } | |
| if let Err(e) = validate_job_state(&input.state) { | |
| return FunctionResult::Failure(e); | |
| } | |
| let limit = input.limit.min(MAX_LIMIT); | |
| match self | |
| .adapter | |
| .list_jobs(&input.topic, &input.state, input.offset, limit) | |
| .await | |
| { | |
| Ok(jobs) => FunctionResult::Success(Some(serde_json::json!({ | |
| "jobs": jobs, | |
| "count": jobs.len(), | |
| "offset": input.offset, | |
| "limit": limit, | |
| }))), | |
| Err(e) => FunctionResult::Failure(ErrorBody { | |
| code: "jobs_failed".into(), | |
| message: format!("{:?}", e), | |
| }), | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/modules/queue/queue.rs` around lines 197 - 224, The response currently
returns the original user-provided limit (input.limit) instead of the clamped
value; in the jobs function update the success payload to use the clamped limit
variable (computed with input.limit.min(MAX_LIMIT)) when setting the "limit"
field in the JSON response so API consumers see the actual effective limit
returned by list_jobs.
Summary
lrangeandlist_keys_with_prefixmethods toQueueKvStorefor paginated job listing and queue discoveryBuiltinQueue:list_queues,waiting_count,active_count,delayed_count,list_jobs_in_state,get_job_by_idQueueAdaptertrait with 4 new methods (list_queues,queue_stats,list_jobs,get_job) with default not-supported fallbacksBuiltinQueueAdapterdelegating toBuiltinQueueQueueCoreModule:list_queues,stats,jobs,job(callable asqueue.list_queues, etc.)Test plan
cargo buildcompiles without errorscargo testpasses existing testsqueue.list_queuesreturns queue names with countsqueue.statsreturns correct waiting/active/delayed/dlq countsqueue.jobsreturns paginated job lists per statequeue.jobreturns single job detail by IDSummary by CodeRabbit