2727import java .util .List ;
2828import java .util .concurrent .ConcurrentLinkedDeque ;
2929import java .util .concurrent .TimeUnit ;
30+ import java .util .function .Supplier ;
31+ import java .util .stream .Collectors ;
3032
3133/**
3234 * Dataset API:
@@ -84,12 +86,14 @@ VirtualDatasetRecord create() {
8486 " FROM system.numbers" ;
8587
8688 /**
87- * Common approach to fetch data from ClickHouse using client v2.
89+ * Fetches data from a DB using row binary reader. VirtualDatasetRecord objects are created on each iteration and
90+ * filled with data from the reader. Gives full control on how data is processed and stored.
91+ * If this method returns a lot of data it may cause application slowdown.
8892 *
8993 * @param limit
9094 * @return
9195 */
92- @ GetMapping ("/direct/ dataset/0 " )
96+ @ GetMapping ("/dataset/reader " )
9397 public List <VirtualDatasetRecord > directDatasetFetch (@ RequestParam (name = "limit" , required = false ) Integer limit ) {
9498 limit = limit == null ? 100 : limit ;
9599
@@ -118,7 +122,7 @@ public List<VirtualDatasetRecord> directDatasetFetch(@RequestParam(name = "limit
118122 response .getMetrics ().getMetric (ClientMetrics .OP_DURATION ).getLong (),
119123 TimeUnit .NANOSECONDS .toMillis (response .getServerTime ())));
120124
121- return result ;
125+ return result . stream (). findFirst (). stream (). collect ( Collectors . toCollection ( ArrayList :: new )) ;
122126 } catch (Exception e ) {
123127 throw new RuntimeException ("Failed to fetch dataset" , e );
124128 }
@@ -127,12 +131,14 @@ public List<VirtualDatasetRecord> directDatasetFetch(@RequestParam(name = "limit
127131 private JsonMapper jsonMapper = new JsonMapper ();
128132
129133 /**
130- * Current approach is to demonstrate how to 'stream' data from ClickHouse using JSONEachRow format.
131- * This approach is faster than common one because it bypasses Spring internals and writes directly to http output stream.
134+ * Reads data in JSONEachRow format, parses it into JSON library object (can be used for further processing) and
135+ * writes it back to the response.
136+ * This helps to reduce effort of writing data to the response.
137+ *
132138 * @param httpResp
133139 * @param limit
134140 */
135- @ GetMapping ("/direct/ dataset/1 " )
141+ @ GetMapping ("/dataset/json_each_row_in_and_out " )
136142 @ ResponseBody
137143 public void directDataFetchJSONEachRow (HttpServletResponse httpResp , @ RequestParam (name = "limit" , required = false ) Integer limit ) {
138144 limit = limit == null ? 100 : limit ;
@@ -170,19 +176,25 @@ public void directDataFetchJSONEachRow(HttpServletResponse httpResp, @RequestPar
170176 }
171177
172178 /**
173- * Using POJO deserialization to fetch data from ClickHouse. Also using a objects cache
174- * to avoid objects creation on each iteration.
179+ * Using POJO deserialization to fetch data from ClickHouse.
180+ * If cache is enabled, objects are reused from the pool otherwise new objects are created on each iteration.
175181 *
176182 * @param limit
177183 * @return
178184 */
179- @ GetMapping ("/direct/dataset/cached_objects" )
180- public CalculationResult directDatasetFetchCached (@ RequestParam (name = "limit" , required = false ) Integer limit ) {
185+ @ GetMapping ("/dataset/read_to_pojo" )
186+ public CalculationResult directDatasetReadToPojo (@ RequestParam (name = "limit" , required = false ) Integer limit ,
187+ @ RequestParam (name = "cache" , required = false ) Boolean cache ) {
181188 limit = limit == null ? 100 : limit ;
189+ cache = cache != null && cache ;
190+ return readToPOJO (limit , cache );
191+ }
182192
193+ private CalculationResult readToPOJO (int limit , boolean cache ) {
183194 final String query = DATASET_QUERY + " LIMIT " + limit ;
184195 List <VirtualDatasetRecord > result = null ;
185- ObjectsPreparedCollection <VirtualDatasetRecord > objectsPool = this .pool .lease (); // take object from the pool
196+ Supplier <VirtualDatasetRecord > objectsPool = cache ? this .pool .lease ()
197+ : VirtualDatasetRecord ::new ;
186198 try {
187199 long start = System .nanoTime ();
188200
@@ -193,13 +205,16 @@ public CalculationResult directDatasetFetchCached(@RequestParam(name = "limit",
193205 for (VirtualDatasetRecord record : result ) {
194206 p1Sum += record .getP1 ();
195207 }
196- log .info (result .toString ());
197- objectsPool .reset (); // reset pool to for next use
208+ if (cache ) {
209+ ((ObjectsPreparedCollection <VirtualDatasetRecord >) objectsPool ).reset ();
210+ }
198211 return new CalculationResult (p1Sum );
199212 } catch (Exception e ) {
200213 throw new RuntimeException ("Failed to fetch dataset" , e );
201214 } finally {
202- this .pool .release (objectsPool );
215+ if (cache ) {
216+ this .pool .release ((ObjectsPreparedCollection <VirtualDatasetRecord >) objectsPool );
217+ }
203218 }
204219 }
205220}
0 commit comments