2121 */
2222package com .influxdb .internal ;
2323
24+ import java .io .Closeable ;
2425import java .io .IOException ;
26+ import java .io .InputStreamReader ;
27+ import java .io .Reader ;
28+ import java .nio .charset .StandardCharsets ;
2529import java .util .HashMap ;
30+ import java .util .Iterator ;
2631import java .util .Map ;
2732import java .util .function .BiConsumer ;
2833import java .util .function .Consumer ;
3439import com .influxdb .Arguments ;
3540import com .influxdb .Cancellable ;
3641import com .influxdb .exceptions .InfluxException ;
42+ import com .influxdb .query .FluxRecord ;
3743import com .influxdb .query .internal .FluxCsvParser ;
3844import com .influxdb .query .internal .FluxResultMapper ;
3945
4450import okhttp3 .RequestBody ;
4551import okhttp3 .ResponseBody ;
4652import okio .BufferedSource ;
53+ import org .apache .commons .csv .CSVFormat ;
54+ import org .apache .commons .csv .CSVParser ;
55+ import org .apache .commons .csv .CSVRecord ;
4756import retrofit2 .Call ;
4857import retrofit2 .Callback ;
4958import retrofit2 .Response ;
@@ -72,7 +81,7 @@ public abstract class AbstractQueryApi extends AbstractRestClient {
7281 DEFAULT_DIALECT = new GsonBuilder ().create ().toJson (dialect );
7382 }
7483
75- protected static final Consumer <Throwable > ERROR_CONSUMER = throwable -> {
84+ protected static final Consumer <Throwable > ERROR_CONSUMER = throwable -> {
7685 if (throwable instanceof InfluxException ) {
7786 throw (InfluxException ) throwable ;
7887 } else {
@@ -113,6 +122,10 @@ protected void query(@Nonnull final Call<ResponseBody> queryCall,
113122 query (queryCall , consumer , onError , onComplete , asynchronously );
114123 }
115124
125+ protected FluxRecordIterator queryIterator (@ Nonnull final Call <ResponseBody > queryCall ) {
126+ return new FluxRecordIterator (queryCall , ERROR_CONSUMER );
127+ }
128+
116129 protected void queryRaw (@ Nonnull final Call <ResponseBody > queryCall ,
117130 @ Nonnull final BiConsumer <Cancellable , String > onResponse ,
118131 @ Nonnull final Consumer <? super Throwable > onError ,
@@ -131,11 +144,15 @@ protected void queryRaw(@Nonnull final Call<ResponseBody> queryCall,
131144 query (queryCall , consumer , onError , onComplete , asynchronously );
132145 }
133146
134- protected void query (@ Nonnull final Call <ResponseBody > query ,
135- @ Nonnull final BiConsumer <Cancellable , BufferedSource > consumer ,
136- @ Nonnull final Consumer <? super Throwable > onError ,
137- @ Nonnull final Runnable onComplete ,
138- @ Nonnull final Boolean asynchronously ) {
147+ protected RawIterator queryRawIterator (@ Nonnull final Call <ResponseBody > queryCall ) {
148+ return new RawIterator (queryCall , ERROR_CONSUMER );
149+ }
150+
151+ private void query (@ Nonnull final Call <ResponseBody > query ,
152+ @ Nonnull final BiConsumer <Cancellable , BufferedSource > consumer ,
153+ @ Nonnull final Consumer <? super Throwable > onError ,
154+ @ Nonnull final Runnable onComplete ,
155+ @ Nonnull final Boolean asynchronously ) {
139156
140157 Arguments .checkNotNull (query , "query" );
141158 Arguments .checkNotNull (consumer , "consumer" );
@@ -145,6 +162,46 @@ protected void query(@Nonnull final Call<ResponseBody> query,
145162
146163 DefaultCancellable cancellable = new DefaultCancellable ();
147164
165+ Consumer <ResponseBody > bodyConsumer = body -> {
166+ try {
167+ BufferedSource source = body .source ();
168+
169+ //
170+ // Source has data => parse
171+ //
172+ while (source .isOpen () && !source .exhausted () && !cancellable .wasCancelled ) {
173+
174+ consumer .accept (cancellable , source );
175+ }
176+
177+ if (!cancellable .wasCancelled ) {
178+ onComplete .run ();
179+ }
180+
181+ } catch (Exception e ) {
182+ catchOrPropagateException (e , onError );
183+
184+ } finally {
185+
186+ body .close ();
187+ }
188+ };
189+
190+ query (query , bodyConsumer , onError , onComplete , asynchronously );
191+ }
192+
193+ private void query (@ Nonnull final Call <ResponseBody > query ,
194+ @ Nonnull final Consumer <ResponseBody > consumer ,
195+ @ Nonnull final Consumer <? super Throwable > onError ,
196+ @ Nonnull final Runnable onComplete ,
197+ @ Nonnull final Boolean asynchronously ) {
198+
199+ Arguments .checkNotNull (query , "query" );
200+ Arguments .checkNotNull (consumer , "consumer" );
201+ Arguments .checkNotNull (onError , "onError" );
202+ Arguments .checkNotNull (onComplete , "onComplete" );
203+ Arguments .checkNotNull (asynchronously , "asynchronously" );
204+
148205 Callback <ResponseBody > callback = new Callback <ResponseBody >() {
149206 @ Override
150207 public void onResponse (@ Nonnull final Call <ResponseBody > call ,
@@ -160,28 +217,7 @@ public void onResponse(@Nonnull final Call<ResponseBody> call,
160217 return ;
161218 }
162219
163- try {
164- BufferedSource source = body .source ();
165-
166- //
167- // Source has data => parse
168- //
169- while (source .isOpen () && !source .exhausted () && !cancellable .wasCancelled ) {
170-
171- consumer .accept (cancellable , source );
172- }
173-
174- if (!cancellable .wasCancelled ) {
175- onComplete .run ();
176- }
177-
178- } catch (Exception e ) {
179- catchOrPropagateException (e , onError );
180-
181- } finally {
182-
183- body .close ();
184- }
220+ consumer .accept (body );
185221 }
186222
187223 @ Override
@@ -233,4 +269,121 @@ public boolean isCancelled() {
233269 }
234270 }
235271
272+ protected final class RawIterator implements Iterator <String >, Closeable , Consumer <ResponseBody > {
273+
274+ private String line = null ;
275+ private boolean closed = false ;
276+ private ResponseBody body ;
277+ private BufferedSource source ;
278+ private final Consumer <? super Throwable > onError ;
279+
280+ private RawIterator (@ Nonnull final Call <ResponseBody > call ,
281+ @ Nonnull final Consumer <? super Throwable > onError ) {
282+ this .onError = onError ;
283+ query (call , this , onError , EMPTY_ACTION , false );
284+ }
285+
286+ @ Override
287+ public boolean hasNext () {
288+ return !closed && readNext ();
289+ }
290+
291+ @ Override
292+ public String next () {
293+ return line ;
294+ }
295+
296+ @ Override
297+ public void accept (final ResponseBody body ) {
298+ this .body = body ;
299+ this .source = body .source ();
300+ }
301+
302+ @ Override
303+ public void close () throws IOException {
304+ closed = true ;
305+ if (body != null ) {
306+ body .close ();
307+ }
308+ }
309+
310+ private boolean readNext () {
311+ line = null ;
312+ try {
313+ if (!closed && source .isOpen () && !source .exhausted ()) {
314+ line = source .readUtf8Line ();
315+ }
316+ } catch (IOException e ) {
317+ catchOrPropagateException (e , onError );
318+ }
319+
320+ return line != null ;
321+ }
322+ }
323+
324+ protected final class FluxRecordIterator implements Iterator <FluxRecord >, Closeable , Consumer <ResponseBody > {
325+
326+ private FluxRecord record = null ;
327+ private boolean closed = false ;
328+ private ResponseBody body ;
329+ private CSVParser parser ;
330+ private Iterator <CSVRecord > iterator ;
331+
332+ private final FluxCsvParser .FluxCsvState state = new FluxCsvParser .FluxCsvState ();
333+ private final Consumer <? super Throwable > onError ;
334+
335+ public FluxRecordIterator (@ Nonnull final Call <ResponseBody > call ,
336+ @ Nonnull final Consumer <? super Throwable > onError ) {
337+ this .onError = onError ;
338+ query (call , this , onError , EMPTY_ACTION , false );
339+ }
340+
341+ @ Override
342+ public boolean hasNext () {
343+ return !closed && readNext ();
344+ }
345+
346+ @ Override
347+ public FluxRecord next () {
348+ return record ;
349+ }
350+
351+ @ Override
352+ public void accept (final ResponseBody body ) {
353+ this .body = body ;
354+
355+ Reader reader = new InputStreamReader (body .source ().inputStream (), StandardCharsets .UTF_8 );
356+ try {
357+ parser = new CSVParser (reader , CSVFormat .DEFAULT );
358+ } catch (IOException e ) {
359+ catchOrPropagateException (e , onError );
360+ }
361+ iterator = parser .iterator ();
362+ }
363+
364+ @ Override
365+ public void close () throws IOException {
366+ closed = true ;
367+ if (parser != null ) {
368+ parser .close ();
369+ }
370+ if (body != null ) {
371+ body .close ();
372+ }
373+ }
374+
375+ private boolean readNext () {
376+
377+ record = null ;
378+ while (record == null && iterator .hasNext ()) {
379+ state .csvRecord = iterator .next ();
380+ FluxCsvParser .FluxRecordOrTable fluxRecordOrTable = fluxCsvParser .parseNextResponse (state );
381+ if (fluxRecordOrTable .record != null ) {
382+ record = fluxRecordOrTable .record ;
383+ }
384+ }
385+
386+ return record != null ;
387+ }
388+ }
236389}
0 commit comments