|
| 1 | +package com.clickhouse.examples.jdbc; |
| 2 | + |
| 3 | +import java.io.IOException; |
| 4 | +import java.util.UUID; |
| 5 | +import java.util.concurrent.CompletableFuture; |
| 6 | +import java.util.concurrent.ExecutionException; |
| 7 | + |
| 8 | +import com.clickhouse.client.ClickHouseClient; |
| 9 | +import com.clickhouse.client.ClickHouseConfig; |
| 10 | +import com.clickhouse.client.ClickHouseCredentials; |
| 11 | +import com.clickhouse.client.ClickHouseException; |
| 12 | +import com.clickhouse.client.ClickHouseFormat; |
| 13 | +import com.clickhouse.client.ClickHouseNode; |
| 14 | +import com.clickhouse.client.ClickHouseProtocol; |
| 15 | +import com.clickhouse.client.ClickHouseRecord; |
| 16 | +import com.clickhouse.client.ClickHouseRequest; |
| 17 | +import com.clickhouse.client.ClickHouseResponse; |
| 18 | +import com.clickhouse.client.ClickHouseResponseSummary; |
| 19 | +import com.clickhouse.client.data.BinaryStreamUtils; |
| 20 | +import com.clickhouse.client.data.ClickHousePipedStream; |
| 21 | + |
| 22 | +public class Main { |
| 23 | + static void dropAndCreateTable(ClickHouseNode server, String table) throws ClickHouseException { |
| 24 | + try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol())) { |
| 25 | + ClickHouseRequest<?> request = client.connect(server); |
| 26 | + // or use future chaining |
| 27 | + request.query("drop table if exists " + table).execute().get(); |
| 28 | + request.query("create table " + table + "(a String, b Nullable(String)) engine=MergeTree() order by a") |
| 29 | + .execute().get(); |
| 30 | + } catch (InterruptedException e) { |
| 31 | + Thread.currentThread().interrupt(); |
| 32 | + throw ClickHouseException.forCancellation(e, server); |
| 33 | + } catch (ExecutionException e) { |
| 34 | + throw ClickHouseException.of(e, server); |
| 35 | + } |
| 36 | + } |
| 37 | + |
| 38 | + static long insert(ClickHouseNode server, String table) throws ClickHouseException { |
| 39 | + try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol())) { |
| 40 | + ClickHouseRequest.Mutation request = client.connect(server).write().table(table) |
| 41 | + .format(ClickHouseFormat.RowBinary); |
| 42 | + ClickHouseConfig config = request.getConfig(); |
| 43 | + CompletableFuture<ClickHouseResponse> future; |
| 44 | + // back-pressuring is not supported, you can adjust the first two arguments |
| 45 | + try (ClickHousePipedStream stream = new ClickHousePipedStream(config.getMaxBufferSize(), |
| 46 | + config.getMaxQueuedBuffers(), config.getSocketTimeout())) { |
| 47 | + // in async mode, which is default, execution happens in a worker thread |
| 48 | + future = request.data(stream.getInput()).execute(); |
| 49 | + |
| 50 | + // writing happens in main thread |
| 51 | + for (int i = 0; i < 1000000; i++) { |
| 52 | + BinaryStreamUtils.writeString(stream, String.valueOf(i % 16)); |
| 53 | + BinaryStreamUtils.writeNonNull(stream); |
| 54 | + BinaryStreamUtils.writeString(stream, UUID.randomUUID().toString()); |
| 55 | + } |
| 56 | + } |
| 57 | + |
| 58 | + // response should be always closed |
| 59 | + try (ClickHouseResponse response = future.get()) { |
| 60 | + ClickHouseResponseSummary summary = response.getSummary(); |
| 61 | + return summary.getWrittenRows(); |
| 62 | + } |
| 63 | + } catch (InterruptedException e) { |
| 64 | + Thread.currentThread().interrupt(); |
| 65 | + throw ClickHouseException.forCancellation(e, server); |
| 66 | + } catch (ExecutionException | IOException e) { |
| 67 | + throw ClickHouseException.of(e, server); |
| 68 | + } |
| 69 | + } |
| 70 | + |
| 71 | + static int query(ClickHouseNode server, String table) throws ClickHouseException { |
| 72 | + try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol()); |
| 73 | + ClickHouseResponse response = client.connect(server).query("select * from " + table).execute().get()) { |
| 74 | + int count = 0; |
| 75 | + // or use stream API via response.stream() |
| 76 | + for (ClickHouseRecord rec : response.records()) { |
| 77 | + count++; |
| 78 | + } |
| 79 | + return count; |
| 80 | + } catch (InterruptedException e) { |
| 81 | + Thread.currentThread().interrupt(); |
| 82 | + throw ClickHouseException.forCancellation(e, server); |
| 83 | + } catch (ExecutionException e) { |
| 84 | + throw ClickHouseException.of(e, server); |
| 85 | + } |
| 86 | + } |
| 87 | + |
| 88 | + public static void main(String[] args) { |
| 89 | + ClickHouseNode server = ClickHouseNode.builder() |
| 90 | + .host(System.getProperty("chHost", "192.168.3.16")) |
| 91 | + .port(ClickHouseProtocol.GRPC, Integer.parseInt(System.getProperty("chPort", "9100"))) |
| 92 | + .database("system").credentials(ClickHouseCredentials.fromUserAndPassword( |
| 93 | + System.getProperty("chUser", "default"), System.getProperty("chPassword", ""))) |
| 94 | + .build(); |
| 95 | + |
| 96 | + String table = "grpc_example_table"; |
| 97 | + |
| 98 | + try { |
| 99 | + dropAndCreateTable(server, table); |
| 100 | + |
| 101 | + insert(server, table); |
| 102 | + |
| 103 | + query(server, table); |
| 104 | + } catch (ClickHouseException e) { |
| 105 | + e.printStackTrace(); |
| 106 | + } |
| 107 | + } |
| 108 | +} |
0 commit comments