1
1
//! Jobs for database maintenance
2
2
use std:: collections:: HashMap ;
3
3
use std:: sync:: Arc ;
4
- use std:: time:: Duration ;
4
+ use std:: time:: { Duration , Instant } ;
5
5
6
6
use async_trait:: async_trait;
7
7
use diesel:: { prelude:: RunQueryDsl , sql_query, sql_types:: Double } ;
8
8
9
- use graph:: prelude:: { error, Logger , MetricsRegistry , StoreError } ;
9
+ use graph:: env:: env_var;
10
+ use graph:: prelude:: { chrono, error, lazy_static, Logger , MetricsRegistry , StoreError } ;
10
11
use graph:: prometheus:: Gauge ;
11
12
use graph:: util:: jobs:: { Job , Runner } ;
12
13
13
14
use crate :: connection_pool:: ConnectionPool ;
14
- use crate :: { Store , SubgraphStore } ;
15
+ use crate :: { unused, Store , SubgraphStore } ;
16
+
17
+ lazy_static ! {
18
+ static ref UNUSED_INTERVAL : chrono:: Duration = {
19
+ let interval: u32 = env_var( "GRAPH_REMOVE_UNUSED_INTERVAL" , 360 ) ;
20
+ chrono:: Duration :: minutes( interval as i64 )
21
+ } ;
22
+ }
15
23
16
24
pub fn register (
17
25
runner : & mut Runner ,
@@ -33,6 +41,12 @@ pub fn register(
33
41
Arc :: new ( MirrorPrimary :: new ( store. subgraph_store ( ) ) ) ,
34
42
Duration :: from_secs ( 15 * 60 ) ,
35
43
) ;
44
+
45
+ // Remove unused deployments every 2 hours
46
+ runner. register (
47
+ Arc :: new ( UnusedJob :: new ( store. subgraph_store ( ) ) ) ,
48
+ Duration :: from_secs ( 2 * 60 * 60 ) ,
49
+ )
36
50
}
37
51
38
52
/// A job that vacuums `subgraphs.subgraph_deployment`. With a large number
@@ -141,3 +155,62 @@ impl Job for MirrorPrimary {
141
155
self . store . mirror_primary_tables ( logger) . await ;
142
156
}
143
157
}
158
+
159
+ struct UnusedJob {
160
+ store : Arc < SubgraphStore > ,
161
+ }
162
+
163
+ impl UnusedJob {
164
+ fn new ( store : Arc < SubgraphStore > ) -> UnusedJob {
165
+ UnusedJob { store }
166
+ }
167
+ }
168
+
169
+ #[ async_trait]
170
+ impl Job for UnusedJob {
171
+ fn name ( & self ) -> & str {
172
+ "Record and remove unused deployments"
173
+ }
174
+
175
+ /// Record unused deployments and remove ones that were recorded at
176
+ /// least `UNUSED_INTERVAL` ago
177
+ async fn run ( & self , logger : & Logger ) {
178
+ // Work on removing about 5 minutes
179
+ const REMOVAL_DEADLINE : Duration = Duration :: from_secs ( 5 * 60 ) ;
180
+
181
+ let start = Instant :: now ( ) ;
182
+
183
+ if let Err ( e) = self . store . record_unused_deployments ( ) {
184
+ error ! ( logger, "failed to record unused deployments" ; "error" => e. to_string( ) ) ;
185
+ return ;
186
+ }
187
+
188
+ let remove = match self
189
+ . store
190
+ . list_unused_deployments ( unused:: Filter :: UnusedLongerThan ( * UNUSED_INTERVAL ) )
191
+ {
192
+ Ok ( remove) => remove,
193
+ Err ( e) => {
194
+ error ! ( logger, "failed to list removable deployments" ; "error" => e. to_string( ) ) ;
195
+ return ;
196
+ }
197
+ } ;
198
+
199
+ for deployment in remove {
200
+ match self . store . remove_deployment ( deployment. id ) {
201
+ Ok ( ( ) ) => { /* ignore */ }
202
+ Err ( e) => {
203
+ error ! ( logger, "failed to remove unused deployment" ;
204
+ "sgd" => deployment. id. to_string( ) ,
205
+ "deployment" => deployment. deployment,
206
+ "error" => e. to_string( ) ) ;
207
+ }
208
+ }
209
+ // Stop working on removing after a while to not block other
210
+ // jobs for too long
211
+ if start. elapsed ( ) > REMOVAL_DEADLINE {
212
+ return ;
213
+ }
214
+ }
215
+ }
216
+ }
0 commit comments