Skip to content

Commit 9b047ae

Browse files
feat: add queryRows
1 parent 09646f6 commit 9b047ae

File tree

2 files changed

+112
-175
lines changed

2 files changed

+112
-175
lines changed

src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java

Lines changed: 0 additions & 175 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,9 @@
2121
*/
2222
package com.influxdb.v3.client;
2323

24-
import java.math.BigInteger;
25-
import java.time.Instant;
26-
import java.util.ArrayList;
27-
import java.util.List;
2824
import java.util.Map;
2925
import java.util.Properties;
30-
import java.util.UUID;
31-
import java.util.stream.Stream;
3226

33-
import org.apache.arrow.flight.FlightRuntimeException;
3427
import org.assertj.core.api.Assertions;
3528
import org.junit.jupiter.api.Test;
3629

@@ -72,14 +65,6 @@ void requiredHostConnectionString() {
7265
.hasMessageContaining("no protocol");
7366
}
7467

75-
@Test
76-
void requiredHostEnvOrProperties() {
77-
78-
Assertions.assertThatThrownBy(InfluxDBClient::getInstance)
79-
.isInstanceOf(IllegalArgumentException.class)
80-
.hasMessage("The URL of the InfluxDB server has to be defined.");
81-
}
82-
8368
@Test
8469
void fromParameters() throws Exception {
8570

@@ -143,164 +128,4 @@ public void unsupportedQueryParams() throws Exception {
143128
+ "class com.influxdb.v3.client.internal.InfluxDBClientImpl");
144129
}
145130
}
146-
147-
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
148-
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
149-
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
150-
@Test
151-
public void testQuery() throws Exception {
152-
try (InfluxDBClient client = InfluxDBClient.getInstance(
153-
System.getenv("TESTING_INFLUXDB_URL"),
154-
System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(),
155-
System.getenv("TESTING_INFLUXDB_DATABASE"),
156-
null)) {
157-
String uuid = UUID.randomUUID().toString();
158-
long timestamp = Instant.now().getEpochSecond();
159-
String record = String.format(
160-
"host12,tag=empty "
161-
+ "name=\"intel\","
162-
+ "mem_total=2048,"
163-
+ "disk_free=100i,"
164-
+ "temperature=100.86,"
165-
+ "isActive=true,"
166-
+ "testId=\"%s\" %d",
167-
uuid,
168-
timestamp
169-
);
170-
client.writeRecord(record, new WriteOptions(null, WritePrecision.S, null));
171-
172-
Map<String, Object> parameters = Map.of("testId", uuid);
173-
String sql = "Select * from host12 where \"testId\"=$testId";
174-
try (Stream<Object[]> stream = client.query(sql, parameters)) {
175-
stream.findFirst()
176-
.ifPresent(objects -> {
177-
Assertions.assertThat(objects[0].getClass()).isEqualTo(Long.class);
178-
Assertions.assertThat(objects[0]).isEqualTo(100L);
179-
180-
Assertions.assertThat(objects[1].getClass()).isEqualTo(Boolean.class);
181-
Assertions.assertThat(objects[1]).isEqualTo(true);
182-
183-
Assertions.assertThat(objects[2].getClass()).isEqualTo(Double.class);
184-
Assertions.assertThat(objects[2]).isEqualTo(2048.0);
185-
186-
Assertions.assertThat(objects[3].getClass()).isEqualTo(String.class);
187-
Assertions.assertThat(objects[3]).isEqualTo("intel");
188-
189-
Assertions.assertThat(objects[4].getClass()).isEqualTo(String.class);
190-
Assertions.assertThat(objects[4]).isEqualTo("empty");
191-
192-
Assertions.assertThat(objects[7].getClass()).isEqualTo(BigInteger.class);
193-
Assertions.assertThat(objects[7]).isEqualTo(BigInteger.valueOf(timestamp * 1_000_000_000));
194-
});
195-
}
196-
}
197-
}
198-
199-
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
200-
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
201-
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
202-
@Test
203-
public void testQueryRows() throws Exception {
204-
try (InfluxDBClient client = InfluxDBClient.getInstance(
205-
System.getenv("TESTING_INFLUXDB_URL"),
206-
System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(),
207-
System.getenv("TESTING_INFLUXDB_DATABASE"),
208-
null)) {
209-
String uuid = UUID.randomUUID().toString();
210-
String measurement = "host21";
211-
List<Map<String, Object>> testDatas = new ArrayList<>();
212-
for (int i = 0; i <= 9; i++) {
213-
long timestamp = System.currentTimeMillis();
214-
Map<String, Object> map = Map.of(
215-
"measurement", measurement,
216-
"tag", "tagValue",
217-
"name", "intel",
218-
"mem_total", 2048.0,
219-
"disk_free", 100L,
220-
"temperature", 100.86,
221-
"isActive", true,
222-
"time", timestamp,
223-
"testId", uuid
224-
);
225-
String record = String.format(
226-
"%s,tag=tagValue "
227-
+ "name=\"%s\","
228-
+ "mem_total=%f,"
229-
+ "disk_free=%di,"
230-
+ "temperature=%f,"
231-
+ "isActive=%b,"
232-
+ "testId=\"%s\" %d",
233-
measurement,
234-
map.get("name"),
235-
(Double) map.get("mem_total"),
236-
(Long) map.get("disk_free"),
237-
(Double) map.get("temperature"),
238-
map.get("isActive"),
239-
uuid,
240-
timestamp
241-
);
242-
client.writeRecord(record, new WriteOptions(null, WritePrecision.MS, null));
243-
testDatas.add(map);
244-
}
245-
246-
Map<String, Object> parameters = Map.of("testId", uuid);
247-
// Result set much be ordered by time
248-
String sql = String.format("Select * from %s where \"testId\"=$testId order by time", measurement);
249-
try (Stream<Map<String, Object>> stream = client.queryRows(sql, parameters)) {
250-
List<Map<String, Object>> results = stream.collect(Collectors.toList());
251-
for (int i = 0; i <= 9; i++) {
252-
Map<String, Object> row = results.get(i);
253-
Map<String, Object> testData = testDatas.get(i);
254-
Assertions.assertThat(row.get("tag").getClass()).isEqualTo(String.class);
255-
Assertions.assertThat(row.get("tag")).isEqualTo(testData.get("tag"));
256-
257-
Assertions.assertThat(row.get("name").getClass()).isEqualTo(String.class);
258-
Assertions.assertThat(row.get("name")).isEqualTo(testData.get("name"));
259-
260-
Assertions.assertThat(row.get("mem_total").getClass()).isEqualTo(Double.class);
261-
Assertions.assertThat(row.get("mem_total")).isEqualTo(testData.get("mem_total"));
262-
263-
Assertions.assertThat(row.get("disk_free").getClass()).isEqualTo(Long.class);
264-
Assertions.assertThat(row.get("disk_free")).isEqualTo(testData.get("disk_free"));
265-
266-
Assertions.assertThat(row.get("isActive").getClass()).isEqualTo(Boolean.class);
267-
Assertions.assertThat(row.get("isActive")).isEqualTo(testData.get("isActive"));
268-
269-
Assertions.assertThat(row.get("time").getClass()).isEqualTo(BigInteger.class);
270-
Assertions.assertThat(row.get("time"))
271-
.isEqualTo(BigInteger.valueOf((Long) testData.get("time") * 1_000_000));
272-
}
273-
}
274-
}
275-
}
276-
277-
278-
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
279-
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
280-
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
281-
@Test
282-
public void testQueryRowsExceptionCases() throws Exception {
283-
try (InfluxDBClient client = InfluxDBClient.getInstance(
284-
System.getenv("TESTING_INFLUXDB_URL"),
285-
System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(),
286-
System.getenv("TESTING_INFLUXDB_DATABASE"),
287-
null)) {
288-
289-
// Empty result case
290-
Map<String, Object> parameters = Map.of("testId", "NotExist");
291-
String sql = "Select * from host21 where \"testId\"=$testId";
292-
try (Stream<Map<String, Object>> stream = client.queryRows(sql, parameters)) {
293-
Assertions.assertThat((int) stream.count()).isEqualTo(0);
294-
}
295-
296-
// Malformed query case
297-
Assertions.assertThatThrownBy(() -> {
298-
String query = "Select * from host21 whereabs testId=2";
299-
try (Stream<Map<String, Object>> stream = client.queryRows(query)) {
300-
stream.findFirst();
301-
}
302-
})
303-
.isInstanceOf(FlightRuntimeException.class);
304-
}
305-
}
306131
}

src/test/java/com/influxdb/v3/client/integration/E2ETest.java

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,16 @@
2626
import java.net.URL;
2727
import java.net.URLConnection;
2828
import java.time.Instant;
29+
import java.util.ArrayList;
30+
import java.util.List;
2931
import java.util.Map;
3032
import java.util.UUID;
3133
import java.util.logging.Logger;
34+
import java.util.stream.Collectors;
3235
import java.util.stream.Stream;
3336
import javax.annotation.Nonnull;
3437

38+
import org.apache.arrow.flight.FlightRuntimeException;
3539
import org.assertj.core.api.Assertions;
3640
import org.junit.jupiter.api.Test;
3741
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
@@ -195,6 +199,114 @@ public void testQuery() throws Exception {
195199
}
196200
}
197201

202+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
203+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
204+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
205+
@Test
206+
public void testQueryRows() throws Exception {
207+
try (InfluxDBClient client = InfluxDBClient.getInstance(
208+
System.getenv("TESTING_INFLUXDB_URL"),
209+
System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(),
210+
System.getenv("TESTING_INFLUXDB_DATABASE"),
211+
null)) {
212+
String uuid = UUID.randomUUID().toString();
213+
String measurement = "host21";
214+
List<Map<String, Object>> testDatas = new ArrayList<>();
215+
for (int i = 0; i <= 9; i++) {
216+
long timestamp = System.currentTimeMillis();
217+
Map<String, Object> map = Map.of(
218+
"measurement", measurement,
219+
"tag", "tagValue",
220+
"name", "intel",
221+
"mem_total", 2048.0,
222+
"disk_free", 100L,
223+
"temperature", 100.86,
224+
"isActive", true,
225+
"time", timestamp,
226+
"testId", uuid
227+
);
228+
String record = String.format(
229+
"%s,tag=tagValue "
230+
+ "name=\"%s\","
231+
+ "mem_total=%f,"
232+
+ "disk_free=%di,"
233+
+ "temperature=%f,"
234+
+ "isActive=%b,"
235+
+ "testId=\"%s\" %d",
236+
measurement,
237+
map.get("name"),
238+
(Double) map.get("mem_total"),
239+
(Long) map.get("disk_free"),
240+
(Double) map.get("temperature"),
241+
map.get("isActive"),
242+
uuid,
243+
timestamp
244+
);
245+
client.writeRecord(record, new WriteOptions(null, WritePrecision.MS, null));
246+
testDatas.add(map);
247+
}
248+
249+
Map<String, Object> parameters = Map.of("testId", uuid);
250+
// Result set much be ordered by time
251+
String sql = String.format("Select * from %s where \"testId\"=$testId order by time", measurement);
252+
try (Stream<Map<String, Object>> stream = client.queryRows(sql, parameters)) {
253+
List<Map<String, Object>> results = stream.collect(Collectors.toList());
254+
for (int i = 0; i <= 9; i++) {
255+
Map<String, Object> row = results.get(i);
256+
Map<String, Object> testData = testDatas.get(i);
257+
Assertions.assertThat(row.get("tag").getClass()).isEqualTo(String.class);
258+
Assertions.assertThat(row.get("tag")).isEqualTo(testData.get("tag"));
259+
260+
Assertions.assertThat(row.get("name").getClass()).isEqualTo(String.class);
261+
Assertions.assertThat(row.get("name")).isEqualTo(testData.get("name"));
262+
263+
Assertions.assertThat(row.get("mem_total").getClass()).isEqualTo(Double.class);
264+
Assertions.assertThat(row.get("mem_total")).isEqualTo(testData.get("mem_total"));
265+
266+
Assertions.assertThat(row.get("disk_free").getClass()).isEqualTo(Long.class);
267+
Assertions.assertThat(row.get("disk_free")).isEqualTo(testData.get("disk_free"));
268+
269+
Assertions.assertThat(row.get("isActive").getClass()).isEqualTo(Boolean.class);
270+
Assertions.assertThat(row.get("isActive")).isEqualTo(testData.get("isActive"));
271+
272+
Assertions.assertThat(row.get("time").getClass()).isEqualTo(BigInteger.class);
273+
Assertions.assertThat(row.get("time"))
274+
.isEqualTo(BigInteger.valueOf((Long) testData.get("time") * 1_000_000));
275+
}
276+
}
277+
}
278+
}
279+
280+
281+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
282+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
283+
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
284+
@Test
285+
public void testQueryRowsExceptionCases() throws Exception {
286+
try (InfluxDBClient client = InfluxDBClient.getInstance(
287+
System.getenv("TESTING_INFLUXDB_URL"),
288+
System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(),
289+
System.getenv("TESTING_INFLUXDB_DATABASE"),
290+
null)) {
291+
292+
// Empty result case
293+
Map<String, Object> parameters = Map.of("testId", "NotExist");
294+
String sql = "Select * from host21 where \"testId\"=$testId";
295+
try (Stream<Map<String, Object>> stream = client.queryRows(sql, parameters)) {
296+
Assertions.assertThat((int) stream.count()).isEqualTo(0);
297+
}
298+
299+
// Malformed query case
300+
Assertions.assertThatThrownBy(() -> {
301+
String query = "Select * from host21 whereabs testId=2";
302+
try (Stream<Map<String, Object>> stream = client.queryRows(query)) {
303+
stream.findFirst();
304+
}
305+
})
306+
.isInstanceOf(FlightRuntimeException.class);
307+
}
308+
}
309+
198310
private void assertGetDataSuccess(@Nonnull final InfluxDBClient influxDBClient) {
199311
influxDBClient.writePoint(
200312
Point.measurement("test1")

0 commit comments

Comments
 (0)