Skip to content

Commit 3169621

Browse files
authored
Merge pull request #600 from fredo994/fix-issue-599
Fix issue 599: Query with different time format does not work if database is not specified.
2 parents ecdbac8 + 9bd8c84 commit 3169621

File tree

3 files changed

+214
-12
lines changed

3 files changed

+214
-12
lines changed

README.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,48 @@ try (InfluxDB influxDB = InfluxDBFactory.connect("http://172.17.0.2:8086", "root
160160
}
161161
```
162162

163+
#### Default database to query
164+
165+
If you only use one database you can set client level database information. This database will be used for all subsequent HTTP queries,
166+
but if you still sometimes need to query some different database you can, you need to provide provide database information directly in the query,
167+
then database information in the query will take precedence and query will be pushed to that database.
168+
This is shown in the following example:
169+
170+
```java
171+
// Database names
172+
InfluxDB influxDB = InfluxDBFactory.connect("http://172.17.0.2:8086", "root", "root");
173+
174+
String db1 = "database1";
175+
influxDB.query(new Query("CREATE DATABASE " + db1));
176+
String defaultRpName = "aRetentionPolicy1";
177+
influxDB.query(new Query("CREATE RETENTION POLICY " + rp1 + " ON " + db1 + " DURATION 30h REPLICATION 2 DEFAULT"));
178+
Point point1 = Point.measurement("cpu")
179+
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
180+
.addField("idle", 90L)
181+
.addField("user", 9L)
182+
.addField("system", 1L)
183+
.build();
184+
influxDB.write(db1, rp1, point1); // Write to db1
185+
186+
String db2 = "database2";
187+
influxDB.query(new Query("CREATE DATABASE " + db2));
188+
String rp2 = "aRetentionPolicy1";
189+
influxDB.query(new Query("CREATE RETENTION POLICY " + rp2 + " ON " + db2 + " DURATION 30h REPLICATION 2 DEFAULT"));
190+
Point point2 = Point.measurement("cpu")
191+
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
192+
.addField("idle", 80L)
193+
.addField("user", 8L)
194+
.addField("system", 2L)
195+
.build();
196+
influxDB.write(db2, rp2, point2); // Write to db2
197+
198+
199+
influxDB.query(new Query("SELECT * FROM cpu")); // Returns Point1
200+
influxDB.query(new Query("SELECT * FROM cpu", db2)) // Returns Point2
201+
202+
```
203+
204+
163205
### Advanced Usage
164206

165207
#### Gzip's support (version 2.5+ required)

src/main/java/org/influxdb/impl/InfluxDBImpl.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -608,14 +608,13 @@ public void query(final Query query, final int chunkSize, final BiConsumer<Cance
608608
@Override
609609
public void query(final Query query, final int chunkSize, final BiConsumer<Cancellable, QueryResult> onNext,
610610
final Runnable onComplete, final Consumer<Throwable> onFailure) {
611-
612611
Call<ResponseBody> call;
613612
if (query instanceof BoundParameterQuery) {
614613
BoundParameterQuery boundParameterQuery = (BoundParameterQuery) query;
615-
call = this.influxDBService.query(query.getDatabase(), query.getCommandWithUrlEncoded(), chunkSize,
614+
call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded(), chunkSize,
616615
boundParameterQuery.getParameterJsonWithUrlEncoded());
617616
} else {
618-
call = this.influxDBService.query(query.getDatabase(), query.getCommandWithUrlEncoded(), chunkSize);
617+
call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded(), chunkSize);
619618
}
620619

621620
call.enqueue(new Callback<ResponseBody>() {
@@ -681,11 +680,11 @@ public QueryResult query(final Query query, final TimeUnit timeUnit) {
681680
Call<QueryResult> call = null;
682681
if (query instanceof BoundParameterQuery) {
683682
BoundParameterQuery boundParameterQuery = (BoundParameterQuery) query;
684-
call = this.influxDBService.query(query.getDatabase(),
683+
call = this.influxDBService.query(getDatabase(query),
685684
TimeUtil.toTimePrecision(timeUnit), query.getCommandWithUrlEncoded(),
686685
boundParameterQuery.getParameterJsonWithUrlEncoded());
687686
} else {
688-
call = this.influxDBService.query(query.getDatabase(),
687+
call = this.influxDBService.query(getDatabase(query),
689688
TimeUtil.toTimePrecision(timeUnit), query.getCommandWithUrlEncoded());
690689
}
691690
return executeQuery(call);
@@ -746,19 +745,15 @@ public boolean databaseExists(final String name) {
746745
*/
747746
private Call<QueryResult> callQuery(final Query query) {
748747
Call<QueryResult> call;
749-
String db = query.getDatabase();
750-
if (db == null) {
751-
db = this.database;
752-
}
753748
if (query instanceof BoundParameterQuery) {
754749
BoundParameterQuery boundParameterQuery = (BoundParameterQuery) query;
755-
call = this.influxDBService.postQuery(db, query.getCommandWithUrlEncoded(),
750+
call = this.influxDBService.postQuery(getDatabase(query), query.getCommandWithUrlEncoded(),
756751
boundParameterQuery.getParameterJsonWithUrlEncoded());
757752
} else {
758753
if (query.requiresPost()) {
759-
call = this.influxDBService.postQuery(db, query.getCommandWithUrlEncoded());
754+
call = this.influxDBService.postQuery(getDatabase(query), query.getCommandWithUrlEncoded());
760755
} else {
761-
call = this.influxDBService.query(db, query.getCommandWithUrlEncoded());
756+
call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded());
762757
}
763758
}
764759
return call;
@@ -926,6 +921,14 @@ public void dropRetentionPolicy(final String rpName, final String database) {
926921
executeQuery(this.influxDBService.postQuery(Query.encode(queryBuilder.toString())));
927922
}
928923

924+
private String getDatabase(final Query query) {
925+
String db = query.getDatabase();
926+
if (db == null) {
927+
return this.database;
928+
}
929+
return db;
930+
}
931+
929932
private interface ChunkProccesor {
930933
void process(ResponseBody chunkedBody, Cancellable cancellable,
931934
BiConsumer<Cancellable, QueryResult> consumer, Runnable onComplete) throws IOException;

src/test/java/org/influxdb/InfluxDBTest.java

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.influxdb;
22

3+
import java.util.Collections;
34
import org.influxdb.InfluxDB.LogLevel;
45
import org.influxdb.InfluxDB.ResponseFormat;
56
import org.influxdb.dto.BatchPoints;
@@ -285,6 +286,162 @@ public void testWriteNoDatabase() {
285286
this.influxDB.query(new Query("DROP DATABASE " + dbName));
286287
}
287288

289+
/**
290+
* Tests that database information is used from {@link InfluxDB} when database information
291+
* is not present in query.
292+
*/
293+
@Test
294+
public void testQueryWithNoDatabase() {
295+
String dbName = "write_unittest_" + System.currentTimeMillis();
296+
this.influxDB.query(new Query("CREATE DATABASE " + dbName));
297+
this.influxDB.setDatabase(dbName); // Set db here, then after write query should pass.
298+
this.influxDB.write(Point
299+
.measurement("cpu")
300+
.tag("atag", "test")
301+
.addField("idle", 90L)
302+
.addField("usertime", 9L)
303+
.addField("system", 1L)
304+
.build());
305+
306+
Query query = new Query("SELECT * FROM cpu GROUP BY *");
307+
QueryResult result = this.influxDB.query(query);
308+
Assertions.assertEquals(
309+
result.getResults().get(0).getSeries().get(0).getTags(),
310+
Collections.singletonMap("atag", "test")
311+
);
312+
this.influxDB.query(new Query("DROP DATABASE " + dbName));
313+
}
314+
315+
/**
316+
* Tests that database information is used from {@link InfluxDB} when database information
317+
* is not present in query.
318+
*/
319+
@Test
320+
public void testQueryWithNoDatabaseWithConsumer() {
321+
String dbName = "write_unittest_" + System.currentTimeMillis();
322+
this.influxDB.query(new Query("CREATE DATABASE " + dbName));
323+
this.influxDB.setDatabase(dbName); // Set db here, then after write query should pass.
324+
this.influxDB.write(Point
325+
.measurement("cpu")
326+
.tag("atag", "test")
327+
.addField("idle", 90L)
328+
.addField("usertime", 9L)
329+
.addField("system", 1L)
330+
.build());
331+
332+
Query query = new Query("SELECT * FROM cpu GROUP BY *");
333+
this.influxDB.query(query,
334+
queryResult ->
335+
Assertions.assertEquals(
336+
queryResult.getResults().get(0).getSeries().get(0).getTags(),
337+
Collections.singletonMap("atag", "test")
338+
)
339+
,
340+
throwable -> Assertions.fail()
341+
);
342+
this.influxDB.query(new Query("DROP DATABASE " + dbName));
343+
}
344+
345+
346+
/**
347+
* Tests that database information is used from {@link InfluxDB} when database information
348+
* is not present in query.
349+
*/
350+
// Note: this test is copied from InfluxDBTest#testChunking but changed so that database
351+
// information is present in client not query. Combined both tests test situations with
352+
// and without database information present in query, hence no need for additional test
353+
// for situation where database info is not set in the client.
354+
public void testQueryNoDatabaseWithChunking() throws Exception {
355+
if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) {
356+
// do not test version 0.13 and 1.0
357+
return;
358+
}
359+
String dbName = "write_unittest_" + System.currentTimeMillis();
360+
this.influxDB.query(new Query("CREATE DATABASE " + dbName));
361+
this.influxDB.setDatabase(dbName); // Set database -> no need to use it as query parameter.
362+
String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version());
363+
BatchPoints batchPoints = BatchPoints.database(dbName).retentionPolicy(rp).build();
364+
Point point1 = Point.measurement("disk").tag("atag", "a").addField("used", 60L).addField("free", 1L).build();
365+
Point point2 = Point.measurement("disk").tag("atag", "b").addField("used", 70L).addField("free", 2L).build();
366+
Point point3 = Point.measurement("disk").tag("atag", "c").addField("used", 80L).addField("free", 3L).build();
367+
batchPoints.point(point1);
368+
batchPoints.point(point2);
369+
batchPoints.point(point3);
370+
this.influxDB.write(batchPoints);
371+
372+
Thread.sleep(2000);
373+
final BlockingQueue<QueryResult> queue = new LinkedBlockingQueue<>();
374+
Query query = new Query("SELECT * FROM disk");
375+
this.influxDB.query(query, 2, queue::add);
376+
377+
Thread.sleep(2000);
378+
this.influxDB.query(new Query("DROP DATABASE " + dbName));
379+
380+
QueryResult result = queue.poll(20, TimeUnit.SECONDS);
381+
Assertions.assertNotNull(result);
382+
System.out.println(result);
383+
Assertions.assertEquals(2, result.getResults().get(0).getSeries().get(0).getValues().size());
384+
385+
result = queue.poll(20, TimeUnit.SECONDS);
386+
Assertions.assertNotNull(result);
387+
System.out.println(result);
388+
Assertions.assertEquals(1, result.getResults().get(0).getSeries().get(0).getValues().size());
389+
390+
result = queue.poll(20, TimeUnit.SECONDS);
391+
Assertions.assertNotNull(result);
392+
System.out.println(result);
393+
Assertions.assertEquals("DONE", result.getError());
394+
}
395+
396+
/**
397+
* Tests that database information is used from {@link InfluxDB} when database information
398+
* is not present in query and when different time format is requested from db.
399+
*/
400+
@Test
401+
public void testQueryNoDatabaseWithTimeFormat() {
402+
String dbName = "write_unittest_" + System.currentTimeMillis();
403+
long time = 1559027876L;
404+
this.influxDB.query(new Query("CREATE DATABASE " + dbName));
405+
this.influxDB.setDatabase(dbName); // Set db here, then after write query should pass.
406+
this.influxDB.write(Point
407+
.measurement("cpu")
408+
.tag("atag", "test")
409+
.addField("idle", 90L)
410+
.addField("usertime", 9L)
411+
.addField("system", 1L)
412+
.time(time, TimeUnit.MILLISECONDS) // Set time.
413+
.build());
414+
415+
Query query = new Query("SELECT * FROM cpu GROUP BY *");
416+
417+
// Test milliseconds
418+
QueryResult result = this.influxDB.query(query, TimeUnit.MILLISECONDS);
419+
Series series = result.getResults().get(0).getSeries().get(0);
420+
Assertions.assertEquals(
421+
((Number)series.getValues().get(0).get(series.getColumns().indexOf("time"))).longValue() ,
422+
time
423+
);
424+
425+
// Test nanoseconds
426+
result = this.influxDB.query(query, TimeUnit.NANOSECONDS);
427+
series = result.getResults().get(0).getSeries().get(0);
428+
Assertions.assertEquals(
429+
((Number)series.getValues().get(0).get(series.getColumns().indexOf("time"))).longValue(),
430+
TimeUnit.NANOSECONDS.convert(time, TimeUnit.MILLISECONDS)
431+
);
432+
433+
// Test seconds
434+
result = this.influxDB.query(query, TimeUnit.SECONDS);
435+
series = result.getResults().get(0).getSeries().get(0);
436+
Assertions.assertEquals(
437+
((Number)series.getValues().get(0).get(series.getColumns().indexOf("time"))).longValue(),
438+
TimeUnit.SECONDS.convert(time, TimeUnit.MILLISECONDS)
439+
440+
);
441+
442+
this.influxDB.query(new Query("DROP DATABASE " + dbName));
443+
}
444+
288445
/**
289446
* Test the implementation of {@link InfluxDB#write(int, Point)}'s async support.
290447
*/

0 commit comments

Comments
 (0)