Skip to content

Commit 069e805

Browse files
feat: add queryRows function (#209)
1 parent 27267a9 commit 069e805

File tree

8 files changed

+346
-11
lines changed

8 files changed

+346
-11
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
## 1.2.0 [unreleased]
22

3+
### Features
4+
5+
1. [#209](https://github.com/InfluxCommunity/influxdb3-java/pull/209) Add query function returning row as map.
6+
37
## 1.1.0 [2025-05-22]
48

59
### Features

src/main/java/com/influxdb/v3/client/InfluxDBClient.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,87 @@ Stream<Object[]> query(@Nonnull final String query,
202202
@Nonnull final Map<String, Object> parameters,
203203
@Nonnull final QueryOptions options);
204204

205+
/**
206+
* Query data from InfluxDB IOx using FlightSQL.
207+
* <p>
208+
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
209+
* <pre>
210+
* try (Stream&lt;Map&lt;String, Object&gt;&gt; rows = client.queryRows("select * from cpu where host=intel")) {
211+
* rows.forEach(row -&gt; {
212+
* // process row
213+
* });
214+
* };
215+
* </pre>
216+
*
217+
* @param query the query string to execute, cannot be null
218+
* @return Batches of rows returned by the query
219+
*/
220+
@Nonnull
221+
Stream<Map<String, Object>> queryRows(@Nonnull final String query);
222+
223+
/**
224+
* Query data from InfluxDB IOx using FlightSQL.
225+
* <p>
226+
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
227+
* <pre>
228+
* try (Stream&lt;Map&lt;String, Object&gt;&gt; rows = client.queryRows("select * from cpu where host=$host",
229+
* Map.of("host", "server-a"))) {
230+
* rows.forEach(row -&gt; {
231+
* // process row
232+
* })
233+
* };
234+
* </pre>
235+
*
236+
* @param query the query string to execute, cannot be null
237+
* @param parameters query named parameters
238+
* @return Batches of rows returned by the query
239+
*/
240+
@Nonnull
241+
Stream<Map<String, Object>> queryRows(@Nonnull final String query, @Nonnull final Map<String, Object> parameters);
242+
243+
/**
244+
* Query data from InfluxDB IOx using FlightSQL.
245+
* <p>
246+
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
247+
* <pre>
248+
* try (Stream&lt;Map&lt;String, Object&gt;&gt; rows = client.queryRows("select * from cpu where host=intel",
249+
* options)) {
250+
* rows.forEach(row -&gt; {
251+
* // process row
252+
* })
253+
* };
254+
* </pre>
255+
*
256+
* @param query the query string to execute, cannot be null
257+
* @param options the options for querying data from InfluxDB
258+
* @return Batches of rows returned by the query
259+
*/
260+
@Nonnull
261+
Stream<Map<String, Object>> queryRows(@Nonnull final String query, @Nonnull final QueryOptions options);
262+
263+
/**
264+
* Query data from InfluxDB IOx using FlightSQL.
265+
* <p>
266+
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
267+
* <pre>
268+
* try (Stream&lt;Map&lt;String, Object&gt;&gt; rows = client.queryRows("select * from cpu where host=$host",
269+
* Map.of("host", "server-a"), options)) {
270+
* rows.forEach(row -&gt; {
271+
* // process row
272+
* })
273+
* };
274+
* </pre>
275+
*
276+
* @param query the query string to execute, cannot be null
277+
* @param parameters query named parameters
278+
* @param options the options for querying data from InfluxDB
279+
* @return Batches of rows returned by the query
280+
*/
281+
@Nonnull
282+
Stream<Map<String, Object>> queryRows(@Nonnull final String query,
283+
@Nonnull final Map<String, Object> parameters,
284+
@Nonnull final QueryOptions options);
285+
205286
/**
206287
* Query data from InfluxDB IOx into Point structure using FlightSQL.
207288
* <p>

src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,41 @@ public Stream<Object[]> query(@Nonnull final String query,
190190
)));
191191
}
192192

193+
@Nonnull
194+
@Override
195+
public Stream<Map<String, Object>> queryRows(@Nonnull final String query) {
196+
return queryRows(query, NO_PARAMETERS, QueryOptions.DEFAULTS);
197+
}
198+
199+
@Nonnull
200+
@Override
201+
public Stream<Map<String, Object>> queryRows(@Nonnull final String query,
202+
@Nonnull final Map<String, Object> parameters
203+
) {
204+
return queryRows(query, parameters, QueryOptions.DEFAULTS);
205+
}
206+
207+
@Nonnull
208+
@Override
209+
public Stream<Map<String, Object>> queryRows(@Nonnull final String query, @Nonnull final QueryOptions options) {
210+
return queryRows(query, NO_PARAMETERS, options);
211+
}
212+
213+
@Nonnull
214+
@Override
215+
public Stream<Map<String, Object>> queryRows(@Nonnull final String query,
216+
@Nonnull final Map<String, Object> parameters,
217+
@Nonnull final QueryOptions options) {
218+
return queryData(query, parameters, options)
219+
.flatMap(vector -> IntStream.range(0, vector.getRowCount())
220+
.mapToObj(rowNumber ->
221+
VectorSchemaRootConverter.INSTANCE
222+
.getMapFromVectorSchemaRoot(
223+
vector,
224+
rowNumber
225+
)));
226+
}
227+
193228
@Nonnull
194229
@Override
195230
public Stream<PointValues> queryPoints(@Nonnull final String query) {

src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323

2424
import java.math.BigInteger;
2525
import java.time.LocalDateTime;
26+
import java.util.LinkedHashMap;
2627
import java.util.List;
28+
import java.util.Map;
2729
import java.util.Objects;
2830
import java.util.logging.Logger;
2931
import javax.annotation.Nonnull;
@@ -195,4 +197,25 @@ public Object[] getArrayObjectFromVectorSchemaRoot(@Nonnull final VectorSchemaRo
195197

196198
return row;
197199
}
200+
201+
/**
202+
* Get a Map from VectorSchemaRoot.
203+
*
204+
* @param vector The data return from InfluxDB.
205+
* @param rowNumber The row number of data
206+
* @return A Map represents a row of data
207+
*/
208+
public Map<String, Object> getMapFromVectorSchemaRoot(@Nonnull final VectorSchemaRoot vector, final int rowNumber) {
209+
Map<String, Object> row = new LinkedHashMap<>();
210+
for (FieldVector fieldVector : vector.getFieldVectors()) {
211+
Object mappedValue = getMappedValue(
212+
fieldVector.getField(),
213+
fieldVector.getObject(rowNumber)
214+
);
215+
row.put(fieldVector.getName(), mappedValue);
216+
217+
}
218+
219+
return row;
220+
}
198221
}

src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,6 @@ void requiredHostConnectionString() {
6565
.hasMessageContaining("no protocol");
6666
}
6767

68-
@Test
69-
void requiredHostEnvOrProperties() {
70-
71-
Assertions.assertThatThrownBy(InfluxDBClient::getInstance)
72-
.isInstanceOf(IllegalArgumentException.class)
73-
.hasMessage("The URL of the InfluxDB server has to be defined.");
74-
}
75-
7668
@Test
7769
void fromParameters() throws Exception {
7870

@@ -136,5 +128,4 @@ public void unsupportedQueryParams() throws Exception {
136128
+ "class com.influxdb.v3.client.internal.InfluxDBClientImpl");
137129
}
138130
}
139-
140131
}

0 commit comments

Comments
 (0)