@@ -30,6 +30,54 @@ CronQueue::CronQueue(const Logger &parent_logger,
3030
3131CronQueue::~CronQueue () noexcept = default ;
3232
33+ inline void
34+ CronQueue::Prepare ()
35+ {
36+ db.Prepare (" release_stale" , R"SQL(
37+ UPDATE cronjobs
38+ SET node_name=NULL, node_timeout=NULL, next_run=NULL
39+ WHERE node_name=$1
40+ )SQL" ,
41+ 1 );
42+
43+ db.Prepare (" claim_job" , R"SQL(
44+ UPDATE cronjobs
45+ SET node_name=$2, node_timeout=now()+$3::INTERVAL
46+ WHERE id=$1 AND enabled AND node_name IS NULL
47+ )SQL" ,
48+ 3 );
49+
50+ db.Prepare (" finish_job" , R"SQL(
51+ UPDATE cronjobs
52+ SET node_name=NULL, node_timeout=NULL, last_run=now(), next_run=NULL
53+ WHERE id=$1 AND node_name=$2
54+ )SQL" ,
55+ 2 );
56+
57+ db.Prepare (" insert_result" , R"SQL(
58+ INSERT INTO cronresults(cronjob_id, node_name, start_time, exit_status, log)
59+ VALUES($1, $2, $3, $4, $5)
60+ )SQL" ,
61+ 5 );
62+
63+ db.Prepare (" find_earliest_pending" , R"SQL(
64+ SELECT EXTRACT(EPOCH FROM (MIN(next_run) - now())) FROM cronjobs
65+ WHERE enabled AND next_run IS NOT NULL AND next_run != 'infinity' AND node_name IS NULL
66+ )SQL" ,
67+ 0 );
68+
69+ db.Prepare (" check_pending" , R"SQL(
70+ SELECT id, account_id, command, translate_param, notification
71+ FROM cronjobs WHERE enabled AND next_run<=now()
72+ AND node_name IS NULL
73+ ORDER BY next_run
74+ LIMIT 1
75+ )SQL" ,
76+ 0 );
77+
78+ InitCalculateNextRun (db);
79+ }
80+
3381void
3482CronQueue::CheckEnabled () noexcept
3583{
@@ -81,10 +129,7 @@ CronQueue::EnableFull() noexcept
81129void
82130CronQueue::ReleaseStale ()
83131{
84- const auto result = db.ExecuteParams (" UPDATE cronjobs "
85- " SET node_name=NULL, node_timeout=NULL, next_run=NULL "
86- " WHERE node_name=$1" ,
87- node_name.c_str ());
132+ const auto result = db.ExecutePrepared (" release_stale" , node_name.c_str ());
88133
89134 unsigned n = result.GetAffectedRows ();
90135 if (n > 0 )
@@ -124,9 +169,7 @@ CronQueue::ScheduleScheduler(bool immediately) noexcept
124169static std::chrono::seconds
125170FindEarliestPending (Pg::Connection &db)
126171{
127- const auto result =
128- db.Execute (" SELECT EXTRACT(EPOCH FROM (MIN(next_run) - now())) FROM cronjobs "
129- " WHERE enabled AND next_run IS NOT NULL AND next_run != 'infinity' AND node_name IS NULL" );
172+ const auto result = db.ExecutePrepared (" find_earliest_pending" );
130173
131174 if (result.IsEmpty () || result.IsValueNull (0 , 0 ))
132175 /* no matching cronjob; disable the timer, and wait for the
@@ -188,13 +231,8 @@ CronQueue::Claim(const CronJob &job) noexcept
188231try {
189232 const char *timeout = " 5 minutes" ;
190233
191- const auto r =
192- db.ExecuteParams (" UPDATE cronjobs "
193- " SET node_name=$2, node_timeout=now()+$3::INTERVAL "
194- " WHERE id=$1 AND enabled AND node_name IS NULL" ,
195- job.id .c_str (),
196- node_name.c_str (),
197- timeout);
234+ const auto r = db.ExecutePrepared (" claim_job" , job.id .c_str (),
235+ node_name.c_str (), timeout);
198236 if (r.GetAffectedRows () == 0 ) {
199237 logger (3 , " Lost race to run job '" , job.id , " '" );
200238 return false ;
@@ -211,12 +249,7 @@ CronQueue::Finish(const CronJob &job) noexcept
211249try {
212250 ScheduleCheckNotify ();
213251
214- const auto r =
215- db.ExecuteParams (" UPDATE cronjobs "
216- " SET node_name=NULL, node_timeout=NULL, last_run=now(), next_run=NULL "
217- " WHERE id=$1 AND node_name=$2" ,
218- job.id .c_str (),
219- node_name.c_str ());
252+ const auto r = db.ExecutePrepared (" finish_job" , job.id .c_str (), node_name.c_str ());
220253 if (r.GetAffectedRows () == 0 ) {
221254 logger (3 , " Lost race to finish job '" , job.id , " '" );
222255 return ;
@@ -231,13 +264,12 @@ CronQueue::InsertResult(const CronJob &job, const char *start_time,
231264try {
232265 ScheduleCheckNotify ();
233266
234- db.ExecuteParams (" INSERT INTO cronresults(cronjob_id, node_name, start_time, exit_status, log) "
235- " VALUES($1, $2, $3, $4, $5)" ,
236- job.id .c_str (),
237- node_name.c_str (),
238- start_time,
239- result.exit_status ,
240- result.log .c_str ());
267+ db.ExecutePrepared (" insert_result" ,
268+ job.id .c_str (),
269+ node_name.c_str (),
270+ start_time,
271+ result.exit_status ,
272+ result.log .c_str ());
241273} catch (...) {
242274 db.CheckError (std::current_exception ());
243275}
@@ -248,12 +280,7 @@ CronQueue::CheckPending()
248280 if (!IsEnabled ())
249281 return false ;
250282
251- const auto result =
252- db.Execute (" SELECT id, account_id, command, translate_param, notification "
253- " FROM cronjobs WHERE enabled AND next_run<=now() "
254- " AND node_name IS NULL "
255- " ORDER BY next_run "
256- " LIMIT 1" );
283+ const auto result = db.ExecutePrepared (" check_pending" );
257284 if (result.IsEmpty ())
258285 return false ;
259286
@@ -282,6 +309,8 @@ CronQueue::OnConnect()
282309 throw FmtRuntimeError (" PostgreSQL version {:?} is too old, need at least 9.6" ,
283310 db.GetParameterStatus (" server_version" ));
284311
312+ Prepare ();
313+
285314 db.Execute (" LISTEN cronjobs_modified" );
286315 db.Execute (" LISTEN cronjobs_scheduled" );
287316
0 commit comments