Skip to content

Commit 1aa447f

Browse files
committed
Exceptions of async operations are not wrapped in CompletionEx anymore
1 parent 6aeee07 commit 1aa447f

File tree

2 files changed

+102
-60
lines changed

2 files changed

+102
-60
lines changed

src/main/java/com/hivemq/client/internal/mqtt/MqttAsyncClient.java

Lines changed: 51 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,54 @@
4646
import java.util.concurrent.CompletableFuture;
4747
import java.util.concurrent.Executor;
4848
import java.util.function.Consumer;
49-
import java.util.function.Function;
5049

5150
/**
5251
* @author Silvio Giebl
5352
*/
5453
public class MqttAsyncClient implements Mqtt5AsyncClient {
5554

56-
private static final @NotNull Function<Mqtt5SubAck, Mqtt5SubAck> SUBACK_HANDLER = MqttBlockingClient::handleSubAck;
57-
private static final @NotNull Function<Mqtt5UnsubAck, Mqtt5UnsubAck> UNSUBACK_HANDLER =
58-
MqttBlockingClient::handleUnsubAck;
55+
private static @NotNull CompletableFuture<@NotNull Mqtt5SubAck> handleSubAck(
56+
final @NotNull CompletableFuture<@NotNull Mqtt5SubAck> future, final @NotNull MqttSubscribe subscribe) {
57+
58+
if (subscribe.getSubscriptions().size() == 1) {
59+
return future;
60+
}
61+
final CompletableFuture<Mqtt5SubAck> mappedFuture = new CompletableFuture<>();
62+
future.whenComplete((subAck, throwable) -> {
63+
if (throwable != null) {
64+
mappedFuture.completeExceptionally(throwable);
65+
} else {
66+
try {
67+
mappedFuture.complete(MqttBlockingClient.handleSubAck(subAck));
68+
} catch (final Throwable t) {
69+
mappedFuture.completeExceptionally(t);
70+
}
71+
}
72+
});
73+
return mappedFuture;
74+
}
75+
76+
private static @NotNull CompletableFuture<@NotNull Mqtt5UnsubAck> handleUnsubAck(
77+
final @NotNull CompletableFuture<@NotNull Mqtt5UnsubAck> future,
78+
final @NotNull MqttUnsubscribe unsubscribe) {
79+
80+
if (unsubscribe.getTopicFilters().size() == 1) {
81+
return future;
82+
}
83+
final CompletableFuture<Mqtt5UnsubAck> mappedFuture = new CompletableFuture<>();
84+
future.whenComplete((unsubAck, throwable) -> {
85+
if (throwable != null) {
86+
mappedFuture.completeExceptionally(throwable);
87+
} else {
88+
try {
89+
mappedFuture.complete(MqttBlockingClient.handleUnsubAck(unsubAck));
90+
} catch (final Throwable t) {
91+
mappedFuture.completeExceptionally(t);
92+
}
93+
}
94+
});
95+
return mappedFuture;
96+
}
5997

6098
private final @NotNull MqttRxClient delegate;
6199

@@ -74,7 +112,7 @@ public class MqttAsyncClient implements Mqtt5AsyncClient {
74112
public @NotNull CompletableFuture<@NotNull Mqtt5SubAck> subscribe(final @Nullable Mqtt5Subscribe subscribe) {
75113
final MqttSubscribe mqttSubscribe = MqttChecks.subscribe(subscribe);
76114

77-
return RxFutureConverter.toFuture(delegate.subscribe(mqttSubscribe)).thenApply(SUBACK_HANDLER);
115+
return handleSubAck(RxFutureConverter.toFuture(delegate.subscribe(mqttSubscribe)), mqttSubscribe);
78116
}
79117

80118
@Override
@@ -84,9 +122,9 @@ public class MqttAsyncClient implements Mqtt5AsyncClient {
84122
final MqttSubscribe mqttSubscribe = MqttChecks.subscribe(subscribe);
85123
Checks.notNull(callback, "Callback");
86124

87-
return delegate.subscribeStream(mqttSubscribe)
88-
.subscribeSingleFuture(new CallbackSubscriber(callback))
89-
.thenApply(SUBACK_HANDLER);
125+
return handleSubAck(
126+
delegate.subscribeStream(mqttSubscribe).subscribeSingleFuture(new CallbackSubscriber(callback)),
127+
mqttSubscribe);
90128
}
91129

92130
@Override
@@ -98,10 +136,10 @@ public class MqttAsyncClient implements Mqtt5AsyncClient {
98136
Checks.notNull(callback, "Callback");
99137
Checks.notNull(executor, "Executor");
100138

101-
return delegate.subscribeStreamUnsafe(mqttSubscribe)
102-
.observeOnBoth(Schedulers.from(executor), true)
103-
.subscribeSingleFuture(new CallbackSubscriber(callback))
104-
.thenApply(SUBACK_HANDLER);
139+
return handleSubAck(
140+
delegate.subscribeStreamUnsafe(mqttSubscribe)
141+
.observeOnBoth(Schedulers.from(executor), true).subscribeSingleFuture(new CallbackSubscriber(callback)),
142+
mqttSubscribe);
105143
}
106144

107145
@Override
@@ -134,7 +172,7 @@ public void publishes(
134172

135173
final MqttUnsubscribe mqttUnsubscribe = MqttChecks.unsubscribe(unsubscribe);
136174

137-
return RxFutureConverter.toFuture(delegate.unsubscribe(mqttUnsubscribe)).thenApply(UNSUBACK_HANDLER);
175+
return handleUnsubAck(RxFutureConverter.toFuture(delegate.unsubscribe(mqttUnsubscribe)), mqttUnsubscribe);
138176
}
139177

140178
@Override

src/main/java/com/hivemq/client/internal/mqtt/mqtt3/Mqtt3AsyncClientView.java

Lines changed: 51 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -41,61 +41,33 @@
4141
import com.hivemq.client.mqtt.mqtt3.message.subscribe.Mqtt3Subscribe;
4242
import com.hivemq.client.mqtt.mqtt3.message.subscribe.suback.Mqtt3SubAck;
4343
import com.hivemq.client.mqtt.mqtt3.message.unsubscribe.Mqtt3Unsubscribe;
44-
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck;
4544
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
46-
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
4745
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
48-
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck;
4946
import org.jetbrains.annotations.NotNull;
5047
import org.jetbrains.annotations.Nullable;
5148

5249
import java.util.concurrent.CompletableFuture;
53-
import java.util.concurrent.CompletionException;
5450
import java.util.concurrent.Executor;
55-
import java.util.function.BiFunction;
5651
import java.util.function.Consumer;
57-
import java.util.function.Function;
5852

5953
/**
6054
* @author Silvio Giebl
6155
*/
6256
public class Mqtt3AsyncClientView implements Mqtt3AsyncClient {
6357

64-
private static final @NotNull BiFunction<Mqtt5ConnAck, Throwable, Mqtt3ConnAck> CONNACK_MAPPER =
65-
(connAck, throwable) -> {
66-
if (throwable != null) {
67-
throw new CompletionException(Mqtt3ExceptionFactory.map(throwable));
68-
}
69-
return Mqtt3ConnAckView.of(connAck);
70-
};
71-
72-
private static final @NotNull BiFunction<Mqtt5SubAck, Throwable, Mqtt3SubAck> SUBACK_MAPPER =
73-
(subAck, throwable) -> {
74-
if (throwable != null) {
75-
throw new CompletionException(Mqtt3ExceptionFactory.map(throwable));
76-
}
77-
return Mqtt3SubAckView.of(subAck);
78-
};
79-
80-
private static final @NotNull BiFunction<Mqtt5UnsubAck, Throwable, Void> UNSUBACK_MAPPER =
81-
(unsubAck, throwable) -> {
82-
if (throwable != null) {
83-
throw new CompletionException(Mqtt3ExceptionFactory.map(throwable));
84-
}
85-
return null;
86-
};
87-
88-
private static final @NotNull BiFunction<Mqtt5PublishResult, Throwable, Mqtt3Publish> PUBLISH_RESULT_MAPPER =
89-
(publishResult, throwable) -> {
90-
if (throwable != null) {
91-
throw new CompletionException(Mqtt3ExceptionFactory.map(throwable));
92-
}
93-
return Mqtt3PublishView.of(publishResult.getPublish());
94-
};
58+
private static @NotNull CompletableFuture<@NotNull Mqtt3SubAck> handleSubAck(
59+
final @NotNull CompletableFuture<@NotNull Mqtt5SubAck> future) {
9560

96-
private static final @NotNull Function<Throwable, Void> DISCONNECT_MAPPER = throwable -> {
97-
throw new CompletionException(Mqtt3ExceptionFactory.map(throwable));
98-
};
61+
final CompletableFuture<Mqtt3SubAck> mappedFuture = new CompletableFuture<>();
62+
future.whenComplete((subAck, throwable) -> {
63+
if (throwable != null) {
64+
mappedFuture.completeExceptionally(Mqtt3ExceptionFactory.map(throwable));
65+
} else {
66+
mappedFuture.complete(Mqtt3SubAckView.of(subAck));
67+
}
68+
});
69+
return mappedFuture;
70+
}
9971

10072
private static @NotNull Consumer<Mqtt5Publish> callbackView(final @NotNull Consumer<Mqtt3Publish> callback) {
10173
return publish -> callback.accept(Mqtt3PublishView.of(publish));
@@ -113,14 +85,22 @@ public class Mqtt3AsyncClientView implements Mqtt3AsyncClient {
11385
public @NotNull CompletableFuture<@NotNull Mqtt3ConnAck> connect(final @Nullable Mqtt3Connect connect) {
11486
final MqttConnect mqttConnect = MqttChecks.connect(connect);
11587

116-
return delegate.connect(mqttConnect).handle(CONNACK_MAPPER);
88+
final CompletableFuture<Mqtt3ConnAck> future = new CompletableFuture<>();
89+
delegate.connect(mqttConnect).whenComplete((connAck, throwable) -> {
90+
if (throwable != null) {
91+
future.completeExceptionally(Mqtt3ExceptionFactory.map(throwable));
92+
} else {
93+
future.complete(Mqtt3ConnAckView.of(connAck));
94+
}
95+
});
96+
return future;
11797
}
11898

11999
@Override
120100
public @NotNull CompletableFuture<@NotNull Mqtt3SubAck> subscribe(final @Nullable Mqtt3Subscribe subscribe) {
121101
final MqttSubscribe mqttSubscribe = MqttChecks.subscribe(subscribe);
122102

123-
return delegate.subscribe(mqttSubscribe).handle(SUBACK_MAPPER);
103+
return handleSubAck(delegate.subscribe(mqttSubscribe));
124104
}
125105

126106
@Override
@@ -130,7 +110,7 @@ public class Mqtt3AsyncClientView implements Mqtt3AsyncClient {
130110
final MqttSubscribe mqttSubscribe = MqttChecks.subscribe(subscribe);
131111
Checks.notNull(callback, "Callback");
132112

133-
return delegate.subscribe(mqttSubscribe, callbackView(callback)).handle(SUBACK_MAPPER);
113+
return handleSubAck(delegate.subscribe(mqttSubscribe, callbackView(callback)));
134114
}
135115

136116
@Override
@@ -142,7 +122,7 @@ public class Mqtt3AsyncClientView implements Mqtt3AsyncClient {
142122
Checks.notNull(callback, "Callback");
143123
Checks.notNull(executor, "Executor");
144124

145-
return delegate.subscribe(mqttSubscribe, callbackView(callback), executor).handle(SUBACK_MAPPER);
125+
return handleSubAck(delegate.subscribe(mqttSubscribe, callbackView(callback), executor));
146126
}
147127

148128
@Override
@@ -171,19 +151,43 @@ public void publishes(
171151
public @NotNull CompletableFuture<Void> unsubscribe(final @Nullable Mqtt3Unsubscribe unsubscribe) {
172152
final MqttUnsubscribe mqttUnsubscribe = MqttChecks.unsubscribe(unsubscribe);
173153

174-
return delegate.unsubscribe(mqttUnsubscribe).handle(UNSUBACK_MAPPER);
154+
final CompletableFuture<Void> future = new CompletableFuture<>();
155+
delegate.unsubscribe(mqttUnsubscribe).whenComplete((unsubAck, throwable) -> {
156+
if (throwable != null) {
157+
future.completeExceptionally(Mqtt3ExceptionFactory.map(throwable));
158+
} else {
159+
future.complete(null);
160+
}
161+
});
162+
return future;
175163
}
176164

177165
@Override
178166
public @NotNull CompletableFuture<@NotNull Mqtt3Publish> publish(final @Nullable Mqtt3Publish publish) {
179167
final MqttPublish mqttPublish = MqttChecks.publish(publish);
180168

181-
return delegate.publish(mqttPublish).handle(PUBLISH_RESULT_MAPPER);
169+
final CompletableFuture<Mqtt3Publish> future = new CompletableFuture<>();
170+
delegate.publish(mqttPublish).whenComplete((publishResult, throwable) -> {
171+
if (throwable != null) {
172+
future.completeExceptionally(Mqtt3ExceptionFactory.map(throwable));
173+
} else {
174+
future.complete(Mqtt3PublishView.of(publishResult.getPublish()));
175+
}
176+
});
177+
return future;
182178
}
183179

184180
@Override
185181
public @NotNull CompletableFuture<Void> disconnect() {
186-
return delegate.disconnect(Mqtt3DisconnectView.DELEGATE).exceptionally(DISCONNECT_MAPPER);
182+
final CompletableFuture<Void> future = new CompletableFuture<>();
183+
delegate.disconnect(Mqtt3DisconnectView.DELEGATE).whenComplete((ignored, throwable) -> {
184+
if (throwable != null) {
185+
future.completeExceptionally(Mqtt3ExceptionFactory.map(throwable));
186+
} else {
187+
future.complete(null);
188+
}
189+
});
190+
return future;
187191
}
188192

189193
@Override

0 commit comments

Comments
 (0)