Skip to content

Commit 09ab375

Browse files
authored
Merge pull request #562 from den-crane/feature/send-compressed-stream
ability to send compressed files/streams
2 parents df180d9 + 1107226 commit 09ab375

File tree

5 files changed

+182
-19
lines changed

5 files changed

+182
-19
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ sth
3535
.write() // Write API entrypoint
3636
.table("default.my_table") // where to write data
3737
.option("format_csv_delimiter", ";") // specific param
38-
.data(new File("/path/to/file.csv"), ClickHouseFormat.CSV) // specify input
38+
.data(new File("/path/to/file.csv.gz"), ClickHouseFormat.CSV, ClickHouseCompression.gzip) // specify input
3939
.send();
4040
```
4141
#### Configurable send
@@ -46,6 +46,7 @@ sth
4646
.write()
4747
.sql("INSERT INTO default.my_table (a,b,c)")
4848
.data(new MyCustomInputStream(), ClickHouseFormat.JSONEachRow)
49+
.dataCompression(ClickHouseCompression.brotli)
4950
.addDbParam(ClickHouseQueryParam.MAX_PARALLEL_REPLICAS, 2)
5051
.send();
5152
```

src/main/java/ru/yandex/clickhouse/ClickHouseStatementImpl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -892,6 +892,9 @@ void sendStream(Writer writer, HttpEntity content) throws ClickHouseException {
892892

893893
HttpPost httpPost = new HttpPost(uri);
894894

895+
if (writer.getCompression() != null) {
896+
httpPost.addHeader("Content-Encoding", writer.getCompression().name());
897+
}
895898
httpPost.setEntity(content);
896899
HttpResponse response = client.execute(httpPost);
897900
entity = response.getEntity();

src/main/java/ru/yandex/clickhouse/Writer.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import org.apache.http.HttpEntity;
44
import org.apache.http.entity.InputStreamEntity;
5+
import ru.yandex.clickhouse.domain.ClickHouseCompression;
56
import ru.yandex.clickhouse.domain.ClickHouseFormat;
67
import ru.yandex.clickhouse.util.ClickHouseStreamCallback;
78
import ru.yandex.clickhouse.util.ClickHouseStreamHttpEntity;
@@ -17,7 +18,7 @@
1718
public class Writer extends ConfigurableApi<Writer> {
1819

1920
private ClickHouseFormat format = TabSeparated;
20-
21+
private ClickHouseCompression compression = null;
2122
private String table = null;
2223
private String sql = null;
2324
private InputStreamProvider streamProvider = null;
@@ -81,6 +82,22 @@ public Writer data(File input) {
8182
return this;
8283
}
8384

85+
public Writer data(InputStream stream, ClickHouseFormat format, ClickHouseCompression compression) {
86+
return dataCompression(compression).format(format).data(stream);
87+
}
88+
89+
public Writer data(File input, ClickHouseFormat format, ClickHouseCompression compression) {
90+
return dataCompression(compression).format(format).data(input);
91+
}
92+
93+
public Writer dataCompression(ClickHouseCompression compression) {
94+
if (null == compression) {
95+
throw new NullPointerException("Compression can not be null");
96+
}
97+
this.compression = compression;
98+
return this;
99+
}
100+
84101
public Writer data(File input, ClickHouseFormat format) {
85102
return format(format).data(input);
86103
}
@@ -184,4 +201,8 @@ public InputStream get() throws IOException {
184201
return stream;
185202
}
186203
}
204+
205+
public ClickHouseCompression getCompression() {
206+
return compression;
207+
}
187208
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package ru.yandex.clickhouse.domain;
2+
3+
public enum ClickHouseCompression {
4+
none,
5+
gzip,
6+
brotli,
7+
deflate,
8+
zstd;
9+
}

src/test/java/ru/yandex/clickhouse/integration/StreamSQLTest.java

Lines changed: 146 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,14 @@
66
import ru.yandex.clickhouse.ClickHouseConnection;
77
import ru.yandex.clickhouse.ClickHouseContainerForTest;
88
import ru.yandex.clickhouse.ClickHouseDataSource;
9-
9+
import ru.yandex.clickhouse.domain.ClickHouseCompression;
10+
import ru.yandex.clickhouse.domain.ClickHouseFormat;
11+
import ru.yandex.clickhouse.settings.ClickHouseProperties;
1012
import java.io.*;
1113
import java.nio.charset.Charset;
1214
import java.sql.ResultSet;
1315
import java.sql.SQLException;
16+
import java.util.zip.GZIPOutputStream;
1417

1518
public class StreamSQLTest {
1619
private ClickHouseDataSource dataSource;
@@ -33,7 +36,11 @@ public void simpleCSVInsert() throws SQLException {
3336
String string = "5,6\n1,6";
3437
InputStream inputStream = new ByteArrayInputStream(string.getBytes(Charset.forName("UTF-8")));
3538

36-
connection.createStatement().sendStreamSQL(inputStream, "insert into test.csv_stream_sql format CSV");
39+
connection.createStatement().
40+
write()
41+
.sql("insert into test.csv_stream_sql format CSV")
42+
.data(inputStream)
43+
.send();
3744

3845
ResultSet rs = connection.createStatement().executeQuery(
3946
"SELECT count() AS cnt, sum(value) AS sum, uniqExact(string_value) uniq FROM test.csv_stream_sql");
@@ -43,31 +50,21 @@ public void simpleCSVInsert() throws SQLException {
4350
Assert.assertEquals(rs.getLong("uniq"), 1);
4451
}
4552

46-
@Test
47-
public void multiRowTSVInsert() throws SQLException {
48-
connection.createStatement().execute("DROP TABLE IF EXISTS test.tsv_stream_sql");
49-
connection.createStatement().execute(
50-
"CREATE TABLE test.tsv_stream_sql (value Int32, string_value String) ENGINE = Log()"
51-
);
52-
53-
54-
final int rowsCount = 100000;
55-
56-
InputStream in = new InputStream() {
53+
private InputStream getTSVStream(final int rowsCount) {
54+
return new InputStream() {
5755
private int si = 0;
5856
private String s = "";
5957
private int i = 0;
60-
private final int count = rowsCount;
6158

6259
private boolean genNextString() {
63-
if (i >= count) return false;
60+
if (i >= rowsCount) return false;
6461
si = 0;
6562
s = String.format("%d\txxxx%d\n", 1, i);
6663
i++;
6764
return true;
6865
}
6966

70-
public int read() throws IOException {
67+
public int read() {
7168
if (si >= s.length()) {
7269
if ( ! genNextString() ) {
7370
return -1;
@@ -76,8 +73,22 @@ public int read() throws IOException {
7673
return s.charAt( si++ );
7774
}
7875
};
76+
}
77+
78+
@Test
79+
public void multiRowTSVInsert() throws SQLException {
80+
connection.createStatement().execute("DROP TABLE IF EXISTS test.tsv_stream_sql");
81+
connection.createStatement().execute(
82+
"CREATE TABLE test.tsv_stream_sql (value Int32, string_value String) ENGINE = Log()"
83+
);
84+
85+
final int rowsCount = 100000;
7986

80-
connection.createStatement().sendStreamSQL(in, "insert into test.tsv_stream_sql format TSV");
87+
connection.createStatement().
88+
write()
89+
.sql("insert into test.tsv_stream_sql format TSV")
90+
.data(getTSVStream(rowsCount), ClickHouseFormat.TSV)
91+
.send();
8192

8293
ResultSet rs = connection.createStatement().executeQuery(
8394
"SELECT count() AS cnt, sum(value) AS sum, uniqExact(string_value) uniq FROM test.tsv_stream_sql");
@@ -87,4 +98,122 @@ public int read() throws IOException {
8798
Assert.assertEquals(rs.getInt("uniq"), rowsCount);
8899
}
89100

101+
private InputStream gzStream( InputStream is ) throws IOException
102+
{
103+
final int bufferSize = 16384;
104+
byte data[] = new byte[bufferSize];
105+
ByteArrayOutputStream os = new ByteArrayOutputStream();
106+
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(os);
107+
BufferedInputStream es = new BufferedInputStream(is, bufferSize);
108+
int count;
109+
while ( ( count = es.read( data, 0, bufferSize) ) != -1 )
110+
gzipOutputStream.write( data, 0, count );
111+
es.close();
112+
gzipOutputStream.close();
113+
114+
return new ByteArrayInputStream( os.toByteArray() );
115+
}
116+
117+
@Test
118+
public void multiRowTSVInsertCompressed() throws SQLException, IOException {
119+
connection.createStatement().execute("DROP TABLE IF EXISTS test.tsv_compressed_stream_sql");
120+
connection.createStatement().execute(
121+
"CREATE TABLE test.tsv_compressed_stream_sql (value Int32, string_value String) ENGINE = Log()"
122+
);
123+
124+
final int rowsCount = 100000;
125+
126+
InputStream gz = gzStream(getTSVStream(rowsCount));
127+
connection.createStatement().
128+
write()
129+
.sql("insert into test.tsv_compressed_stream_sql format TSV")
130+
.data(gz, ClickHouseFormat.TSV, ClickHouseCompression.gzip)
131+
.send();
132+
133+
ResultSet rs = connection.createStatement().executeQuery(
134+
"SELECT count() AS cnt, sum(value) AS sum, uniqExact(string_value) uniq FROM test.tsv_compressed_stream_sql");
135+
Assert.assertTrue(rs.next());
136+
Assert.assertEquals(rs.getInt("cnt"), rowsCount);
137+
Assert.assertEquals(rs.getInt("sum"), rowsCount);
138+
Assert.assertEquals(rs.getInt("uniq"), rowsCount);
139+
}
140+
141+
142+
@Test
143+
public void JSONEachRowInsert() throws SQLException {
144+
connection.createStatement().execute("DROP TABLE IF EXISTS test.json_stream_sql");
145+
connection.createStatement().execute(
146+
"CREATE TABLE test.json_stream_sql (value Int32, string_value String) ENGINE = Log()"
147+
);
148+
149+
String string = "{\"value\":5,\"string_value\":\"6\"}\n{\"value\":1,\"string_value\":\"6\"}";
150+
InputStream inputStream = new ByteArrayInputStream(string.getBytes(Charset.forName("UTF-8")));
151+
152+
connection.createStatement().
153+
write()
154+
.sql("insert into test.json_stream_sql")
155+
.data(inputStream, ClickHouseFormat.JSONEachRow)
156+
.data(inputStream)
157+
.send();
158+
159+
ResultSet rs = connection.createStatement().executeQuery(
160+
"SELECT count() AS cnt, sum(value) AS sum, uniqExact(string_value) uniq FROM test.json_stream_sql");
161+
Assert.assertTrue(rs.next());
162+
Assert.assertEquals(rs.getInt("cnt"), 2);
163+
Assert.assertEquals(rs.getLong("sum"), 6);
164+
Assert.assertEquals(rs.getLong("uniq"), 1);
165+
}
166+
167+
@Test
168+
public void JSONEachRowCompressedInsert() throws SQLException, IOException {
169+
connection.createStatement().execute("DROP TABLE IF EXISTS test.json_comressed_stream_sql");
170+
connection.createStatement().execute(
171+
"CREATE TABLE test.json_comressed_stream_sql (value Int32, string_value String) ENGINE = Log()"
172+
);
173+
174+
String string = "{\"value\":5,\"string_value\":\"6\"}\n{\"value\":1,\"string_value\":\"6\"}";
175+
InputStream inputStream = new ByteArrayInputStream(string.getBytes(Charset.forName("UTF-8")));
176+
177+
connection.createStatement().
178+
write()
179+
.sql("insert into test.json_comressed_stream_sql")
180+
.data(inputStream, ClickHouseFormat.JSONEachRow)
181+
.data(gzStream(inputStream))
182+
.dataCompression(ClickHouseCompression.gzip)
183+
.send();
184+
185+
ResultSet rs = connection.createStatement().executeQuery(
186+
"SELECT count() AS cnt, sum(value) AS sum, uniqExact(string_value) uniq FROM test.json_comressed_stream_sql");
187+
Assert.assertTrue(rs.next());
188+
Assert.assertEquals(rs.getInt("cnt"), 2);
189+
Assert.assertEquals(rs.getLong("sum"), 6);
190+
Assert.assertEquals(rs.getLong("uniq"), 1);
191+
}
192+
193+
@Test
194+
public void CSVInsertCompressedIntoTable() throws SQLException, IOException {
195+
connection.createStatement().execute("DROP TABLE IF EXISTS test.csv_stream_compressed");
196+
connection.createStatement().execute(
197+
"CREATE TABLE test.csv_stream_compressed (value Int32, string_value String) ENGINE = Log()"
198+
);
199+
200+
String string = "5,6\n1,6";
201+
InputStream inputStream = new ByteArrayInputStream(string.getBytes(Charset.forName("UTF-8")));
202+
203+
connection.createStatement().
204+
write()
205+
.table("test.csv_stream_compressed")
206+
.format(ClickHouseFormat.CSV)
207+
.dataCompression(ClickHouseCompression.gzip)
208+
.data(gzStream(inputStream))
209+
.send();
210+
211+
ResultSet rs = connection.createStatement().executeQuery(
212+
"SELECT count() AS cnt, sum(value) AS sum, uniqExact(string_value) uniq FROM test.csv_stream_compressed");
213+
Assert.assertTrue(rs.next());
214+
Assert.assertEquals(rs.getInt("cnt"), 2);
215+
Assert.assertEquals(rs.getLong("sum"), 6);
216+
Assert.assertEquals(rs.getLong("uniq"), 1);
217+
}
218+
90219
}

0 commit comments

Comments
 (0)