Skip to content

Commit 882bdd6

Browse files
authored
Fix scale issues when listing results with tens of thousands of jobs (#117)
* Add database indexes The command `torc results list <id>` was timing out when there are tens of thousands of results. * Add database scale test * Perform a batch unblock operation The per-job unblock was far too slow in large workflows. It caused runners to timeout and exit. * Fix job pagination in large workflows * Increase batch job creation size * Optimize bulk job initialization The previous behavior was causing timeouts when initializing a workflow with 100,000 jobs.
1 parent efd72bd commit 882bdd6

File tree

21 files changed

+887
-168
lines changed

21 files changed

+887
-168
lines changed

src/client/commands/compute_nodes.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,9 @@ pub enum ComputeNodeCommands {
7171
/// List compute nodes for this workflow (optional - will prompt if not provided)
7272
#[arg()]
7373
workflow_id: Option<i64>,
74-
/// Maximum number of compute nodes to return
75-
#[arg(short, long, default_value = "10000")]
76-
limit: i64,
74+
/// Maximum number of compute nodes to return (default: all)
75+
#[arg(short, long)]
76+
limit: Option<i64>,
7777
/// Offset for pagination (0-based)
7878
#[arg(short, long, default_value = "0")]
7979
offset: i64,
@@ -147,9 +147,11 @@ pub fn handle_compute_node_commands(
147147
},
148148
};
149149

150-
let mut params = ComputeNodeListParams::new()
151-
.with_offset(*offset)
152-
.with_limit(*limit);
150+
let mut params = ComputeNodeListParams::new().with_offset(*offset);
151+
152+
if let Some(limit_val) = limit {
153+
params = params.with_limit(*limit_val);
154+
}
153155

154156
if let Some(sort) = sort_by {
155157
params = params.with_sort_by(sort.clone());

src/client/commands/events.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,9 @@ EXAMPLES:
100100
/// Filter events by type or category
101101
#[arg(short = 't', long = "type", alias = "category")]
102102
event_type: Option<String>,
103-
/// Maximum number of events to return
104-
#[arg(short, long, default_value = "10000")]
105-
limit: i64,
103+
/// Maximum number of events to return (default: all)
104+
#[arg(short, long)]
105+
limit: Option<i64>,
106106
/// Offset for pagination (0-based)
107107
#[arg(short, long, default_value = "0")]
108108
offset: i64,
@@ -207,9 +207,11 @@ pub fn handle_event_commands(config: &Configuration, command: &EventCommands, fo
207207
None => select_workflow_interactively(config, &user_name).unwrap(),
208208
};
209209

210-
let mut params = EventListParams::new()
211-
.with_offset(*offset)
212-
.with_limit(*limit);
210+
let mut params = EventListParams::new().with_offset(*offset);
211+
212+
if let Some(limit_val) = limit {
213+
params = params.with_limit(*limit_val);
214+
}
213215

214216
if let Some(event_type_str) = event_type {
215217
params = params.with_category(event_type_str.clone());

src/client/commands/failure_handlers.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ pub enum FailureHandlerCommands {
2424
/// List failure handlers for this workflow (optional - will prompt if not provided)
2525
#[arg()]
2626
workflow_id: Option<i64>,
27-
/// Maximum number of failure handlers to return
28-
#[arg(short, long, default_value = "10000")]
29-
limit: i64,
27+
/// Maximum number of failure handlers to return (default: all)
28+
#[arg(short, long)]
29+
limit: Option<i64>,
3030
/// Offset for pagination (0-based)
3131
#[arg(long, default_value = "0")]
3232
offset: i64,
@@ -60,7 +60,7 @@ pub fn handle_failure_handler_commands(
6060
config,
6161
selected_workflow_id,
6262
Some(*offset),
63-
Some(*limit),
63+
*limit,
6464
) {
6565
Ok(response) => {
6666
let handlers = response.items.unwrap_or_default();

src/client/commands/files.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,9 @@ EXAMPLES:
8585
/// Filter by job ID that produced the files
8686
#[arg(long)]
8787
produced_by_job_id: Option<i64>,
88-
/// Maximum number of files to return
89-
#[arg(short, long, default_value = "10000")]
90-
limit: i64,
88+
/// Maximum number of files to return (default: all)
89+
#[arg(short, long)]
90+
limit: Option<i64>,
9191
/// Offset for pagination (0-based)
9292
#[arg(long, default_value = "0")]
9393
offset: i64,
@@ -200,10 +200,13 @@ pub fn handle_file_commands(config: &Configuration, command: &FileCommands, form
200200

201201
let mut params = FileListParams::new()
202202
.with_offset(*offset)
203-
.with_limit(*limit)
204203
.with_sort_by(sort_by.clone().unwrap_or_default())
205204
.with_reverse_sort(*reverse_sort);
206205

206+
if let Some(limit_val) = limit {
207+
params = params.with_limit(*limit_val);
208+
}
209+
207210
if let Some(job_id) = produced_by_job_id {
208211
params = params.with_produced_by_job_id(*job_id);
209212
}

src/client/commands/job_dependencies.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ pub enum JobDependencyCommands {
1313
/// ID of the workflow (optional - will prompt if not provided)
1414
#[arg()]
1515
workflow_id: Option<i64>,
16-
/// Maximum number of dependencies to return
17-
#[arg(short, long, default_value = "10000")]
18-
limit: i64,
16+
/// Maximum number of dependencies to return (default: all)
17+
#[arg(short, long)]
18+
limit: Option<i64>,
1919
/// Offset for pagination (0-based)
2020
#[arg(long, default_value = "0")]
2121
offset: i64,
@@ -25,9 +25,9 @@ pub enum JobDependencyCommands {
2525
/// ID of the workflow (optional - will prompt if not provided)
2626
#[arg()]
2727
workflow_id: Option<i64>,
28-
/// Maximum number of relationships to return
29-
#[arg(short, long, default_value = "10000")]
30-
limit: i64,
28+
/// Maximum number of relationships to return (default: all)
29+
#[arg(short, long)]
30+
limit: Option<i64>,
3131
/// Offset for pagination (0-based)
3232
#[arg(long, default_value = "0")]
3333
offset: i64,
@@ -37,9 +37,9 @@ pub enum JobDependencyCommands {
3737
/// ID of the workflow (optional - will prompt if not provided)
3838
#[arg()]
3939
workflow_id: Option<i64>,
40-
/// Maximum number of relationships to return
41-
#[arg(short, long, default_value = "10000")]
42-
limit: i64,
40+
/// Maximum number of relationships to return (default: all)
41+
#[arg(short, long)]
42+
limit: Option<i64>,
4343
/// Offset for pagination (0-based)
4444
#[arg(long, default_value = "0")]
4545
offset: i64,
@@ -116,7 +116,7 @@ pub fn handle_job_dependency_commands(
116116
config,
117117
selected_workflow_id,
118118
Some(*offset),
119-
Some(*limit),
119+
*limit,
120120
) {
121121
Ok(response) => {
122122
if print_if_json(format, &response, "job dependencies") {
@@ -163,7 +163,7 @@ pub fn handle_job_dependency_commands(
163163
config,
164164
selected_workflow_id,
165165
Some(*offset),
166-
Some(*limit),
166+
*limit,
167167
) {
168168
Ok(response) => {
169169
if print_if_json(format, &response, "job-file relationships") {
@@ -224,7 +224,7 @@ pub fn handle_job_dependency_commands(
224224
config,
225225
selected_workflow_id,
226226
Some(*offset),
227-
Some(*limit),
227+
*limit,
228228
) {
229229
Ok(response) => {
230230
if print_if_json(format, &response, "job-user_data relationships") {

src/client/commands/jobs.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -209,9 +209,9 @@ EXAMPLES:
209209
/// Filter by upstream job ID (jobs that depend on this job)
210210
#[arg(long)]
211211
upstream_job_id: Option<i64>,
212-
/// Maximum number of jobs to return
213-
#[arg(short, long, default_value = "10000")]
214-
limit: i64,
212+
/// Maximum number of jobs to return (default: all)
213+
#[arg(short, long)]
214+
limit: Option<i64>,
215215
/// Offset for pagination (0-based)
216216
#[arg(long, default_value = "0")]
217217
offset: i64,
@@ -467,11 +467,14 @@ pub fn handle_job_commands(config: &Configuration, command: &JobCommands, format
467467

468468
let mut params = JobListParams::new()
469469
.with_offset(*offset)
470-
.with_limit(*limit)
471470
.with_sort_by(sort_by.clone().unwrap_or_default())
472471
.with_reverse_sort(*reverse_sort)
473472
.with_include_relationships(*include_relationships);
474473

474+
if let Some(limit_val) = limit {
475+
params = params.with_limit(*limit_val);
476+
}
477+
475478
if let Some(job_status) = job_status {
476479
params = params.with_status(job_status);
477480
}
@@ -1174,7 +1177,7 @@ pub fn create_jobs_from_file(
11741177
}
11751178

11761179
// Create jobs in batches using bulk API
1177-
const BATCH_SIZE: usize = 1000;
1180+
const BATCH_SIZE: usize = 10000;
11781181
let mut total_created = 0;
11791182

11801183
for batch in jobs.chunks(BATCH_SIZE) {
@@ -1218,7 +1221,7 @@ pub fn get_existing_job_names(
12181221
) -> Result<HashSet<String>, Box<dyn std::error::Error>> {
12191222
let mut names = HashSet::new();
12201223
let mut offset = 0;
1221-
const PAGE_SIZE: i64 = 1000;
1224+
const PAGE_SIZE: i64 = 10_000;
12221225

12231226
loop {
12241227
let response = default_api::list_jobs(

src/client/commands/pagination/base.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ impl<T: Paginatable> PaginatedIterator<T> {
9191
/// # Arguments
9292
/// * `config` - API configuration
9393
/// * `params` - Resource-specific parameters
94-
/// * `initial_limit` - Page size for each API call (default: 1000)
94+
/// * `initial_limit` - Page size for each API call (default: 10,000)
9595
pub fn new(
9696
config: apis::configuration::Configuration,
9797
params: T::Params,
@@ -102,7 +102,7 @@ impl<T: Paginatable> PaginatedIterator<T> {
102102
config,
103103
params,
104104
remaining_limit,
105-
initial_limit: initial_limit.unwrap_or(1000),
105+
initial_limit: initial_limit.unwrap_or(10_000),
106106
current_page: Vec::new().into_iter(),
107107
finished: false,
108108
}
@@ -173,7 +173,7 @@ pub fn paginate<T: Paginatable>(
173173
config: &apis::configuration::Configuration,
174174
params: T::Params,
175175
) -> Result<Vec<T>, apis::Error<T::ListError>> {
176-
let initial_limit = params.limit().unwrap_or(1000);
176+
let initial_limit = params.limit().unwrap_or(10_000);
177177
let iter = PaginatedIterator::<T>::new(config.clone(), params, Some(initial_limit));
178178
iter.collect()
179179
}

src/client/commands/resource_requirements.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ pub enum ResourceRequirementsCommands {
5757
/// List resource requirements for this workflow (optional - will prompt if not provided)
5858
#[arg()]
5959
workflow_id: Option<i64>,
60-
/// Maximum number of resource requirements to return
61-
#[arg(short, long, default_value = "10000")]
62-
limit: i64,
60+
/// Maximum number of resource requirements to return (default: all)
61+
#[arg(short, long)]
62+
limit: Option<i64>,
6363
/// Offset for pagination (0-based)
6464
#[arg(long, default_value = "0")]
6565
offset: i64,
@@ -174,12 +174,15 @@ pub fn handle_resource_requirements_commands(
174174
};
175175

176176
// Use pagination utility to get all resource requirements
177-
let params = pagination::ResourceRequirementsListParams::new()
177+
let mut params = pagination::ResourceRequirementsListParams::new()
178178
.with_offset(*offset)
179-
.with_limit(*limit)
180179
.with_sort_by(sort_by.clone().unwrap_or_default())
181180
.with_reverse_sort(*reverse_sort);
182181

182+
if let Some(limit_val) = limit {
183+
params = params.with_limit(*limit_val);
184+
}
185+
183186
match pagination::paginate_resource_requirements(
184187
config,
185188
selected_workflow_id as i64,

src/client/commands/results.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,9 @@ pub enum ResultCommands {
121121
/// Filter by job status (uninitialized, blocked, canceled, terminated, done, ready, scheduled, running, pending, disabled)
122122
#[arg(short, long)]
123123
status: Option<String>,
124-
/// Maximum number of results to return
125-
#[arg(short, long, default_value = "10000")]
126-
limit: i64,
124+
/// Maximum number of results to return (default: all)
125+
#[arg(short, long)]
126+
limit: Option<i64>,
127127
/// Offset for pagination (0-based)
128128
#[arg(long, default_value = "0")]
129129
offset: i64,
@@ -177,9 +177,11 @@ pub fn handle_result_commands(config: &Configuration, command: &ResultCommands,
177177
};
178178

179179
// Use pagination utility to get all results
180-
let mut params = pagination::ResultListParams::new()
181-
.with_offset(*offset)
182-
.with_limit(*limit);
180+
let mut params = pagination::ResultListParams::new().with_offset(*offset);
181+
182+
if let Some(limit_val) = limit {
183+
params = params.with_limit(*limit_val);
184+
}
183185

184186
if let Some(job_id_val) = job_id {
185187
params = params.with_job_id(*job_id_val);

src/client/commands/scheduled_compute_nodes.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ pub enum ScheduledComputeNodeCommands {
4747
/// List scheduled compute nodes for this workflow (optional - will prompt if not provided)
4848
#[arg()]
4949
workflow_id: Option<i64>,
50-
/// Maximum number of scheduled compute nodes to return
51-
#[arg(short, long, default_value = "10000")]
52-
limit: i64,
50+
/// Maximum number of scheduled compute nodes to return (default: all)
51+
#[arg(short, long)]
52+
limit: Option<i64>,
5353
/// Offset for pagination (0-based)
5454
#[arg(short, long, default_value = "0")]
5555
offset: i64,
@@ -129,7 +129,7 @@ pub fn handle_scheduled_compute_node_commands(
129129
config,
130130
selected_workflow_id,
131131
Some(*offset),
132-
Some(*limit),
132+
*limit,
133133
sort_by.as_deref(),
134134
Some(*reverse_sort),
135135
scheduler_id.as_deref(),

0 commit comments

Comments
 (0)