Skip to content

Commit 5972efb

Browse files
committed
ISSUE-680 # Add database CSV loader
1 parent 1d2edb3 commit 5972efb

File tree

12 files changed

+520
-14
lines changed

12 files changed

+520
-14
lines changed
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package org.jsmart.zerocode.core.db;
2+
3+
import java.sql.Connection;
4+
import java.sql.SQLException;
5+
import java.util.ArrayList;
6+
import java.util.Arrays;
7+
import java.util.List;
8+
import java.util.stream.Collectors;
9+
import java.util.stream.IntStream;
10+
11+
import org.apache.commons.dbutils.QueryRunner;
12+
import org.apache.commons.lang3.StringUtils;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
16+
import com.univocity.parsers.csv.CsvParser;
17+
18+
class DbCsvLoader {
19+
private static final Logger LOGGER = LoggerFactory.getLogger(DbCsvLoader.class);
20+
private Connection conn;
21+
private CsvParser csvParser;
22+
23+
public DbCsvLoader(Connection conn, CsvParser csvParser) {
24+
this.conn = conn;
25+
this.csvParser = csvParser;
26+
}
27+
28+
/**
29+
* Loads rows in csv format (csvLines) into a table in the database
30+
* and returns the total number of rows
31+
*/
32+
int loadCsv(String table, List<String> csvLines, boolean withHeaders, String nullString) throws SQLException {
33+
if (csvLines == null || csvLines.isEmpty())
34+
return 0;
35+
36+
List<String[]> lines = parseLines(table, csvLines);
37+
38+
String[] headers = buildHeaders(lines.get(0), withHeaders);
39+
List<Object[]> paramset = buildParameters(table, headers, lines, withHeaders, nullString);
40+
if (paramset.isEmpty()) // can have headers, but no rows
41+
return 0;
42+
43+
String sql = buildSql(table, headers, paramset.get(0).length);
44+
LOGGER.info("Loading CSV using this sql: {}", sql);
45+
46+
QueryRunner runner = new QueryRunner();
47+
int insertCount = 0;
48+
for (int i = 0 ; i < paramset.size(); i++) {
49+
insertRow(runner, i, sql, paramset.get(i));
50+
insertCount++;
51+
}
52+
LOGGER.info("Total of rows inserted: {}", insertCount);
53+
return insertCount;
54+
}
55+
56+
private List<String[]> parseLines(String table, List<String> lines) {
57+
int numCol = 0; // will check that every row has same columns than the first
58+
List<String[]> parsedLines = new ArrayList<>();
59+
for (int i = 0; i<lines.size(); i++) {
60+
String[] parsedLine = csvParser.parseLine(lines.get(i));
61+
parsedLines.add(parsedLine);
62+
if (i == 0) {
63+
numCol=parsedLine.length;
64+
} else if (numCol != parsedLine.length) {
65+
String message = String.format("Error parsing CSV content to load into table %s: "
66+
+ "Row %d has %d columns and should have %d", table, i + 1, parsedLine.length, numCol);
67+
LOGGER.error(message);
68+
throw new RuntimeException(message);
69+
}
70+
}
71+
return parsedLines;
72+
}
73+
74+
private String[] buildHeaders(String[] line, boolean withHeaders) {
75+
return withHeaders ? line : new String[] {};
76+
}
77+
78+
private List<Object[]> buildParameters(String table, String[] headers, List<String[]> lines, boolean withHeaders, String nullString) {
79+
DbValueConverter converter = new DbValueConverter(conn, table);
80+
List<Object[]> paramset = new ArrayList<>();
81+
for (int i = withHeaders ? 1 : 0; i < lines.size(); i++) {
82+
String[] parsedLine = lines.get(i);
83+
parsedLine = processNulls(parsedLine, nullString);
84+
Object[] params;
85+
try {
86+
params = converter.convertColumnValues(headers, parsedLine);
87+
LOGGER.info(" row [{}] params: {}", i + 1, Arrays.asList(params).toString());
88+
} catch (Exception e) { // Not only SQLException as converter also does parsing
89+
String message = String.format("Error matching data type of parameters and table columns at CSV row %d", i + 1);
90+
LOGGER.error(message); // do not log the exception because it will be logged by the parent executor (DbCsvLoader)
91+
LOGGER.error("Exception message: {}", e.getMessage());
92+
throw new RuntimeException(message, e);
93+
}
94+
paramset.add(params);
95+
}
96+
return paramset;
97+
}
98+
99+
private String[] processNulls(String[] line, String nullString) {
100+
for (int i = 0; i < line.length; i++) {
101+
if (StringUtils.isBlank(nullString) && StringUtils.isBlank(line[i])) {
102+
line[i] = null;
103+
} else if (!StringUtils.isBlank(nullString)) {
104+
if (StringUtils.isBlank(line[i])) // null must be empty string
105+
line[i] = "";
106+
else if (nullString.trim().equalsIgnoreCase(line[i].trim()))
107+
line[i] = null;
108+
}
109+
}
110+
return line;
111+
}
112+
113+
private String buildSql(String table, String[] headers, int columnCount) {
114+
String placeholders = IntStream.range(0, columnCount)
115+
.mapToObj(i -> "?").collect(Collectors.joining(","));
116+
return "INSERT INTO " + table
117+
+ (headers.length > 0 ? " (" + String.join(",", headers) + ")" : "")
118+
+ " VALUES (" + placeholders + ");";
119+
}
120+
121+
private void insertRow(QueryRunner runner, int rowId, String sql, Object[] params) {
122+
try {
123+
runner.update(conn, sql, params);
124+
} catch (SQLException e) {
125+
String message = String.format("Error inserting data at CSV row %d", rowId + 1);
126+
LOGGER.error(message); // do not log the exception because it will be logged by the parent executor (DbCsvLoader)
127+
LOGGER.error("Exception message: {}", e.getMessage());
128+
throw new RuntimeException(message, e);
129+
}
130+
}
131+
132+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package org.jsmart.zerocode.core.db;
2+
3+
import com.fasterxml.jackson.annotation.JsonProperty;
4+
import com.fasterxml.jackson.core.type.TypeReference;
5+
import com.fasterxml.jackson.databind.JsonNode;
6+
import com.fasterxml.jackson.databind.ObjectMapper;
7+
import com.fasterxml.jackson.databind.ObjectReader;
8+
import org.apache.commons.lang3.StringUtils;
9+
10+
import java.io.IOException;
11+
import java.nio.file.Files;
12+
import java.nio.file.Path;
13+
import java.nio.file.Paths;
14+
import java.util.Collections;
15+
import java.util.List;
16+
import java.util.Optional;
17+
import java.util.stream.Collectors;
18+
19+
public class DbCsvRequest {
20+
private final String tableName;
21+
private final List<String> csvSource;
22+
private final Boolean withHeaders;
23+
private final String nullString;
24+
25+
public DbCsvRequest(
26+
@JsonProperty(value="tableName", required=true) String tableName,
27+
@JsonProperty("csvSource") JsonNode csvSourceJsonNode,
28+
@JsonProperty("withHeaders") Boolean withHeaders,
29+
@JsonProperty("nullString") String nullString) {
30+
this.tableName = tableName;
31+
this.withHeaders = Optional.ofNullable(withHeaders).orElse(false);
32+
this.nullString = Optional.ofNullable(nullString).orElse("");
33+
this.csvSource = Optional.ofNullable(csvSourceJsonNode).map(this::getCsvSourceFrom).orElse(Collections.emptyList());
34+
}
35+
36+
public String getTableName() {
37+
return tableName;
38+
}
39+
40+
public List<String> getCsvSource() {
41+
return csvSource;
42+
}
43+
44+
public boolean getWithHeaders() {
45+
return withHeaders;
46+
}
47+
48+
public String getNullString() {
49+
return nullString;
50+
}
51+
52+
// Code below is duplicated from org.jsmart.zerocode.core.domain.Parametrized.java and not included in tests.
53+
// TODO Consider some refactoring later (to SmartUtils?) and review error message when file not found
54+
55+
private List<String> getCsvSourceFrom(JsonNode csvSourceJsonNode) {
56+
try {
57+
if (csvSourceJsonNode.isArray()) {
58+
return readCsvSourceFromJson(csvSourceJsonNode);
59+
60+
} else {
61+
return readCsvSourceFromExternalCsvFile(csvSourceJsonNode);
62+
}
63+
} catch (IOException e) {
64+
throw new RuntimeException("Error deserializing csvSource", e);
65+
}
66+
}
67+
68+
private List<String> readCsvSourceFromJson(JsonNode csvSourceJsonNode) throws IOException {
69+
ObjectMapper mapper = new ObjectMapper();
70+
ObjectReader reader = mapper.readerFor(new TypeReference<List<String>>() {
71+
});
72+
return reader.readValue(csvSourceJsonNode);
73+
}
74+
75+
private List<String> readCsvSourceFromExternalCsvFile(JsonNode csvSourceJsonNode) throws IOException {
76+
String csvSourceFilePath = csvSourceJsonNode.textValue();
77+
if (StringUtils.isNotBlank(csvSourceFilePath)) {
78+
Path path = Paths.get("./src/test/resources/",csvSourceFilePath);
79+
List<String> csvSourceFileLines = Files.lines(path)
80+
.filter(StringUtils::isNotBlank)
81+
.collect(Collectors.toList());
82+
//if (this.ignoreHeader) {
83+
// return csvSourceFileLines.stream()
84+
// .skip(1)
85+
// .collect(Collectors.toList());
86+
//}
87+
return csvSourceFileLines;
88+
}
89+
return Collections.emptyList();
90+
}
91+
92+
@Override
93+
public String toString() {
94+
return "Parameterized{" +
95+
"tableName=" + tableName +
96+
", csvSource=" + csvSource +
97+
", withHeaders=" + withHeaders +
98+
", nullString=" + nullString +
99+
'}';
100+
}
101+
}

core/src/main/java/org/jsmart/zerocode/core/db/DbSqlExecutor.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.fasterxml.jackson.databind.ObjectMapper;
44
import com.google.inject.Inject;
55
import com.google.inject.name.Named;
6+
import com.univocity.parsers.csv.CsvParser;
67

78
import org.apache.commons.dbutils.DbUtils;
89
import org.slf4j.Logger;
@@ -22,6 +23,7 @@
2223
public class DbSqlExecutor {
2324
private static final Logger LOGGER = LoggerFactory.getLogger(DbSqlExecutor.class);
2425
public static final String SQL_RESULTS_KEY = "rows";
26+
public static final String CSV_RESULTS_KEY = "size";
2527

2628
@Inject
2729
@Named("db.driver.url") private String url;
@@ -32,6 +34,35 @@ public class DbSqlExecutor {
3234
@Inject(optional = true)
3335
@Named("db.driver.password") private String password;
3436

37+
@Inject
38+
private CsvParser csvParser;
39+
40+
/**
41+
* The LOADCSV operation inserts the content of a CSV file into a table,
42+
* and returns the number of records inserted under the key "size"
43+
*/
44+
public Map<String, Object> LOADCSV(DbCsvRequest request) { // uppercase for consistency with http api operations
45+
return loadcsv(request);
46+
}
47+
48+
public Map<String, Object> loadcsv(DbCsvRequest request) {
49+
Connection conn = createAndGetConnection();
50+
try {
51+
LOGGER.info("Load CSV, request -> {} ", request);
52+
DbCsvLoader runner = new DbCsvLoader(conn, csvParser);
53+
long result = runner.loadCsv(request.getTableName(), request.getCsvSource(),
54+
request.getWithHeaders(), request.getNullString());
55+
Map<String, Object> response = new HashMap<>();
56+
response.put(CSV_RESULTS_KEY, result);
57+
return response;
58+
} catch (Exception e) {
59+
LOGGER.error("Failed to load CSV", e);
60+
throw new RuntimeException(e);
61+
} finally {
62+
closeConnection(conn);
63+
}
64+
}
65+
3566
/**
3667
* The EXECUTE operation returns the records retrieved by the SQL specified in the request
3768
* under the key "rows" (select) or an empty object (insert, update)

0 commit comments

Comments
 (0)