|
21 | 21 | */ |
22 | 22 | package com.influxdb.v3.client; |
23 | 23 |
|
| 24 | +import java.io.IOException; |
24 | 25 | import java.math.BigInteger; |
25 | 26 | import java.time.Instant; |
26 | 27 | import java.time.temporal.ChronoUnit; |
| 28 | +import java.util.HashMap; |
27 | 29 | import java.util.List; |
28 | 30 | import java.util.Map; |
29 | 31 | import java.util.stream.Collectors; |
30 | 32 | import java.util.stream.Stream; |
31 | 33 |
|
| 34 | +import org.apache.arrow.flight.CallStatus; |
| 35 | +import org.apache.arrow.flight.FlightRuntimeException; |
32 | 36 | import org.apache.arrow.vector.VectorSchemaRoot; |
33 | 37 | import org.assertj.core.api.Assertions; |
34 | 38 | import org.jetbrains.annotations.NotNull; |
@@ -222,6 +226,91 @@ void pointValues() { |
222 | 226 | } |
223 | 227 | } |
224 | 228 |
|
| 229 | + /* |
| 230 | + Motivated by EAR 5718, useful exception INVALID_ARGUMENT was being masked by |
| 231 | + INTERNAL: http2 exception - Header size exceeded max allowed size (10240). |
| 232 | + */ |
| 233 | + public String makeLengthyTag(final int length, final int maxPartLength, final byte separator) { |
| 234 | + final String legalVals = "0123456789abcdefghijklmnopqrstuvwxyz"; |
| 235 | + byte[] bytes = new byte[length]; |
| 236 | + int nextPartAddress = 0; |
| 237 | + for (int i = 0; i < length; i++) { |
| 238 | + if (i == nextPartAddress) { |
| 239 | + bytes[i] = separator; |
| 240 | + nextPartAddress = i + (int) (Math.random() * (maxPartLength - 3)); |
| 241 | + } else { |
| 242 | + bytes[i] = legalVals.getBytes()[(int) (Math.random() * legalVals.length())]; |
| 243 | + } |
| 244 | + } |
| 245 | + return new String(bytes); |
| 246 | + } |
| 247 | + |
| 248 | + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") |
| 249 | + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") |
| 250 | + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") |
| 251 | + @Test |
| 252 | + public void handleFlightRuntimeException() throws IOException { |
| 253 | + Instant now = Instant.now(); |
| 254 | + String measurement = String.format( |
| 255 | + "/%d/test/com/influxdb/v3/client/ITQueryWrite/handleFlightRuntimeException", now.toEpochMilli() |
| 256 | + ); |
| 257 | + |
| 258 | + client = getInstance(); |
| 259 | + |
| 260 | + int extraTagLength = 512; |
| 261 | + Map<String, String> extraTags = new HashMap<String, String>(); |
| 262 | + for (int i = 0; i < 22; i++) { |
| 263 | + extraTags.put(makeLengthyTag(extraTagLength, 64, (byte) '/'), "extra-tag-" + i); |
| 264 | + } |
| 265 | + |
| 266 | + Point p = Point.measurement(measurement) |
| 267 | + .setTag("id", "thx1138") |
| 268 | + .setTag("model", "xc11") |
| 269 | + .setTags(extraTags) |
| 270 | + .setFloatField("speed", 3.14) |
| 271 | + .setFloatField("bearing", 3.14 * 0.5) |
| 272 | + .setIntegerField("ticks", 42) |
| 273 | + .setStringField("location", "/earth/4/12/9/15/1") |
| 274 | + .setTimestamp(now); |
| 275 | + |
| 276 | + try { |
| 277 | + client.writePoint(p); |
| 278 | + } catch (InfluxDBApiException idbae) { |
| 279 | + Assertions.fail(idbae); |
| 280 | + } |
| 281 | + |
| 282 | + String faultyQuery = String.format("SELECT * FROM \"%s\" WHERE idx = 'thx1138'", measurement); |
| 283 | + |
| 284 | + try (Stream<Object[]> stream = client.query(faultyQuery)) { |
| 285 | + stream.forEach(row -> { |
| 286 | + for (Object o : row) { |
| 287 | + System.out.print(o + " "); |
| 288 | + } |
| 289 | + System.out.print("\n"); |
| 290 | + }); |
| 291 | + } catch (FlightRuntimeException fre) { |
| 292 | + Assertions.assertThat(fre.getMessage()).doesNotContain("http2 exception"); |
| 293 | + Assertions.assertThat(fre.status().code()).isNotEqualTo(CallStatus.INTERNAL.code()); |
| 294 | + Assertions.assertThat(fre.status().code()). |
| 295 | + as(String.format("Flight runtime exception was UNAVAILABLE. " |
| 296 | + + "Target test case was not fully tested. " |
| 297 | + + "Check limits of test account and target database %s.", |
| 298 | + System.getenv("TESTING_INFLUXDB_DATABASE"))) |
| 299 | + .isNotEqualTo(CallStatus.UNAVAILABLE.code()); |
| 300 | + Assertions.assertThat(fre.status().code()). |
| 301 | + as("Flight runtime exception was UNAUTHENTICATED. " |
| 302 | + + "Target test case was not fully tested. Check test account token.") |
| 303 | + .isNotEqualTo(CallStatus.UNAUTHENTICATED.code()); |
| 304 | + return; |
| 305 | + } catch (Exception e) { |
| 306 | + Assertions.fail(String.format("FlightRuntimeException should have been thrown. " |
| 307 | + + "Instead received %s.", e)); |
| 308 | + } |
| 309 | + |
| 310 | + Assertions.fail("FlightRuntimeException should have been thrown. Instead final query passed."); |
| 311 | + |
| 312 | + } |
| 313 | + |
225 | 314 | @NotNull |
226 | 315 | private static InfluxDBClient getInstance() { |
227 | 316 | return InfluxDBClient.getInstance( |
|
0 commit comments