Skip to content

Commit f725b6a

Browse files
quite4worktyranron
andcommitted
Add -replacing flag
Co-authored-by: tyranron <tyranron@gmail.com>
1 parent bd3db51 commit f725b6a

File tree

5 files changed

+106
-19
lines changed

5 files changed

+106
-19
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,8 @@ You can optionally disable this behavior and allow overlapping instances of
212212
your jobs by passing the `-overlapping` flag to Supercronic. Supercronic will
213213
still warn about jobs falling behind, but will run duplicate instances of them.
214214

215+
If you pass `-replacing` flag and it's time for a new job iteration to run,
216+
Supercronic will kill the previous job process if it hasn't finished yet.
215217

216218
## Reload crontab
217219

cron/cron.go

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func startReaderDrain(wg *sync.WaitGroup, readerLogger *logrus.Entry, reader io.
6363
}()
6464
}
6565

66-
func runJob(cronCtx *crontab.Context, command string, jobLogger *logrus.Entry, passthroughLogs bool) error {
66+
func runJob(cronCtx *crontab.Context, command string, jobLogger *logrus.Entry, passthroughLogs bool, nextRun time.Time, replacing bool) error {
6767
jobLogger.Info("starting")
6868

6969
cmd := exec.Command(cronCtx.Shell, "-c", command)
@@ -101,6 +101,22 @@ func runJob(cronCtx *crontab.Context, command string, jobLogger *logrus.Entry, p
101101
return err
102102
}
103103

104+
if replacing {
105+
ctx, cancel := context.WithDeadline(context.Background(), nextRun)
106+
defer cancel()
107+
go func(pid int) {
108+
// Kill command and its sub-processes once the deadline is exceeded.
109+
<-ctx.Done()
110+
if ctx.Err() == context.DeadlineExceeded {
111+
// Negative number tells to kill the whole process group.
112+
// By convention PGID of process group equals to the PID of the
113+
// group leader, so the command process is the first member of
114+
// the process group and is the group leader.
115+
syscall.Kill(-pid, syscall.SIGKILL)
116+
}
117+
}(cmd.Process.Pid)
118+
}
119+
104120
var wg sync.WaitGroup
105121

106122
if stdout != nil {
@@ -122,7 +138,7 @@ func runJob(cronCtx *crontab.Context, command string, jobLogger *logrus.Entry, p
122138
return nil
123139
}
124140

125-
func monitorJob(ctx context.Context, job *crontab.Job, t0 time.Time, jobLogger *logrus.Entry, overlapping bool, promMetrics *prometheus_metrics.PrometheusMetrics) {
141+
func monitorJob(ctx context.Context, job *crontab.Job, t0 time.Time, jobLogger *logrus.Entry, overlapping bool, replacing bool, promMetrics *prometheus_metrics.PrometheusMetrics) {
126142
t := t0
127143

128144
for {
@@ -134,6 +150,9 @@ func monitorJob(ctx context.Context, job *crontab.Job, t0 time.Time, jobLogger *
134150
if overlapping {
135151
m = "overlapping jobs"
136152
}
153+
if replacing {
154+
m = "replacing job"
155+
}
137156

138157
jobLogger.Warnf("%s: job is still running since %s (%s elapsed)", m, t0, t.Sub(t0))
139158

@@ -149,9 +168,10 @@ func startFunc(
149168
exitCtx context.Context,
150169
logger *logrus.Entry,
151170
overlapping bool,
171+
replacing bool,
152172
expression crontab.Expression,
153173
timezone *time.Location,
154-
fn func(time.Time, *logrus.Entry),
174+
fn func(time.Time, *logrus.Entry, bool),
155175
) {
156176
wg.Add(1)
157177

@@ -200,7 +220,7 @@ func startFunc(
200220
"iteration": cronIteration,
201221
})
202222

203-
fn(nextRun, jobLogger)
223+
fn(nextRun, jobLogger, replacing)
204224
}
205225

206226
if overlapping {
@@ -221,10 +241,11 @@ func StartJob(
221241
exitCtx context.Context,
222242
cronLogger *logrus.Entry,
223243
overlapping bool,
244+
replacing bool,
224245
passthroughLogs bool,
225246
promMetrics *prometheus_metrics.PrometheusMetrics,
226247
) {
227-
runThisJob := func(t0 time.Time, jobLogger *logrus.Entry) {
248+
runThisJob := func(t0 time.Time, jobLogger *logrus.Entry, replacing bool) {
228249
promMetrics.CronsCurrentlyRunningGauge.With(jobPromLabels(job)).Inc()
229250

230251
defer func() {
@@ -234,15 +255,16 @@ func StartJob(
234255
monitorCtx, cancelMonitor := context.WithCancel(context.Background())
235256
defer cancelMonitor()
236257

237-
go monitorJob(monitorCtx, job, t0, jobLogger, overlapping, promMetrics)
258+
go monitorJob(monitorCtx, job, t0, jobLogger, overlapping, replacing, promMetrics)
238259

239260
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
240261
promMetrics.CronsExecutionTimeHistogram.With(jobPromLabels(job)).Observe(v)
241262
}))
242263

243264
defer timer.ObserveDuration()
244265

245-
err := runJob(cronCtx, job.Command, jobLogger, passthroughLogs)
266+
nextRun := job.Expression.Next(t0)
267+
err := runJob(cronCtx, job.Command, jobLogger, passthroughLogs, nextRun, replacing)
246268

247269
promMetrics.CronsExecCounter.With(jobPromLabels(job)).Inc()
248270

@@ -262,6 +284,7 @@ func StartJob(
262284
exitCtx,
263285
cronLogger,
264286
overlapping,
287+
replacing,
265288
job.Expression,
266289
cronCtx.Timezone,
267290
runThisJob,

cron/cron_test.go

Lines changed: 67 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ func TestRunJob(t *testing.T) {
150150
label := fmt.Sprintf("RunJob(%q)", tt.command)
151151
logger, channel := newTestLogger()
152152

153-
err := runJob(tt.context, tt.command, logger, false)
153+
err := runJob(tt.context, tt.command, logger, false, time.Now(), false)
154154
if tt.success {
155155
assert.Nil(t, err, label)
156156
} else {
@@ -198,7 +198,7 @@ func TestStartJobExitsOnRequest(t *testing.T) {
198198
ctx, cancel := context.WithCancel(context.Background())
199199
cancel()
200200

201-
StartJob(&wg, &basicContext, &job, ctx, logger, false, false, &PROM_METRICS)
201+
StartJob(&wg, &basicContext, &job, ctx, logger, false, false, false, &PROM_METRICS)
202202

203203
wg.Wait()
204204
}
@@ -218,7 +218,7 @@ func TestStartJobRunsJob(t *testing.T) {
218218

219219
logger, channel := newTestLogger()
220220

221-
StartJob(&wg, &basicContext, &job, ctx, logger, false, false, &PROM_METRICS)
221+
StartJob(&wg, &basicContext, &job, ctx, logger, false, false, false, &PROM_METRICS)
222222

223223
select {
224224
case entry := <-channel:
@@ -266,6 +266,62 @@ func TestStartJobRunsJob(t *testing.T) {
266266
wg.Wait()
267267
}
268268

269+
func TestStartJobReplacesPreviousJobs(t *testing.T) {
270+
job := crontab.Job{
271+
CrontabLine: crontab.CrontabLine{
272+
Expression: &testExpression{2 * time.Second},
273+
Schedule: "always!",
274+
Command: "sleep 100",
275+
},
276+
Position: 1,
277+
}
278+
279+
var wg sync.WaitGroup
280+
ctx, cancel := context.WithCancel(context.Background())
281+
282+
logger, channel := newTestLogger()
283+
284+
StartJob(&wg, &basicContext, &job, ctx, logger, false, true, false, &PROM_METRICS)
285+
286+
select {
287+
case entry := <-channel:
288+
assert.Regexp(t, regexp.MustCompile("job will run next"), entry.Message)
289+
case <-time.After(time.Second):
290+
t.Fatalf("timed out waiting for schedule")
291+
}
292+
293+
select {
294+
case entry := <-channel:
295+
assert.Regexp(t, regexp.MustCompile("starting"), entry.Message)
296+
case <-time.After(3 * time.Second):
297+
t.Fatalf("timed out waiting for start")
298+
}
299+
300+
select {
301+
case entry := <-channel:
302+
assert.Regexp(t, regexp.MustCompile("replacing job"), entry.Message)
303+
case <-time.After(3 * time.Second):
304+
t.Fatalf("timed out waiting for job replace warning")
305+
}
306+
307+
select {
308+
case entry := <-channel:
309+
assert.Regexp(t, regexp.MustCompile("killed"), entry.Message)
310+
case <-time.After(time.Second):
311+
t.Fatalf("timed out waiting for job kill")
312+
}
313+
314+
select {
315+
case entry := <-channel:
316+
assert.Regexp(t, regexp.MustCompile("job will run next"), entry.Message)
317+
case <-time.After(time.Second):
318+
t.Fatalf("timed out waiting for schedule of the second job iteration")
319+
}
320+
321+
cancel()
322+
wg.Wait()
323+
}
324+
269325
func TestStartFuncWaitsForCompletion(t *testing.T) {
270326
// We use startFunc to start a function, wait for it to start, then
271327
// tell the whole thing to exit, and verify that it waits for the
@@ -281,12 +337,12 @@ func TestStartFuncWaitsForCompletion(t *testing.T) {
281337
ctxStep1, step1Done := context.WithCancel(context.Background())
282338
ctxStep2, step2Done := context.WithCancel(context.Background())
283339

284-
testFn := func(t0 time.Time, jobLogger *logrus.Entry) {
340+
testFn := func(t0 time.Time, jobLogger *logrus.Entry, replacing bool) {
285341
step1Done()
286342
<-ctxStep2.Done()
287343
}
288344

289-
startFunc(&wg, ctxStartFunc, logger, false, expr, time.Local, testFn)
345+
startFunc(&wg, ctxStartFunc, logger, false, false, expr, time.Local, testFn)
290346
go func() {
291347
wg.Wait()
292348
allDone()
@@ -329,12 +385,12 @@ func TestStartFuncDoesNotRunOverlappingJobs(t *testing.T) {
329385
ctxStartFunc, cancelStartFunc := context.WithCancel(context.Background())
330386
ctxAllDone, allDone := context.WithCancel(context.Background())
331387

332-
testFn := func(t0 time.Time, jobLogger *logrus.Entry) {
388+
testFn := func(t0 time.Time, jobLogger *logrus.Entry, replacing bool) {
333389
testChan <- nil
334390
<-ctxAllDone.Done()
335391
}
336392

337-
startFunc(&wg, ctxStartFunc, logger, false, expr, time.Local, testFn)
393+
startFunc(&wg, ctxStartFunc, logger, false, false, expr, time.Local, testFn)
338394

339395
select {
340396
case <-testChan:
@@ -368,12 +424,12 @@ func TestStartFuncRunsOverlappingJobs(t *testing.T) {
368424
ctxStartFunc, cancelStartFunc := context.WithCancel(context.Background())
369425
ctxAllDone, allDone := context.WithCancel(context.Background())
370426

371-
testFn := func(t0 time.Time, jobLogger *logrus.Entry) {
427+
testFn := func(t0 time.Time, jobLogger *logrus.Entry, replacing bool) {
372428
testChan <- nil
373429
<-ctxAllDone.Done()
374430
}
375431

376-
startFunc(&wg, ctxStartFunc, logger, true, expr, time.Local, testFn)
432+
startFunc(&wg, ctxStartFunc, logger, true, false, expr, time.Local, testFn)
377433

378434
for i := 0; i < 5; i++ {
379435
select {
@@ -406,7 +462,7 @@ func TestStartFuncUsesTz(t *testing.T) {
406462

407463
it := 0
408464

409-
testFn := func(t0 time.Time, jobLogger *logrus.Entry) {
465+
testFn := func(t0 time.Time, jobLogger *logrus.Entry, replacing bool) {
410466
testChan <- t0.Location()
411467
it += 1
412468

@@ -422,7 +478,7 @@ func TestStartFuncUsesTz(t *testing.T) {
422478
}
423479
}
424480

425-
startFunc(&wg, ctxStartFunc, logger, false, expr, loc, testFn)
481+
startFunc(&wg, ctxStartFunc, logger, false, false, expr, loc, testFn)
426482

427483
for i := 0; i < 5; i++ {
428484
select {

integration/test.bats

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ wait_for() {
6262
[[ "$n" -ge 4 ]]
6363
}
6464

65+
@test "it runs replacing jobs" {
66+
n="$(SUPERCRONIC_ARGS="-replacing" run_supercronic "${BATS_TEST_DIRNAME}/timeout.crontab" 5s | grep -iE "killed" | wc -l)"
67+
[[ "$n" -ge 3 ]]
68+
}
69+
6570
@test "it supports debug logging " {
6671
SUPERCRONIC_ARGS="-debug" run_supercronic "${BATS_TEST_DIRNAME}/hello.crontab" | grep -iE "debug"
6772
}

main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ func main() {
4242
sentry := flag.String("sentry-dsn", "", "enable Sentry error logging, using provided DSN")
4343
sentryAlias := flag.String("sentryDsn", "", "alias for sentry-dsn")
4444
overlapping := flag.Bool("overlapping", false, "enable tasks overlapping")
45+
replacing := flag.Bool("replacing", false, "enable tasks replacing")
4546
flag.Parse()
4647

4748
var sentryDsn string
@@ -147,7 +148,7 @@ func main() {
147148
"job.position": job.Position,
148149
})
149150

150-
cron.StartJob(&wg, tab.Context, job, exitCtx, cronLogger, *overlapping, *passthroughLogs, &promMetrics)
151+
cron.StartJob(&wg, tab.Context, job, exitCtx, cronLogger, *overlapping, *replacing, *passthroughLogs, &promMetrics)
151152
}
152153

153154
termChan := make(chan os.Signal, 1)

0 commit comments

Comments
 (0)