50
50
from airflow .utils .types import DagRunType
51
51
52
52
53
+ def _pop_running_ti (ti_status , reduced_key ):
54
+ try :
55
+ ti_status .running .pop (reduced_key )
56
+ except KeyError :
57
+ # the task is not running tracking dict
58
+ pass
59
+
60
+
53
61
class BackfillJob (BaseJob ):
54
62
"""
55
63
A backfill job consists of a dag or subdag for a specific time range. It
@@ -203,33 +211,39 @@ def _update_counters(self, ti_status, session=None):
203
211
if filter_for_tis is not None :
204
212
refreshed_tis = session .query (TI ).filter (filter_for_tis ).all ()
205
213
214
+ self .log .debug ("ti_status: %s" , ti_status )
206
215
for ti in refreshed_tis :
207
216
# Here we remake the key by subtracting 1 to match in memory information
208
217
reduced_key = ti .key .reduced
209
218
if ti .state == State .SUCCESS :
210
219
ti_status .succeeded .add (reduced_key )
211
220
self .log .debug ("Task instance %s succeeded. Don't rerun." , ti )
212
- ti_status .running .pop (reduced_key )
221
+ self .log .debug ("reduced_key: %s" , reduced_key )
222
+ _pop_running_ti (ti_status , reduced_key )
213
223
continue
214
224
if ti .state == State .SKIPPED :
215
225
ti_status .skipped .add (reduced_key )
216
226
self .log .debug ("Task instance %s skipped. Don't rerun." , ti )
217
- ti_status .running .pop (reduced_key )
227
+ self .log .debug ("reduced_key: %s" , reduced_key )
228
+ _pop_running_ti (ti_status , reduced_key )
218
229
continue
219
230
if ti .state == State .FAILED :
220
231
self .log .error ("Task instance %s failed" , ti )
232
+ self .log .debug ("reduced_key: %s" , reduced_key )
221
233
ti_status .failed .add (reduced_key )
222
- ti_status . running . pop ( reduced_key )
234
+ _pop_running_ti ( ti_status , reduced_key )
223
235
continue
224
236
# special case: if the task needs to run again put it back
225
237
if ti .state == State .UP_FOR_RETRY :
226
238
self .log .warning ("Task instance %s is up for retry" , ti )
227
- ti_status .running .pop (reduced_key )
239
+ self .log .debug ("reduced_key: %s" , reduced_key )
240
+ _pop_running_ti (ti_status , reduced_key )
228
241
ti_status .to_run [ti .key ] = ti
229
242
# special case: if the task needs to be rescheduled put it back
230
243
elif ti .state == State .UP_FOR_RESCHEDULE :
231
244
self .log .warning ("Task instance %s is up for reschedule" , ti )
232
- ti_status .running .pop (reduced_key )
245
+ self .log .debug ("reduced_key: %s" , reduced_key )
246
+ _pop_running_ti (ti_status , reduced_key )
233
247
ti_status .to_run [ti .key ] = ti
234
248
# special case: The state of the task can be set to NONE by the task itself
235
249
# when it reaches concurrency limits. It could also happen when the state
@@ -242,12 +256,9 @@ def _update_counters(self, ti_status, session=None):
242
256
"reaching concurrency limits. Re-adding task to queue." ,
243
257
ti ,
244
258
)
259
+ self .log .debug ("reduced_key: %s" , reduced_key )
245
260
tis_to_be_scheduled .append (ti )
246
- try :
247
- ti_status .running .pop (reduced_key )
248
- except KeyError :
249
- # the task is not running
250
- pass
261
+ _pop_running_ti (ti_status , reduced_key )
251
262
ti_status .to_run [ti .key ] = ti
252
263
253
264
# Batch schedule of task instances
0 commit comments