3030import java .util .Map ;
3131import java .util .Objects ;
3232import java .util .concurrent .TimeUnit ;
33+ import java .util .function .BiFunction ;
3334import java .util .logging .Logger ;
3435import java .util .stream .Collectors ;
3536import java .util .stream .IntStream ;
4243import io .netty .handler .codec .http .HttpMethod ;
4344import io .netty .handler .codec .http .HttpResponseStatus ;
4445import org .apache .arrow .flight .CallOption ;
45- import org .apache .arrow .vector .FieldVector ;
4646import org .apache .arrow .vector .VectorSchemaRoot ;
4747
4848import com .influxdb .v3 .client .InfluxDBApiException ;
@@ -187,14 +187,8 @@ public Stream<Object[]> query(@Nonnull final String query, @Nonnull final Map<St
187187 public Stream <Object []> query (@ Nonnull final String query ,
188188 @ Nonnull final Map <String , Object > parameters ,
189189 @ Nonnull final QueryOptions options ) {
190- return queryData (query , parameters , options )
191- .flatMap (vector -> IntStream .range (0 , vector .getRowCount ())
192- .mapToObj (rowNumber ->
193- VectorSchemaRootConverter .INSTANCE
194- .getArrayObjectFromVectorSchemaRoot (
195- vector ,
196- rowNumber
197- )));
190+ return queryDataAndProcess (query , parameters , options ,
191+ VectorSchemaRootConverter .INSTANCE ::getArrayObjectFromVectorSchemaRoot );
198192 }
199193
200194 @ Nonnull
@@ -222,14 +216,7 @@ public Stream<Map<String, Object>> queryRows(@Nonnull final String query, @Nonnu
222216 public Stream <Map <String , Object >> queryRows (@ Nonnull final String query ,
223217 @ Nonnull final Map <String , Object > parameters ,
224218 @ Nonnull final QueryOptions options ) {
225- return queryData (query , parameters , options )
226- .flatMap (vector -> IntStream .range (0 , vector .getRowCount ())
227- .mapToObj (rowNumber ->
228- VectorSchemaRootConverter .INSTANCE
229- .getMapFromVectorSchemaRoot (
230- vector ,
231- rowNumber
232- )));
219+ return queryDataAndProcess (query , parameters , options , VectorSchemaRootConverter .INSTANCE ::getMapFromVectorSchemaRoot );
233220 }
234221
235222 @ Nonnull
@@ -255,14 +242,8 @@ public Stream<PointValues> queryPoints(@Nonnull final String query, @Nonnull fin
255242 public Stream <PointValues > queryPoints (@ Nonnull final String query ,
256243 @ Nonnull final Map <String , Object > parameters ,
257244 @ Nonnull final QueryOptions options ) {
258- return queryData (query , parameters , options )
259- .flatMap (vector -> {
260- List <FieldVector > fieldVectors = vector .getFieldVectors ();
261- return IntStream
262- .range (0 , vector .getRowCount ())
263- .mapToObj (row ->
264- VectorSchemaRootConverter .INSTANCE .toPointValues (row , fieldVectors ));
265- });
245+ return queryDataAndProcess (query , parameters , options ,
246+ (vector , rowNumber ) -> VectorSchemaRootConverter .INSTANCE .toPointValues (rowNumber , vector .getFieldVectors ()));
266247 }
267248
268249 @ Nonnull
@@ -389,6 +370,19 @@ private <T> void writeData(@Nonnull final List<T> data, @Nonnull final WriteOpti
389370 }
390371 }
391372
373+ @ Nonnull
374+ private <T > Stream <T > queryDataAndProcess (@ Nonnull final String query ,
375+ @ Nonnull final Map <String , Object > parameters ,
376+ @ Nonnull final QueryOptions options ,
377+ @ Nonnull final BiFunction <VectorSchemaRoot , Integer , T > processor ) {
378+ return queryData (query , parameters , options )
379+ .flatMap (vector ->
380+ IntStream .range (0 , vector .getRowCount ())
381+ .mapToObj (rowNumber -> processor .apply (vector , rowNumber ))
382+ .onClose (vector ::close )
383+ );
384+ }
385+
392386 @ Nonnull
393387 private Stream <VectorSchemaRoot > queryData (@ Nonnull final String query ,
394388 @ Nonnull final Map <String , Object > parameters ,
0 commit comments