Skip to content

Commit a418c90

Browse files
fix: use right execution_dates to fetch airflow job runs for varying schedule intervals (#396)
* fix: getjobruns on varying schedules * fix: fix lint issues * refactor: remove unused code
1 parent a976283 commit a418c90

File tree

5 files changed

+202
-44
lines changed

5 files changed

+202
-44
lines changed

core/cron/cron.go

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,30 @@ func ParseCronSchedule(interval string) (*ScheduleSpec, error) {
3636
}, nil
3737
}
3838

39-
// Interval accepts the time and returns
40-
func (s *ScheduleSpec) Interval(t time.Time) time.Duration {
41-
start := s.Next(t)
42-
next := s.Next(start)
43-
return next.Sub(start)
39+
func (s *ScheduleSpec) Prev(currTime time.Time) time.Time {
40+
startTime := s.getEarliestTimeToStartCron(currTime)
41+
return s.getPreviousSchedule(currTime, startTime)
42+
}
43+
44+
func (s *ScheduleSpec) getPreviousSchedule(currTime time.Time, startTime time.Time) time.Time {
45+
previousSchedule := startTime
46+
for {
47+
nextSchedule := s.Next(previousSchedule)
48+
if nextSchedule.After(currTime) || nextSchedule.Equal(currTime) {
49+
return previousSchedule
50+
}
51+
previousSchedule = nextSchedule
52+
}
53+
}
54+
55+
func (s *ScheduleSpec) getEarliestTimeToStartCron(currTime time.Time) time.Time {
56+
initialDelay := -time.Hour * 24 * 7 //nolint:gomnd
57+
startTime := currTime
58+
for {
59+
startTime = startTime.Add(initialDelay)
60+
if s.Next(startTime).Before(currTime) {
61+
break
62+
}
63+
}
64+
return startTime
4465
}

core/cron/cron_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package cron_test
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/assert"
8+
9+
"github.com/odpf/optimus/core/cron"
10+
)
11+
12+
func TestScheduleSpec(t *testing.T) {
13+
t.Run("Prev", func(t *testing.T) {
14+
t.Run("with constant interval", func(t *testing.T) {
15+
scheduleSpec, err := cron.ParseCronSchedule("@midnight")
16+
assert.Nil(t, err)
17+
scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-25T02:00:00+00:00")
18+
prevScheduleTime := scheduleSpec.Prev(scheduleStartTime)
19+
expectedTime, _ := time.Parse(time.RFC3339, "2022-03-25T00:00:00+00:00")
20+
assert.Equal(t, prevScheduleTime, expectedTime)
21+
})
22+
t.Run("with varying interval", func(t *testing.T) {
23+
// at 2 AM every month on 2,11,19,26
24+
scheduleSpec, err := cron.ParseCronSchedule("0 2 2,11,19,26 * *")
25+
assert.Nil(t, err)
26+
27+
scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-19T01:59:59+00:00")
28+
prevScheduleTime := scheduleSpec.Prev(scheduleStartTime)
29+
expectedTime, _ := time.Parse(time.RFC3339, "2022-03-11T02:00:00+00:00")
30+
assert.Equal(t, prevScheduleTime, expectedTime)
31+
})
32+
t.Run("with time falling on schedule time", func(t *testing.T) {
33+
scheduleSpec, err := cron.ParseCronSchedule("@monthly")
34+
assert.Nil(t, err)
35+
36+
scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-01T00:00:00+00:00")
37+
prevScheduleTime := scheduleSpec.Prev(scheduleStartTime)
38+
expectedTime, _ := time.Parse(time.RFC3339, "2022-02-01T00:00:00+00:00")
39+
assert.Equal(t, prevScheduleTime, expectedTime)
40+
})
41+
})
42+
t.Run("Next", func(t *testing.T) {
43+
t.Run("with constant interval", func(t *testing.T) {
44+
scheduleSpec, err := cron.ParseCronSchedule("@midnight")
45+
assert.Nil(t, err)
46+
scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-25T02:00:00+00:00")
47+
prevScheduleTime := scheduleSpec.Next(scheduleStartTime)
48+
expectedTime, _ := time.Parse(time.RFC3339, "2022-03-26T00:00:00+00:00")
49+
assert.Equal(t, prevScheduleTime, expectedTime)
50+
})
51+
t.Run("with varying interval", func(t *testing.T) {
52+
// at 2 AM every month on 2,11,19,26
53+
scheduleSpec, err := cron.ParseCronSchedule("0 2 2,11,19,26 * *")
54+
assert.Nil(t, err)
55+
56+
scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-19T02:01:59+00:00")
57+
prevScheduleTime := scheduleSpec.Next(scheduleStartTime)
58+
expectedTime, _ := time.Parse(time.RFC3339, "2022-03-26T02:00:00+00:00")
59+
assert.Equal(t, prevScheduleTime, expectedTime)
60+
})
61+
t.Run("with current time falling on schedule time", func(t *testing.T) {
62+
scheduleSpec, err := cron.ParseCronSchedule("@monthly")
63+
assert.Nil(t, err)
64+
65+
scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-01T00:00:00+00:00")
66+
prevScheduleTime := scheduleSpec.Next(scheduleStartTime)
67+
expectedTime, _ := time.Parse(time.RFC3339, "2022-04-01T00:00:00+00:00")
68+
assert.Equal(t, prevScheduleTime, expectedTime)
69+
})
70+
})
71+
}

ext/scheduler/airflow2/airflow.go

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -329,13 +329,7 @@ func (s *scheduler) GetJobRunStatus(ctx context.Context, projectSpec models.Proj
329329
func (s *scheduler) GetJobRuns(ctx context.Context, projectSpec models.ProjectSpec, jobQuery *models.JobQuery, jobCron *cron.ScheduleSpec) ([]models.JobRun, error) {
330330
var jobRuns []models.JobRun
331331
var dagRunList DagRunListResponse
332-
var dagRunRequest DagRunRequest
333-
if jobQuery.OnlyLastRun {
334-
dagRunRequest = getDagRunRequest(jobQuery)
335-
} else {
336-
jobQueryWithExecDate := covertToExecDate(jobQuery, jobCron)
337-
dagRunRequest = getDagRunRequest(jobQueryWithExecDate)
338-
}
332+
dagRunRequest := s.GetDagRunRequest(jobQuery, jobCron)
339333
reqBody, err := json.Marshal(dagRunRequest)
340334
if err != nil {
341335
return jobRuns, err
@@ -355,22 +349,38 @@ func (s *scheduler) GetJobRuns(ctx context.Context, projectSpec models.ProjectSp
355349
return getJobRuns(dagRunList, jobCron)
356350
}
357351

358-
func covertToExecDate(jobQuery *models.JobQuery, jobCron *cron.ScheduleSpec) *models.JobQuery {
359-
givenStartDate := jobQuery.StartDate
360-
givenEndDate := jobQuery.EndDate
352+
func (s *scheduler) GetDagRunRequest(jobQuery *models.JobQuery, jobCron *cron.ScheduleSpec) DagRunRequest {
353+
if jobQuery.OnlyLastRun {
354+
return DagRunRequest{
355+
OrderBy: "-execution_date",
356+
PageOffset: 0,
357+
PageLimit: 1,
358+
DagIds: []string{jobQuery.Name},
359+
}
360+
}
361+
startDate := s.getExecutionStartDate(jobQuery.StartDate, jobCron)
362+
endDate := s.getExecutionEndDate(jobQuery.EndDate, jobCron)
363+
return DagRunRequest{
364+
OrderBy: "execution_date",
365+
PageOffset: 0,
366+
PageLimit: pageLimit,
367+
DagIds: []string{jobQuery.Name},
368+
ExecutionDateGte: startDate.Format(airflowDateFormat),
369+
ExecutionDateLte: endDate.Format(airflowDateFormat),
370+
}
371+
}
361372

362-
duration := jobCron.Interval(givenStartDate)
363-
jobQuery.StartDate = givenStartDate.Add(-duration)
364-
jobQuery.EndDate = givenEndDate.Add(-duration)
373+
func (*scheduler) getExecutionStartDate(scheduleStartTime time.Time, jobCron *cron.ScheduleSpec) time.Time {
374+
return jobCron.Prev(scheduleStartTime)
375+
}
365376

366-
modifiedJobQuery := &models.JobQuery{
367-
Name: jobQuery.Name,
368-
StartDate: jobQuery.StartDate,
369-
EndDate: jobQuery.EndDate,
370-
Filter: jobQuery.Filter,
371-
OnlyLastRun: false,
377+
func (*scheduler) getExecutionEndDate(scheduleEndTime time.Time, jobCron *cron.ScheduleSpec) time.Time {
378+
// when the current time matches one of the schedule times execution time means previous schedule.
379+
if jobCron.Next(scheduleEndTime.Add(-time.Second * 1)).Equal(scheduleEndTime) {
380+
return jobCron.Prev(scheduleEndTime)
372381
}
373-
return modifiedJobQuery
382+
// else it is previous to previous schedule.
383+
return jobCron.Prev(jobCron.Prev(scheduleEndTime))
374384
}
375385

376386
func (*scheduler) notifyProgress(po progress.Observer, event progress.Event) {

ext/scheduler/airflow2/airflow_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -926,3 +926,78 @@ func TestAirflow2(t *testing.T) {
926926
})
927927
})
928928
}
929+
930+
func TestAirflow_GetDagRunsRequest(t *testing.T) {
931+
t.Run("only last runs", func(t *testing.T) {
932+
inputQuery := models.JobQuery{OnlyLastRun: true, Name: "dag1"}
933+
scheduler := airflow2.NewScheduler(nil, nil, nil)
934+
935+
scheduleSpec, _ := cron.ParseCronSchedule("@midnight")
936+
dagRunRequest := scheduler.GetDagRunRequest(&inputQuery, scheduleSpec)
937+
expectedDagRunRequest := airflow2.DagRunRequest{OrderBy: "-execution_date",
938+
PageOffset: 0,
939+
PageLimit: 1,
940+
DagIds: []string{"dag1"}}
941+
assert.Equal(t, dagRunRequest, expectedDagRunRequest)
942+
})
943+
t.Run("when input times doesn't fall exactly on schedule times", func(t *testing.T) {
944+
scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-25T00:00:00+00:00")
945+
scheduleEndTime, _ := time.Parse(time.RFC3339, "2022-03-29T00:00:00+00:00")
946+
expectedExecutionStartDate, _ := time.Parse(time.RFC3339, "2022-03-24T00:00:00+00:00")
947+
expectedExecutionEndDate, _ := time.Parse(time.RFC3339, "2022-03-28T00:00:00+00:00")
948+
inputQuery := models.JobQuery{Name: "dag1", StartDate: scheduleStartTime, EndDate: scheduleEndTime}
949+
scheduler := airflow2.NewScheduler(nil, nil, nil)
950+
951+
scheduleSpec, _ := cron.ParseCronSchedule("@midnight")
952+
dagRunRequest := scheduler.GetDagRunRequest(&inputQuery, scheduleSpec)
953+
expectedDagRunRequest := airflow2.DagRunRequest{
954+
OrderBy: "execution_date",
955+
PageOffset: 0,
956+
PageLimit: 99999,
957+
DagIds: []string{"dag1"},
958+
ExecutionDateLte: expectedExecutionEndDate.Format("2006-01-02T15:04:05+00:00"),
959+
ExecutionDateGte: expectedExecutionStartDate.Format("2006-01-02T15:04:05+00:00"),
960+
}
961+
assert.Equal(t, dagRunRequest, expectedDagRunRequest)
962+
})
963+
t.Run("when input times fall exactly on schedule times", func(t *testing.T) {
964+
scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-25T02:00:00+00:00")
965+
scheduleEndTime, _ := time.Parse(time.RFC3339, "2022-03-29T02:00:00+00:00")
966+
expectedExecutionStartDate, _ := time.Parse(time.RFC3339, "2022-03-25T00:00:00+00:00")
967+
expectedExecutionEndDate, _ := time.Parse(time.RFC3339, "2022-03-28T00:00:00+00:00")
968+
inputQuery := models.JobQuery{Name: "dag1", StartDate: scheduleStartTime, EndDate: scheduleEndTime}
969+
scheduler := airflow2.NewScheduler(nil, nil, nil)
970+
971+
scheduleSpec, _ := cron.ParseCronSchedule("@midnight")
972+
dagRunRequest := scheduler.GetDagRunRequest(&inputQuery, scheduleSpec)
973+
expectedDagRunRequest := airflow2.DagRunRequest{
974+
OrderBy: "execution_date",
975+
PageOffset: 0,
976+
PageLimit: 99999,
977+
DagIds: []string{"dag1"},
978+
ExecutionDateLte: expectedExecutionEndDate.Format("2006-01-02T15:04:05+00:00"),
979+
ExecutionDateGte: expectedExecutionStartDate.Format("2006-01-02T15:04:05+00:00"),
980+
}
981+
assert.Equal(t, dagRunRequest, expectedDagRunRequest)
982+
})
983+
t.Run("with varying schedule intervals", func(t *testing.T) {
984+
scheduleStartTime, _ := time.Parse(time.RFC3339, "2022-03-17T00:00:00+00:00")
985+
scheduleEndTime, _ := time.Parse(time.RFC3339, "2022-03-27T00:00:00+00:00")
986+
expectedExecutionStartDate, _ := time.Parse(time.RFC3339, "2022-03-11T02:00:00+00:00")
987+
expectedExecutionEndDate, _ := time.Parse(time.RFC3339, "2022-03-25T02:00:00+00:00")
988+
inputQuery := models.JobQuery{Name: "dag1", StartDate: scheduleStartTime, EndDate: scheduleEndTime}
989+
scheduler := airflow2.NewScheduler(nil, nil, nil)
990+
991+
scheduleSpec, _ := cron.ParseCronSchedule("0 2 2,11,17,19,25,26,27 * *")
992+
dagRunRequest := scheduler.GetDagRunRequest(&inputQuery, scheduleSpec)
993+
expectedDagRunRequest := airflow2.DagRunRequest{
994+
OrderBy: "execution_date",
995+
PageOffset: 0,
996+
PageLimit: 99999,
997+
DagIds: []string{"dag1"},
998+
ExecutionDateLte: expectedExecutionEndDate.Format("2006-01-02T15:04:05+00:00"),
999+
ExecutionDateGte: expectedExecutionStartDate.Format("2006-01-02T15:04:05+00:00"),
1000+
}
1001+
assert.Equal(t, dagRunRequest, expectedDagRunRequest)
1002+
})
1003+
}

ext/scheduler/airflow2/client.go

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -135,25 +135,6 @@ func toJobStatus(list DagRunListResponse) ([]models.JobStatus, error) {
135135
return jobStatus, nil
136136
}
137137

138-
func getDagRunRequest(param *models.JobQuery) DagRunRequest {
139-
if param.OnlyLastRun {
140-
return DagRunRequest{
141-
OrderBy: "-execution_date",
142-
PageOffset: 0,
143-
PageLimit: 1,
144-
DagIds: []string{param.Name},
145-
}
146-
}
147-
return DagRunRequest{
148-
OrderBy: "execution_date",
149-
PageOffset: 0,
150-
PageLimit: pageLimit,
151-
DagIds: []string{param.Name},
152-
ExecutionDateGte: param.StartDate.Format(airflowDateFormat),
153-
ExecutionDateLte: param.EndDate.Format(airflowDateFormat),
154-
}
155-
}
156-
157138
func getJobRuns(res DagRunListResponse, spec *cron.ScheduleSpec) ([]models.JobRun, error) {
158139
var jobRunList []models.JobRun
159140
if res.TotalEntries > pageLimit {

0 commit comments

Comments
 (0)