Skip to content

Commit 371c17e

Browse files
feat: add queryRows function
1 parent 8d6d3e8 commit 371c17e

File tree

4 files changed

+195
-2
lines changed

4 files changed

+195
-2
lines changed

src/main/java/com/influxdb/v3/client/InfluxDBClient.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,88 @@ Stream<Object[]> query(@Nonnull final String query,
182182
@Nonnull final Map<String, Object> parameters,
183183
@Nonnull final QueryOptions options);
184184

185+
/**
186+
* Query data from InfluxDB IOx using FlightSQL.
187+
* <p>
188+
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
189+
* <pre>
190+
* try (Stream&lt;Map&lt;String, Object&gt;&gt; rows = client.query("select * from cpu where host=$host",;
191+
* Map.of("host", "server-a"), options)) {
192+
* rows.forEach(row -&gt; {
193+
* // process row
194+
* }
195+
* });
196+
* </pre>
197+
*
198+
* @param query the query string to execute, cannot be null
199+
* @return Batches of rows returned by the query
200+
*/
201+
@Nonnull
202+
Stream<Map<String, Object>> queryRows(@Nonnull final String query);
203+
204+
/**
205+
* Query data from InfluxDB IOx using FlightSQL.
206+
* <p>
207+
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
208+
* <pre>
209+
* try (Stream&lt;Map&lt;String, Object&gt;&gt; rows = client.query("select * from cpu where host=$host",;
210+
* Map.of("host", "server-a"), options)) {
211+
* rows.forEach(row -&gt; {
212+
* // process row
213+
* }
214+
* });
215+
* </pre>
216+
*
217+
* @param query the query string to execute, cannot be null
218+
* @param parameters query named parameters
219+
* @return Batches of rows returned by the query
220+
*/
221+
@Nonnull
222+
Stream<Map<String, Object>> queryRows(@Nonnull final String query, @Nonnull final Map<String, Object> parameters);
223+
224+
/**
225+
* Query data from InfluxDB IOx using FlightSQL.
226+
* <p>
227+
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
228+
* <pre>
229+
* try (Stream&lt;Map&lt;String, Object&gt;&gt; rows = client.query("select * from cpu where host=$host",;
230+
* Map.of("host", "server-a"), options)) {
231+
* rows.forEach(row -&gt; {
232+
* // process row
233+
* }
234+
* });
235+
* </pre>
236+
*
237+
* @param query the query string to execute, cannot be null
238+
* @param options the options for querying data from InfluxDB
239+
* @return Batches of rows returned by the query
240+
*/
241+
@Nonnull
242+
Stream<Map<String, Object>> queryRows(@Nonnull final String query, @Nonnull final QueryOptions options);
243+
244+
/**
245+
* Query data from InfluxDB IOx using FlightSQL.
246+
* <p>
247+
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
248+
* <pre>
249+
* try (Stream&lt;Map&lt;String, Object&gt;&gt; rows = client.query("select * from cpu where host=$host",;
250+
* Map.of("host", "server-a"), options)) {
251+
* rows.forEach(row -&gt; {
252+
* // process row
253+
* }
254+
* });
255+
* </pre>
256+
*
257+
* @param query the query string to execute, cannot be null
258+
* @param parameters query named parameters
259+
* @param options the options for querying data from InfluxDB
260+
* @return Batches of rows returned by the query
261+
*/
262+
@Nonnull
263+
Stream<Map<String, Object>> queryRows(@Nonnull final String query,
264+
@Nonnull final Map<String, Object> parameters,
265+
@Nonnull final QueryOptions options);
266+
185267
/**
186268
* Query data from InfluxDB IOx into Point structure using FlightSQL.
187269
* <p>

src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,39 @@ public Stream<Object[]> query(@Nonnull final String query,
189189
)));
190190
}
191191

192+
@Nonnull
193+
@Override
194+
public Stream<Map<String, Object>> queryRows(@Nonnull String query) {
195+
return queryRows(query, NO_PARAMETERS, QueryOptions.DEFAULTS);
196+
}
197+
198+
@Nonnull
199+
@Override
200+
public Stream<Map<String, Object>> queryRows(@Nonnull String query, @Nonnull Map<String, Object> parameters) {
201+
return queryRows(query, parameters, QueryOptions.DEFAULTS);
202+
}
203+
204+
@Nonnull
205+
@Override
206+
public Stream<Map<String, Object>> queryRows(@Nonnull String query, @Nonnull QueryOptions options) {
207+
return queryRows(query, NO_PARAMETERS, options);
208+
}
209+
210+
@Nonnull
211+
@Override
212+
public Stream<Map<String, Object>> queryRows(@Nonnull final String query,
213+
@Nonnull final Map<String, Object> parameters,
214+
@Nonnull final QueryOptions options) {
215+
return queryData(query, parameters, options)
216+
.flatMap(vector -> IntStream.range(0, vector.getRowCount())
217+
.mapToObj(rowNumber ->
218+
VectorSchemaRootConverter.INSTANCE
219+
.getMapFromVectorSchemaRoot(
220+
vector,
221+
rowNumber
222+
)));
223+
}
224+
192225
@Nonnull
193226
@Override
194227
public Stream<PointValues> queryPoints(@Nonnull final String query) {

src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323

2424
import java.math.BigInteger;
2525
import java.time.LocalDateTime;
26+
import java.util.HashMap;
2627
import java.util.List;
28+
import java.util.Map;
2729
import java.util.Objects;
2830
import java.util.logging.Logger;
2931
import javax.annotation.Nonnull;
@@ -195,4 +197,25 @@ public Object[] getArrayObjectFromVectorSchemaRoot(@Nonnull final VectorSchemaRo
195197

196198
return row;
197199
}
200+
201+
/**
202+
* Get a Map from VectorSchemaRoot.
203+
*
204+
* @param vector The data return from InfluxDB.
205+
* @param rowNumber The row number of data
206+
* @return A Map represents a row of data
207+
*/
208+
public Map<String, Object> getMapFromVectorSchemaRoot(VectorSchemaRoot vector, int rowNumber) {
209+
Map<String, Object> row = new HashMap<>();
210+
for (FieldVector fieldVector : vector.getFieldVectors()) {
211+
Object mappedValue = getMappedValue(
212+
fieldVector.getField(),
213+
fieldVector.getObject(rowNumber)
214+
);
215+
row.put(fieldVector.getName(), mappedValue);
216+
217+
}
218+
219+
return row;
220+
}
198221
}

src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public void testQuery() throws Exception {
138138
String uuid = UUID.randomUUID().toString();
139139
long timestamp = Instant.now().getEpochSecond();
140140
String record = String.format(
141-
"host10,tag=empty "
141+
"host12,tag=empty "
142142
+ "name=\"intel\","
143143
+ "mem_total=2048,"
144144
+ "disk_free=100i,"
@@ -151,7 +151,7 @@ public void testQuery() throws Exception {
151151
client.writeRecord(record, new WriteOptions(null, WritePrecision.S, null));
152152

153153
Map<String, Object> parameters = Map.of("testId", uuid);
154-
String sql = "Select * from host10 where \"testId\"=$testId";
154+
String sql = "Select * from host12 where \"testId\"=$testId";
155155
try (Stream<Object[]> stream = client.query(sql, parameters)) {
156156
stream.findFirst()
157157
.ifPresent(objects -> {
@@ -167,10 +167,65 @@ public void testQuery() throws Exception {
167167
Assertions.assertThat(objects[3].getClass()).isEqualTo(String.class);
168168
Assertions.assertThat(objects[3]).isEqualTo("intel");
169169

170+
Assertions.assertThat(objects[4].getClass()).isEqualTo(String.class);
171+
Assertions.assertThat(objects[4]).isEqualTo("empty");
172+
170173
Assertions.assertThat(objects[7].getClass()).isEqualTo(BigInteger.class);
171174
Assertions.assertThat(objects[7]).isEqualTo(BigInteger.valueOf(timestamp * 1_000_000_000));
172175
});
173176
}
174177
}
175178
}
179+
180+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
181+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
182+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
183+
@Test
184+
public void testQueryRows() throws Exception {
185+
try (InfluxDBClient client = InfluxDBClient.getInstance(
186+
System.getenv("TESTING_INFLUXDB_URL"),
187+
System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(),
188+
System.getenv("TESTING_INFLUXDB_DATABASE"),
189+
null)) {
190+
String uuid = UUID.randomUUID().toString();
191+
long timestamp = Instant.now().getEpochSecond();
192+
String record = String.format(
193+
"host12,tag=tagValue "
194+
+ "name=\"intel\","
195+
+ "mem_total=2048,"
196+
+ "disk_free=100i,"
197+
+ "temperature=100.86,"
198+
+ "isActive=true,"
199+
+ "testId=\"%s\" %d",
200+
uuid,
201+
timestamp
202+
);
203+
client.writeRecord(record, new WriteOptions(null, WritePrecision.S, null));
204+
205+
Map<String, Object> parameters = Map.of("testId", uuid);
206+
String sql = "Select * from host12 where \"testId\"=$testId";
207+
try (Stream<Map<String, Object>> stream = client.queryRows(sql, parameters)) {
208+
stream.findFirst()
209+
.ifPresent(map -> {
210+
Assertions.assertThat(map.get("tag").getClass()).isEqualTo(String.class);
211+
Assertions.assertThat(map.get("tag")).isEqualTo("tagValue");
212+
213+
Assertions.assertThat(map.get("name").getClass()).isEqualTo(String.class);
214+
Assertions.assertThat(map.get("name")).isEqualTo("intel");
215+
216+
Assertions.assertThat(map.get("mem_total").getClass()).isEqualTo(Double.class);
217+
Assertions.assertThat(map.get("mem_total")).isEqualTo(2048.0);
218+
219+
Assertions.assertThat(map.get("disk_free").getClass()).isEqualTo(Long.class);
220+
Assertions.assertThat(map.get("disk_free")).isEqualTo(100L);
221+
222+
Assertions.assertThat(map.get("isActive").getClass()).isEqualTo(Boolean.class);
223+
Assertions.assertThat(map.get("isActive")).isEqualTo(true);
224+
225+
Assertions.assertThat(map.get("time").getClass()).isEqualTo(BigInteger.class);
226+
Assertions.assertThat(map.get("time")).isEqualTo(BigInteger.valueOf(timestamp * 1_000_000_000));
227+
});
228+
}
229+
}
230+
}
176231
}

0 commit comments

Comments
 (0)