|
20 | 20 | import com.clickhouse.client.api.query.QuerySettings; |
21 | 21 | import com.clickhouse.data.ClickHouseFormat; |
22 | 22 | import com.clickhouse.data.ClickHouseVersion; |
| 23 | +import com.clickhouse.data.stream.ByteArrayQueueInputStream; |
23 | 24 | import org.apache.commons.lang3.StringEscapeUtils; |
24 | 25 | import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils; |
25 | 26 | import org.testng.Assert; |
|
37 | 38 | import java.util.ArrayList; |
38 | 39 | import java.util.Arrays; |
39 | 40 | import java.util.Collections; |
| 41 | +import java.util.LinkedList; |
40 | 42 | import java.util.List; |
| 43 | +import java.util.Queue; |
41 | 44 | import java.util.UUID; |
42 | 45 | import java.util.concurrent.TimeUnit; |
43 | 46 |
|
@@ -66,13 +69,17 @@ public InsertTests(boolean useClientCompression, boolean useHttpCompression) { |
66 | 69 | @BeforeMethod(groups = { "integration" }) |
67 | 70 | public void setUp() throws IOException { |
68 | 71 | ClickHouseNode node = getServer(ClickHouseProtocol.HTTP); |
| 72 | + int bufferSize = (7 * 65500); |
| 73 | + |
69 | 74 | client = new Client.Builder() |
70 | 75 | .addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), false) |
71 | 76 | .setUsername("default") |
72 | 77 | .setPassword("") |
73 | | - .useNewImplementation(System.getProperty("client.tests.useNewImplementation", "true").equals("true")) |
74 | 78 | .compressClientRequest(useClientCompression) |
75 | 79 | .useHttpCompression(useHttpCompression) |
| 80 | + .setSocketSndbuf(bufferSize) |
| 81 | + .setSocketRcvbuf(bufferSize) |
| 82 | + .setClientNetworkBufferSize(bufferSize) |
76 | 83 | .build(); |
77 | 84 | settings = new InsertSettings() |
78 | 85 | .setDeduplicationToken(RandomStringUtils.randomAlphabetic(36)) |
@@ -227,6 +234,37 @@ public void insertRawData() throws Exception { |
227 | 234 | } |
228 | 235 |
|
229 | 236 |
|
| 237 | + @Test(groups = { "integration" }, enabled = true) |
| 238 | + public void insertRawDataQueued() throws Exception { |
| 239 | + final String tableName = "raw_data_table"; |
| 240 | + final String createSQL = "CREATE TABLE " + tableName + |
| 241 | + " (Id UInt32, event_ts Timestamp, name String, p1 Int64, p2 String) ENGINE = MergeTree() ORDER BY ()"; |
| 242 | + |
| 243 | + initTable(tableName, createSQL); |
| 244 | + settings.setInputStreamCopyBufferSize(8198 * 2); |
| 245 | + settings.compressClientRequest(true); |
| 246 | + Queue<byte[]> queue = new LinkedList<>(); |
| 247 | + ByteArrayQueueInputStream qIn = new ByteArrayQueueInputStream(queue, null); |
| 248 | + for (int i = 0; i < 10; i++) { |
| 249 | + if (i > 2 && i < 5) { |
| 250 | + queue.add(new byte[0]); |
| 251 | + } else { |
| 252 | + queue.add(String.format("{ \"Id\": %d, \"events_ts\": \"%s\", \"name\": \"%s\", \"p1\": \"%d\", \"p2\": \"%s\"}\n", i, "2021-01-01 00:00:00", "name" + i, i, "p2").getBytes()); |
| 253 | + } |
| 254 | + } |
| 255 | + InsertResponse response = client.insert(tableName, qIn, |
| 256 | + ClickHouseFormat.JSONEachRow, settings).get(30, TimeUnit.SECONDS); |
| 257 | + |
| 258 | + assertEquals((int)response.getWrittenRows(), 10 ); |
| 259 | + |
| 260 | + List<GenericRecord> records = client.queryAll("SELECT * FROM " + tableName); |
| 261 | + assertEquals(records.size(), 10); |
| 262 | + for (GenericRecord record : records) { |
| 263 | + System.out.println(record.getString(1) + " " +record.getString(2) + " " +record.getString(3) + " " +record.getString(4) + " " +record.getString(5) + " "); |
| 264 | + } |
| 265 | + } |
| 266 | + |
| 267 | + |
230 | 268 | @Test(groups = { "integration" }, enabled = true) |
231 | 269 | public void insertRawDataSimple() throws Exception { |
232 | 270 | insertRawDataSimple(1000); |
@@ -379,7 +417,7 @@ public static Object[] logCommentDataProvider() { |
379 | 417 | }; |
380 | 418 | } |
381 | 419 |
|
382 | | - private void initTable(String tableName, String createTableSQL) throws Exception { |
| 420 | + protected void initTable(String tableName, String createTableSQL) throws Exception { |
383 | 421 | client.execute("DROP TABLE IF EXISTS " + tableName).get(EXECUTE_CMD_TIMEOUT, TimeUnit.SECONDS); |
384 | 422 | client.execute(createTableSQL).get(EXECUTE_CMD_TIMEOUT, TimeUnit.SECONDS); |
385 | 423 | } |
|
0 commit comments