Skip to content

Commit a34b610

Browse files
authored
Merge pull request #157 from PublicisSapient/develop
Develop
2 parents 1f5c837 + c92902e commit a34b610

File tree

56 files changed

+5307
-388
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+5307
-388
lines changed

ai-data-processor/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,11 @@
144144
<artifactId>junit-jupiter-api</artifactId>
145145
<scope>test</scope>
146146
</dependency>
147+
<dependency>
148+
<groupId>com.knowhow.retro</groupId>
149+
<artifactId>ai-gateway-client</artifactId>
150+
<version>1.0.0</version>
151+
</dependency>
147152

148153
<!--Logging dependencies-->
149154
<dependency>

ai-data-processor/src/main/java/com/publicissapient/kpidashboard/AiDataProcessorApplication.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@
99
import org.springframework.scheduling.annotation.EnableScheduling;
1010

1111
@SpringBootApplication
12-
@ComponentScan(basePackages = {"com.publicissapient", "com.knowhow.retro.notifications"})
13-
@EnableMongoRepositories(basePackages = {"com.publicissapient.**.repository"})
12+
@ComponentScan(basePackages = { "com.publicissapient", "com.knowhow.retro.notifications",
13+
"com.knowhow.retro.aigatewayclient" })
14+
@EnableMongoRepositories(basePackages = { "com.publicissapient.**.repository" })
1415
@EnableBatchProcessing
1516
@EnableAsync
1617
@EnableScheduling

ai-data-processor/src/main/java/com/publicissapient/kpidashboard/client/customapi/KnowHOWClient.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,22 @@ public List<KpiElement> getKpiIntegrationValues(List<KpiRequest> kpiRequests) {
7474
}).flatMapIterable(list -> list).collectList().block();
7575
}
7676

77+
public List<KpiElement> getKpiIntegrationValuesKanban(List<KpiRequest> kpiRequests) {
78+
return Flux.fromIterable(kpiRequests).publishOn(Schedulers.boundedElastic()).flatMap(kpiRequest -> {
79+
try {
80+
semaphore.acquire();
81+
return this.knowHOWWebClient.post()
82+
.uri(this.knowHOWApiClientConfig.getKpiIntegrationValuesKanbanEndpointConfig().getPath())
83+
.bodyValue(kpiRequest).retrieve().bodyToFlux(KpiElement.class).retryWhen(retrySpec())
84+
.collectList().doFinally(signalType -> semaphore.release());
85+
} catch (InterruptedException e) {
86+
log.error("Could not get kpi integration values kanban for kpiRequest {}", kpiRequest);
87+
Thread.currentThread().interrupt();
88+
return Flux.error(e);
89+
}
90+
}).flatMapIterable(list -> list).collectList().block();
91+
}
92+
7793
private RetryBackoffSpec retrySpec() {
7894
return Retry
7995
.backoff(knowHOWApiClientConfig.getRetryPolicy().getMaxAttempts(),

ai-data-processor/src/main/java/com/publicissapient/kpidashboard/client/customapi/config/KnowHOWApiClientConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,8 @@ public static class EndpointConfig {
5050
public EndpointConfig getKpiIntegrationValuesEndpointConfig() {
5151
return this.endpoints.get("kpi-integration-values");
5252
}
53+
54+
public EndpointConfig getKpiIntegrationValuesKanbanEndpointConfig() {
55+
return this.endpoints.get("kpi-integration-values-kanban");
56+
}
5357
}

ai-data-processor/src/main/java/com/publicissapient/kpidashboard/client/shareddataservice/SharedDataServiceClient.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,20 @@
1717
package com.publicissapient.kpidashboard.client.shareddataservice;
1818

1919
import com.publicissapient.kpidashboard.client.shareddataservice.config.SharedDataServiceConfig;
20-
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.dto.PagedAIUsagePerOrgLevel;
20+
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.dto.AIUsagePerOrgLevel;
21+
import lombok.extern.slf4j.Slf4j;
2122
import org.springframework.http.HttpHeaders;
2223
import org.springframework.http.MediaType;
2324
import org.springframework.stereotype.Component;
2425
import org.springframework.web.reactive.function.client.WebClient;
26+
import org.springframework.web.reactive.function.client.WebClientResponseException;
2527
import reactor.util.retry.Retry;
28+
import reactor.util.retry.RetryBackoffSpec;
2629

30+
import java.net.ConnectException;
2731
import java.time.Duration;
2832

33+
@Slf4j
2934
@Component
3035
public class SharedDataServiceClient {
3136
private final WebClient webClient;
@@ -47,9 +52,15 @@ public SharedDataServiceClient(SharedDataServiceConfig sharedDataServiceConfig)
4752
.build();
4853
}
4954

50-
public PagedAIUsagePerOrgLevel getAIUsageStatsAsync(String levelName) {
51-
int maxAttempts = sharedDataServiceConfig.getRetryPolicy().getMaxAttempts();
52-
int minBackoffDuration = sharedDataServiceConfig.getRetryPolicy().getMinBackoffDuration();
55+
public AIUsagePerOrgLevel getAIUsageStatsAsync(String levelName) {
56+
RetryBackoffSpec retrySpec = Retry.backoff(
57+
sharedDataServiceConfig.getRetryPolicy().getMaxAttempts(),
58+
Duration.of(sharedDataServiceConfig.getRetryPolicy().getMinBackoffDuration(),
59+
sharedDataServiceConfig.getRetryPolicy().getMinBackoffTimeUnit().toChronoUnit()))
60+
.filter(SharedDataServiceClient::shouldRetry)
61+
.doBeforeRetry(retrySignal ->
62+
log.info("Retry #{} due to {}", retrySignal.totalRetries(), retrySignal.failure().toString()));
63+
5364
String path = sharedDataServiceConfig.getAiUsageStatisticsEndpoint().getPath();
5465

5566
return webClient.get()
@@ -58,8 +69,15 @@ public PagedAIUsagePerOrgLevel getAIUsageStatsAsync(String levelName) {
5869
.queryParam(LEVEL_NAME_PARAM, levelName)
5970
.build())
6071
.retrieve()
61-
.bodyToMono(PagedAIUsagePerOrgLevel.class)
62-
.retryWhen(Retry.backoff(maxAttempts, Duration.ofSeconds(minBackoffDuration)).jitter(0.5))
72+
.bodyToMono(AIUsagePerOrgLevel.class)
73+
.retryWhen(retrySpec)
6374
.block();
6475
}
76+
77+
private static boolean shouldRetry(Throwable throwable) {
78+
if (throwable instanceof WebClientResponseException ex) {
79+
return ex.getStatusCode().is5xxServerError();
80+
}
81+
return throwable instanceof ConnectException;
82+
}
6583
}

ai-data-processor/src/main/java/com/publicissapient/kpidashboard/job/aiusagestatisticscollector/dto/PagedAIUsagePerOrgLevel.java renamed to ai-data-processor/src/main/java/com/publicissapient/kpidashboard/job/aiusagestatisticscollector/dto/AIUsagePerOrgLevel.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@
1919
import java.time.Instant;
2020
import java.util.List;
2121

22-
public record PagedAIUsagePerOrgLevel(String levelType,
23-
String levelName,
24-
Instant statsDate,
25-
AIUsageSummary usageSummary,
26-
List<AIUsagePerUser> users,
27-
int currentPage,
28-
int totalPages,
29-
long totalElements,
30-
int pageSize) {
22+
public record AIUsagePerOrgLevel(String levelType,
23+
String levelName,
24+
Instant statsDate,
25+
AIUsageSummary usageSummary,
26+
List<AIUsagePerUser> users,
27+
int currentPage,
28+
int totalPages,
29+
long totalElements,
30+
int pageSize) {
3131
}

ai-data-processor/src/main/java/com/publicissapient/kpidashboard/job/aiusagestatisticscollector/dto/mapper/AIUsageStatisticsMapper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package com.publicissapient.kpidashboard.job.aiusagestatisticscollector.dto.mapper;
1818

19-
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.dto.PagedAIUsagePerOrgLevel;
19+
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.dto.AIUsagePerOrgLevel;
2020
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.model.AIUsageStatistics;
2121
import org.mapstruct.Mapper;
2222
import org.mapstruct.Mapping;
@@ -28,5 +28,5 @@ public interface AIUsageStatisticsMapper {
2828

2929
@Mapping(target = "users", ignore = true)
3030
@Mapping(target = "ingestTimestamp", expression = "java(java.time.Instant.now())")
31-
AIUsageStatistics toEntity(PagedAIUsagePerOrgLevel pagedAIUsagePerOrgLevel);
31+
AIUsageStatistics toEntity(AIUsagePerOrgLevel aIUsagePerOrgLevel);
3232
}

ai-data-processor/src/main/java/com/publicissapient/kpidashboard/job/aiusagestatisticscollector/listener/AIUsageStatisticsJobCompletionListener.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616

1717
package com.publicissapient.kpidashboard.job.aiusagestatisticscollector.listener;
1818

19-
import com.publicissapient.kpidashboard.common.model.ProcessorExecutionTraceLog;
19+
import com.publicissapient.kpidashboard.common.model.tracelog.JobExecutionTraceLog;
2020
import com.publicissapient.kpidashboard.common.model.application.ErrorDetail;
21-
import com.publicissapient.kpidashboard.common.service.ProcessorExecutionTraceLogServiceImpl;
21+
import com.publicissapient.kpidashboard.common.service.JobExecutionTraceLogService;
2222
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.service.AccountBatchService;
2323
import lombok.AllArgsConstructor;
2424
import lombok.extern.slf4j.Slf4j;
@@ -37,7 +37,7 @@
3737
@AllArgsConstructor
3838
public class AIUsageStatisticsJobCompletionListener implements JobExecutionListener {
3939
private final AccountBatchService accountBatchService;
40-
private final ProcessorExecutionTraceLogServiceImpl processorExecutionTraceLogServiceImpl;
40+
private final JobExecutionTraceLogService jobExecutionTraceLogService;
4141

4242
@Override
4343
public void afterJob(@NonNull JobExecution jobExecution) {
@@ -50,20 +50,20 @@ private void storeJobExecutionStatus(JobExecution jobExecution) {
5050
String jobName = jobParameters.getString("jobName");
5151
ObjectId executionId = (ObjectId) Objects.requireNonNull(jobParameters.getParameter("executionId")).getValue();
5252

53-
Optional<ProcessorExecutionTraceLog> processorExecutionTraceLogOptional = this.processorExecutionTraceLogServiceImpl
53+
Optional<JobExecutionTraceLog> executionTraceLogOptional = this.jobExecutionTraceLogService
5454
.findById(executionId);
55-
if (processorExecutionTraceLogOptional.isPresent()) {
56-
ProcessorExecutionTraceLog executionTraceLog = processorExecutionTraceLogOptional.get();
55+
if (executionTraceLogOptional.isPresent()) {
56+
JobExecutionTraceLog executionTraceLog = executionTraceLogOptional.get();
5757
executionTraceLog.setExecutionOngoing(false);
58-
executionTraceLog.setExecutionEndedAt(Instant.now().toEpochMilli());
58+
executionTraceLog.setExecutionEndedAt(Instant.now());
5959
executionTraceLog.setExecutionSuccess(jobExecution.getStatus() == BatchStatus.COMPLETED);
6060
executionTraceLog
6161
.setErrorDetailList(jobExecution.getAllFailureExceptions().stream().map(failureException -> {
6262
ErrorDetail errorDetail = new ErrorDetail();
6363
errorDetail.setError(failureException.getMessage());
6464
return errorDetail;
6565
}).toList());
66-
this.processorExecutionTraceLogServiceImpl.saveAiDataProcessorExecutions(executionTraceLog);
66+
this.jobExecutionTraceLogService.updateJobExecution(executionTraceLog);
6767
} else {
6868
log.error("Could not store job execution ending status for job with name {} and execution id {}. Job "
6969
+ "execution could not be found", jobName, executionId);

ai-data-processor/src/main/java/com/publicissapient/kpidashboard/job/aiusagestatisticscollector/processor/AccountItemProcessor.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,28 @@
1818

1919
import org.springframework.batch.item.ItemProcessor;
2020

21-
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.dto.PagedAIUsagePerOrgLevel;
21+
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.dto.AIUsagePerOrgLevel;
2222
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.model.AIUsageStatistics;
2323
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.service.AIUsageStatisticsService;
24+
import com.publicissapient.kpidashboard.job.constant.JobConstants;
2425

2526
import jakarta.annotation.Nonnull;
2627
import lombok.AllArgsConstructor;
2728
import lombok.extern.slf4j.Slf4j;
2829

2930
@Slf4j
3031
@AllArgsConstructor
31-
public class AccountItemProcessor implements ItemProcessor<PagedAIUsagePerOrgLevel, AIUsageStatistics> {
32+
public class AccountItemProcessor implements ItemProcessor<AIUsagePerOrgLevel, AIUsageStatistics> {
3233
private final AIUsageStatisticsService aiUsageStatisticsService;
3334

3435
@Override
35-
public AIUsageStatistics process(@Nonnull PagedAIUsagePerOrgLevel item) {
36-
log.debug("[ai-usage-statistics-collector job] Fetching AI usage statistics for level name: {}", item.levelName());
37-
return aiUsageStatisticsService.fetchAIUsageStatistics(item.levelName());
36+
public AIUsageStatistics process(@Nonnull AIUsagePerOrgLevel item) {
37+
log.debug("{} Fetching AI usage statistics for level name: {}", JobConstants.LOG_PREFIX_AI_USAGE_STATISTICS, item.levelName());
38+
try {
39+
return aiUsageStatisticsService.fetchAIUsageStatistics(item.levelName());
40+
} catch (Exception ex) {
41+
log.error("Failed fetching AI stats for {} – skipping", item.levelName());
42+
throw ex;
43+
}
3844
}
3945
}

ai-data-processor/src/main/java/com/publicissapient/kpidashboard/job/aiusagestatisticscollector/reader/AccountItemReader.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,29 @@
1616

1717
package com.publicissapient.kpidashboard.job.aiusagestatisticscollector.reader;
1818

19+
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.dto.AIUsagePerOrgLevel;
20+
import com.publicissapient.kpidashboard.job.constant.JobConstants;
1921
import org.springframework.batch.item.ItemReader;
2022

21-
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.dto.PagedAIUsagePerOrgLevel;
2223
import com.publicissapient.kpidashboard.job.aiusagestatisticscollector.service.AccountBatchService;
2324

2425
import lombok.RequiredArgsConstructor;
2526
import lombok.extern.slf4j.Slf4j;
2627

2728
@Slf4j
2829
@RequiredArgsConstructor
29-
public class AccountItemReader implements ItemReader<PagedAIUsagePerOrgLevel> {
30+
public class AccountItemReader implements ItemReader<AIUsagePerOrgLevel> {
3031

3132
private final AccountBatchService accountBatchService;
3233

3334
@Override
34-
public PagedAIUsagePerOrgLevel read() {
35-
PagedAIUsagePerOrgLevel aiUsageStatistics = accountBatchService.getNextAccountPage();
36-
log.info("[ai-usage-statistics-collector job] Reader fetched level name: {}", aiUsageStatistics.levelName());
35+
public AIUsagePerOrgLevel read() {
36+
AIUsagePerOrgLevel aiUsageStatistics = accountBatchService.getNextAccount();
37+
if (aiUsageStatistics == null) {
38+
log.info("No more accounts.");
39+
return null;
40+
}
41+
log.info("{} Reader fetched level name: {}", JobConstants.LOG_PREFIX_AI_USAGE_STATISTICS, aiUsageStatistics.levelName());
3742
return aiUsageStatistics;
3843
}
3944
}

0 commit comments

Comments
 (0)