diff --git a/kraftwerk-core/src/main/java/fr/insee/kraftwerk/core/outputs/csv/CsvOutputFiles.java b/kraftwerk-core/src/main/java/fr/insee/kraftwerk/core/outputs/csv/CsvOutputFiles.java index 0345fd95a..5dd972fd4 100644 --- a/kraftwerk-core/src/main/java/fr/insee/kraftwerk/core/outputs/csv/CsvOutputFiles.java +++ b/kraftwerk-core/src/main/java/fr/insee/kraftwerk/core/outputs/csv/CsvOutputFiles.java @@ -1,7 +1,6 @@ package fr.insee.kraftwerk.core.outputs.csv; import fr.insee.bpm.metadata.model.MetadataModel; -import fr.insee.bpm.metadata.model.VariableType; import fr.insee.kraftwerk.core.Constants; import fr.insee.kraftwerk.core.encryption.EncryptionUtils; import fr.insee.kraftwerk.core.exceptions.KraftwerkException; @@ -15,7 +14,6 @@ import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; -import java.io.BufferedReader; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -28,6 +26,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; /** * Class to manage the writing of CSV output tables. @@ -35,6 +34,8 @@ @Slf4j public class CsvOutputFiles extends OutputFiles { + public static final String NULL_MARKER = "¤__NULL__¤"; + /** * When an instance is created, the output folder is created. * @@ -57,43 +58,31 @@ public void writeOutputTables() throws KraftwerkException { Path tmpOutputFile = Files.createTempFile(Path.of(System.getProperty("java.io.tmpdir")), outputFileName(datasetName, kraftwerkExecutionContext), null); - //Get column names - List columnNames = SqlUtils.getColumnNames(getDatabase(), datasetName); + //Get columns + Map columns = SqlUtils.getColumnTypes(getDatabase(), datasetName); - if(columnNames.isEmpty()){ + if(columns.isEmpty()){ log.warn("dataset {} is empty !", datasetName); return; } - //Get boolean columns names - List boolColumnNames = SqlUtils.getColumnNames(getDatabase(), datasetName, VariableType.BOOLEAN); - //Get indexes of boolean columns - List boolColumnIndexes = new ArrayList<>(); - //Create file with double quotes header - Files.write(tmpOutputFile, buildHeader(columnNames, boolColumnNames, boolColumnIndexes).getBytes()); + Files.write(tmpOutputFile, buildHeader(columns.keySet().stream().toList()).getBytes()); //Data export into temp file - StringBuilder exportCsvQuery = getExportCsvQuery(datasetName, tmpOutputFile.toFile(), columnNames); + StringBuilder exportCsvQuery = getExportCsvQuery(datasetName, tmpOutputFile.toFile(), columns); this.getDatabase().execute(exportCsvQuery.toString()); - //Apply csv format transformations - - //Merge data file with header file - //Read line by line to avoid memory waste - try(BufferedReader bufferedReader = Files.newBufferedReader(Path.of(tmpOutputFile.toAbsolutePath() + "data"))){ - String line = bufferedReader.readLine(); - while(line != null){ - //Apply transformations to elements - line = applyNullTransformation(line); - line = applyBooleanTransformations(line, boolColumnIndexes); - - Files.write(tmpOutputFile,(line + "\n").getBytes(),StandardOpenOption.APPEND); - line = bufferedReader.readLine(); - } + //Merge header and data + try (var output = Files.newOutputStream(tmpOutputFile, StandardOpenOption.APPEND); + var input = Files.newInputStream(Path.of(tmpOutputFile.toAbsolutePath() + "data"))) { + input.transferTo(output); } - Files.deleteIfExists(Path.of(tmpOutputFile + "data")); + String content = Files.readString(tmpOutputFile); + content = content.replace(NULL_MARKER, "\"\""); + Files.writeString(tmpOutputFile, content); + Files.deleteIfExists(Path.of(tmpOutputFile.toAbsolutePath() + "data")); String outputFile = getOutputFolder().resolve(outputFileName(datasetName, kraftwerkExecutionContext)).toString(); if (kraftwerkExecutionContext != null) { @@ -121,62 +110,64 @@ public void writeOutputTables() throws KraftwerkException { } } - private static @NotNull StringBuilder getExportCsvQuery(String datasetName, File outputFile, List columnNames) { - StringBuilder exportCsvQuery = new StringBuilder(String.format("COPY %s TO '%s' (FORMAT CSV, HEADER false, DELIMITER '%s', OVERWRITE_OR_IGNORE true", datasetName, outputFile.getAbsolutePath() +"data", Constants.CSV_OUTPUTS_SEPARATOR)); - //Double quote values parameter - exportCsvQuery.append(", FORCE_QUOTE("); - for (String stringColumnName : columnNames) { - exportCsvQuery.append(String.format("'%s',", stringColumnName)); - } - //Remove last "," - exportCsvQuery.deleteCharAt(exportCsvQuery.length() - 1); - exportCsvQuery.append("))"); - return exportCsvQuery; + private static @NotNull StringBuilder getExportCsvQuery(String datasetName, File outputFile, Map columnTypes) { + StringBuilder query = new StringBuilder("COPY (SELECT "); + int nbColOk = 0; + for (Entry column : columnTypes.entrySet()) { + String col = column.getKey(); + String type = column.getValue().toUpperCase(); // e.g. 'INTEGER', 'VARCHAR', etc. + + String expression; + // replaces false/true by 0/1 + if (type.equals("BOOLEAN")) { + expression = String.format( + "CASE WHEN \"%1$s\" IS NULL THEN NULL ELSE CASE WHEN \"%1$s\" THEN 1 ELSE 0 END END AS \"%1$s\"", + col + ); + } else if (type.contains("CHAR") || type.equals("TEXT")) { + expression = String.format("COALESCE(\"%1$s\", '') AS \"%1$s\"", col); + } else { + expression = String.format("\"%1$s\"", col); // keep null + } + + query.append(expression); + if (nbColOk < columnTypes.size() - 1) { + query.append(", "); + } + nbColOk++; + } + + query.append(String.format(" FROM \"%s\") TO '%s' (FORMAT CSV, HEADER false, DELIMITER '%s', NULLSTR '%s'", + datasetName, + outputFile.getAbsolutePath() + "data", + Constants.CSV_OUTPUTS_SEPARATOR, + NULL_MARKER)); + + if (!columnTypes.isEmpty()) { + //Double quote values parameter + query.append(", FORCE_QUOTE("); + for (String col : columnTypes.keySet()) { + query.append("\"").append(col).append("\","); + } + query.deleteCharAt(query.length() - 1); // remove trailing comma + query.append(")"); + } + + query.append(")"); + log.debug("csv query : \n {}",query); + return query; } - private static String buildHeader(List columnNames, List boolColumnNames, List boolColumnIndexes) { + private static String buildHeader(List columnNames) { StringBuilder headerBuilder = new StringBuilder(); for (String columnName : columnNames) { headerBuilder.append(String.format("\"%s\"", columnName)).append(Constants.CSV_OUTPUTS_SEPARATOR); - if(boolColumnNames.contains(columnName)){ - boolColumnIndexes.add(columnNames.indexOf(columnName)); - } } headerBuilder.deleteCharAt(headerBuilder.length()-1); headerBuilder.append("\n"); return headerBuilder.toString(); } - /** - * replaces false/true by 0/1 in a line - * @param csvLine line to transform - * @param boolColumnIndexes indexes of booleans values to change - * @return the transformed line - */ - private String applyBooleanTransformations(String csvLine, List boolColumnIndexes) { - String[] lineElements = csvLine.split(String.valueOf(Constants.CSV_OUTPUTS_SEPARATOR), -1); - //change "true" or "false" by "1" or "0" - for (int elementIndex : boolColumnIndexes) { - lineElements[elementIndex] = lineElements[elementIndex].replace("false", "0").replace("true", "1"); - } - //Rebuild csv line - return String.join(String.valueOf(Constants.CSV_OUTPUTS_SEPARATOR),lineElements); - } - - /** - * Changes null values to "" in a line - * @param csvLine line to transform - * @return the transformed line - */ - private String applyNullTransformation(String csvLine) { - String[] lineElements = csvLine.split(String.valueOf(Constants.CSV_OUTPUTS_SEPARATOR), -1); - for (int i = 0; i < lineElements.length; i++) { - if (lineElements[i].isEmpty()) { - lineElements[i] = "\"\""; - } - } - return String.join(String.valueOf(Constants.CSV_OUTPUTS_SEPARATOR),lineElements); - } @Override public void writeImportScripts(Map metadataModels, KraftwerkExecutionContext kraftwerkExecutionContext) { diff --git a/kraftwerk-core/src/main/java/fr/insee/kraftwerk/core/utils/SqlUtils.java b/kraftwerk-core/src/main/java/fr/insee/kraftwerk/core/utils/SqlUtils.java index cd749432d..4cdf98e47 100644 --- a/kraftwerk-core/src/main/java/fr/insee/kraftwerk/core/utils/SqlUtils.java +++ b/kraftwerk-core/src/main/java/fr/insee/kraftwerk/core/utils/SqlUtils.java @@ -304,6 +304,20 @@ public static List getColumnNames(Statement statement, String tableName, return columnNames; } + public static Map getColumnTypes(Statement statement, String tableName) throws SQLException { + Map columnTypes = new LinkedHashMap<>(); // conserve l'ordre des colonnes + + try (ResultSet resultSet = statement.executeQuery(String.format("DESCRIBE \"%s\"", tableName))) { + while (resultSet.next()) { + String columnName = resultSet.getString("column_name"); + String columnType = resultSet.getString("column_type").toUpperCase(); // ex: BOOLEAN, VARCHAR, INTEGER + columnTypes.put(columnName, columnType); + } + } + + return columnTypes; + } + /** * Connect to DuckDB and retrieve column names of a table *