Skip to content

Commit 8d1f297

Browse files
committed
Polishing.
Refine exception propagation by collecting exceptions as list instead of using an atomic reference. Also, introduce exception aggregator for easier exception surfacing. [resolves #678][#677]
1 parent cb5bea2 commit 8d1f297

File tree

5 files changed

+55
-32
lines changed

5 files changed

+55
-32
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -607,6 +607,9 @@ Logging facilities:
607607

608608
* Driver Logging (`io.r2dbc.postgresql`)
609609
* Query Logging (`io.r2dbc.postgresql.QUERY` on `DEBUG` level)
610+
* Connection Context (`io.r2dbc.postgresql.client.ConnectionContext`)
611+
* `DEBUG` level enables connection and process identifiers in log messages and exceptions (`[cid: 0x1][pid: 109]`)
612+
* `TRACE` level enables socket information (remote and local addresses) to the connection context (`[cid: 0x1][pid: 109][id: 0x79dfc4d4, L:/127.0.0.1:49391 - R:localhost/127.0.0.1:49366]`)
610613
* Parameters' values Logging (`io.r2dbc.postgresql.PARAM` on `DEBUG` level)
611614
* Transport Logging (`io.r2dbc.postgresql.client`)
612615
* `DEBUG` enables `Message` exchange logging

src/main/java/io/r2dbc/postgresql/MultiHostConnectionStrategy.java

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
import java.util.List;
3636
import java.util.Map;
3737
import java.util.concurrent.ConcurrentHashMap;
38-
import java.util.concurrent.atomic.AtomicReference;
38+
import java.util.concurrent.CopyOnWriteArrayList;
39+
import java.util.function.Consumer;
3940
import java.util.function.Predicate;
4041

4142
import static io.r2dbc.postgresql.MultiHostConnectionStrategy.TargetServerType.ANY;
@@ -73,7 +74,7 @@ public final class MultiHostConnectionStrategy implements ConnectionStrategy {
7374

7475
@Override
7576
public Mono<Client> connect() {
76-
return Mono.defer(() -> connect(this.multiHostConfiguration.getTargetServerType()));
77+
return connect(this.multiHostConfiguration.getTargetServerType());
7778
}
7879

7980
@Override
@@ -83,40 +84,37 @@ public String toString() {
8384
}
8485

8586
public Mono<Client> connect(TargetServerType targetServerType) {
86-
AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
87+
List<Throwable> errors = new CopyOnWriteArrayList<>();
8788

88-
return Mono.defer(() -> attemptConnection(targetServerType))
89+
return attemptConnection(targetServerType, errors::add)
8990
.onErrorResume(e -> {
90-
if (!exceptionRef.compareAndSet(null, e)) {
91-
exceptionRef.get().addSuppressed(e);
92-
}
91+
errors.add(e);
9392
return Mono.empty();
9493
})
95-
.switchIfEmpty(Mono.defer(() -> targetServerType == PREFER_SECONDARY ? attemptConnection(PRIMARY) : Mono.empty()))
94+
.switchIfEmpty(Mono.defer(() -> targetServerType == PREFER_SECONDARY ? attemptConnection(PRIMARY, errors::add) : Mono.empty()))
9695
.switchIfEmpty(Mono.error(() -> {
97-
Throwable error = exceptionRef.get();
98-
if (error == null) {
99-
return new PostgresqlConnectionFactory.PostgresConnectionException(String.format("No server matches target type '%s'", targetServerType), null);
96+
if (errors.isEmpty()) {
97+
return new ExceptionAggregator(String.format("No server matches target type '%s'", targetServerType), null);
10098
} else {
101-
return new PostgresqlConnectionFactory.PostgresConnectionException(String.format("Cannot connect to a host of %s", this.addresses), error);
99+
RuntimeException exception = new ExceptionAggregator(null, errors.size() == 1 ?
100+
errors.get(0) : null);
101+
102+
if (errors.size() > 1) {
103+
errors.forEach(exception::addSuppressed);
104+
}
105+
return exception;
102106
}
103107
}));
104108
}
105109

106-
private Mono<Client> attemptConnection(TargetServerType targetServerType) {
107-
AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
110+
private Mono<Client> attemptConnection(TargetServerType targetServerType, Consumer<Throwable> errorHandler) {
108111
return getCandidates(targetServerType).concatMap(candidate -> this.attemptConnection(targetServerType, candidate)
109112
.onErrorResume(e -> {
110-
if (!exceptionRef.compareAndSet(null, e)) {
111-
exceptionRef.get().addSuppressed(e);
112-
}
113+
errorHandler.accept(e);
113114
this.statusMap.put(candidate, HostConnectOutcome.fail(candidate));
114115
return Mono.empty();
115116
}))
116-
.next()
117-
.switchIfEmpty(Mono.defer(() -> exceptionRef.get() != null
118-
? Mono.error(exceptionRef.get())
119-
: Mono.empty()));
117+
.next();
120118
}
121119

122120
private Mono<Client> attemptConnection(TargetServerType targetServerType, SocketAddress candidate) {
@@ -229,6 +227,13 @@ public interface HostSelector {
229227

230228
}
231229

230+
static class ExceptionAggregator extends RuntimeException {
231+
232+
public ExceptionAggregator(@Nullable String message, @Nullable Throwable cause) {
233+
super(message, cause);
234+
}
235+
}
236+
232237
private static class HostConnectOutcome {
233238

234239
static final Clock DEFAULT_CLOCK = Clock.systemDefaultZone();

src/main/java/io/r2dbc/postgresql/PostgresqlConnectionFactory.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,16 @@ private Mono<PostgresqlConnection> closeWithError(Client client, Throwable throw
181181

182182
private Throwable cannotConnect(Throwable throwable, ConnectionStrategy strategy) {
183183

184+
// Rewrite ExceptionAggregator but preserve suppressed exceptions to enrich the exception context
185+
if (throwable instanceof MultiHostConnectionStrategy.ExceptionAggregator) {
186+
String message = throwable.getMessage() != null ? String.format("Cannot connect to %s: %s", strategy, throwable.getMessage()) : String.format("Cannot connect to %s", strategy);
187+
PostgresConnectionException exception = new PostgresConnectionException(message, throwable.getCause());
188+
for (Throwable t : throwable.getSuppressed()) {
189+
exception.addSuppressed(t);
190+
}
191+
return exception;
192+
}
193+
184194
if (throwable instanceof R2dbcException) {
185195
return throwable;
186196
}
@@ -226,6 +236,7 @@ static class PostgresConnectionException extends R2dbcNonTransientResourceExcept
226236

227237
private final ErrorDetails errorDetails;
228238

239+
229240
public PostgresConnectionException(String reason, @Nullable Throwable cause) {
230241
super(reason, CONNECTION_DOES_NOT_EXIST, 0, null, cause);
231242
this.errorDetails = ErrorDetails.fromCodeAndMessage(CONNECTION_DOES_NOT_EXIST, reason);

src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,6 @@ public final class ReactorNettyClient implements Client {
9999

100100
private static final boolean DEBUG_ENABLED = logger.isDebugEnabled();
101101

102-
private static final Supplier<PostgresConnectionClosedException> UNEXPECTED = () -> new PostgresConnectionClosedException("Connection unexpectedly closed");
103-
104-
private static final Supplier<PostgresConnectionClosedException> EXPECTED = () -> new PostgresConnectionClosedException("Connection closed");
105-
106102
private final ByteBufAllocator byteBufAllocator;
107103

108104
private final ConnectionSettings settings;
@@ -111,6 +107,10 @@ public final class ReactorNettyClient implements Client {
111107

112108
private final Scheduler scheduler;
113109

110+
private final Supplier<PostgresConnectionClosedException> unexpected;
111+
112+
private final Supplier<PostgresConnectionClosedException> expected;
113+
114114
private ConnectionContext context;
115115

116116
private final Sinks.Many<Publisher<FrontendMessage>> requestSink = Sinks.many().unicast().onBackpressureBuffer();
@@ -148,11 +148,14 @@ private ReactorNettyClient(Connection connection, ConnectionSettings settings) {
148148
Assert.requireNonNull(connection, "Connection must not be null");
149149
this.settings = Assert.requireNonNull(settings, "ConnectionSettings must not be null");
150150

151+
151152
connection.addHandlerLast(new EnsureSubscribersCompleteChannelHandler(this.requestSink));
152153
connection.addHandlerLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE - 5, 1, 4, -4, 0));
153154
this.connection = connection;
154155
this.byteBufAllocator = connection.outbound().alloc();
155156
this.context = new ConnectionContext().withChannelId(connection.channel().toString());
157+
this.unexpected = () -> new PostgresConnectionClosedException(this.context.getMessage("Connection unexpectedly closed"));
158+
this.expected = () -> new PostgresConnectionClosedException(this.context.getMessage("Connection closed"));
156159

157160
EventLoop eventLoop = connection.channel().eventLoop();
158161
this.scheduler = Schedulers.fromExecutorService(eventLoop, eventLoop.toString());
@@ -194,7 +197,7 @@ public Mono<Void> close() {
194197

195198
this.notificationProcessor.tryEmitComplete();
196199

197-
drainError(EXPECTED);
200+
drainError(expected);
198201

199202
boolean connected = isConnected();
200203
if (this.isClosed.compareAndSet(false, true)) {
@@ -519,9 +522,9 @@ private static String toString(List<Field> fields) {
519522

520523
private void handleClose() {
521524
if (this.isClosed.compareAndSet(false, true)) {
522-
drainError(UNEXPECTED);
525+
drainError(unexpected);
523526
} else {
524-
drainError(EXPECTED);
527+
drainError(expected);
525528
}
526529
}
527530

@@ -531,7 +534,7 @@ private void handleConnectionError(Throwable error) {
531534
drainError(() -> this.messageSubscriber.createClientClosedException(error));
532535
}
533536

534-
drainError(() -> new PostgresConnectionException(error));
537+
drainError(() -> new PostgresConnectionException(this.context, error));
535538
}
536539

537540
private void drainError(Supplier<? extends Throwable> supplier) {
@@ -589,8 +592,8 @@ static class PostgresConnectionException extends R2dbcNonTransientResourceExcept
589592

590593
private final static ErrorDetails ERROR_DETAILS = ErrorDetails.fromCodeAndMessage(CONNECTION_FAILURE, "An I/O error occurred while sending to the backend or receiving from the backend");
591594

592-
public PostgresConnectionException(Throwable cause) {
593-
super(ERROR_DETAILS.getMessage(), ERROR_DETAILS.getCode(), 0, null, cause);
595+
public PostgresConnectionException(ConnectionContext context, Throwable cause) {
596+
super(context.getMessage(ERROR_DETAILS.getMessage()), ERROR_DETAILS.getCode(), 0, null, cause);
594597
}
595598

596599
@Override
@@ -772,7 +775,7 @@ PostgresConnectionClosedException createClientClosedException() {
772775
}
773776

774777
PostgresConnectionClosedException createClientClosedException(@Nullable Throwable cause) {
775-
return new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed", cause);
778+
return new PostgresConnectionClosedException(ReactorNettyClient.this.context.getMessage("Cannot exchange messages because the connection is closed"), cause);
776779
}
777780

778781
/**

src/test/resources/logback-test.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
</appender>
2525

2626
<logger name="io.r2dbc.postgresql" level="INFO"/>
27+
<logger name="io.r2dbc.postgresql.client.ConnectionContext" level="INFO"/>
2728
<logger name="org.testcontainers" level="INFO"/>
2829
<logger name="reactor.netty" level="WARN"/>
2930
<logger name="stream" level="INFO"/>

0 commit comments

Comments
 (0)