Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package fr.insee.kraftwerk.api.configuration;

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
@EnableAsync
public class AsyncConfig {

@Bean(name = "kraftwerkExecutor")
@ConditionalOnMissingBean(name = "kraftwerkExecutor")
public Executor kraftwerkExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("kw-");
executor.initialize();
return executor;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import fr.insee.kraftwerk.api.process.MainProcessing;
import fr.insee.kraftwerk.api.process.MainProcessingGenesisLegacy;
import fr.insee.kraftwerk.api.process.MainProcessingGenesisNew;
import fr.insee.kraftwerk.api.services.async.MainAsyncService;
import fr.insee.kraftwerk.core.data.model.Mode;
import fr.insee.kraftwerk.core.exceptions.KraftwerkException;
import fr.insee.kraftwerk.core.utils.KraftwerkExecutionContext;
Expand All @@ -33,21 +34,24 @@

import java.io.IOException;
import java.time.LocalDateTime;
import java.util.UUID;

@RestController
@Slf4j
@Tag(name = "${tag.main}")
public class MainService extends KraftwerkService {

MainAsyncService mainAsyncService;
ConfigProperties configProperties;
MinioClient minioClient;
VaultConfig vaultConfig;
boolean useMinio;


@Autowired
public MainService(ConfigProperties configProperties, MinioConfig minioConfig, VaultConfig vaultConfig, Environment env) {
public MainService(MainAsyncService mainAsyncService, ConfigProperties configProperties, MinioConfig minioConfig, VaultConfig vaultConfig, Environment env) {
super(configProperties, minioConfig);
this.mainAsyncService = mainAsyncService;
this.configProperties = configProperties;
this.minioConfig = minioConfig;
this.vaultConfig = vaultConfig;
Expand All @@ -71,7 +75,11 @@ public ResponseEntity<String> mainService(
) {
boolean fileByFile = false;
boolean withDDI = true;
return runWithoutGenesis(inDirectoryParam, archiveAtEnd, fileByFile, withDDI, withEncryption);
FileUtilsInterface fileUtilsInterface = getFileUtilsInterface();
MainProcessing mp = getMainProcessing(inDirectoryParam, fileByFile, withDDI, withEncryption, fileUtilsInterface);
String jobId = UUID.randomUUID().toString();
mainAsyncService.runWithoutGenesis(jobId, fileUtilsInterface, mp, inDirectoryParam, archiveAtEnd, fileByFile, withDDI, withEncryption);
return ResponseEntity.accepted().body(jobId);
}

@PutMapping(value = "/main/file-by-file")
Expand All @@ -83,7 +91,11 @@ public ResponseEntity<String> mainFileByFile(
) {
boolean fileByFile = true;
boolean withDDI = true;
return runWithoutGenesis(inDirectoryParam, archiveAtEnd, fileByFile, withDDI, withEncryption);
FileUtilsInterface fileUtilsInterface = getFileUtilsInterface();
MainProcessing mp = getMainProcessing(inDirectoryParam, fileByFile, withDDI, withEncryption, fileUtilsInterface);
String jobId = UUID.randomUUID().toString();
mainAsyncService.runWithoutGenesis(jobId, fileUtilsInterface, mp, inDirectoryParam, archiveAtEnd, fileByFile, withDDI, withEncryption);
return ResponseEntity.accepted().body(jobId);
}

@PutMapping(value = "/main/lunatic-only")
Expand All @@ -95,7 +107,11 @@ public ResponseEntity<String> mainLunaticOnly(
) {
boolean withDDI = false;
boolean fileByFile = false;
return runWithoutGenesis(inDirectoryParam, archiveAtEnd, fileByFile, withDDI, withEncryption);
FileUtilsInterface fileUtilsInterface = getFileUtilsInterface();
MainProcessing mp = getMainProcessing(inDirectoryParam, fileByFile, withDDI, withEncryption, fileUtilsInterface);
String jobId = UUID.randomUUID().toString();
mainAsyncService.runWithoutGenesis(jobId, fileUtilsInterface, mp, inDirectoryParam, archiveAtEnd, fileByFile, withDDI, withEncryption);
return ResponseEntity.accepted().body(jobId);
}

/**
Expand All @@ -112,7 +128,11 @@ public ResponseEntity<String> mainGenesis(
@Parameter(description = "${param.batchSize}") @RequestParam(value = "batchSize", defaultValue = "1000") int batchSize,
@Parameter(description = "${param.withEncryption}") @RequestParam(value = "withEncryption", defaultValue = "false") boolean withEncryption) {
boolean withDDI = true;
return runWithGenesis(campaignId, withDDI, withEncryption, batchSize);
FileUtilsInterface fileUtilsInterface = getFileUtilsInterface();
MainProcessingGenesisLegacy mpGenesis = getMainProcessingGenesis(withDDI, withEncryption, fileUtilsInterface);
String jobId = UUID.randomUUID().toString();
mainAsyncService.runWithGenesis(jobId,fileUtilsInterface, mpGenesis, campaignId, withDDI, withEncryption, batchSize);
return ResponseEntity.accepted().body(jobId);
}

@PutMapping(value = "/main/genesis/by-questionnaire")
Expand All @@ -123,7 +143,11 @@ public ResponseEntity<String> mainGenesisByQuestionnaireId(
@Parameter(description = "${param.batchSize}") @RequestParam(value = "batchSize", defaultValue = "1000") int batchSize,
@Parameter(description = "${param.withEncryption}") @RequestParam(value = "withEncryption", defaultValue = "false") boolean withEncryption) {
boolean withDDI = true;
return runWithGenesisByQuestionnaire(questionnaireModelId, withDDI, withEncryption, batchSize, dataMode);
FileUtilsInterface fileUtilsInterface = getFileUtilsInterface();
String jobId = UUID.randomUUID().toString();
MainProcessingGenesisNew mpGenesis = getMainProcessingGenesisByQuestionnaire(withDDI, withEncryption, fileUtilsInterface);
mainAsyncService.runWithGenesisByQuestionnaire(jobId,fileUtilsInterface, mpGenesis, questionnaireModelId, withDDI, withEncryption, batchSize, dataMode);
return ResponseEntity.accepted().body(jobId);
}

/**
Expand All @@ -141,7 +165,11 @@ public ResponseEntity<String> mainGenesisLunaticOnly(
@Parameter(description = "${param.withEncryption}") @RequestParam(value = "withEncryption", defaultValue = "false") boolean withEncryption
) {
boolean withDDI = false;
return runWithGenesis(campaignId, withDDI, withEncryption, batchSize);
FileUtilsInterface fileUtilsInterface = getFileUtilsInterface();
MainProcessingGenesisLegacy mpGenesis = getMainProcessingGenesis(withDDI, withEncryption, fileUtilsInterface);
String jobId = UUID.randomUUID().toString();
mainAsyncService.runWithGenesis(jobId,fileUtilsInterface, mpGenesis, campaignId, withDDI, withEncryption, batchSize);
return ResponseEntity.accepted().body(jobId);
}

@PutMapping(value = "/main/genesis/by-questionnaire/lunatic-only")
Expand All @@ -153,7 +181,12 @@ public ResponseEntity<String> mainGenesisLunaticOnlyByQuestionnaire(
@Parameter(description = "${param.withEncryption}") @RequestParam(value = "withEncryption", defaultValue = "false") boolean withEncryption
) {
boolean withDDI = false;
return runWithGenesisByQuestionnaire(questionnaireModelId, withDDI, withEncryption, batchSize, dataMode);
FileUtilsInterface fileUtilsInterface = getFileUtilsInterface();
MainProcessingGenesisNew mpGenesis = getMainProcessingGenesisByQuestionnaire(withDDI, withEncryption, fileUtilsInterface);
String jobId = UUID.randomUUID().toString();
mainAsyncService.runWithGenesisByQuestionnaire(jobId,fileUtilsInterface, mpGenesis, questionnaireModelId, withDDI, withEncryption, batchSize, dataMode);
return ResponseEntity.accepted().body(jobId);

}

@GetMapping(value ="/json")
Expand All @@ -180,57 +213,6 @@ public ResponseEntity<Object> jsonExtraction(
return ResponseEntity.ok(String.format("Data extracted for questionnaireModelId %s",questionnaireModelId));
}

@NotNull
private ResponseEntity<String> runWithoutGenesis(String inDirectoryParam, boolean archiveAtEnd, boolean fileByFile, boolean withDDI, boolean withEncryption) {
FileUtilsInterface fileUtilsInterface = getFileUtilsInterface();

MainProcessing mp = getMainProcessing(inDirectoryParam, fileByFile, withDDI, withEncryption, fileUtilsInterface);
try {
mp.runMain();
} catch (KraftwerkException e) {
return ResponseEntity.status(e.getStatus()).body(e.getMessage());
}
/* Step 4.3- 4.4 : Archive */
if (Boolean.TRUE.equals(archiveAtEnd)) archive(inDirectoryParam, fileUtilsInterface);

return ResponseEntity.ok(inDirectoryParam);
}


@NotNull
private ResponseEntity<String> runWithGenesis(String campaignId, boolean withDDI, boolean withEncryption, int batchSize) {
FileUtilsInterface fileUtilsInterface = getFileUtilsInterface();

MainProcessingGenesisLegacy mpGenesis = getMainProcessingGenesis(withDDI, withEncryption, fileUtilsInterface);

try {
mpGenesis.runMain(campaignId, batchSize);
} catch (KraftwerkException e) {
return ResponseEntity.status(e.getStatus()).body(e.getMessage());
} catch (IOException e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(e.getMessage());
}
return ResponseEntity.ok(campaignId);
}

@NotNull
private ResponseEntity<String> runWithGenesisByQuestionnaire(String questionnaireModelId, boolean withDDI, boolean withEncryption, int batchSize, Mode dataMode) {
FileUtilsInterface fileUtilsInterface = getFileUtilsInterface();

MainProcessingGenesisNew mpGenesis = getMainProcessingGenesisByQuestionnaire(withDDI, withEncryption, fileUtilsInterface);

try {
mpGenesis.runMain(questionnaireModelId, batchSize, dataMode);
} catch (KraftwerkException e) {
return ResponseEntity.status(e.getStatus()).body(e.getMessage());
} catch (IOException e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(e.getMessage());
}
return ResponseEntity.ok(questionnaireModelId);
}




@NotNull
MainProcessingGenesisLegacy getMainProcessingGenesis(boolean withDDI, boolean withEncryption, FileUtilsInterface fileUtilsInterface) {
Expand Down Expand Up @@ -270,7 +252,6 @@ MainProcessingGenesisNew getMainProcessingGenesisByQuestionnaire(boolean withDDI
);
}


@NotNull MainProcessing getMainProcessing(String inDirectoryParam, boolean fileByFile, boolean withDDI, boolean withEncryption, FileUtilsInterface fileUtilsInterface) {
KraftwerkExecutionContext kraftwerkExecutionContext = new KraftwerkExecutionContext(
inDirectoryParam,
Expand All @@ -285,13 +266,14 @@ MainProcessingGenesisNew getMainProcessingGenesisByQuestionnaire(boolean withDDI

@NotNull FileUtilsInterface getFileUtilsInterface() {
FileUtilsInterface fileUtilsInterface;
if(Boolean.TRUE.equals(useMinio)){
if(useMinio){
fileUtilsInterface = new MinioImpl(minioClient, minioConfig.getBucketName());
}else{
fileUtilsInterface = new FileSystemImpl(configProperties.getDefaultDirectory());
}
return fileUtilsInterface;
}

public record JobAcceptedResponse(String jobId) {}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package fr.insee.kraftwerk.api.services.async;

import org.springframework.stereotype.Component;

import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

@Component
public class InMemoryJobStore {

private final ConcurrentMap<String, JobExecution> jobs = new ConcurrentHashMap<>();

public void start(String jobId) {
jobs.put(jobId, new JobExecution(
jobId,
JobStatus.RUNNING,
null,
Instant.now(),
null
));
}

public void success(String jobId) {
jobs.computeIfPresent(jobId, (id, job) ->
new JobExecution(
id,
JobStatus.SUCCESS,
null,
job.startedAt(),
Instant.now()
)
);
}

public void fail(String jobId, Exception e) {
jobs.computeIfPresent(jobId, (id, job) ->
new JobExecution(
id,
JobStatus.FAILED,
e.getMessage(),
job.startedAt(),
Instant.now()
)
);
}

public Optional<JobExecution> get(String jobId) {
return Optional.ofNullable(jobs.get(jobId));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package fr.insee.kraftwerk.api.services.async;

import java.time.Instant;

public record JobExecution(
String jobId,
JobStatus status,
String errorMessage,
Instant startedAt,
Instant endedAt
) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package fr.insee.kraftwerk.api.services.async;

public enum JobStatus {
RUNNING,
SUCCESS,
FAILED
}
Loading
Loading