Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
28c8705
DTS-50661: Refactor batch job common constant.
shunaray Dec 1, 2025
70c9ed3
DTS-50661:Refactor job execution trace logging: replace ProcessorExec…
shunaray Dec 3, 2025
3358c58
DTS-50661: Created AiProcessorConstant for centralised constants.
shunaray Dec 4, 2025
4d3ea6e
DTS-50661: Add recommendation calculation job and related components
shunaray Dec 4, 2025
5182f0e
Precompute kpi maturity for Kanban projects
Dec 5, 2025
0d4059c
Merge branch 'develop' of https://github.com/PublicisSapient/knowhow-…
shunaray Dec 7, 2025
22ee198
DTS-50661: Enhance recommendation processing: add job execution trace…
shunaray Dec 8, 2025
838f0cf
DTS-50661:Add RecommendationValidator for AI-generated recommendation…
shunaray Dec 8, 2025
5e29029
DTS-50661:Update scheduling cron expression and clean up code comments
shunaray Dec 8, 2025
6fe903c
DTS-50661:Refactor recommendation parsing and validation logic for im…
shunaray Dec 8, 2025
dfd802b
DTS-50661:Enhance logging in recommendation processing with consisten…
shunaray Dec 8, 2025
6404333
DTS-50661:Refactor project-related classes and update variable names …
shunaray Dec 9, 2025
1f13e4d
DTS-50661: Refactor ProjectItemWriter to projectWiseTraceLog future s…
shunaray Dec 10, 2025
b9789f6
DTS-50661: Refactor configuration and update project ID references in…
shunaray Dec 10, 2025
195ebe2
DTS-50661: Refactor JobOrchestratorTest to use JobExecutionTraceLog a…
shunaray Dec 10, 2025
8440440
DTS-50661: Refactor code and added unit test cases.
shunaray Dec 10, 2025
d7e87a7
DTS-50661: Update application-local.yml with new API and authenticati…
shunaray Dec 10, 2025
d2ba174
DTS-50661: Remove the throws in javadoc causing failure.
shunaray Dec 10, 2025
77a4776
DTS-50873 handle exception occurring while the step is running
AnaTMitran Dec 10, 2025
812c5b4
DTS-50661: Refactor job execution trace log to support processor job-…
shunaray Dec 11, 2025
7b5cb98
DTS-50661: Renamed AiDataProcessorConstants to JobConstants.
shunaray Dec 11, 2025
5b2f1dd
DTS-50661: Review comment fixes, removed redundant ex try,catch and r…
shunaray Dec 11, 2025
da8e011
DTS-50661: Review comment unit test fixes
shunaray Dec 11, 2025
6038b7f
Merge pull request #156 from PublicisSapient/DTS-50873
anamitra1-ps Dec 11, 2025
e3001ba
DTS-50661: Review comment added null validation
shunaray Dec 11, 2025
ca172bf
DTS-50661: Recommendation batch only for scrum projects and not on hold
shunaray Dec 11, 2025
451e242
Merge branch 'develop' of https://github.com/PublicisSapient/knowhow-…
shunaray Dec 11, 2025
bfd8f2d
Merge branch 'develop' of https://github.com/PublicisSapient/knowhow-…
Dec 11, 2025
bc7e926
solve the PR comments
Dec 11, 2025
bbdd2c8
Merge pull request #154 from PublicisSapient/feature/DTS-50880-precom…
vladinu Dec 11, 2025
0936187
Merge branch 'develop' of https://github.com/PublicisSapient/knowhow-…
shunaray Dec 12, 2025
b7d704f
DTS-50661: Added fast fail check in beforeJob for ai gateway availabi…
shunaray Dec 12, 2025
10d5eea
DTS-50661: Filtering on-hold project.
shunaray Dec 12, 2025
628cc8f
DTS-50661: Adding basicProjectConfig id in ProjectInputDTO
shunaray Dec 12, 2025
7a1f70b
DTS-50661: Added kpi data casting check.
shunaray Dec 12, 2025
47c79c8
DTS-50661: Test case fix.
shunaray Dec 12, 2025
8fd4020
DTS-50661: Test case fix.
shunaray Dec 12, 2025
c92902e
Merge pull request #155 from PublicisSapient/feature/DTS-50661-recomm…
shunaray Dec 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,22 @@ public List<KpiElement> getKpiIntegrationValues(List<KpiRequest> kpiRequests) {
}).flatMapIterable(list -> list).collectList().block();
}

public List<KpiElement> getKpiIntegrationValuesKanban(List<KpiRequest> kpiRequests) {
return Flux.fromIterable(kpiRequests).publishOn(Schedulers.boundedElastic()).flatMap(kpiRequest -> {
try {
semaphore.acquire();
return this.knowHOWWebClient.post()
.uri(this.knowHOWApiClientConfig.getKpiIntegrationValuesKanbanEndpointConfig().getPath())
.bodyValue(kpiRequest).retrieve().bodyToFlux(KpiElement.class).retryWhen(retrySpec())
.collectList().doFinally(signalType -> semaphore.release());
} catch (InterruptedException e) {
log.error("Could not get kpi integration values kanban for kpiRequest {}", kpiRequest);
Thread.currentThread().interrupt();
return Flux.error(e);
}
}).flatMapIterable(list -> list).collectList().block();
}

private RetryBackoffSpec retrySpec() {
return Retry
.backoff(knowHOWApiClientConfig.getRetryPolicy().getMaxAttempts(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,8 @@ public static class EndpointConfig {
public EndpointConfig getKpiIntegrationValuesEndpointConfig() {
return this.endpoints.get("kpi-integration-values");
}

public EndpointConfig getKpiIntegrationValuesKanbanEndpointConfig() {
return this.endpoints.get("kpi-integration-values-kanban");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@
package com.publicissapient.kpidashboard.client.shareddataservice;

import com.publicissapient.kpidashboard.client.shareddataservice.config.SharedDataServiceConfig;
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.dto.PagedAIUsagePerOrgLevel;
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.dto.AIUsagePerOrgLevel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.util.retry.Retry;
import reactor.util.retry.RetryBackoffSpec;

import java.net.ConnectException;
import java.time.Duration;

@Slf4j
@Component
public class SharedDataServiceClient {
private final WebClient webClient;
Expand All @@ -47,9 +52,15 @@ public SharedDataServiceClient(SharedDataServiceConfig sharedDataServiceConfig)
.build();
}

public PagedAIUsagePerOrgLevel getAIUsageStatsAsync(String levelName) {
int maxAttempts = sharedDataServiceConfig.getRetryPolicy().getMaxAttempts();
int minBackoffDuration = sharedDataServiceConfig.getRetryPolicy().getMinBackoffDuration();
public AIUsagePerOrgLevel getAIUsageStatsAsync(String levelName) {
RetryBackoffSpec retrySpec = Retry.backoff(
sharedDataServiceConfig.getRetryPolicy().getMaxAttempts(),
Duration.of(sharedDataServiceConfig.getRetryPolicy().getMinBackoffDuration(),
sharedDataServiceConfig.getRetryPolicy().getMinBackoffTimeUnit().toChronoUnit()))
.filter(SharedDataServiceClient::shouldRetry)
.doBeforeRetry(retrySignal ->
log.info("Retry #{} due to {}", retrySignal.totalRetries(), retrySignal.failure().toString()));

String path = sharedDataServiceConfig.getAiUsageStatisticsEndpoint().getPath();

return webClient.get()
Expand All @@ -58,8 +69,15 @@ public PagedAIUsagePerOrgLevel getAIUsageStatsAsync(String levelName) {
.queryParam(LEVEL_NAME_PARAM, levelName)
.build())
.retrieve()
.bodyToMono(PagedAIUsagePerOrgLevel.class)
.retryWhen(Retry.backoff(maxAttempts, Duration.ofSeconds(minBackoffDuration)).jitter(0.5))
.bodyToMono(AIUsagePerOrgLevel.class)
.retryWhen(retrySpec)
.block();
}

private static boolean shouldRetry(Throwable throwable) {
if (throwable instanceof WebClientResponseException ex) {
return ex.getStatusCode().is5xxServerError();
}
return throwable instanceof ConnectException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
import java.time.Instant;
import java.util.List;

public record PagedAIUsagePerOrgLevel(String levelType,
String levelName,
Instant statsDate,
AIUsageSummary usageSummary,
List<AIUsagePerUser> users,
int currentPage,
int totalPages,
long totalElements,
int pageSize) {
public record AIUsagePerOrgLevel(String levelType,
String levelName,
Instant statsDate,
AIUsageSummary usageSummary,
List<AIUsagePerUser> users,
int currentPage,
int totalPages,
long totalElements,
int pageSize) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.publicissapient.kpidashboard.job.aiusagestatisticscollector.dto.mapper;

import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.dto.PagedAIUsagePerOrgLevel;
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.dto.AIUsagePerOrgLevel;
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.model.AIUsageStatistics;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
Expand All @@ -28,5 +28,5 @@ public interface AIUsageStatisticsMapper {

@Mapping(target = "users", ignore = true)
@Mapping(target = "ingestTimestamp", expression = "java(java.time.Instant.now())")
AIUsageStatistics toEntity(PagedAIUsagePerOrgLevel pagedAIUsagePerOrgLevel);
AIUsageStatistics toEntity(AIUsagePerOrgLevel aIUsagePerOrgLevel);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import org.springframework.batch.item.ItemProcessor;

import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.dto.PagedAIUsagePerOrgLevel;
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.dto.AIUsagePerOrgLevel;
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.model.AIUsageStatistics;
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.service.AIUsageStatisticsService;

Expand All @@ -28,12 +28,17 @@

@Slf4j
@AllArgsConstructor
public class AccountItemProcessor implements ItemProcessor<PagedAIUsagePerOrgLevel, AIUsageStatistics> {
public class AccountItemProcessor implements ItemProcessor<AIUsagePerOrgLevel, AIUsageStatistics> {
private final AIUsageStatisticsService aiUsageStatisticsService;

@Override
public AIUsageStatistics process(@Nonnull PagedAIUsagePerOrgLevel item) {
public AIUsageStatistics process(@Nonnull AIUsagePerOrgLevel item) {
log.debug("[ai-usage-statistics-collector job] Fetching AI usage statistics for level name: {}", item.levelName());
return aiUsageStatisticsService.fetchAIUsageStatistics(item.levelName());
try {
return aiUsageStatisticsService.fetchAIUsageStatistics(item.levelName());
} catch (Exception ex) {
log.error("Failed fetching AI stats for {} – skipping", item.levelName());
throw ex;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,27 @@

package com.publicissapient.kpidashboard.job.aiusagestatisticscollector.reader;

import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.dto.AIUsagePerOrgLevel;
import org.springframework.batch.item.ItemReader;

import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.dto.PagedAIUsagePerOrgLevel;
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.service.AccountBatchService;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@RequiredArgsConstructor
public class AccountItemReader implements ItemReader<PagedAIUsagePerOrgLevel> {
public class AccountItemReader implements ItemReader<AIUsagePerOrgLevel> {

private final AccountBatchService accountBatchService;

@Override
public PagedAIUsagePerOrgLevel read() {
PagedAIUsagePerOrgLevel aiUsageStatistics = accountBatchService.getNextAccountPage();
public AIUsagePerOrgLevel read() {
AIUsagePerOrgLevel aiUsageStatistics = accountBatchService.getNextAccount();
if (aiUsageStatistics == null) {
log.info("No more accounts.");
return null;
}
log.info("[ai-usage-statistics-collector job] Reader fetched level name: {}", aiUsageStatistics.levelName());
return aiUsageStatistics;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import com.publicissapient.kpidashboard.client.shareddataservice.SharedDataServiceClient;
import com.publicissapient.kpidashboard.exception.InternalServerErrorException;
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.dto.PagedAIUsagePerOrgLevel;
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.dto.AIUsagePerOrgLevel;
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.model.AIUsageStatistics;
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.repository.AIUsageStatisticsRepository;
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.dto.mapper.AIUsageStatisticsMapper;
Expand All @@ -40,7 +40,7 @@ public class AIUsageStatisticsService {

public AIUsageStatistics fetchAIUsageStatistics(String levelName) {
try {
PagedAIUsagePerOrgLevel aiUsageStatistics = sharedDataServiceClient.getAIUsageStatsAsync(levelName);
AIUsagePerOrgLevel aiUsageStatistics = sharedDataServiceClient.getAIUsageStatsAsync(levelName);
return aiUsageStatisticsMapper.toEntity(aiUsageStatistics);
} catch (Exception ex) {
log.error("Failed to fetch AI usage stats for {}: {}", levelName, ex.getMessage());
Expand All @@ -51,6 +51,6 @@ public AIUsageStatistics fetchAIUsageStatistics(String levelName) {
@Transactional
public void saveAll(List<AIUsageStatistics> aiUsageStatisticsList) {
aiUsageStatisticsRepository.saveAll(aiUsageStatisticsList);
log.info("Successfully fetched and saved {} AI usage statistics", aiUsageStatisticsList.size());
log.info("Successfully fetched and saved {} AI usage statistics for account: {}", aiUsageStatisticsList.size(), aiUsageStatisticsList.get(0).getLevelName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import com.publicissapient.kpidashboard.common.model.application.AccountHierarchy;
import com.publicissapient.kpidashboard.common.repository.application.AccountHierarchyRepository;
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.dto.PagedAIUsagePerOrgLevel;
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.dto.AIUsagePerOrgLevel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
Expand Down Expand Up @@ -62,10 +62,10 @@ public void initializeBatchProcessingParametersForTheNextProcess() {
}

/**
* Return next account to process as a PagedAIUsagePerOrgLevel.
* Return next account to process as a AIUsagePerOrgLevel.
* Each page contains exactly 1 account because the endpoint supports only 1 account per request.
*/
public PagedAIUsagePerOrgLevel getNextAccountPage() {
public AIUsagePerOrgLevel getNextAccount() {
if (!initialized) {
initializeBatchProcessingParametersForTheNextProcess();
}
Expand All @@ -76,7 +76,7 @@ public PagedAIUsagePerOrgLevel getNextAccountPage() {

AccountHierarchy account = allAccounts.get(currentIndex);

PagedAIUsagePerOrgLevel page = new PagedAIUsagePerOrgLevel(
AIUsagePerOrgLevel page = new AIUsagePerOrgLevel(
"account",
account.getNodeName(),
Instant.now(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Optional;
import java.util.concurrent.Future;

import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.dto.AIUsagePerOrgLevel;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
Expand All @@ -32,7 +33,6 @@

import com.publicissapient.kpidashboard.common.service.ProcessorExecutionTraceLogServiceImpl;
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.config.AIUsageStatisticsCollectorJobConfig;
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.dto.PagedAIUsagePerOrgLevel;
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.listener.AIUsageStatisticsJobCompletionListener;
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.model.AIUsageStatistics;
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.processor.AccountItemProcessor;
Expand Down Expand Up @@ -85,18 +85,21 @@ public Optional<SchedulingConfig> getSchedulingConfig() {
}

private Step chunkProcessAIUsageStatisticsForAccounts() {

return new StepBuilder("process-ai-usage-statistics", jobRepository)
.<PagedAIUsagePerOrgLevel, Future<AIUsageStatistics>>chunk(
.<AIUsagePerOrgLevel, Future<AIUsageStatistics>>chunk(
aiUsageStatisticsCollectorJobConfig.getBatching().getChunkSize(), transactionManager)
.faultTolerant()
.skip(Exception.class)
.skipLimit(1000)
.noRetry(Exception.class)
.reader(new AccountItemReader(accountBatchService))
.processor(asyncAccountProcessor())
.writer(asyncItemWriter())
.build();
}

private AsyncItemProcessor<PagedAIUsagePerOrgLevel, AIUsageStatistics> asyncAccountProcessor() {
AsyncItemProcessor<PagedAIUsagePerOrgLevel, AIUsageStatistics> asyncItemProcessor = new AsyncItemProcessor<>();
private AsyncItemProcessor<AIUsagePerOrgLevel, AIUsageStatistics> asyncAccountProcessor() {
AsyncItemProcessor<AIUsagePerOrgLevel, AIUsageStatistics> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(new AccountItemProcessor(this.aiUsageStatisticsService));
asyncItemProcessor.setTaskExecutor(taskExecutor);
return asyncItemProcessor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public class ProjectItemProcessor implements ItemProcessor<ProjectInputDTO, KpiM

@Override
public KpiMaturity process(@Nonnull ProjectInputDTO item) {
log.info("[kpi-maturity-calculation job] Starting kpi metrics calculation for project with nodeId: {}", item.nodeId());
log.info("[kpi-maturity-calculation job] Starting kpi metrics calculation for project with nodeId: {} and deliveryMethodology: {}", item.nodeId(), item
.deliveryMethodology());

return kpiMaturityCalculationService.calculateKpiMaturityForProject(item);
}
Expand Down
Loading