@@ -3358,39 +3358,39 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
33583358 taskGroupEntry -> computeEarliestTaskStartTime (taskGroupEntry .getValue ()).getMillis ()
33593359 )
33603360 )
3361- .forEach (entry -> {
3362- Integer groupId = entry .getKey ();
3363- TaskGroup group = entry .getValue ();
3364-
3365- final DateTime earliestTaskStart = computeEarliestTaskStartTime (group );
3366- final Duration runDuration = Duration .millis (DateTimes .nowUtc ().getMillis () - earliestTaskStart .getMillis ());
3367- if (stopTasksEarly || group .getHandoffEarly ()) {
3368- // If handoffEarly has been set, stop tasks irrespective of stopTaskCount
3369- log .info (
3370- "Stopping taskGroup[%d] early after running for duration[%s]." ,
3371- groupId , runDuration
3372- );
3373- futureGroupIds .add (groupId );
3374- futures .add (checkpointTaskGroup (group , true ));
3375- if (group .getHandoffEarly ()) {
3376- numStoppedTasks .getAndIncrement ();
3377- }
3378- } else if (earliestTaskStart .plus (ioConfig .getTaskDuration ()).isBeforeNow ()) {
3379- // Stop this task group if it has run longer than the configured duration
3380- // and the pending task groups are less than the configured stop task count.
3381- int numPendingCompletionTaskGroups = pendingCompletionTaskGroups .values ().stream ()
3382- .mapToInt (List ::size ).sum ();
3383- if (numPendingCompletionTaskGroups + numStoppedTasks .get () < ioConfig .getMaxAllowedStops ()) {
3361+ .forEach (entry -> {
3362+ Integer groupId = entry .getKey ();
3363+ TaskGroup group = entry .getValue ();
3364+
3365+ final DateTime earliestTaskStart = computeEarliestTaskStartTime (group );
3366+ final Duration runDuration = Duration .millis (DateTimes .nowUtc ().getMillis () - earliestTaskStart .getMillis ());
3367+ if (stopTasksEarly || group .getHandoffEarly ()) {
3368+ // If handoffEarly has been set, stop tasks irrespective of stopTaskCount
33843369 log .info (
3385- "Stopping taskGroup[%d] as it has already run for duration[%s], configured task duration[%s]." ,
3386- groupId , runDuration , ioConfig . getTaskDuration ()
3370+ "Stopping taskGroup[%d] early after running for duration[%s]." ,
3371+ groupId , runDuration
33873372 );
33883373 futureGroupIds .add (groupId );
33893374 futures .add (checkpointTaskGroup (group , true ));
3390- numStoppedTasks .getAndIncrement ();
3375+ if (group .getHandoffEarly ()) {
3376+ numStoppedTasks .getAndIncrement ();
3377+ }
3378+ } else if (earliestTaskStart .plus (ioConfig .getTaskDuration ()).isBeforeNow ()) {
3379+ // Stop this task group if it has run longer than the configured duration
3380+ // and the pending task groups are less than the configured stop task count.
3381+ int numPendingCompletionTaskGroups = pendingCompletionTaskGroups .values ().stream ()
3382+ .mapToInt (List ::size ).sum ();
3383+ if (numPendingCompletionTaskGroups + numStoppedTasks .get () < ioConfig .getMaxAllowedStops ()) {
3384+ log .info (
3385+ "Stopping taskGroup[%d] as it has already run for duration[%s], configured task duration[%s]." ,
3386+ groupId , runDuration , ioConfig .getTaskDuration ()
3387+ );
3388+ futureGroupIds .add (groupId );
3389+ futures .add (checkpointTaskGroup (group , true ));
3390+ numStoppedTasks .getAndIncrement ();
3391+ }
33913392 }
3392- }
3393- });
3393+ });
33943394 List <Either <Throwable , Map <PartitionIdType , SequenceOffsetType >>> results = coalesceAndAwait (futures );
33953395 for (int j = 0 ; j < results .size (); j ++) {
33963396 Integer groupId = futureGroupIds .get (j );
0 commit comments