|
1 | 1 | /* |
2 | | - * Copyright 2021 Oxide Computer Company |
| 2 | + * Copyright 2025 Oxide Computer Company |
3 | 3 | */ |
4 | 4 |
|
5 | 5 | use chrono::prelude::*; |
@@ -87,40 +87,50 @@ async fn worker_cleanup_one(log: &Logger, c: &Central) -> Result<()> { |
87 | 87 | continue; |
88 | 88 | } |
89 | 89 |
|
| 90 | + if j.worker.is_none() { |
| 91 | + /* |
| 92 | + * This job has not yet been assigned to a worker. We do not |
| 93 | + * want to cancel jobs that have merely been queued for a long |
| 94 | + * time. |
| 95 | + */ |
| 96 | + continue; |
| 97 | + } |
| 98 | + |
90 | 99 | /* |
91 | | - * Determine when we assigned this job to a worker by looking at the |
92 | | - * timestamp on the first control event. |
| 100 | + * Determine when we assigned this job to a worker: |
93 | 101 | */ |
94 | | - let control = |
95 | | - c.db.job_events(j.id, 0, 10_000)? |
96 | | - .iter() |
97 | | - .find(|jev| jev.stream == "control") |
98 | | - .cloned(); |
99 | | - if let Some(control) = control { |
100 | | - if control.age().as_secs() > c.config.job.max_runtime { |
101 | | - warn!( |
102 | | - log, |
103 | | - "job {} duration {} exceeds {} seconds; \ |
| 102 | + let times = c.db.job_times(j.id)?; |
| 103 | + let Some(atime) = times.get("assigned") else { |
| 104 | + continue; |
| 105 | + }; |
| 106 | + let age = Utc::now() |
| 107 | + .signed_duration_since(atime) |
| 108 | + .to_std() |
| 109 | + .unwrap_or_else(|_| std::time::Duration::from_secs(0)); |
| 110 | + |
| 111 | + if age.as_secs() > c.config.job.max_runtime { |
| 112 | + warn!( |
| 113 | + log, |
| 114 | + "job {} duration {} exceeds {} seconds; \ |
104 | 115 | recycling worker {}", |
105 | | - j.id, |
106 | | - control.age().as_secs(), |
| 116 | + j.id, |
| 117 | + age.as_secs(), |
| 118 | + c.config.job.max_runtime, |
| 119 | + w.id, |
| 120 | + ); |
| 121 | + c.db.job_append_event( |
| 122 | + j.id, |
| 123 | + None, |
| 124 | + "control", |
| 125 | + Utc::now(), |
| 126 | + None, |
| 127 | + &format!( |
| 128 | + "job duration {} exceeds {} seconds; aborting", |
| 129 | + age.as_secs(), |
107 | 130 | c.config.job.max_runtime, |
108 | | - w.id, |
109 | | - ); |
110 | | - c.db.job_append_event( |
111 | | - j.id, |
112 | | - None, |
113 | | - "control", |
114 | | - Utc::now(), |
115 | | - None, |
116 | | - &format!( |
117 | | - "job duration {} exceeds {} seconds; aborting", |
118 | | - control.age().as_secs(), |
119 | | - c.config.job.max_runtime, |
120 | | - ), |
121 | | - )?; |
122 | | - c.db.worker_recycle(w.id)?; |
123 | | - } |
| 131 | + ), |
| 132 | + )?; |
| 133 | + c.db.worker_recycle(w.id)?; |
124 | 134 | } |
125 | 135 | } |
126 | 136 | } |
|
0 commit comments