Skip to content

Commit e3df3af

Browse files
committed
build: merge main into branch
2 parents 2764787 + 1f1ac3f commit e3df3af

File tree

9 files changed

+359
-21
lines changed

9 files changed

+359
-21
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
### Features
44

5+
1. [#209](https://github.com/InfluxCommunity/influxdb3-java/pull/209) Add query function returning row as map.
56
1. [#238](https://github.com/InfluxCommunity/influxdb3-java/pull/238): Support fast writes without waiting for WAL
67
persistence:
78
- New write option (`WriteOptions.noSync`) added: `true` value means faster write but without the confirmation that

src/main/java/com/influxdb/v3/client/InfluxDBClient.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,87 @@ Stream<Object[]> query(@Nonnull final String query,
202202
@Nonnull final Map<String, Object> parameters,
203203
@Nonnull final QueryOptions options);
204204

205+
/**
206+
* Query data from InfluxDB IOx using FlightSQL.
207+
* <p>
208+
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
209+
* <pre>
210+
* try (Stream&lt;Map&lt;String, Object&gt;&gt; rows = client.queryRows("select * from cpu where host=intel")) {
211+
* rows.forEach(row -&gt; {
212+
* // process row
213+
* });
214+
* };
215+
* </pre>
216+
*
217+
* @param query the query string to execute, cannot be null
218+
* @return Batches of rows returned by the query
219+
*/
220+
@Nonnull
221+
Stream<Map<String, Object>> queryRows(@Nonnull final String query);
222+
223+
/**
224+
* Query data from InfluxDB IOx using FlightSQL.
225+
* <p>
226+
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
227+
* <pre>
228+
* try (Stream&lt;Map&lt;String, Object&gt;&gt; rows = client.queryRows("select * from cpu where host=$host",
229+
* Map.of("host", "server-a"))) {
230+
* rows.forEach(row -&gt; {
231+
* // process row
232+
* })
233+
* };
234+
* </pre>
235+
*
236+
* @param query the query string to execute, cannot be null
237+
* @param parameters query named parameters
238+
* @return Batches of rows returned by the query
239+
*/
240+
@Nonnull
241+
Stream<Map<String, Object>> queryRows(@Nonnull final String query, @Nonnull final Map<String, Object> parameters);
242+
243+
/**
244+
* Query data from InfluxDB IOx using FlightSQL.
245+
* <p>
246+
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
247+
* <pre>
248+
* try (Stream&lt;Map&lt;String, Object&gt;&gt; rows = client.queryRows("select * from cpu where host=intel",
249+
* options)) {
250+
* rows.forEach(row -&gt; {
251+
* // process row
252+
* })
253+
* };
254+
* </pre>
255+
*
256+
* @param query the query string to execute, cannot be null
257+
* @param options the options for querying data from InfluxDB
258+
* @return Batches of rows returned by the query
259+
*/
260+
@Nonnull
261+
Stream<Map<String, Object>> queryRows(@Nonnull final String query, @Nonnull final QueryOptions options);
262+
263+
/**
264+
* Query data from InfluxDB IOx using FlightSQL.
265+
* <p>
266+
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
267+
* <pre>
268+
* try (Stream&lt;Map&lt;String, Object&gt;&gt; rows = client.queryRows("select * from cpu where host=$host",
269+
* Map.of("host", "server-a"), options)) {
270+
* rows.forEach(row -&gt; {
271+
* // process row
272+
* })
273+
* };
274+
* </pre>
275+
*
276+
* @param query the query string to execute, cannot be null
277+
* @param parameters query named parameters
278+
* @param options the options for querying data from InfluxDB
279+
* @return Batches of rows returned by the query
280+
*/
281+
@Nonnull
282+
Stream<Map<String, Object>> queryRows(@Nonnull final String query,
283+
@Nonnull final Map<String, Object> parameters,
284+
@Nonnull final QueryOptions options);
285+
205286
/**
206287
* Query data from InfluxDB IOx into Point structure using FlightSQL.
207288
* <p>

src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,41 @@ public Stream<Object[]> query(@Nonnull final String query,
195195
)));
196196
}
197197

198+
@Nonnull
199+
@Override
200+
public Stream<Map<String, Object>> queryRows(@Nonnull final String query) {
201+
return queryRows(query, NO_PARAMETERS, QueryOptions.DEFAULTS);
202+
}
203+
204+
@Nonnull
205+
@Override
206+
public Stream<Map<String, Object>> queryRows(@Nonnull final String query,
207+
@Nonnull final Map<String, Object> parameters
208+
) {
209+
return queryRows(query, parameters, QueryOptions.DEFAULTS);
210+
}
211+
212+
@Nonnull
213+
@Override
214+
public Stream<Map<String, Object>> queryRows(@Nonnull final String query, @Nonnull final QueryOptions options) {
215+
return queryRows(query, NO_PARAMETERS, options);
216+
}
217+
218+
@Nonnull
219+
@Override
220+
public Stream<Map<String, Object>> queryRows(@Nonnull final String query,
221+
@Nonnull final Map<String, Object> parameters,
222+
@Nonnull final QueryOptions options) {
223+
return queryData(query, parameters, options)
224+
.flatMap(vector -> IntStream.range(0, vector.getRowCount())
225+
.mapToObj(rowNumber ->
226+
VectorSchemaRootConverter.INSTANCE
227+
.getMapFromVectorSchemaRoot(
228+
vector,
229+
rowNumber
230+
)));
231+
}
232+
198233
@Nonnull
199234
@Override
200235
public Stream<PointValues> queryPoints(@Nonnull final String query) {

src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323

2424
import java.math.BigInteger;
2525
import java.time.LocalDateTime;
26+
import java.util.LinkedHashMap;
2627
import java.util.List;
28+
import java.util.Map;
2729
import java.util.Objects;
2830
import java.util.logging.Logger;
2931
import javax.annotation.Nonnull;
@@ -195,4 +197,25 @@ public Object[] getArrayObjectFromVectorSchemaRoot(@Nonnull final VectorSchemaRo
195197

196198
return row;
197199
}
200+
201+
/**
202+
* Get a Map from VectorSchemaRoot.
203+
*
204+
* @param vector The data return from InfluxDB.
205+
* @param rowNumber The row number of data
206+
* @return A Map represents a row of data
207+
*/
208+
public Map<String, Object> getMapFromVectorSchemaRoot(@Nonnull final VectorSchemaRoot vector, final int rowNumber) {
209+
Map<String, Object> row = new LinkedHashMap<>();
210+
for (FieldVector fieldVector : vector.getFieldVectors()) {
211+
Object mappedValue = getMappedValue(
212+
fieldVector.getField(),
213+
fieldVector.getObject(rowNumber)
214+
);
215+
row.put(fieldVector.getName(), mappedValue);
216+
217+
}
218+
219+
return row;
220+
}
198221
}

src/test/java/com/influxdb/v3/client/ITQueryWrite.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,13 @@
2828
import java.util.HashMap;
2929
import java.util.List;
3030
import java.util.Map;
31+
import java.util.Random;
3132
import java.util.stream.Collectors;
3233
import java.util.stream.Stream;
3334

3435
import org.apache.arrow.flight.CallStatus;
3536
import org.apache.arrow.flight.FlightRuntimeException;
37+
import org.apache.arrow.flight.FlightStatusCode;
3638
import org.apache.arrow.vector.VectorSchemaRoot;
3739
import org.assertj.core.api.Assertions;
3840
import org.jetbrains.annotations.NotNull;
@@ -236,16 +238,15 @@ void pointValues() {
236238
@Test
237239
public void handleFlightRuntimeException() throws IOException {
238240
Instant now = Instant.now();
239-
String measurement = String.format(
240-
"/%d/test/com/influxdb/v3/client/ITQueryWrite/handleFlightRuntimeException", now.toEpochMilli()
241-
);
241+
String measurement = "/influxdb3-java/test/ITQueryWrite/handleFlightRuntimeException";
242242

243243
client = getInstance();
244244

245245
int extraTagLength = 512;
246246
Map<String, String> extraTags = new HashMap<String, String>();
247+
Random seededRandom = new Random(1); // use seeded random to generate always the same tags
247248
for (int i = 0; i < 22; i++) {
248-
extraTags.put(makeLengthyTag(extraTagLength, 64, (byte) '/'), "extra-tag-" + i);
249+
extraTags.put(makeLengthyTag(extraTagLength, 64, (byte) '/', seededRandom), "extra-tag-" + i);
249250
}
250251

251252
Point p = Point.measurement(measurement)
@@ -275,17 +276,21 @@ public void handleFlightRuntimeException() throws IOException {
275276
});
276277
} catch (FlightRuntimeException fre) {
277278
Assertions.assertThat(fre.getMessage()).doesNotContain("http2 exception");
278-
Assertions.assertThat(fre.status().code()).isNotEqualTo(CallStatus.INTERNAL.code());
279-
Assertions.assertThat(fre.status().code()).
279+
FlightStatusCode statusCode = fre.status().code();
280+
Assertions.assertThat(statusCode).isNotEqualTo(CallStatus.INTERNAL.code());
281+
Assertions.assertThat(statusCode).
280282
as(String.format("Flight runtime exception was UNAVAILABLE. "
281283
+ "Target test case was not fully tested. "
282284
+ "Check limits of test account and target database %s.",
283285
System.getenv("TESTING_INFLUXDB_DATABASE")))
284286
.isNotEqualTo(CallStatus.UNAVAILABLE.code());
285-
Assertions.assertThat(fre.status().code()).
287+
Assertions.assertThat(statusCode).
286288
as("Flight runtime exception was UNAUTHENTICATED. "
287289
+ "Target test case was not fully tested. Check test account token.")
288290
.isNotEqualTo(CallStatus.UNAUTHENTICATED.code());
291+
Assertions.assertThat(statusCode).
292+
as("Flight runtime exception was not INVALID_ARGUMENT but: " + statusCode.toString())
293+
.isEqualTo(CallStatus.INVALID_ARGUMENT.code());
289294
return;
290295
} catch (Exception e) {
291296
Assertions.fail(String.format("FlightRuntimeException should have been thrown. "
@@ -304,16 +309,17 @@ private static InfluxDBClient getInstance() {
304309
System.getenv("TESTING_INFLUXDB_DATABASE"));
305310
}
306311

307-
private String makeLengthyTag(final int length, final int maxPartLength, final byte separator) {
312+
private String makeLengthyTag(final int length, final int maxPartLength, final byte separator,
313+
final Random random) {
308314
final String legalVals = "0123456789abcdefghijklmnopqrstuvwxyz";
309315
byte[] bytes = new byte[length];
310316
int nextPartAddress = 0;
311317
for (int i = 0; i < length; i++) {
312318
if (i == nextPartAddress) {
313319
bytes[i] = separator;
314-
nextPartAddress = i + (int) (Math.random() * (maxPartLength - 3));
320+
nextPartAddress = i + (int) (random.nextDouble() * (maxPartLength - 3));
315321
} else {
316-
bytes[i] = legalVals.getBytes()[(int) (Math.random() * legalVals.length())];
322+
bytes[i] = legalVals.getBytes()[(int) (random.nextDouble() * legalVals.length())];
317323
}
318324
}
319325
return new String(bytes);

src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,6 @@ void requiredHostConnectionString() {
6565
.hasMessageContaining("no protocol");
6666
}
6767

68-
@Test
69-
void requiredHostEnvOrProperties() {
70-
71-
Assertions.assertThatThrownBy(InfluxDBClient::getInstance)
72-
.isInstanceOf(IllegalArgumentException.class)
73-
.hasMessage("The URL of the InfluxDB server has to be defined.");
74-
}
75-
7668
@Test
7769
void fromParameters() throws Exception {
7870

@@ -136,5 +128,4 @@ public void unsupportedQueryParams() throws Exception {
136128
+ "class com.influxdb.v3.client.internal.InfluxDBClientImpl");
137129
}
138130
}
139-
140131
}

0 commit comments

Comments
 (0)