Skip to content

Commit e7ffbed

Browse files
authored
Merge pull request #286 from hivemq/develop
Release 1.1.0
2 parents fcf6e73 + 49cbabb commit e7ffbed

File tree

118 files changed

+4557
-935
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

118 files changed

+4557
-935
lines changed

CODE_OF_CONDUCT.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# HiveMQ Code of Conduct
2+
3+
Please refer to our [HiveMQ Code of Conduct](https://github.com/hivemq/hivemq-community/blob/master/code-of-conduct.md).

CONTRIBUTING.md

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,24 @@
11
# Contributing
2+
## Contributing to the HiveMQ Community Projects
23

3-
## External contributors
4+
Welcome to the HiveMQ Community! Glad to see your interest in contributing to HiveMQ MQTT Client. Please checkout our [Contribution Guide](https://github.com/hivemq/hivemq-community/blob/master/CONTRIBUTING.adoc) to make sure your contribution will be accepted by the HiveMQ team.
5+
6+
For information on how the HiveMQ Community is organized and how contributions will be accepted please have a look at our [HiveMQ Community Repo](https://github.com/hivemq/hivemq-community).
7+
8+
##Contributing to HiveMQ MQTT Client
9+
### External contributors
410
If you would like to contribute code, do the following:
511
- Fork the repository on GitHub
612
- Open a pull request targeting the `develop` branch
713

8-
## License
14+
### License
915
By contributing your code, you agree to license your contribution under the terms of the
1016
[Apache License, Version 2.0](https://github.com/hivemq/hivemq-mqtt-client/blob/develop/LICENSE).
1117

1218
All files must contain the license header from the
1319
[HEADER](https://github.com/hivemq/hivemq-mqtt-client/blob/develop/HEADER) file.
1420

15-
## Branching model
21+
### Branching model
1622

1723
- `master`: release branch, protected
1824
- `develop` is merged into `master` by creating a merge commit if a new version is released
@@ -25,22 +31,22 @@ All files must contain the license header from the
2531
- Pull request targeting the `develop` branch
2632
- Mandatory code review of the pull request
2733

28-
## Branching guidelines
34+
### Branching guidelines
2935

3036
- Branch types: feature, bugfix, improvement, cleanup (same as the label of a corresponding GitHub Issue)
3137
- Branch names:
3238
- Starting with type: `feature/`, `bugfix/`, `improvement/`, `cleanup/`
3339
- \+ task: lower case, spaces replaced with `-`
3440

35-
## Commit guidelines
41+
### Commit guidelines
3642

3743
- Commits should be as atomic as possible.
3844
- Commit messages should describe the changes clearly.
3945
- Commit messages should start with a capital letter for consistency.
4046
- Commit messages should avoid exceeding the line length limit. Instead use multiple lines, each describing one specific
4147
change.
4248

43-
## Code style guidelines
49+
### Code style guidelines
4450

4551
- The project uses Nullability annotations to avoid NullPointerExceptions: `@NotNull`, `@Nullable`.
4652
Every non-primitive parameter/return type/field should be annotated with one of them.

README.md

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ If you use Gradle, just include the following inside your `build.gradle` file.
4949

5050
```groovy
5151
dependencies {
52-
compile group: 'com.hivemq', name: 'hivemq-mqtt-client', version: '1.0.0'
52+
compile group: 'com.hivemq', name: 'hivemq-mqtt-client', version: '1.0.1'
5353
}
5454
```
5555

@@ -71,7 +71,7 @@ NOTE: You have to set the compiler version to `1.8` or higher.
7171
<dependency>
7272
<groupId>com.hivemq</groupId>
7373
<artifactId>hivemq-mqtt-client</artifactId>
74-
<version>1.0.0</version>
74+
<version>1.0.1</version>
7575
</dependency>
7676
</dependencies>
7777
...
@@ -88,7 +88,7 @@ To use the shaded version just append `-shaded` to the artifact name.
8888

8989
```groovy
9090
dependencies {
91-
compile group: 'com.hivemq', name: 'hivemq-mqtt-client-shaded', version: '1.0.0'
91+
compile group: 'com.hivemq', name: 'hivemq-mqtt-client-shaded', version: '1.0.1'
9292
}
9393
```
9494

@@ -101,7 +101,7 @@ dependencies {
101101
<dependency>
102102
<groupId>com.hivemq</groupId>
103103
<artifactId>hivemq-mqtt-client-shaded</artifactId>
104-
<version>1.0.0</version>
104+
<version>1.0.1</version>
105105
</dependency>
106106
</dependencies>
107107
...
@@ -125,7 +125,7 @@ repositories {
125125
}
126126
127127
dependencies {
128-
compile group: 'com.hivemq', name: 'hivemq-mqtt-client', version: '1.0.0-SNAPSHOT'
128+
compile group: 'com.hivemq', name: 'hivemq-mqtt-client', version: '1.0.1-SNAPSHOT'
129129
}
130130
```
131131

@@ -145,7 +145,7 @@ dependencies {
145145
<dependency>
146146
<groupId>com.hivemq</groupId>
147147
<artifactId>hivemq-mqtt-client</artifactId>
148-
<version>1.0.0-SNAPSHOT</version>
148+
<version>1.0.1-SNAPSHOT</version>
149149
</dependency>
150150
</dependencies>
151151
...
@@ -590,3 +590,7 @@ part of the public API.
590590
# Contributing
591591

592592
See [CONTRIBUTING.md](CONTRIBUTING.md)
593+
594+
# License
595+
596+
See [LICENSE](LICENSE)

build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ plugins {
1313
}
1414

1515
group 'com.hivemq'
16-
version '1.0.1' + (Boolean.valueOf(System.getProperty("snapshot")) ? "-SNAPSHOT" : "")
16+
version '1.1.0' + (Boolean.valueOf(System.getProperty("snapshot")) ? "-SNAPSHOT" : "")
1717
description 'HiveMQ MQTT Client is a MQTT 5.0 and MQTT 3.1.1 compatible and feature-rich high-performance Java client ' +
1818
'library with different API flavours and backpressure support'
1919

@@ -28,6 +28,7 @@ ext {
2828
licenseReadableName = 'The Apache License, Version 2.0'
2929
licenseUrl = 'http://www.apache.org/licenses/LICENSE-2.0.txt'
3030
shadedAppendix = 'shaded'
31+
prevVersion = '1.0.1'
3132
}
3233

3334
sourceCompatibility = 1.8

japicc.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ task japicc {
88
def workingDir = new File(project.buildDir, 'japicc')
99
def executable = new File(workingDir, 'japi-compliance-checker-' + japiccVersion + '/japi-compliance-checker.pl')
1010

11-
def lastSemVer = lastSemVer()
11+
def lastSemVer = project.prevVersion == null ? lastSemVer() : project.prevVersion
1212
def shadedName = project.name + '-' + shadedAppendix
1313
def lastJar = new File(workingDir, project.name + '-' + lastSemVer + '.jar')
1414
def lastShadedJar = new File(workingDir, shadedName + '-' + lastSemVer + '.jar')

src/main/java/com/hivemq/client/internal/mqtt/MqttAsyncClient.java

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,13 @@
1717

1818
package com.hivemq.client.internal.mqtt;
1919

20+
import com.hivemq.client.internal.mqtt.message.connect.MqttConnect;
21+
import com.hivemq.client.internal.mqtt.message.disconnect.MqttDisconnect;
22+
import com.hivemq.client.internal.mqtt.message.publish.MqttPublish;
23+
import com.hivemq.client.internal.mqtt.message.subscribe.MqttSubscribe;
2024
import com.hivemq.client.internal.mqtt.message.subscribe.MqttSubscribeBuilder;
25+
import com.hivemq.client.internal.mqtt.message.unsubscribe.MqttUnsubscribe;
26+
import com.hivemq.client.internal.mqtt.util.MqttChecks;
2127
import com.hivemq.client.internal.rx.RxFutureConverter;
2228
import com.hivemq.client.internal.util.Checks;
2329
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
@@ -31,7 +37,6 @@
3137
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
3238
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe;
3339
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck;
34-
import io.reactivex.Flowable;
3540
import io.reactivex.FlowableSubscriber;
3641
import io.reactivex.schedulers.Schedulers;
3742
import org.jetbrains.annotations.NotNull;
@@ -51,8 +56,6 @@ public class MqttAsyncClient implements Mqtt5AsyncClient {
5156
private static final @NotNull Function<Mqtt5SubAck, Mqtt5SubAck> SUBACK_HANDLER = MqttBlockingClient::handleSubAck;
5257
private static final @NotNull Function<Mqtt5UnsubAck, Mqtt5UnsubAck> UNSUBACK_HANDLER =
5358
MqttBlockingClient::handleUnsubAck;
54-
private static final @NotNull Function<Mqtt5PublishResult, Mqtt5PublishResult> PUBLISH_HANDLER =
55-
MqttBlockingClient::handlePublish;
5659

5760
private final @NotNull MqttRxClient delegate;
5861

@@ -62,21 +65,26 @@ public class MqttAsyncClient implements Mqtt5AsyncClient {
6265

6366
@Override
6467
public @NotNull CompletableFuture<@NotNull Mqtt5ConnAck> connect(final @Nullable Mqtt5Connect connect) {
65-
return RxFutureConverter.toFuture(delegate.connect(connect));
68+
final MqttConnect mqttConnect = MqttChecks.connect(connect);
69+
70+
return RxFutureConverter.toFuture(delegate.connect(mqttConnect));
6671
}
6772

6873
@Override
6974
public @NotNull CompletableFuture<@NotNull Mqtt5SubAck> subscribe(final @Nullable Mqtt5Subscribe subscribe) {
70-
return RxFutureConverter.toFuture(delegate.subscribe(subscribe));
75+
final MqttSubscribe mqttSubscribe = MqttChecks.subscribe(subscribe);
76+
77+
return RxFutureConverter.toFuture(delegate.subscribe(mqttSubscribe)).thenApply(SUBACK_HANDLER);
7178
}
7279

7380
@Override
7481
public @NotNull CompletableFuture<@NotNull Mqtt5SubAck> subscribe(
7582
final @Nullable Mqtt5Subscribe subscribe, final @Nullable Consumer<@NotNull Mqtt5Publish> callback) {
7683

84+
final MqttSubscribe mqttSubscribe = MqttChecks.subscribe(subscribe);
7785
Checks.notNull(callback, "Callback");
7886

79-
return delegate.subscribeStream(subscribe)
87+
return delegate.subscribeStream(mqttSubscribe)
8088
.subscribeSingleFuture(new CallbackSubscriber(callback))
8189
.thenApply(SUBACK_HANDLER);
8290
}
@@ -86,10 +94,11 @@ public class MqttAsyncClient implements Mqtt5AsyncClient {
8694
final @Nullable Mqtt5Subscribe subscribe, final @Nullable Consumer<@NotNull Mqtt5Publish> callback,
8795
final @Nullable Executor executor) {
8896

97+
final MqttSubscribe mqttSubscribe = MqttChecks.subscribe(subscribe);
8998
Checks.notNull(callback, "Callback");
9099
Checks.notNull(executor, "Executor");
91100

92-
return delegate.subscribeStreamUnsafe(subscribe)
101+
return delegate.subscribeStreamUnsafe(mqttSubscribe)
93102
.observeOnBoth(Schedulers.from(executor), true)
94103
.subscribeSingleFuture(new CallbackSubscriber(callback))
95104
.thenApply(SUBACK_HANDLER);
@@ -99,6 +108,7 @@ public class MqttAsyncClient implements Mqtt5AsyncClient {
99108
public void publishes(
100109
final @Nullable MqttGlobalPublishFilter filter, final @Nullable Consumer<@NotNull Mqtt5Publish> callback) {
101110

111+
Checks.notNull(filter, "Global publish filter");
102112
Checks.notNull(callback, "Callback");
103113

104114
delegate.publishes(filter).subscribe(new CallbackSubscriber(callback));
@@ -109,6 +119,7 @@ public void publishes(
109119
final @Nullable MqttGlobalPublishFilter filter, final @Nullable Consumer<@NotNull Mqtt5Publish> callback,
110120
final @Nullable Executor executor) {
111121

122+
Checks.notNull(filter, "Global publish filter");
112123
Checks.notNull(callback, "Callback");
113124
Checks.notNull(executor, "Executor");
114125

@@ -120,15 +131,17 @@ public void publishes(
120131
@Override
121132
public @NotNull CompletableFuture<@NotNull Mqtt5UnsubAck> unsubscribe(
122133
final @Nullable Mqtt5Unsubscribe unsubscribe) {
123-
return RxFutureConverter.toFuture(delegate.unsubscribe(unsubscribe)).thenApply(UNSUBACK_HANDLER);
134+
135+
final MqttUnsubscribe mqttUnsubscribe = MqttChecks.unsubscribe(unsubscribe);
136+
137+
return RxFutureConverter.toFuture(delegate.unsubscribe(mqttUnsubscribe)).thenApply(UNSUBACK_HANDLER);
124138
}
125139

126140
@Override
127141
public @NotNull CompletableFuture<@NotNull Mqtt5PublishResult> publish(final @Nullable Mqtt5Publish publish) {
128-
Checks.notNull(publish, "Publish");
142+
final MqttPublish mqttPublish = MqttChecks.publish(publish);
129143

130-
return RxFutureConverter.toFuture(delegate.publishHalfSafe(Flowable.just(publish)).singleOrError())
131-
.thenApply(PUBLISH_HANDLER);
144+
return RxFutureConverter.toFuture(delegate.publish(mqttPublish));
132145
}
133146

134147
@Override
@@ -138,7 +151,9 @@ public void publishes(
138151

139152
@Override
140153
public @NotNull CompletableFuture<Void> disconnect(final @Nullable Mqtt5Disconnect disconnect) {
141-
return RxFutureConverter.toFuture(delegate.disconnect(disconnect));
154+
final MqttDisconnect mqttDisconnect = MqttChecks.disconnect(disconnect);
155+
156+
return RxFutureConverter.toFuture(delegate.disconnect(mqttDisconnect));
142157
}
143158

144159
@Override

src/main/java/com/hivemq/client/internal/mqtt/MqttBlockingClient.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@
1717

1818
package com.hivemq.client.internal.mqtt;
1919

20+
import com.hivemq.client.internal.mqtt.message.connect.MqttConnect;
21+
import com.hivemq.client.internal.mqtt.message.disconnect.MqttDisconnect;
22+
import com.hivemq.client.internal.mqtt.message.publish.MqttPublish;
23+
import com.hivemq.client.internal.mqtt.message.subscribe.MqttSubscribe;
24+
import com.hivemq.client.internal.mqtt.message.unsubscribe.MqttUnsubscribe;
25+
import com.hivemq.client.internal.mqtt.util.MqttChecks;
2026
import com.hivemq.client.internal.util.AsyncRuntimeException;
2127
import com.hivemq.client.internal.util.Checks;
2228
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
@@ -70,18 +76,6 @@ public class MqttBlockingClient implements Mqtt5BlockingClient {
7076
return unsubAck;
7177
}
7278

73-
static @NotNull Mqtt5PublishResult handlePublish(final @NotNull Mqtt5PublishResult publishResult) {
74-
final Optional<Throwable> error = publishResult.getError();
75-
if (error.isPresent()) {
76-
final Throwable throwable = error.get();
77-
if (throwable instanceof RuntimeException) {
78-
throw (RuntimeException) throwable;
79-
}
80-
throw new RuntimeException(throwable);
81-
}
82-
return publishResult;
83-
}
84-
8579
private final @NotNull MqttRxClient delegate;
8680

8781
MqttBlockingClient(final @NotNull MqttRxClient delegate) {
@@ -90,41 +84,46 @@ public class MqttBlockingClient implements Mqtt5BlockingClient {
9084

9185
@Override
9286
public @NotNull Mqtt5ConnAck connect(final @Nullable Mqtt5Connect connect) {
87+
final MqttConnect mqttConnect = MqttChecks.connect(connect);
9388
try {
94-
return delegate.connectUnsafe(connect).blockingGet();
89+
return delegate.connectUnsafe(mqttConnect).blockingGet();
9590
} catch (final RuntimeException e) {
9691
throw AsyncRuntimeException.fillInStackTrace(e);
9792
}
9893
}
9994

10095
@Override
10196
public @NotNull Mqtt5SubAck subscribe(final @Nullable Mqtt5Subscribe subscribe) {
97+
final MqttSubscribe mqttSubscribe = MqttChecks.subscribe(subscribe);
10298
try {
103-
return handleSubAck(delegate.subscribeUnsafe(subscribe).blockingGet());
99+
return handleSubAck(delegate.subscribeUnsafe(mqttSubscribe).blockingGet());
104100
} catch (final RuntimeException e) {
105101
throw AsyncRuntimeException.fillInStackTrace(e);
106102
}
107103
}
108104

109105
@Override
110106
public @NotNull Mqtt5Publishes publishes(final @Nullable MqttGlobalPublishFilter filter) {
107+
Checks.notNull(filter, "Global publish filter");
108+
111109
return new MqttPublishes(delegate.publishesUnsafe(filter));
112110
}
113111

114112
@Override
115113
public @NotNull Mqtt5UnsubAck unsubscribe(final @Nullable Mqtt5Unsubscribe unsubscribe) {
114+
final MqttUnsubscribe mqttUnsubscribe = MqttChecks.unsubscribe(unsubscribe);
116115
try {
117-
return handleUnsubAck(delegate.unsubscribeUnsafe(unsubscribe).blockingGet());
116+
return handleUnsubAck(delegate.unsubscribeUnsafe(mqttUnsubscribe).blockingGet());
118117
} catch (final RuntimeException e) {
119118
throw AsyncRuntimeException.fillInStackTrace(e);
120119
}
121120
}
122121

123122
@Override
124123
public @NotNull Mqtt5PublishResult publish(final @Nullable Mqtt5Publish publish) {
125-
Checks.notNull(publish, "Publish");
124+
final MqttPublish mqttPublish = MqttChecks.publish(publish);
126125
try {
127-
return handlePublish(delegate.publishUnsafe(Flowable.just(publish)).singleOrError().blockingGet());
126+
return delegate.publishUnsafe(mqttPublish).blockingGet();
128127
} catch (final RuntimeException e) {
129128
throw AsyncRuntimeException.fillInStackTrace(e);
130129
}
@@ -141,8 +140,9 @@ public void reauth() {
141140

142141
@Override
143142
public void disconnect(final @NotNull Mqtt5Disconnect disconnect) {
143+
final MqttDisconnect mqttDisconnect = MqttChecks.disconnect(disconnect);
144144
try {
145-
delegate.disconnectUnsafe(disconnect).blockingAwait();
145+
delegate.disconnectUnsafe(mqttDisconnect).blockingAwait();
146146
} catch (final RuntimeException e) {
147147
throw AsyncRuntimeException.fillInStackTrace(e);
148148
}

0 commit comments

Comments
 (0)