Skip to content

Commit e29067e

Browse files
committed
workshop: add plan option "notify_progress"
1 parent 7be2e12 commit e29067e

File tree

9 files changed

+52
-7
lines changed

9 files changed

+52
-7
lines changed

debian/changelog

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
cm4all-workshop (7.3) unstable; urgency=low
22

3-
*
3+
* workshop: add plan option "notify_progress"
44

55
--
66

doc/index.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,10 @@ The following options are available:
315315

316316
The ``control_channel`` option is allowed, but not ``allow_spawn``.
317317

318+
* :samp:`notify_progress`: send a PostgreSQL notify
319+
``job_progress:PLAN_NAME`` after the ``progress`` column of a job
320+
was updated.
321+
318322
* :samp:`control_channel`: see `Control Channel`_.
319323

320324
.. _allow_spawn:

src/workshop/Job.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
#include "Queue.hxx"
77

88
bool
9-
WorkshopJob::SetProgress(unsigned progress, const char *timeout) noexcept
9+
WorkshopJob::SetProgress(unsigned progress, const char *timeout, bool notify) noexcept
1010
{
11-
return queue.SetJobProgress(*this, progress, timeout);
11+
return queue.SetJobProgress(*this, progress, timeout, notify);
1212
}
1313

1414
void

src/workshop/Job.hxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ struct WorkshopJob {
3939
* string that is understood by PostgreSQL)
4040
* @return true on success
4141
*/
42-
bool SetProgress(unsigned progress, const char *timeout) noexcept;
42+
bool SetProgress(unsigned progress, const char *timeout, bool notify) noexcept;
4343

4444
/**
4545
* Add more environment variables to the record in the "jobs"

src/workshop/Operator.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ WorkshopOperator::OnProgress(unsigned progress) noexcept
409409
LogBridge::Flush() call */
410410
return;
411411

412-
job.SetProgress(progress, plan->timeout.c_str());
412+
job.SetProgress(progress, plan->timeout.c_str(), plan->notify_progress);
413413

414414
/* refresh the timeout */
415415
ScheduleTimeout();

src/workshop/Plan.hxx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ struct Plan {
6666

6767
bool translate = false;
6868

69+
bool notify_progress = false;
70+
6971
Plan() = default;
7072

7173
Plan(Plan &&) = default;

src/workshop/PlanLoader.cxx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ PlanLoader::ParseLine(FileLineParser &line)
7171
seen_exec_option = true;
7272
} else if (StringIsEqual(key, "translate")) {
7373
plan.translate = true;
74+
} else if (StringIsEqual(key, "notify_progress")) {
75+
plan.notify_progress = true;
7476
} else if (StringIsEqual(key, "control_channel")) {
7577
/* previously, the "yes"/"no" parameter was mandatory, but
7678
that's deprecated since 2.0.36 */

src/workshop/Queue.cxx

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ WorkshopQueue::WorkshopQueue(const Logger &parent_logger,
3333
db(event_loop, std::move(_db_config), *this),
3434
check_notify_event(event_loop, BIND_THIS_METHOD(CheckNotify)),
3535
timer_event(event_loop, BIND_THIS_METHOD(OnTimer)),
36+
progress_notify_timer(event_loop, BIND_THIS_METHOD(OnProgressNotifyTimer)),
3637
handler(_handler)
3738
{
3839
}
@@ -48,6 +49,19 @@ WorkshopQueue::OnTimer() noexcept
4849
Run();
4950
}
5051

52+
void
53+
WorkshopQueue::OnProgressNotifyTimer() noexcept
54+
{
55+
try {
56+
for (std::string_view plan_name : progress_notify_plans)
57+
db.Execute(fmt::format("NOTIFY \"job_progress:{}\"", plan_name).c_str());
58+
} catch (...) {
59+
db.CheckError(std::current_exception());
60+
}
61+
62+
progress_notify_plans.clear();
63+
}
64+
5165
bool
5266
WorkshopQueue::GetNextScheduled(int *span_r)
5367
{
@@ -345,14 +359,19 @@ WorkshopQueue::CheckRateLimit(const char *plan_name,
345359

346360
bool
347361
WorkshopQueue::SetJobProgress(const WorkshopJob &job, unsigned progress,
348-
const char *timeout) noexcept
362+
const char *timeout, bool notify) noexcept
349363
{
350364
assert(&job.queue == this);
351365

352366
logger(5, "job ", job.id, " progress=", progress);
353367

354368
ScheduleCheckNotify();
355369

370+
if (notify)
371+
if (auto [it, inserted] = progress_notify_plans.emplace(job.plan_name);
372+
inserted && !progress_notify_timer.IsPending())
373+
progress_notify_timer.Schedule(std::chrono::milliseconds{250});
374+
356375
try {
357376
pg_set_job_progress(db, job.id.c_str(), progress, timeout);
358377
return true;
@@ -497,6 +516,7 @@ WorkshopQueue::OnDisconnect() noexcept
497516
{
498517
logger(4, "disconnected from database");
499518

519+
progress_notify_timer.Cancel();
500520
timer_event.Cancel();
501521
check_notify_event.Cancel();
502522
}

src/workshop/Queue.hxx

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include "pg/AsyncConnection.hxx"
1010
#include "io/Logger.hxx"
1111

12+
#include <set>
1213
#include <string>
1314
#include <chrono>
1415

@@ -72,6 +73,19 @@ class WorkshopQueue final : private Pg::AsyncConnectionHandler {
7273
*/
7374
FineTimerEvent timer_event;
7475

76+
/**
77+
* This timer delays the submission of progress notifies by
78+
* some time to allow merging several notifies into one.
79+
*/
80+
FineTimerEvent progress_notify_timer;
81+
82+
/**
83+
* The list of plan names that have seen progress updates, to
84+
* be handled by #progress_notify_timer /
85+
* OnProgressNotifyTimer().
86+
*/
87+
std::set<std::string, std::less<>> progress_notify_plans;
88+
7589
std::string plans_include, plans_exclude, plans_lowprio;
7690
std::chrono::steady_clock::time_point next_expire_check =
7791
std::chrono::steady_clock::time_point::min();
@@ -147,10 +161,11 @@ public:
147161
unsigned max_count) noexcept;
148162

149163
/**
164+
* @param notify send a PostgreSQL NOTIFY?
150165
* @return true on success
151166
*/
152167
bool SetJobProgress(const WorkshopJob &job, unsigned progress,
153-
const char *timeout) noexcept;
168+
const char *timeout, bool notify) noexcept;
154169

155170
void SetJobEnv(const WorkshopJob &job, const char *more_env);
156171

@@ -197,6 +212,8 @@ private:
197212
timer_event.Schedule(d);
198213
}
199214

215+
void OnProgressNotifyTimer() noexcept;
216+
200217
/**
201218
* Schedule a queue run. It will occur "very soon" (in a few
202219
* milliseconds).

0 commit comments

Comments
 (0)