Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package fr.insee.kraftwerk.core.outputs.csv;

import fr.insee.bpm.metadata.model.MetadataModel;
import fr.insee.bpm.metadata.model.VariableType;
import fr.insee.kraftwerk.core.Constants;
import fr.insee.kraftwerk.core.encryption.EncryptionUtils;
import fr.insee.kraftwerk.core.exceptions.KraftwerkException;
Expand All @@ -15,7 +14,6 @@
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -28,13 +26,16 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

/**
* Class to manage the writing of CSV output tables.
*/
@Slf4j
public class CsvOutputFiles extends OutputFiles {

public static final String NULL_MARKER = "¤__NULL__¤";

/**
* When an instance is created, the output folder is created.
*
Expand All @@ -57,43 +58,31 @@ public void writeOutputTables() throws KraftwerkException {
Path tmpOutputFile = Files.createTempFile(Path.of(System.getProperty("java.io.tmpdir")),
outputFileName(datasetName, kraftwerkExecutionContext), null);

//Get column names
List<String> columnNames = SqlUtils.getColumnNames(getDatabase(), datasetName);
//Get columns
Map<String,String> columns = SqlUtils.getColumnTypes(getDatabase(), datasetName);

if(columnNames.isEmpty()){
if(columns.isEmpty()){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really a return we want if columns is empty? Maybe a continue is better to not interrupt the writing process for next datasets.

log.warn("dataset {} is empty !", datasetName);
return;
}

//Get boolean columns names
List<String> boolColumnNames = SqlUtils.getColumnNames(getDatabase(), datasetName, VariableType.BOOLEAN);
//Get indexes of boolean columns
List<Integer> boolColumnIndexes = new ArrayList<>();

//Create file with double quotes header
Files.write(tmpOutputFile, buildHeader(columnNames, boolColumnNames, boolColumnIndexes).getBytes());
Files.write(tmpOutputFile, buildHeader(columns.keySet().stream().toList()).getBytes());

//Data export into temp file
StringBuilder exportCsvQuery = getExportCsvQuery(datasetName, tmpOutputFile.toFile(), columnNames);
StringBuilder exportCsvQuery = getExportCsvQuery(datasetName, tmpOutputFile.toFile(), columns);
this.getDatabase().execute(exportCsvQuery.toString());

//Apply csv format transformations

//Merge data file with header file
//Read line by line to avoid memory waste
try(BufferedReader bufferedReader = Files.newBufferedReader(Path.of(tmpOutputFile.toAbsolutePath() + "data"))){
String line = bufferedReader.readLine();
while(line != null){
//Apply transformations to elements
line = applyNullTransformation(line);
line = applyBooleanTransformations(line, boolColumnIndexes);

Files.write(tmpOutputFile,(line + "\n").getBytes(),StandardOpenOption.APPEND);
line = bufferedReader.readLine();
}
//Merge header and data
try (var output = Files.newOutputStream(tmpOutputFile, StandardOpenOption.APPEND);
var input = Files.newInputStream(Path.of(tmpOutputFile.toAbsolutePath() + "data"))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer using Path.resolve() for path construction over string concatenation

input.transferTo(output);
}
Files.deleteIfExists(Path.of(tmpOutputFile + "data"));
String content = Files.readString(tmpOutputFile);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid that we could have performance issues here. If i am not mistaken Files.readString() will put all the content of the file in RAM, which can be a problem for big files.

Suggestion : deport this logic in a method where we use stream to read line by line the file, something like :

Path tempFile = Files.createTempFile("csv_clean_", ".tmp");

try (BufferedReader reader = Files.newBufferedReader(file);
     BufferedWriter writer = Files.newBufferedWriter(tempFile, StandardOpenOption.WRITE)) {

    String line;
    while ((line = reader.readLine()) != null) {
        writer.write(line.replace(NULL_MARKER, "\"\""));
        writer.newLine();
    }
}

Files.move(tempFile, file, java.nio.file.StandardCopyOption.REPLACE_EXISTING);

content = content.replace(NULL_MARKER, "\"\"");
Files.writeString(tmpOutputFile, content);

Files.deleteIfExists(Path.of(tmpOutputFile.toAbsolutePath() + "data"));

String outputFile = getOutputFolder().resolve(outputFileName(datasetName, kraftwerkExecutionContext)).toString();
if (kraftwerkExecutionContext != null) {
Expand Down Expand Up @@ -121,62 +110,64 @@ public void writeOutputTables() throws KraftwerkException {
}
}

private static @NotNull StringBuilder getExportCsvQuery(String datasetName, File outputFile, List<String> columnNames) {
StringBuilder exportCsvQuery = new StringBuilder(String.format("COPY %s TO '%s' (FORMAT CSV, HEADER false, DELIMITER '%s', OVERWRITE_OR_IGNORE true", datasetName, outputFile.getAbsolutePath() +"data", Constants.CSV_OUTPUTS_SEPARATOR));
//Double quote values parameter
exportCsvQuery.append(", FORCE_QUOTE(");
for (String stringColumnName : columnNames) {
exportCsvQuery.append(String.format("'%s',", stringColumnName));
}
//Remove last ","
exportCsvQuery.deleteCharAt(exportCsvQuery.length() - 1);
exportCsvQuery.append("))");
return exportCsvQuery;
private static @NotNull StringBuilder getExportCsvQuery(String datasetName, File outputFile, Map<String, String> columnTypes) {
StringBuilder query = new StringBuilder("COPY (SELECT ");
int nbColOk = 0;
for (Entry<String,String> column : columnTypes.entrySet()) {
String col = column.getKey();
String type = column.getValue().toUpperCase(); // e.g. 'INTEGER', 'VARCHAR', etc.

String expression;
// replaces false/true by 0/1
if (type.equals("BOOLEAN")) {
expression = String.format(
"CASE WHEN \"%1$s\" IS NULL THEN NULL ELSE CASE WHEN \"%1$s\" THEN 1 ELSE 0 END END AS \"%1$s\"",
col
);
} else if (type.contains("CHAR") || type.equals("TEXT")) {
expression = String.format("COALESCE(\"%1$s\", '') AS \"%1$s\"", col);
} else {
expression = String.format("\"%1$s\"", col); // keep null
}

query.append(expression);
if (nbColOk < columnTypes.size() - 1) {
query.append(", ");
}
nbColOk++;
}

query.append(String.format(" FROM \"%s\") TO '%s' (FORMAT CSV, HEADER false, DELIMITER '%s', NULLSTR '%s'",
datasetName,
outputFile.getAbsolutePath() + "data",
Constants.CSV_OUTPUTS_SEPARATOR,
NULL_MARKER));

if (!columnTypes.isEmpty()) {
//Double quote values parameter
query.append(", FORCE_QUOTE(");
for (String col : columnTypes.keySet()) {
query.append("\"").append(col).append("\",");
}
query.deleteCharAt(query.length() - 1); // remove trailing comma
query.append(")");
}

query.append(")");
log.debug("csv query : \n {}",query);
return query;
}

private static String buildHeader(List<String> columnNames, List<String> boolColumnNames, List<Integer> boolColumnIndexes) {
private static String buildHeader(List<String> columnNames) {
StringBuilder headerBuilder = new StringBuilder();
for (String columnName : columnNames) {
headerBuilder.append(String.format("\"%s\"", columnName)).append(Constants.CSV_OUTPUTS_SEPARATOR);
if(boolColumnNames.contains(columnName)){
boolColumnIndexes.add(columnNames.indexOf(columnName));
}
}
headerBuilder.deleteCharAt(headerBuilder.length()-1);
headerBuilder.append("\n");
return headerBuilder.toString();
}

/**
* replaces false/true by 0/1 in a line
* @param csvLine line to transform
* @param boolColumnIndexes indexes of booleans values to change
* @return the transformed line
*/
private String applyBooleanTransformations(String csvLine, List<Integer> boolColumnIndexes) {
String[] lineElements = csvLine.split(String.valueOf(Constants.CSV_OUTPUTS_SEPARATOR), -1);
//change "true" or "false" by "1" or "0"
for (int elementIndex : boolColumnIndexes) {
lineElements[elementIndex] = lineElements[elementIndex].replace("false", "0").replace("true", "1");
}
//Rebuild csv line
return String.join(String.valueOf(Constants.CSV_OUTPUTS_SEPARATOR),lineElements);
}

/**
* Changes null values to "" in a line
* @param csvLine line to transform
* @return the transformed line
*/
private String applyNullTransformation(String csvLine) {
String[] lineElements = csvLine.split(String.valueOf(Constants.CSV_OUTPUTS_SEPARATOR), -1);
for (int i = 0; i < lineElements.length; i++) {
if (lineElements[i].isEmpty()) {
lineElements[i] = "\"\"";
}
}
return String.join(String.valueOf(Constants.CSV_OUTPUTS_SEPARATOR),lineElements);
}

@Override
public void writeImportScripts(Map<String, MetadataModel> metadataModels, KraftwerkExecutionContext kraftwerkExecutionContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,20 @@ public static List<String> getColumnNames(Statement statement, String tableName,
return columnNames;
}

public static Map<String, String> getColumnTypes(Statement statement, String tableName) throws SQLException {
Map<String, String> columnTypes = new LinkedHashMap<>(); // conserve l'ordre des colonnes

try (ResultSet resultSet = statement.executeQuery(String.format("DESCRIBE \"%s\"", tableName))) {
while (resultSet.next()) {
String columnName = resultSet.getString("column_name");
String columnType = resultSet.getString("column_type").toUpperCase(); // ex: BOOLEAN, VARCHAR, INTEGER
columnTypes.put(columnName, columnType);
}
}

return columnTypes;
}

/**
* Connect to DuckDB and retrieve column names of a table
*
Expand Down
Loading