2525import org .springframework .batch .core .job .builder .JobBuilder ;
2626import org .springframework .batch .core .repository .JobRepository ;
2727import org .springframework .batch .core .step .builder .StepBuilder ;
28+ import org .springframework .batch .integration .async .AsyncItemProcessor ;
29+ import org .springframework .batch .integration .async .AsyncItemWriter ;
30+ import org .springframework .core .task .TaskExecutor ;
2831import org .springframework .stereotype .Component ;
2932import org .springframework .transaction .PlatformTransactionManager ;
3033
3134import com .publicissapient .kpidashboard .common .model .recommendation .batch .RecommendationsActionPlan ;
35+ import com .publicissapient .kpidashboard .common .repository .recommendation .RecommendationRepository ;
36+ import com .publicissapient .kpidashboard .common .service .JobExecutionTraceLogService ;
37+ import com .publicissapient .kpidashboard .common .service .ProcessorExecutionTraceLogService ;
3238import com .publicissapient .kpidashboard .job .config .base .SchedulingConfig ;
33- import com .publicissapient .kpidashboard .job .recommendationcalculation .config .RecommendationCalculationBatchConfig ;
3439import com .publicissapient .kpidashboard .job .recommendationcalculation .config .RecommendationCalculationConfig ;
3540import com .publicissapient .kpidashboard .job .recommendationcalculation .listener .RecommendationCalculationJobExecutionListener ;
41+ import com .publicissapient .kpidashboard .job .recommendationcalculation .processor .ProjectItemProcessor ;
42+ import com .publicissapient .kpidashboard .job .recommendationcalculation .reader .ProjectItemReader ;
43+ import com .publicissapient .kpidashboard .job .recommendationcalculation .service .RecommendationCalculationService ;
44+ import com .publicissapient .kpidashboard .job .recommendationcalculation .service .RecommendationProjectBatchService ;
45+ import com .publicissapient .kpidashboard .job .recommendationcalculation .writer .ProjectItemWriter ;
3646import com .publicissapient .kpidashboard .job .shared .dto .ProjectInputDTO ;
3747import com .publicissapient .kpidashboard .job .strategy .JobStrategy ;
3848
4656@ RequiredArgsConstructor
4757public class RecommendationCalculationJobStrategy implements JobStrategy {
4858
49- private final RecommendationCalculationConfig recommendationCalculationConfig ;
50- private final RecommendationCalculationBatchConfig batchConfig ;
51- private final PlatformTransactionManager platformTransactionManager ;
5259 private final JobRepository jobRepository ;
53- private final RecommendationCalculationJobExecutionListener jobExecutionListener ;
60+ private final TaskExecutor taskExecutor ;
61+ private final PlatformTransactionManager platformTransactionManager ;
62+ private final RecommendationCalculationConfig recommendationCalculationConfig ;
63+ private final RecommendationProjectBatchService projectBatchService ;
64+ private final RecommendationCalculationService recommendationCalculationService ;
65+ private final JobExecutionTraceLogService jobExecutionTraceLogService ;
66+ private final ProcessorExecutionTraceLogService processorExecutionTraceLogService ;
67+ private final RecommendationRepository recommendationRepository ;
5468
5569 @ Override
5670 public String getJobName () {
@@ -65,18 +79,33 @@ public Optional<SchedulingConfig> getSchedulingConfig() {
6579 @ Override
6680 public Job getJob () {
6781 return new JobBuilder (recommendationCalculationConfig .getName (), jobRepository ).start (chunkProcessProjects ())
68- .listener (jobExecutionListener ).build ();
82+ .listener (new RecommendationCalculationJobExecutionListener (this .projectBatchService ,
83+ this .jobExecutionTraceLogService ))
84+ .build ();
6985 }
7086
7187 private Step chunkProcessProjects () {
7288 return new StepBuilder (String .format ("%s-chunk-process" , recommendationCalculationConfig .getName ()),
7389 jobRepository )
7490 .<ProjectInputDTO , Future <RecommendationsActionPlan >>chunk (
7591 recommendationCalculationConfig .getBatching ().getChunkSize (), platformTransactionManager )
92+ .reader (new ProjectItemReader (this .projectBatchService )).processor (asyncProjectProcessor ())
93+ .writer (asyncItemWriter ()).build ();
94+ }
95+
96+ private AsyncItemProcessor <ProjectInputDTO , RecommendationsActionPlan > asyncProjectProcessor () {
97+ AsyncItemProcessor <ProjectInputDTO , RecommendationsActionPlan > asyncItemProcessor = new AsyncItemProcessor <>();
98+ asyncItemProcessor .setDelegate (new ProjectItemProcessor (this .recommendationCalculationService ,
99+ this .processorExecutionTraceLogService ));
100+ asyncItemProcessor .setTaskExecutor (taskExecutor );
101+ return asyncItemProcessor ;
102+ }
76103
77- .reader (batchConfig .recommendationProjectItemReader ())
78- .processor (batchConfig .recommendationAsyncProjectProcessor ())
79- .writer (batchConfig .recommendationAsyncItemWriter ()).build ();
104+ private AsyncItemWriter <RecommendationsActionPlan > asyncItemWriter () {
105+ AsyncItemWriter <RecommendationsActionPlan > writer = new AsyncItemWriter <>();
106+ writer .setDelegate (
107+ new ProjectItemWriter (this .recommendationRepository , this .processorExecutionTraceLogService ));
108+ return writer ;
80109 }
81110
82111}
0 commit comments