Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public abstract class AbstractMainProcessingGenesis {
protected UserInputsGenesis userInputs;
protected final FileUtilsInterface fileUtilsInterface;
protected Statement database;
@Getter
protected final KraftwerkExecutionContext kraftwerkExecutionContext;
/* SPECIFIC VARIABLES */
@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class MainProcessing {
List<UserInputsFile> userInputsFileList; // for file by file process
@Getter
private VtlBindings vtlBindings = new VtlBindings();
@Getter
private final KraftwerkExecutionContext kraftwerkExecutionContext;
private final FileUtilsInterface fileUtilsInterface;

Expand Down Expand Up @@ -164,7 +165,7 @@ private void writeErrors() {

private static List<UserInputsFile> getUserInputsFile(UserInputsFile source, boolean fileByFile) throws KraftwerkException {
List<UserInputsFile> userInputsFileList = new ArrayList<>();
if(Boolean.TRUE.equals(fileByFile)){
if(fileByFile){
for (String dataMode : source.getModeInputsMap().keySet()) {
List<Path> dataFiles = getFilesToProcess(source, dataMode);
for (Path dataFile : dataFiles) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,19 @@
package fr.insee.kraftwerk.api.process;

import fr.insee.bpm.metadata.model.Group;
import fr.insee.bpm.metadata.model.MetadataModel;
import fr.insee.kraftwerk.api.client.GenesisClient;
import fr.insee.kraftwerk.api.configuration.ConfigProperties;
import fr.insee.kraftwerk.core.data.model.InterrogationId;
import fr.insee.kraftwerk.core.data.model.Mode;
import fr.insee.kraftwerk.core.data.model.SurveyUnitUpdateLatest;
import fr.insee.kraftwerk.core.exceptions.KraftwerkException;
import fr.insee.kraftwerk.core.utils.KraftwerkExecutionContext;
import fr.insee.kraftwerk.core.utils.SqlUtils;
import fr.insee.kraftwerk.core.utils.files.FileUtilsInterface;
import fr.insee.kraftwerk.core.vtl.VtlBindings;
import lombok.extern.log4j.Log4j2;

import java.io.IOException;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

@Log4j2
public class MainProcessingGenesisLegacy extends AbstractMainProcessingGenesis{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.springframework.beans.factory.annotation.Autowired;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.Connection;
Expand Down Expand Up @@ -218,12 +217,7 @@ private void moveTempFile(String filename, Path tmpOutputFile) throws KraftwerkE
log.error("Permission refused to create folder: {} : {}", outDirectory.getParent(), e);
}
Path outputPath = outDirectory.resolve(filename);
//Encrypt file if requested
if(kraftwerkExecutionContext.isWithEncryption()) {
InputStream encryptedStream = encryptionUtils.encryptOutputFile(tmpOutputFile, kraftwerkExecutionContext);
fileUtilsInterface.writeFile(filename, encryptedStream, true);
log.info("File: {} successfully written and encrypted", filename);
}
kraftwerkExecutionContext.setOutDirectory(outDirectory);
fileUtilsInterface.moveFile(tmpOutputFile,outputPath.toString());
log.info("File: {} successfully written", outputPath);
}
Expand All @@ -241,9 +235,7 @@ public String outputFileName(String questionnaireModelId) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss");
String timestamp = LocalDateTime.now().format(formatter);
String fileName = String.format("export_%s_%s", questionnaireModelId, timestamp);
return kraftwerkExecutionContext.isWithEncryption() ?
fileName + JSON_EXTENSION + ".enc"
: fileName + JSON_EXTENSION;
return fileName + JSON_EXTENSION;
}

protected void tmpJsonFileWriter(List<InterrogationId> listIds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ public class MainService extends KraftwerkService {
MinioClient minioClient;
VaultConfig vaultConfig;
boolean useMinio;
OutputZipService outputZipService;


@Autowired
public MainService(ConfigProperties configProperties, MinioConfig minioConfig, VaultConfig vaultConfig, Environment env) {
@Autowired
public MainService(ConfigProperties configProperties, MinioConfig minioConfig, VaultConfig vaultConfig, Environment env, OutputZipService outputZipService) {
super(configProperties, minioConfig);
this.configProperties = configProperties;
this.minioConfig = minioConfig;
this.outputZipService = outputZipService;
this.minioConfig = minioConfig;
this.vaultConfig = vaultConfig;

useMinio = false;
Expand Down Expand Up @@ -171,6 +172,7 @@ public ResponseEntity<Object> jsonExtraction(
MainProcessingGenesisNew mpGenesis = getMainProcessingGenesisByQuestionnaire(withDDI, withEncryption, fileUtilsInterface);
try {
mpGenesis.runMainJson(questionnaireModelId, batchSize, dataMode, since);
outputZipService.encryptAndArchiveOutputs(mpGenesis.getKraftwerkExecutionContext(),fileUtilsInterface);
log.info("Data extracted");
} catch (KraftwerkException e) {
return ResponseEntity.status(e.getStatus()).body(e.getMessage());
Expand All @@ -187,11 +189,12 @@ private ResponseEntity<String> runWithoutGenesis(String inDirectoryParam, boolea
MainProcessing mp = getMainProcessing(inDirectoryParam, fileByFile, withDDI, withEncryption, fileUtilsInterface);
try {
mp.runMain();
outputZipService.encryptAndArchiveOutputs(mp.getKraftwerkExecutionContext(),fileUtilsInterface);
} catch (KraftwerkException e) {
return ResponseEntity.status(e.getStatus()).body(e.getMessage());
}
/* Step 4.3- 4.4 : Archive */
if (Boolean.TRUE.equals(archiveAtEnd)) archive(inDirectoryParam, fileUtilsInterface);
if (archiveAtEnd) archive(inDirectoryParam, fileUtilsInterface);

return ResponseEntity.ok(inDirectoryParam);
}
Expand All @@ -205,6 +208,7 @@ private ResponseEntity<String> runWithGenesis(String campaignId, boolean withDDI

try {
mpGenesis.runMain(campaignId, batchSize);
outputZipService.encryptAndArchiveOutputs(mpGenesis.getKraftwerkExecutionContext(),fileUtilsInterface);
} catch (KraftwerkException e) {
return ResponseEntity.status(e.getStatus()).body(e.getMessage());
} catch (IOException e) {
Expand All @@ -221,6 +225,7 @@ private ResponseEntity<String> runWithGenesisByQuestionnaire(String questionnair

try {
mpGenesis.runMain(questionnaireModelId, batchSize, dataMode);
outputZipService.encryptAndArchiveOutputs(mpGenesis.getKraftwerkExecutionContext(),fileUtilsInterface);
} catch (KraftwerkException e) {
return ResponseEntity.status(e.getStatus()).body(e.getMessage());
} catch (IOException e) {
Expand All @@ -230,8 +235,6 @@ private ResponseEntity<String> runWithGenesisByQuestionnaire(String questionnair
}




@NotNull
MainProcessingGenesisLegacy getMainProcessingGenesis(boolean withDDI, boolean withEncryption, FileUtilsInterface fileUtilsInterface) {

Expand Down Expand Up @@ -285,7 +288,7 @@ MainProcessingGenesisNew getMainProcessingGenesisByQuestionnaire(boolean withDDI

@NotNull FileUtilsInterface getFileUtilsInterface() {
FileUtilsInterface fileUtilsInterface;
if(Boolean.TRUE.equals(useMinio)){
if(useMinio){
fileUtilsInterface = new MinioImpl(minioClient, minioConfig.getBucketName());
}else{
fileUtilsInterface = new FileSystemImpl(configProperties.getDefaultDirectory());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package fr.insee.kraftwerk.api.services;


import fr.insee.kraftwerk.core.encryption.EncryptionUtils;
import fr.insee.kraftwerk.core.exceptions.KraftwerkException;
import fr.insee.kraftwerk.core.utils.KraftwerkExecutionContext;
import fr.insee.kraftwerk.core.utils.files.FileUtilsInterface;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

@Service
@RequiredArgsConstructor
@Slf4j
public class OutputZipService {

private static final String ZIP_EXTENSION = ".zip";
private static final String TMPDIR = "java.io.tmpdir";

private final EncryptionUtils encryptionUtils;

/**
* Create a zip of kraftwerkExecutionContext.outDirectory, then encrypt the zip if kraftwerkExecutionContext.withEncryption=true.
*/
public void encryptAndArchiveOutputs(KraftwerkExecutionContext kraftwerkExecutionContext,
FileUtilsInterface fileUtils) throws KraftwerkException {

if (!kraftwerkExecutionContext.isWithEncryption()) {
return;
}
Path outDirectory = kraftwerkExecutionContext.getOutDirectory();
if (outDirectory == null) throw new KraftwerkException(500, "outDirectory is null in context");

Path tempZipFile = null;
Path tempEncFile = null;

try {
Path tempDirectory = Path.of(System.getProperty(TMPDIR));
Files.createDirectories(tempDirectory);

String baseName = outDirectory.getFileName().toString();
tempZipFile = Files.createTempFile(tempDirectory, baseName + "_", ZIP_EXTENSION);

buildZip(outDirectory, tempZipFile, fileUtils);

String encExt = encryptionUtils.getEncryptedFileExtension();
if (!encExt.startsWith(".")) encExt = "." + encExt;

tempEncFile = Files.createTempFile(tempDirectory, baseName + "_", ZIP_EXTENSION + encExt);

try (InputStream encrypted = encryptionUtils.encryptOutputFile(tempZipFile, kraftwerkExecutionContext);
OutputStream out = Files.newOutputStream(tempEncFile,
StandardOpenOption.TRUNCATE_EXISTING)) {
encrypted.transferTo(out);
}

Path parent = outDirectory.getParent();
if (parent == null) throw new KraftwerkException(500, "Cannot resolve parent of outDirectory: " + outDirectory);

String targetEncPath = parent.resolve(baseName + ZIP_EXTENSION + encExt).toString();

// store encrypted file:
fileUtils.moveFile(tempEncFile, targetEncPath);
tempEncFile = null;

deleteWithRetry(tempZipFile);
tempZipFile = null;

fileUtils.deleteDirectory(outDirectory);

log.info("Encrypted archive created at {} and outDirectory deleted ({})", targetEncPath, outDirectory);

} catch (UncheckedIOException | IOException e) {
cleanupTemps(tempZipFile, tempEncFile);
Throwable cause = e instanceof UncheckedIOException ? e.getCause() : e;
throw new KraftwerkException(500, "IO error during output archive: " + cause.getMessage());
}

}

private void cleanupTemps(Path zip, Path enc) {
if (enc != null) deleteWithRetry(enc);
if (zip != null) deleteWithRetry(zip);
}


/**
* Builds the zip at zipFile from outDir, supporting both:
* - filesystem outDir (exists locally)
* - Minio outDir (prefix)
*/
private void buildZip(Path outDirectory, Path zipFile, FileUtilsInterface fileUtils) throws KraftwerkException, IOException {
Files.deleteIfExists(zipFile);

try (ZipOutputStream zipOutputStream = new ZipOutputStream(
Files.newOutputStream(zipFile, StandardOpenOption.CREATE_NEW))) {

if (Files.exists(outDirectory) && Files.isDirectory(outDirectory)) {
try (var paths = Files.walk(outDirectory)) {
paths.filter(Files::isRegularFile).forEach(path -> {
try {
addFileToZip(outDirectory, path, zipOutputStream);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});

}
} else {
String prefix = normalizePrefix(outDirectory.toString());
List<String> objects = fileUtils.listFileNames(prefix);
for (String objectPath : objects) {
if (objectPath.endsWith("/")) continue;
addMinioObjectToZip(prefix, objectPath, zipOutputStream, fileUtils);
}
}
}
}

private void addFileToZip(Path rootDir, Path file, ZipOutputStream zipOutputStream) throws IOException {
String entryName = rootDir.relativize(file).toString().replace("\\", "/");
zipOutputStream.putNextEntry(new ZipEntry(entryName));
Files.copy(file, zipOutputStream);
zipOutputStream.closeEntry();
}


private void addMinioObjectToZip(String prefix,
String objectPath,
ZipOutputStream zipOutputStream,
FileUtilsInterface fileUtils) throws KraftwerkException {

String entryName = relativize(prefix, objectPath);
if (entryName.isBlank()) return;

try (InputStream inputStream = fileUtils.readFile(objectPath)) {
if (inputStream == null) {
log.warn("Cannot read object {}, skipping", objectPath);
return;
}
zipOutputStream.putNextEntry(new ZipEntry(entryName));
inputStream.transferTo(zipOutputStream);
zipOutputStream.closeEntry();
} catch (IOException e) {
throw new KraftwerkException(500, "IO error zipping object " + objectPath + ": " + e.getMessage());
}
}

private String normalizePrefix(String p) {
String s = p.replace("\\", "/");
return s.endsWith("/") ? s : s + "/";
}

private String relativize(String basePath, String fullPath) {
String normalizedFullPath = fullPath.replace("\\", "/");
String normalizedBasePath = basePath.replace("\\", "/");
if (normalizedFullPath.startsWith(normalizedBasePath)) return normalizedFullPath.substring(normalizedBasePath.length());

String basePathWithoutTrailingSlash = normalizedBasePath.endsWith("/") ? normalizedBasePath.substring(0, normalizedBasePath.length() - 1) : normalizedBasePath;
if (normalizedFullPath.startsWith(basePathWithoutTrailingSlash)) {
String relativePath = normalizedFullPath.substring(basePathWithoutTrailingSlash.length());
if (relativePath.startsWith("/")) relativePath = relativePath.substring(1);
return relativePath;
}
return normalizedFullPath;
}

private void deleteWithRetry(Path path) {
for (int i = 0; i < 8; i++) {
try {
Files.deleteIfExists(path);
return;
} catch (IOException e) {
try { Thread.sleep(80L); } catch (InterruptedException ignored) {}
}
}
path.toFile().deleteOnExit();
log.warn("Could not delete temp file (locked), will deleteOnExit: {}", path);
}

}
Loading