Skip to content

Commit cf0630e

Browse files
authored
feat: write reactive client uses direct flow (#251)
1 parent 1f42128 commit cf0630e

31 files changed

+1714
-717
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ This release uses the latest InfluxDB OSS API definitions - [oss.yml](https://ra
2323
1. [#272](https://github.com/influxdata/influxdb-client-java/pull/272): Add `PingService` to check status of OSS and Cloud instance
2424
1. [#278](https://github.com/influxdata/influxdb-client-java/pull/278): Add query method with all params for BucketsApi, OrganizationApi and TasksApi
2525
1. [#280](https://github.com/influxdata/influxdb-client-java/pull/280): Use async HTTP calls in the Batching writer
26+
1. [#251](https://github.com/influxdata/influxdb-client-java/pull/251): Client uses `Reactive Streams` in public API, `WriteReactiveApi` is cold `Publisher` [influxdb-client-reactive]
2627

2728
### Bug Fixes
2829
1. [#279](https://github.com/influxdata/influxdb-client-java/pull/279): Session authentication for InfluxDB `2.1`

client-core/src/main/java/com/influxdb/internal/AbstractRestClient.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import okhttp3.ResponseBody;
5353
import okhttp3.logging.HttpLoggingInterceptor;
5454
import retrofit2.Call;
55+
import retrofit2.HttpException;
5556
import retrofit2.Response;
5657

5758
/**
@@ -86,6 +87,20 @@ protected <T> T execute(@Nonnull final Call<T> call) throws InfluxException {
8687
}
8788
}
8889

90+
@Nonnull
91+
protected InfluxException toInfluxException(@Nonnull final Throwable throwable) {
92+
93+
if (throwable instanceof InfluxException) {
94+
return (InfluxException) throwable;
95+
}
96+
97+
if (throwable instanceof HttpException) {
98+
return responseToError(((HttpException) throwable).response());
99+
}
100+
101+
return new InfluxException(throwable);
102+
}
103+
89104
@Nonnull
90105
@SuppressWarnings("MagicNumber")
91106
protected InfluxException responseToError(@Nonnull final Response<?> response) {

client-reactive/README.md

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,12 @@
22

33
[![javadoc](https://img.shields.io/badge/javadoc-link-brightgreen.svg)](https://influxdata.github.io/influxdb-client-java/influxdb-client-reactive/apidocs/index.html)
44

5-
The reference Java client that allows query and write for the InfluxDB 2.0 by a reactive way.
5+
The reference reactive Java client for InfluxDB 2.0. The client provide supports for asynchronous stream processing with backpressure as is defined by the [Reactive Streams specification](http://www.reactive-streams.org/).
6+
7+
## Important
8+
9+
> :warning: The `Publishers` returned from [Query](src/main/java/com/influxdb/client/reactive/QueryReactiveApi.java) and [Write](src/main/java/com/influxdb/client/reactive/WriteReactiveApi.java) API are cold.
10+
That means no request to InfluxDB is trigger until register a subscription to `Publisher`.
611

712
## Documentation
813

@@ -35,6 +40,8 @@ import com.influxdb.client.reactive.InfluxDBClientReactive;
3540
import com.influxdb.client.reactive.InfluxDBClientReactiveFactory;
3641
import com.influxdb.client.reactive.QueryReactiveApi;
3742

43+
import io.reactivex.Flowable;
44+
3845
public class InfluxDB2ReactiveExample {
3946

4047
private static char[] token = "my-token".toCharArray();
@@ -51,8 +58,7 @@ public class InfluxDB2ReactiveExample {
5158

5259
QueryReactiveApi queryApi = influxDBClient.getQueryReactiveApi();
5360

54-
queryApi
55-
.query(flux)
61+
Flowable.fromPublisher(queryApi.query(flux))
5662
//
5763
// Filter records by measurement name
5864
//
@@ -82,6 +88,8 @@ import com.influxdb.client.reactive.InfluxDBClientReactive;
8288
import com.influxdb.client.reactive.InfluxDBClientReactiveFactory;
8389
import com.influxdb.client.reactive.QueryReactiveApi;
8490

91+
import io.reactivex.Flowable;
92+
8593
public class InfluxDB2ReactiveExampleRaw {
8694

8795
private static char[] token = "my-token".toCharArray();
@@ -98,8 +106,7 @@ public class InfluxDB2ReactiveExampleRaw {
98106

99107
QueryReactiveApi queryApi = influxDBClient.getQueryReactiveApi();
100108

101-
queryApi
102-
.queryRaw(flux)
109+
Flowable.fromPublisher(queryApi.queryRaw(flux))
103110
//
104111
// Take first 10 records
105112
//
@@ -116,7 +123,7 @@ public class InfluxDB2ReactiveExampleRaw {
116123
}
117124
```
118125

119-
The mapping result to POJO is also supported:
126+
The mapping result to POJO is also support:
120127

121128
```java
122129
package example;
@@ -129,6 +136,9 @@ import com.influxdb.client.reactive.InfluxDBClientReactive;
129136
import com.influxdb.client.reactive.InfluxDBClientReactiveFactory;
130137
import com.influxdb.client.reactive.QueryReactiveApi;
131138

139+
import io.reactivex.Flowable;
140+
import org.reactivestreams.Publisher;
141+
132142
public class InfluxDB2ReactiveExamplePojo {
133143

134144
private static char[] token = "my-token".toCharArray();
@@ -144,8 +154,8 @@ public class InfluxDB2ReactiveExamplePojo {
144154

145155
QueryReactiveApi queryApi = influxDBClient.getQueryReactiveApi();
146156

147-
queryApi
148-
.query(flux, Temperature.class)
157+
Publisher<Temperature> query = queryApi.query(flux, Temperature.class);
158+
Flowable.fromPublisher(query)
149159
//
150160
// Take first 10 records
151161
//
@@ -177,17 +187,23 @@ public class InfluxDB2ReactiveExamplePojo {
177187

178188
## Writes
179189

180-
For writing data we use [WriteReactiveApi](https://influxdata.github.io/influxdb-client-java/influxdb-client-reactive/apidocs/com/influxdb/client/reactive/WriteReactiveApi.html) that supports same configuration as [non reactive client](../client#writes):
190+
For writing data we use [`WriteReactiveApi`](src/main/java/com/influxdb/client/reactive/WriteReactiveApi.java)
191+
that supports writing data using Line Protocol, Data Point or POJO. The [GZIP compression](#gzip-support) is also supported.
192+
193+
The writes are configurable by [`WriteOptionsReactive`](src/main/java/com/influxdb/client/reactive/WriteOptionsReactive.java):
194+
195+
| Property | Description | Default Value |
196+
| --- | --- | --- |
197+
| **batchSize** | the number of data point to collect in batch. The `0` disable batching - whole upstream is written in one batch. | 1000 |
198+
| **flushInterval** | the number of milliseconds before the batch is written | 1000 |
199+
| **jitterInterval** | the number of milliseconds to increase the batch flush interval by a random amount | 0 |
200+
| **retryInterval** | the number of milliseconds to retry unsuccessful write. The retry interval is used when the InfluxDB server does not specify "Retry-After" header.| 5000 |
201+
| **maxRetries** | the number of max retries when write fails. The `0` disable retry strategy - the error is immediately propagate to upstream. | 5 |
202+
| **maxRetryDelay** | the maximum delay between each retry attempt in milliseconds | 125_000 |
203+
| **maxRetryTime** | maximum total retry timeout in milliseconds | 180_000 |
204+
| **exponentialBase** | the base for the exponential retry delay, the next delay is computed using random exponential backoff as a random value within the interval ``retryInterval * exponentialBase^(attempts-1)`` and ``retryInterval * exponentialBase^(attempts)``. Example for ``retryInterval=5_000, exponentialBase=2, maxRetryDelay=125_000, total=5`` Retry delays are random distributed values within the ranges of ``[5_000-10_000, 10_000-20_000, 20_000-40_000, 40_000-80_000, 80_000-125_000]``
181205

182-
1. writing data using Line Protocol, Data Point, POJO
183-
2. use batching for writes
184-
3. use client backpressure strategy
185-
4. produces events that allow user to be notified and react to this events
186-
- `WriteSuccessEvent` - published when arrived the success response from Platform server
187-
- `BackpressureEvent` - published when is **client** backpressure applied
188-
- `WriteErrorEvent` - published when occurs a unhandled exception
189-
- `WriteRetriableErrorEvent` - published when occurs a retriable error
190-
5. use GZIP compression for data
206+
> Backpressure: is defined by the backpressure behavior of the upstream publisher.
191207
192208
### Writing data
193209

@@ -208,6 +224,8 @@ import com.influxdb.client.reactive.InfluxDBClientReactiveFactory;
208224
import com.influxdb.client.reactive.WriteReactiveApi;
209225

210226
import io.reactivex.Flowable;
227+
import io.reactivex.disposables.Disposable;
228+
import org.reactivestreams.Publisher;
211229

212230
public class InfluxDB2ReactiveExampleWriteEveryTenSeconds {
213231

@@ -217,7 +235,7 @@ public class InfluxDB2ReactiveExampleWriteEveryTenSeconds {
217235

218236
public static void main(final String[] args) throws InterruptedException {
219237

220-
InfluxDBClientReactive influxDBClient = InfluxDBClientReactiveFactory.create("http://localhost:8086", token, org, bucket);
238+
InfluxDBClientReactive influxDBClient = InfluxDBClientReactiveFactory.create("http://localhost:9999", token, org, bucket);
221239

222240
//
223241
// Write data
@@ -234,9 +252,20 @@ public class InfluxDB2ReactiveExampleWriteEveryTenSeconds {
234252
return temperature;
235253
});
236254

237-
writeApi.writeMeasurements(WritePrecision.NS, measurements);
255+
//
256+
// ReactiveStreams publisher
257+
//
258+
Publisher<WriteReactiveApi.Success> publisher = writeApi.writeMeasurements(WritePrecision.NS, measurements);
259+
260+
//
261+
// Subscribe to Publisher
262+
//
263+
Disposable subscriber = Flowable.fromPublisher(publisher)
264+
.subscribe(success -> System.out.println("Successfully written temperature"));
265+
266+
Thread.sleep(35_000);
238267

239-
Thread.sleep(30_000);
268+
subscriber.dispose();
240269

241270
influxDBClient.close();
242271
}

client-reactive/pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535

3636
<name>The RxJava InfluxDB 2.0 Client</name>
3737
<description>
38-
The reference Java client that allows query and write for the InfluxDB 2.0 by a reactive way.
38+
The reference reactive Java client for InfluxDB 2.0. The client provide supports for asynchronous stream
39+
processing with backpressure as is defined by the Reactive Streams.
3940
</description>
4041

4142
<url>https://github.com/influxdata/influxdb-client-java/tree/master/client-reactive</url>
@@ -97,6 +98,11 @@
9798
<artifactId>influxdb-client-java</artifactId>
9899
</dependency>
99100

101+
<dependency>
102+
<groupId>com.squareup.retrofit2</groupId>
103+
<artifactId>adapter-rxjava2</artifactId>
104+
</dependency>
105+
100106
<dependency>
101107
<groupId>com.influxdb</groupId>
102108
<artifactId>influxdb-client-test</artifactId>

client-reactive/src/main/java/com/influxdb/client/reactive/InfluxDBClientReactive.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525

2626
import com.influxdb.LogLevel;
2727
import com.influxdb.client.InfluxDBClient;
28-
import com.influxdb.client.WriteOptions;
2928
import com.influxdb.client.domain.HealthCheck;
3029

3130
import io.reactivex.Single;
@@ -57,10 +56,11 @@ public interface InfluxDBClientReactive extends AutoCloseable {
5756
/**
5857
* Create a new Write client.
5958
*
59+
* @param writeOptions the writes configuration
6060
* @return the new client instance for the Write API
6161
*/
6262
@Nonnull
63-
WriteReactiveApi getWriteReactiveApi(@Nonnull final WriteOptions writeOptions);
63+
WriteReactiveApi getWriteReactiveApi(@Nonnull final WriteOptionsReactive writeOptions);
6464

6565
/**
6666
* Get the health of an instance.

0 commit comments

Comments
 (0)