Skip to content

Commit ac17abc

Browse files
committed
#698 Fix the backfill logic for late jobs.
1 parent 2a4907a commit ac17abc

File tree

3 files changed

+28
-19
lines changed

3 files changed

+28
-19
lines changed

pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategySourcing.scala

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,14 @@ class ScheduleStrategySourcing(hasInfoDateColumn: Boolean) extends ScheduleStrat
6161
val lastProcessedDate = bookkeeper.getLatestProcessedDate(outputTable, Option(infoDate))
6262
lastProcessedDate.foreach(d => log.info(s"Last processed info date: $d"))
6363

64-
val backfillDates = getBackFillDays(outputTable, runDate, backfillDays, trackDays, lastProcessedDate, schedule, infoDateExpression, bookkeeper)
65-
.map(d => pipeline.TaskPreDef(d, TaskRunReason.Late))
64+
val backfillDates = if (lateOnly) {
65+
val lateDaysToCheck = Math.max(backfillDays, trackDays)
66+
getBackFillDays(outputTable, runDate, lateDaysToCheck, 0, schedule, infoDateExpression, bookkeeper)
67+
.map(d => pipeline.TaskPreDef(d, TaskRunReason.Late))
68+
} else {
69+
getBackFillDays(outputTable, runDate, backfillDays, trackDays, schedule, infoDateExpression, bookkeeper)
70+
.map(d => pipeline.TaskPreDef(d, TaskRunReason.Late))
71+
}
6672

6773
val newDaysOrig = if (!lateOnly) {
6874
getNew(outputTable, runDate.minusDays(delayDays), schedule, infoDateExpression).toList
@@ -75,7 +81,7 @@ class ScheduleStrategySourcing(hasInfoDateColumn: Boolean) extends ScheduleStrat
7581
case _ => newDaysOrig
7682
}
7783

78-
val lateDaysOrig = if (!newOnly) {
84+
val lateDaysOrig = if (!newOnly && (backfillDays == -1 || lastProcessedDate.isEmpty)) {
7985
getLate(outputTable, runDate.minusDays(delayDays), schedule, infoDateExpression, initialSourcingDateExpr, lastProcessedDate)
8086
} else {
8187
Nil
@@ -113,19 +119,14 @@ class ScheduleStrategySourcing(hasInfoDateColumn: Boolean) extends ScheduleStrat
113119
runDate: LocalDate,
114120
backfillDays: Int,
115121
trackDays: Int,
116-
lastProcessedDate: Option[LocalDate],
117122
schedule: Schedule,
118123
initialSourcingDateExpr: String,
119124
bookkeeper: Bookkeeper): Seq[LocalDate] = {
120125
// If backfillDays == 0, backfill is disabled
121126
// If trackDays > backfillDays, track days supersede backfill with checks for retrospective updates
122-
if (backfillDays == 0 || (backfillDays > 0 && trackDays > backfillDays)) return Seq.empty
127+
if (backfillDays <= 0 || (backfillDays > 0 && trackDays > backfillDays)) return Seq.empty
123128

124-
val backfillStart = if (backfillDays < 0) {
125-
lastProcessedDate.getOrElse(runDate)
126-
} else {
127-
runDate.minusDays(backfillDays - 1)
128-
}
129+
val backfillStart = runDate.minusDays(backfillDays - 1)
129130

130131
if (backfillStart.isEqual(runDate)) return Seq.empty
131132

pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class TaskRunnerMultithreadedSuite extends AnyWordSpec with SparkTestBase {
6161
}
6262

6363
"handle a successful multiple task job parallel execution" in {
64-
val (runner, bk, state, job) = getUseCase(runDate.plusDays(1))
64+
val (runner, bk, state, job) = getUseCase(runDate.plusDays(1), backfillDays = 2)
6565

6666
runner.runJob(job)
6767

@@ -87,7 +87,7 @@ class TaskRunnerMultithreadedSuite extends AnyWordSpec with SparkTestBase {
8787
}
8888

8989
"handle a successful multiple task job sequential execution" in {
90-
val (runner, bk, state, job) = getUseCase(runDate.plusDays(1), allowParallel = false)
90+
val (runner, bk, state, job) = getUseCase(runDate.plusDays(1), allowParallel = false, backfillDays = 2)
9191

9292
runner.runJob(job)
9393

@@ -176,7 +176,8 @@ class TaskRunnerMultithreadedSuite extends AnyWordSpec with SparkTestBase {
176176
runFunction: () => RunResult = () => RunResult(exampleDf),
177177
consumeThreads: Int = 1,
178178
allowParallel: Boolean = true,
179-
parallelTasks: Int = 1
179+
parallelTasks: Int = 1,
180+
backfillDays: Int = 1,
180181
): (ConcurrentJobRunnerImpl, Bookkeeper, PipelineStateSpy, Job) = {
181182
val conf = ConfigFactory.empty()
182183

@@ -193,7 +194,7 @@ class TaskRunnerMultithreadedSuite extends AnyWordSpec with SparkTestBase {
193194
val stats = MetaTableStats(Some(2), None, Some(100))
194195

195196
val operationDef = OperationDefFactory.getDummyOperationDef(consumeThreads = consumeThreads)
196-
val job = new JobSpy(runFunction = runFunction, saveStats = stats, operationDef = operationDef, allowParallel = allowParallel)
197+
val job = new JobSpy(runFunction = runFunction, jobBackfillDays = backfillDays, saveStats = stats, operationDef = operationDef, allowParallel = allowParallel)
197198

198199
val taskRunner = new TaskRunnerMultithreaded(conf, bookkeeper, journal, tokenLockFactory, state, runtimeConfig, "app_123")
199200

pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/splitter/ScheduleStrategySuite.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,9 @@ class ScheduleStrategySuite extends AnyWordSpec {
132132
val bk = mock(classOf[Bookkeeper])
133133

134134
when(bk.getLatestProcessedDate(outputTable, Some(runDate))).thenReturn(Some(runDate.minusDays(2)))
135+
when(bk.getDataAvailability(outputTable, runDate.minusDays(1), runDate.minusDays(1))).thenReturn(Seq.empty)
135136

136-
val params = ScheduleParams.Normal(runDate, 0, 4, 0, newOnly = false, lateOnly = true)
137+
val params = ScheduleParams.Normal(runDate, 0, 2, 0, newOnly = false, lateOnly = true)
137138

138139
val expected = Seq(runDate.minusDays(1))
139140
.map(d => pipeline.TaskPreDef(d, TaskRunReason.Late))
@@ -163,7 +164,8 @@ class ScheduleStrategySuite extends AnyWordSpec {
163164

164165
when(bk.getLatestProcessedDate(outputTable, Some(runDate))).thenReturn(Some(runDate.minusDays(2)))
165166

166-
val params = ScheduleParams.Normal(runDate, 0, 4, 0, newOnly = true, lateOnly = true)
167+
val params = ScheduleParams.Normal(runDate, 0, 2, 0, newOnly = true, lateOnly = true)
168+
when(bk.getDataAvailability(outputTable, runDate.minusDays(1), runDate.minusDays(1))).thenReturn(Seq(DataAvailability(runDate.minusDays(1), 1, 1)))
167169

168170
val result = strategyEvent.getDaysToRun(outputTable, dependencies, bk, infoDateExpression, schedule, params, initialSourcingDateExpr, minimumDate)
169171

@@ -335,10 +337,11 @@ class ScheduleStrategySuite extends AnyWordSpec {
335337
"default behavior with more than 1 day late" in {
336338
val minimumDate = LocalDate.parse("2022-07-01")
337339
val runDate = LocalDate.parse("2022-07-14")
338-
val params = ScheduleParams.Normal(runDate, 0, 0, 0, newOnly = false, lateOnly = false)
340+
val params = ScheduleParams.Normal(runDate, 7, 0, 0, newOnly = false, lateOnly = false)
339341

340342
val bk = mock(classOf[Bookkeeper])
341343
when(bk.getLatestProcessedDate(outputTable, Some(LocalDate.parse("2022-07-09")))).thenReturn(Some(LocalDate.parse("2022-07-05")))
344+
when(bk.getDataAvailability(outputTable, LocalDate.parse("2022-07-08"), LocalDate.parse("2022-07-13"))).thenReturn(Seq.empty)
342345

343346
val expected = Seq(
344347
pipeline.TaskPreDef(LocalDate.of(2022, 7, 9), TaskRunReason.Late)
@@ -354,7 +357,8 @@ class ScheduleStrategySuite extends AnyWordSpec {
354357

355358
when(bk.getLatestProcessedDate(outputTable, Some(runDate.minusDays(1)))).thenReturn(Some(runDate.minusDays(30)))
356359

357-
val params = ScheduleParams.Normal(runDate, 0, 0, 0, newOnly = false, lateOnly = false)
360+
val params = ScheduleParams.Normal(runDate, 2, 7, 0, newOnly = false, lateOnly = false)
361+
when(bk.getDataAvailability(outputTable, LocalDate.parse("2022-07-07"), LocalDate.parse("2022-07-20"))).thenReturn(Seq.empty)
358362

359363
val result = strategySnapshot.getDaysToRun(outputTable, dependencies, bk, "@runDate - 1", schedule, params, initialSourcingDateExpr, minimumDate)
360364

@@ -368,6 +372,7 @@ class ScheduleStrategySuite extends AnyWordSpec {
368372

369373
val bk = mock(classOf[Bookkeeper])
370374
when(bk.getLatestProcessedDate(outputTable, Some(runDate.plusDays(1)))).thenReturn(Some(runDate.minusDays(9)))
375+
when(bk.getDataAvailability(outputTable, nextSunday.minusDays(13), nextSunday.minusDays(1))).thenReturn(Seq(DataAvailability(runDate.minusDays(1), 1, 1)))
371376

372377
val result = strategyEvent.getDaysToRun(outputTable, dependencies, bk, infoDateExpression, schedule, params, initialSourcingDateExpr, minimumDate)
373378

@@ -475,12 +480,14 @@ class ScheduleStrategySuite extends AnyWordSpec {
475480
"default behavior with a monthly job" in {
476481
val minimumDate = LocalDate.parse("2022-05-30")
477482
val runDate = LocalDate.parse("2022-07-14")
478-
val params = ScheduleParams.Normal(runDate, 0, 0, 0, newOnly = false, lateOnly = false)
483+
val params = ScheduleParams.Normal(runDate, 0, 62, 0, newOnly = false, lateOnly = false)
479484

480485
val bk = mock(classOf[Bookkeeper])
481486
when(bk.getLatestProcessedDate(outputTable, Some(LocalDate.parse("2022-07-01"))))
482487
.thenReturn(Some(LocalDate.parse("2022-05-01")))
483488

489+
when(bk.getDataAvailability(outputTable, LocalDate.parse("2022-07-08"), LocalDate.parse("2022-07-13"))).thenReturn(Seq.empty)
490+
484491
val expected = Seq(
485492
pipeline.TaskPreDef(LocalDate.of(2022, 6, 1), TaskRunReason.Late),
486493
pipeline.TaskPreDef(LocalDate.of(2022, 7, 1), TaskRunReason.Late)

0 commit comments

Comments
 (0)