22
33
44import com .clickhouse .client .api .Client ;
5+ import com .clickhouse .client .api .data_formats .ClickHouseBinaryFormatReader ;
56import com .clickhouse .client .api .metrics .ClientMetrics ;
6- import com .clickhouse .client .api .query .GenericRecord ;
7- import com .clickhouse .client .api .query .Records ;
7+ import com .clickhouse .client .api .query .QueryResponse ;
8+ import com .clickhouse .client .api .query .QuerySettings ;
9+ import com .clickhouse .data .ClickHouseFormat ;
810import com .clickhouse .demo_service .data .VirtualDatasetRecord ;
11+ import com .fasterxml .jackson .core .JsonGenerator ;
12+ import com .fasterxml .jackson .databind .MappingIterator ;
13+ import com .fasterxml .jackson .databind .json .JsonMapper ;
14+ import com .fasterxml .jackson .databind .node .ObjectNode ;
15+ import jakarta .servlet .http .HttpServletResponse ;
916import lombok .extern .java .Log ;
10- import org .springframework .beans .factory .annotation .Autowired ;
1117import org .springframework .web .bind .annotation .GetMapping ;
1218import org .springframework .web .bind .annotation .RequestMapping ;
1319import org .springframework .web .bind .annotation .RequestParam ;
20+ import org .springframework .web .bind .annotation .ResponseBody ;
1421import org .springframework .web .bind .annotation .RestController ;
1522
1623import java .util .ArrayList ;
@@ -34,6 +41,10 @@ public DatasetController(Client chDirectClient) {
3441 this .chDirectClient = chDirectClient ;
3542 }
3643
44+ /**
45+ * Makes query to a {@code system.numbers} that can be used to generate a virtual dataset.
46+ * Size of the dataset is limited by the {@code limit} parameter.
47+ */
3748 private static final String DATASET_QUERY =
3849 "SELECT generateUUIDv4() as id, " +
3950 "toUInt32(number) as p1, " +
@@ -42,31 +53,89 @@ public DatasetController(Client chDirectClient) {
4253 "toFloat64(number/100000) as p3" +
4354 " FROM system.numbers" ;
4455
45-
56+ /**
57+ * Common approach to fetch data from ClickHouse using client v2.
58+ *
59+ * @param limit
60+ * @return
61+ */
4662 @ GetMapping ("/direct/dataset/0" )
4763 public List <VirtualDatasetRecord > directDatasetFetch (@ RequestParam (name = "limit" , required = false ) Integer limit ) {
4864 limit = limit == null ? 100 : limit ;
4965
5066 final String query = DATASET_QUERY + " LIMIT " + limit ;
51- try (Records records = chDirectClient .queryRecords (query ).get (3000 , TimeUnit .MILLISECONDS )) {
67+ try (QueryResponse response = chDirectClient .query (query ).get (3000 , TimeUnit .MILLISECONDS )) {
5268 ArrayList <VirtualDatasetRecord > result = new ArrayList <>();
5369
70+ // iterable approach is more efficient for large datasets because it doesn't load all records into memory
71+ ClickHouseBinaryFormatReader reader = Client .newBinaryFormatReader (response );
72+
5473 long start = System .nanoTime ();
55- for ( GenericRecord record : records ) {
74+ while ( reader . next () != null ) {
5675 result .add (new VirtualDatasetRecord (
57- record .getUUID ("id" ),
58- record .getLong ("p1" ),
59- record .getBigInteger ("number" ),
60- record .getFloat ("p2" ),
61- record .getDouble ("p3" )
76+ reader .getUUID ("id" ),
77+ reader .getLong ("p1" ),
78+ reader .getBigInteger ("number" ),
79+ reader .getFloat ("p2" ),
80+ reader .getDouble ("p3" )
6281 ));
6382 }
6483 long duration = System .nanoTime () - start ;
65- log .info ("Read " + result .size () + " records in " + TimeUnit .NANOSECONDS .toMillis (duration ) + "ms. Client time " + records .getMetrics ()
66- .getMetric (ClientMetrics .OP_DURATION ).getLong () + " ms" + " server time " + (TimeUnit .NANOSECONDS .toMillis (records .getServerTime ())) + " ms" );
84+
85+ // report metrics (only for demonstration purposes)
86+ log .info (String .format ("records: %d, read time: %d ms, client time: %d ms, server time: %d ms" ,
87+ result .size (), TimeUnit .NANOSECONDS .toMillis (duration ),
88+ response .getMetrics ().getMetric (ClientMetrics .OP_DURATION ).getLong (),
89+ TimeUnit .NANOSECONDS .toMillis (response .getServerTime ())));
90+
6791 return result ;
6892 } catch (Exception e ) {
6993 throw new RuntimeException ("Failed to fetch dataset" , e );
7094 }
7195 }
96+
97+ private JsonMapper jsonMapper = new JsonMapper ();
98+
99+ /**
100+ * Current approach is to demonstrate how to 'stream' data from ClickHouse using JSONEachRow format.
101+ * This approach is faster than common one because it bypasses Spring internals and writes directly to http output stream.
102+ * @param httpResp
103+ * @param limit
104+ */
105+ @ GetMapping ("/direct/dataset/1" )
106+ @ ResponseBody
107+ public void directDataFetchJSONEachRow (HttpServletResponse httpResp , @ RequestParam (name = "limit" , required = false ) Integer limit ) {
108+ limit = limit == null ? 100 : limit ;
109+
110+ final String query = DATASET_QUERY + " LIMIT " + limit ;
111+ QuerySettings settings = new QuerySettings ().setFormat (ClickHouseFormat .JSONEachRow );
112+ try (QueryResponse response = chDirectClient .query (query , settings ).get (3000 , TimeUnit .MILLISECONDS );
113+ // JSONEachRow format is a stream of JSON objects, so we need to parse them one by one
114+ MappingIterator <ObjectNode > jsonIter = jsonMapper .readerFor (ObjectNode .class )
115+ .readValues (response .getInputStream ())) {
116+ httpResp .setContentType ("application/json" );
117+ JsonGenerator jsonGen = jsonMapper .getFactory ().createGenerator (httpResp .getOutputStream ());
118+
119+ jsonGen .writeStartArray ();
120+ long start = System .nanoTime ();
121+ int counter =0 ;
122+ while (jsonIter .hasNext ()) {
123+ ObjectNode node = jsonIter .next ();
124+ // here may be some processing logic
125+ node .put ("ordNum" , counter ++);
126+
127+ }
128+ jsonGen .writeEndArray ();
129+ jsonGen .close ();
130+ long duration = System .nanoTime () - start ;
131+
132+ // report metrics (only for demonstration purposes)
133+ log .info (String .format ("records: %d, read time: %d ms, client time: %d ms, server time: %d ms" ,
134+ counter , TimeUnit .NANOSECONDS .toMillis (duration ),
135+ response .getMetrics ().getMetric (ClientMetrics .OP_DURATION ).getLong (),
136+ TimeUnit .NANOSECONDS .toMillis (response .getServerTime ())));
137+ } catch (Exception e ) {
138+ throw new RuntimeException ("Failed to fetch dataset" , e );
139+ }
140+ }
72141}
0 commit comments