Skip to content

Commit 80271e6

Browse files
authored
Merge pull request #640 from rhajek/master
Fixed exception propagation in query streaming #639
2 parents 5cdc9e3 + f33dc3f commit 80271e6

File tree

2 files changed

+76
-24
lines changed

2 files changed

+76
-24
lines changed

src/main/java/org/influxdb/impl/InfluxDBImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -676,7 +676,13 @@ public boolean isCanceled() {
676676
if (onFailure != null) {
677677
onFailure.accept(e);
678678
}
679+
} catch (Exception e) {
680+
call.cancel();
681+
if (onFailure != null) {
682+
onFailure.accept(e);
683+
}
679684
}
685+
680686
}
681687

682688
@Override

src/test/java/org/influxdb/InfluxDBTest.java

Lines changed: 70 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1023,31 +1023,77 @@ public void accept(QueryResult result) {
10231023
Assertions.assertEquals("DONE", result.getError());
10241024
}
10251025

1026-
/**
1027-
* Test chunking edge case.
1028-
* @throws InterruptedException
1029-
*/
1030-
@Test
1031-
public void testChunkingFail() throws InterruptedException {
1032-
if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) {
1033-
// do not test version 0.13 and 1.0
1034-
return;
1035-
}
1036-
String dbName = "write_unittest_" + System.currentTimeMillis();
1037-
this.influxDB.query(new Query("CREATE DATABASE " + dbName));
1038-
final CountDownLatch countDownLatch = new CountDownLatch(1);
1039-
Query query = new Query("UNKNOWN_QUERY", dbName);
1040-
this.influxDB.query(query, 10, new Consumer<QueryResult>() {
1041-
@Override
1042-
public void accept(QueryResult result) {
1043-
countDownLatch.countDown();
1044-
}
1045-
});
1046-
this.influxDB.query(new Query("DROP DATABASE " + dbName));
1047-
Assertions.assertFalse(countDownLatch.await(10, TimeUnit.SECONDS));
1048-
}
1026+
/**
1027+
* Test chunking edge case.
1028+
*
1029+
* @throws InterruptedException
1030+
*/
1031+
@Test
1032+
public void testChunkingFail() throws InterruptedException {
1033+
if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) {
1034+
// do not test version 0.13 and 1.0
1035+
return;
1036+
}
1037+
String dbName = "write_unittest_" + System.currentTimeMillis();
1038+
this.influxDB.query(new Query("CREATE DATABASE " + dbName));
1039+
final CountDownLatch countDownLatch = new CountDownLatch(1);
1040+
final CountDownLatch countDownLatchFailure = new CountDownLatch(1);
1041+
Query query = new Query("UNKNOWN_QUERY", dbName);
1042+
this.influxDB.query(query, 10,
1043+
(cancellable, queryResult) -> {
1044+
countDownLatch.countDown();
1045+
}, () -> {
1046+
},
1047+
throwable -> {
1048+
countDownLatchFailure.countDown();
1049+
});
1050+
this.influxDB.query(new Query("DROP DATABASE " + dbName));
1051+
Assertions.assertTrue(countDownLatchFailure.await(10, TimeUnit.SECONDS));
1052+
Assertions.assertFalse(countDownLatch.await(10, TimeUnit.SECONDS));
1053+
}
10491054

1050-
/**
1055+
@Test
1056+
public void testChunkingFailInConsumer() throws InterruptedException {
1057+
if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) {
1058+
// do not test version 0.13 and 1.0
1059+
return;
1060+
}
1061+
String dbName = "write_unittest_" + System.currentTimeMillis();
1062+
this.influxDB.query(new Query("CREATE DATABASE " + dbName));
1063+
1064+
String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version());
1065+
BatchPoints batchPoints = BatchPoints.database(dbName).retentionPolicy(rp).build();
1066+
Point point1 = Point.measurement("disk").tag("atag", "a").addField("used", 60L).addField("free", 1L).build();
1067+
Point point2 = Point.measurement("disk").tag("atag", "b").addField("used", 70L).addField("free", 2L).build();
1068+
Point point3 = Point.measurement("disk").tag("atag", "c").addField("used", 80L).addField("free", 3L).build();
1069+
batchPoints.point(point1);
1070+
batchPoints.point(point2);
1071+
batchPoints.point(point3);
1072+
this.influxDB.write(batchPoints);
1073+
1074+
final CountDownLatch countDownLatch = new CountDownLatch(1);
1075+
final CountDownLatch countDownLatchFailure = new CountDownLatch(1);
1076+
final CountDownLatch countDownLatchComplete = new CountDownLatch(1);
1077+
Query query = new Query("SELECT * FROM disk", dbName);
1078+
this.influxDB.query(query, 2,
1079+
(cancellable, queryResult) -> {
1080+
countDownLatch.countDown();
1081+
throw new RuntimeException("my error");
1082+
}, () -> {
1083+
countDownLatchComplete.countDown();
1084+
System.out.println("onComplete()");
1085+
},
1086+
throwable -> {
1087+
Assertions.assertEquals(throwable.getMessage(), "my error");
1088+
countDownLatchFailure.countDown();
1089+
});
1090+
this.influxDB.query(new Query("DROP DATABASE " + dbName));
1091+
Assertions.assertTrue(countDownLatchFailure.await(10, TimeUnit.SECONDS));
1092+
Assertions.assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
1093+
Assertions.assertFalse(countDownLatchComplete.await(10, TimeUnit.SECONDS));
1094+
}
1095+
1096+
/**
10511097
* Test chunking on 0.13 and 1.0.
10521098
* @throws InterruptedException
10531099
*/

0 commit comments

Comments
 (0)