Skip to content

Commit b2fe6bb

Browse files
committed
Fixed some parsing issues. Implemented RowBinaryWriter for simple inserts under beta flag
1 parent b16f9c0 commit b2fe6bb

File tree

12 files changed

+516
-242
lines changed

12 files changed

+516
-242
lines changed

client-v2/src/main/java/com/clickhouse/client/api/data_formats/ClickHouseBinaryFormatWriter.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@
33
import com.clickhouse.data.ClickHouseFormat;
44

55
import java.io.IOException;
6+
import java.io.InputStream;
67
import java.io.OutputStream;
8+
import java.io.Reader;
79
import java.math.BigDecimal;
810
import java.math.BigInteger;
11+
import java.sql.Clob;
12+
import java.sql.NClob;
913
import java.time.LocalDate;
1014
import java.time.LocalDateTime;
1115
import java.time.ZonedDateTime;
@@ -33,6 +37,11 @@ public interface ClickHouseBinaryFormatWriter {
3337

3438
void setValue(int colIndex, Object value);
3539

40+
/**
41+
* Writer current row or block to the output stream.
42+
* Action is idempotent: if there are no new values set - this method has no effect.
43+
* @throws IOException if writing to an output stream causes an error
44+
*/
3645
void commitRow() throws IOException;
3746

3847
void setByte(String column, byte value);
@@ -91,4 +100,11 @@ public interface ClickHouseBinaryFormatWriter {
91100

92101
void setList(int colIndex, List<?> value);
93102

103+
void setInputStream(int colIndex, InputStream in, long len);
104+
105+
void setInputStream(String column, InputStream in, long len);
106+
107+
void setReader(int colIndex, Reader reader, long len);
108+
109+
void setReader(String column, Reader reader, long len);
94110
}

client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryFormatWriter.java

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
import com.clickhouse.data.ClickHouseFormat;
77

88
import java.io.IOException;
9+
import java.io.InputStream;
910
import java.io.OutputStream;
11+
import java.io.Reader;
1012
import java.math.BigDecimal;
1113
import java.math.BigInteger;
1214
import java.time.LocalDate;
@@ -36,6 +38,8 @@ public class RowBinaryFormatWriter implements ClickHouseBinaryFormatWriter {
3638

3739
private int rowCount = 0;
3840

41+
private boolean rowStarted = false; // indicates if at least one value was written to a row
42+
3943
public RowBinaryFormatWriter(OutputStream out, TableSchema tableSchema, ClickHouseFormat format) {
4044
if (format != ClickHouseFormat.RowBinary && format != ClickHouseFormat.RowBinaryWithDefaults) {
4145
throw new IllegalArgumentException("Only RowBinary and RowBinaryWithDefaults are supported");
@@ -65,6 +69,7 @@ public ClickHouseFormat getFormat() {
6569
@Override
6670
public void clearRow() {
6771
Arrays.fill(row, null);
72+
rowStarted = false;
6873
}
6974

7075
@Override
@@ -75,21 +80,27 @@ public void setValue(String column, Object value) {
7580
@Override
7681
public void setValue(int colIndex, Object value) {
7782
row[colIndex - 1] = value;
83+
if (!rowStarted) {
84+
rowStarted = true;
85+
}
7886
}
7987

8088
@Override
8189
public void commitRow() throws IOException {
82-
List<ClickHouseColumn> columnList = tableSchema.getColumns();
83-
for (int i = 0; i < row.length; i++) {
84-
ClickHouseColumn column = columnList.get(i);
85-
// here we skip if we have a default value that is MATERIALIZED or ALIAS or ...
86-
if (column.hasDefault() && column.getDefaultValue() != ClickHouseColumn.DefaultValue.DEFAULT)
87-
continue;
88-
if (RowBinaryFormatSerializer.writeValuePreamble(out, defaultSupport, column, row[i])) {
89-
SerializerUtils.serializeData(out, row[i], column);
90+
if (rowStarted) {
91+
List<ClickHouseColumn> columnList = tableSchema.getColumns();
92+
for (int i = 0; i < row.length; i++) {
93+
ClickHouseColumn column = columnList.get(i);
94+
// here we skip if we have a default value that is MATERIALIZED or ALIAS or ...
95+
if (column.hasDefault() && column.getDefaultValue() != ClickHouseColumn.DefaultValue.DEFAULT)
96+
continue;
97+
if (RowBinaryFormatSerializer.writeValuePreamble(out, defaultSupport, column, row[i])) {
98+
SerializerUtils.serializeData(out, row[i], column);
99+
}
90100
}
101+
clearRow();
102+
rowCount++;
91103
}
92-
rowCount++;
93104
}
94105

95106
@Override
@@ -231,4 +242,42 @@ public void setList(String column, List<?> value) {
231242
public void setList(int colIndex, List<?> value) {
232243
setValue(colIndex, value);
233244
}
245+
246+
@Override
247+
public void setInputStream(int colIndex, InputStream in, long len) {
248+
setValue(colIndex, new InputStreamHolder(in, len));
249+
}
250+
251+
@Override
252+
public void setInputStream(String column, InputStream in, long len) {
253+
setValue(column, new InputStreamHolder(in, len));
254+
}
255+
256+
@Override
257+
public void setReader(int colIndex, Reader reader, long len) {
258+
setValue(colIndex, new ReaderHolder(reader, len));
259+
}
260+
261+
@Override
262+
public void setReader(String column, Reader reader, long len) {
263+
setValue(column, new ReaderHolder(reader, len));
264+
}
265+
266+
private static class InputStreamHolder {
267+
final InputStream stream;
268+
final long length;
269+
InputStreamHolder(InputStream stream, long length) {
270+
this.stream = stream;
271+
this.length = length;
272+
}
273+
}
274+
275+
private static class ReaderHolder {
276+
final Reader read;
277+
final long length;
278+
ReaderHolder(Reader reader, long length) {
279+
this.read = reader;
280+
this.length = length;
281+
}
282+
}
234283
}

jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,18 @@
33
import com.clickhouse.client.api.Client;
44
import com.clickhouse.client.api.ClientConfigProperties;
55
import com.clickhouse.client.api.internal.ServerSettings;
6+
import com.clickhouse.client.api.metadata.TableSchema;
67
import com.clickhouse.client.api.query.GenericRecord;
78
import com.clickhouse.client.api.query.QuerySettings;
89
import com.clickhouse.data.ClickHouseDataType;
910
import com.clickhouse.jdbc.internal.ClientInfoProperties;
11+
import com.clickhouse.jdbc.internal.DriverProperties;
1012
import com.clickhouse.jdbc.internal.JdbcConfiguration;
1113
import com.clickhouse.jdbc.internal.ExceptionUtils;
1214
import com.clickhouse.jdbc.internal.JdbcUtils;
15+
import com.clickhouse.jdbc.internal.StatementParser;
1316
import com.clickhouse.jdbc.metadata.DatabaseMetaDataImpl;
17+
import com.google.common.collect.Table;
1418
import org.slf4j.Logger;
1519
import org.slf4j.LoggerFactory;
1620

@@ -361,13 +365,19 @@ public Statement createStatement(int resultSetType, int resultSetConcurrency, in
361365
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
362366
checkOpen();
363367

364-
StatementImpl.StatementType statementType = StatementImpl.parseStatementType(sql);
365-
if (statementType == StatementImpl.StatementType.INSERT) {
366-
if (!PreparedStatementImpl.FUNC_DETECT_REGEXP.matcher(sql).find()) {
367-
return new WriterStatementImpl(this, sql, statementType);
368+
StatementParser.ParsedStatement parsedStatement = StatementParser.parsePreparedStatement(sql);
369+
370+
if (config.isBetaFeatureEnabled(DriverProperties.BETA_ROW_BINARY_WRITER)) {
371+
if (parsedStatement.getType() == StatementParser.StatementType.INSERT) {
372+
if (!parsedStatement.hasColumnList() && !PreparedStatementImpl.FUNC_DETECT_REGEXP.matcher(sql).find()) {
373+
TableSchema tableSchema = client.getTableSchema(parsedStatement.getTableName(), schema);
374+
if (tableSchema.getColumns().size() == parsedStatement.getArgumentCount()) {
375+
return new WriterStatementImpl(this, sql, tableSchema, parsedStatement);
376+
}
377+
}
368378
}
369379
}
370-
return new PreparedStatementImpl(this, sql, statementType);
380+
return new PreparedStatementImpl(this, sql, parsedStatement);
371381
}
372382

373383
@Override

jdbc-v2/src/main/java/com/clickhouse/jdbc/PreparedStatementImpl.java

Lines changed: 9 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.clickhouse.data.Tuple;
55
import com.clickhouse.jdbc.internal.ExceptionUtils;
66
import com.clickhouse.jdbc.internal.JdbcUtils;
7+
import com.clickhouse.jdbc.internal.StatementParser;
78
import com.clickhouse.jdbc.metadata.ParameterMetaDataImpl;
89
import com.clickhouse.jdbc.metadata.ResultSetMetaDataImpl;
910
import org.slf4j.Logger;
@@ -67,7 +68,7 @@ public class PreparedStatementImpl extends StatementImpl implements PreparedStat
6768
private final String originalSql;
6869
private final String [] sqlSegments;
6970
private final Object [] parameters;
70-
private final StatementType statementType;
71+
private final StatementParser.StatementType statementType;
7172

7273
// insert
7374
private String [] valueSegments;
@@ -84,15 +85,15 @@ public class PreparedStatementImpl extends StatementImpl implements PreparedStat
8485

8586
private static final Pattern VALUES_PARAMETER_SPLIT = Pattern.compile("\\?(?=(?:[^']*'[^']*')*[^']*$)");
8687

87-
public PreparedStatementImpl(ConnectionImpl connection, String sql, StatementType statementType) throws SQLException {
88+
public PreparedStatementImpl(ConnectionImpl connection, String sql, StatementParser.ParsedStatement parsedStatement) throws SQLException {
8889
super(connection);
8990
this.isPoolable = true; // PreparedStatement is poolable by default
9091
this.originalSql = sql.trim();
9192
//Split the sql string into an array of strings around question mark tokens
92-
this.sqlSegments = splitStatement(originalSql);
93-
this.statementType = statementType;
93+
this.sqlSegments = parsedStatement.getSqlSegments();
94+
this.statementType = parsedStatement.getType();
9495

95-
if (this.statementType == StatementType.INSERT) {
96+
if (this.statementType == StatementParser.StatementType.INSERT) {
9697
insertIntoSQL = originalSql.substring(0, originalSql.indexOf("VALUES") + 6);
9798
valueSegments = originalSql.substring(originalSql.indexOf("VALUES") + 6).split("\\?");
9899
}
@@ -257,7 +258,7 @@ public boolean execute() throws SQLException {
257258
@Override
258259
public void addBatch() throws SQLException {
259260
checkClosed();
260-
if (statementType == StatementType.INSERT) {
261+
if (statementType == StatementParser.StatementType.INSERT) {
261262
// adding values to the end of big INSERT statement.
262263
addBatch(compileSql(valueSegments));
263264
} else {
@@ -268,7 +269,7 @@ public void addBatch() throws SQLException {
268269
@Override
269270
public int[] executeBatch() throws SQLException {
270271
checkClosed();
271-
if (statementType == StatementType.INSERT && !batch.isEmpty()) {
272+
if (statementType == StatementParser.StatementType.INSERT && !batch.isEmpty()) {
272273
// write insert into as batch to avoid multiple requests
273274
StringBuilder sb = new StringBuilder();
274275
sb.append(insertIntoSQL).append(" ");
@@ -343,7 +344,7 @@ public ResultSetMetaData getMetaData() throws SQLException {
343344

344345
if (resultSetMetaData == null && currentResultSet == null) {
345346
// before execution
346-
if (statementType == StatementType.SELECT) {
347+
if (statementType == StatementParser.StatementType.SELECT) {
347348
try {
348349
// Replace '?' with NULL to make SQL valid for DESCRIBE
349350
String sql = JdbcUtils.replaceQuestionMarks(originalSql, JdbcUtils.NULL);
@@ -689,74 +690,7 @@ private static String encodeObject(Object x) throws SQLException {
689690
}
690691
}
691692

692-
693693
private static String escapeString(String x) {
694694
return x.replace("\\", "\\\\").replace("'", "\\'");//Escape single quotes
695695
}
696-
697-
private static String [] splitStatement(String sql) {
698-
List<String> segments = new ArrayList<>();
699-
char[] chars = sql.toCharArray();
700-
int segmentStart = 0;
701-
for (int i = 0; i < chars.length; i++) {
702-
char c = chars[i];
703-
if (c == '\'' || c == '"' || c == '`') {
704-
// string literal or identifier
705-
i = skip(chars, i + 1, c, true);
706-
} else if (c == '/' && lookahead(chars, i) == '*') {
707-
// block comment
708-
int end = sql.indexOf("*/", i);
709-
if (end == -1) {
710-
// missing comment end
711-
break;
712-
}
713-
i = end + 1;
714-
} else if (c == '#' || (c == '-' && lookahead(chars, i) == '-')) {
715-
// line comment
716-
i = skip(chars, i + 1, '\n', false);
717-
} else if (c == '?') {
718-
// question mark
719-
segments.add(sql.substring(segmentStart, i));
720-
segmentStart = i + 1;
721-
}
722-
}
723-
if (segmentStart < chars.length) {
724-
segments.add(sql.substring(segmentStart));
725-
} else {
726-
// add empty segment in case question mark was last char of sql
727-
segments.add("");
728-
}
729-
return segments.toArray(new String[0]);
730-
}
731-
732-
private static int skip(char[] chars, int from, char until, boolean escape) {
733-
for (int i = from; i < chars.length; i++) {
734-
char curr = chars[i];
735-
if (escape) {
736-
char next = lookahead(chars, i);
737-
if ((curr == '\\' && (next == '\\' || next == until)) || (curr == until && next == until)) {
738-
// should skip:
739-
// 1) double \\ (backslash escaped with backslash)
740-
// 2) \[until] ([until] char, escaped with backslash)
741-
// 3) [until][until] ([until] char, escaped with [until])
742-
i++;
743-
continue;
744-
}
745-
}
746-
747-
if (curr == until) {
748-
return i;
749-
}
750-
}
751-
return chars.length;
752-
}
753-
754-
private static char lookahead(char[] chars, int pos) {
755-
pos = pos + 1;
756-
if (pos >= chars.length) {
757-
return '\0';
758-
}
759-
return chars[pos];
760-
}
761-
762696
}

0 commit comments

Comments
 (0)