|
23 | 23 |
|
24 | 24 | import java.math.BigInteger; |
25 | 25 | import java.time.Instant; |
| 26 | +import java.util.ArrayList; |
| 27 | +import java.util.List; |
26 | 28 | import java.util.Map; |
27 | 29 | import java.util.Properties; |
28 | 30 | import java.util.UUID; |
| 31 | +import java.util.stream.Collectors; |
29 | 32 | import java.util.stream.Stream; |
30 | 33 |
|
| 34 | +import org.apache.arrow.flight.FlightRuntimeException; |
31 | 35 | import org.assertj.core.api.Assertions; |
32 | 36 | import org.junit.jupiter.api.Test; |
33 | 37 | import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; |
@@ -188,45 +192,99 @@ public void testQueryRows() throws Exception { |
188 | 192 | System.getenv("TESTING_INFLUXDB_DATABASE"), |
189 | 193 | null)) { |
190 | 194 | String uuid = UUID.randomUUID().toString(); |
191 | | - long timestamp = Instant.now().getEpochSecond(); |
192 | | - String record = String.format( |
193 | | - "host12,tag=tagValue " |
194 | | - + "name=\"intel\"," |
195 | | - + "mem_total=2048," |
196 | | - + "disk_free=100i," |
197 | | - + "temperature=100.86," |
198 | | - + "isActive=true," |
199 | | - + "testId=\"%s\" %d", |
200 | | - uuid, |
201 | | - timestamp |
202 | | - ); |
203 | | - client.writeRecord(record, new WriteOptions(null, WritePrecision.S, null)); |
| 195 | + String measurement = "host21"; |
| 196 | + List<Map<String, Object>> testDatas = new ArrayList<>(); |
| 197 | + for (int i = 0; i <= 9; i++) { |
| 198 | + long timestamp = System.currentTimeMillis(); |
| 199 | + Map<String, Object> map = Map.of( |
| 200 | + "measurement", measurement, |
| 201 | + "tag", "tagValue", |
| 202 | + "name", "intel", |
| 203 | + "mem_total", 2048.0, |
| 204 | + "disk_free", 100L, |
| 205 | + "temperature", 100.86, |
| 206 | + "isActive", true, |
| 207 | + "time", timestamp, |
| 208 | + "testId", uuid |
| 209 | + ); |
| 210 | + String record = String.format( |
| 211 | + "%s,tag=tagValue " |
| 212 | + + "name=\"%s\"," |
| 213 | + + "mem_total=%f," |
| 214 | + + "disk_free=%di," |
| 215 | + + "temperature=%f," |
| 216 | + + "isActive=%b," |
| 217 | + + "testId=\"%s\" %d", |
| 218 | + measurement, |
| 219 | + map.get("name"), |
| 220 | + (Double) map.get("mem_total"), |
| 221 | + (Long) map.get("disk_free"), |
| 222 | + (Double) map.get("temperature"), |
| 223 | + map.get("isActive"), |
| 224 | + uuid, |
| 225 | + timestamp |
| 226 | + ); |
| 227 | + client.writeRecord(record, new WriteOptions(null, WritePrecision.MS, null)); |
| 228 | + testDatas.add(map); |
| 229 | + } |
204 | 230 |
|
205 | 231 | Map<String, Object> parameters = Map.of("testId", uuid); |
206 | | - String sql = "Select * from host12 where \"testId\"=$testId"; |
| 232 | + String sql = String.format("Select * from %s where \"testId\"=$testId order by time", measurement); // Result set much be ordered by time |
207 | 233 | try (Stream<Map<String, Object>> stream = client.queryRows(sql, parameters)) { |
208 | | - stream.findFirst() |
209 | | - .ifPresent(map -> { |
210 | | - Assertions.assertThat(map.get("tag").getClass()).isEqualTo(String.class); |
211 | | - Assertions.assertThat(map.get("tag")).isEqualTo("tagValue"); |
| 234 | + List<Map<String, Object>> results = stream.collect(Collectors.toList()); |
| 235 | + for (int i = 0; i <= 9; i++) { |
| 236 | + Map<String, Object> row = results.get(i); |
| 237 | + Map<String, Object> testData = testDatas.get(i); |
| 238 | + Assertions.assertThat(row.get("tag").getClass()).isEqualTo(String.class); |
| 239 | + Assertions.assertThat(row.get("tag")).isEqualTo(testData.get("tag")); |
212 | 240 |
|
213 | | - Assertions.assertThat(map.get("name").getClass()).isEqualTo(String.class); |
214 | | - Assertions.assertThat(map.get("name")).isEqualTo("intel"); |
| 241 | + Assertions.assertThat(row.get("name").getClass()).isEqualTo(String.class); |
| 242 | + Assertions.assertThat(row.get("name")).isEqualTo(testData.get("name")); |
215 | 243 |
|
216 | | - Assertions.assertThat(map.get("mem_total").getClass()).isEqualTo(Double.class); |
217 | | - Assertions.assertThat(map.get("mem_total")).isEqualTo(2048.0); |
| 244 | + Assertions.assertThat(row.get("mem_total").getClass()).isEqualTo(Double.class); |
| 245 | + Assertions.assertThat(row.get("mem_total")).isEqualTo(testData.get("mem_total")); |
218 | 246 |
|
219 | | - Assertions.assertThat(map.get("disk_free").getClass()).isEqualTo(Long.class); |
220 | | - Assertions.assertThat(map.get("disk_free")).isEqualTo(100L); |
| 247 | + Assertions.assertThat(row.get("disk_free").getClass()).isEqualTo(Long.class); |
| 248 | + Assertions.assertThat(row.get("disk_free")).isEqualTo(testData.get("disk_free")); |
221 | 249 |
|
222 | | - Assertions.assertThat(map.get("isActive").getClass()).isEqualTo(Boolean.class); |
223 | | - Assertions.assertThat(map.get("isActive")).isEqualTo(true); |
| 250 | + Assertions.assertThat(row.get("isActive").getClass()).isEqualTo(Boolean.class); |
| 251 | + Assertions.assertThat(row.get("isActive")).isEqualTo(testData.get("isActive")); |
224 | 252 |
|
225 | | - Assertions.assertThat(map.get("time").getClass()).isEqualTo(BigInteger.class); |
226 | | - Assertions.assertThat(map.get("time")) |
227 | | - .isEqualTo(BigInteger.valueOf(timestamp * 1_000_000_000)); |
228 | | - }); |
| 253 | + Assertions.assertThat(row.get("time").getClass()).isEqualTo(BigInteger.class); |
| 254 | + Assertions.assertThat(row.get("time")) |
| 255 | + .isEqualTo(BigInteger.valueOf((Long) testData.get("time") * 1_000_000)); |
| 256 | + } |
229 | 257 | } |
230 | 258 | } |
231 | 259 | } |
| 260 | + |
| 261 | + |
| 262 | + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") |
| 263 | + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") |
| 264 | + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") |
| 265 | + @Test |
| 266 | + public void testQueryRowsExceptionCases() throws Exception { |
| 267 | + try (InfluxDBClient client = InfluxDBClient.getInstance( |
| 268 | + System.getenv("TESTING_INFLUXDB_URL"), |
| 269 | + System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(), |
| 270 | + System.getenv("TESTING_INFLUXDB_DATABASE"), |
| 271 | + null)) { |
| 272 | + |
| 273 | + // Empty result case |
| 274 | + Map<String, Object> parameters = Map.of("testId", "NotExist"); |
| 275 | + String sql = "Select * from host21 where \"testId\"=$testId"; |
| 276 | + try (Stream<Map<String, Object>> stream = client.queryRows(sql, parameters)) { |
| 277 | + Assertions.assertThat((int) stream.count()).isEqualTo(0); |
| 278 | + } |
| 279 | + |
| 280 | + // Malformed query case |
| 281 | + Assertions.assertThatThrownBy(() -> { |
| 282 | + String query = "Select * from host21 whereabs testId=2"; |
| 283 | + try (Stream<Map<String, Object>> stream = client.queryRows(query)) { |
| 284 | + stream.findFirst(); |
| 285 | + } |
| 286 | + }) |
| 287 | + .isInstanceOf(FlightRuntimeException.class); |
| 288 | + } |
| 289 | + } |
232 | 290 | } |
0 commit comments