Skip to content

Commit 88e8209

Browse files
committed
The connection startup message should only be sent optionally, when cancelling a request it should not be sent.
1 parent 05f18af commit 88e8209

File tree

3 files changed

+33
-35
lines changed

3 files changed

+33
-35
lines changed

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionFactory.java

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -71,60 +71,61 @@ protected Future<Connection> doConnectInternal(PgConnectOptions options, Context
7171
return context.failedFuture(e);
7272
}
7373
SocketAddress server = options.getSocketAddress();
74-
return doConnect(server, context, options);
74+
return connect(server, context, true, options);
7575
}
7676

77-
public void cancelRequest(PgConnectOptions options, int processId, int secretKey, Handler<AsyncResult<Void>> handler) {
78-
doConnect(options.getSocketAddress(), vertx.createEventLoopContext(), options).onComplete(ar -> {
79-
if (ar.succeeded()) {
80-
PgSocketConnection conn = (PgSocketConnection) ar.result();
81-
conn.sendCancelRequestMessage(processId, secretKey, handler);
82-
} else {
83-
handler.handle(Future.failedFuture(ar.cause()));
84-
}
85-
});
77+
public Future<Void> cancelRequest(PgConnectOptions options, int processId, int secretKey) {
78+
return connect(options.getSocketAddress(), vertx.createEventLoopContext(), false, options)
79+
.compose(conn -> {
80+
PgSocketConnection socket = (PgSocketConnection) conn;
81+
return socket.sendCancelRequestMessage(processId, secretKey);
82+
});
8683
}
8784

88-
private Future<Connection> doConnect(SocketAddress server, ContextInternal context, PgConnectOptions options) {
85+
private Future<Connection> connect(SocketAddress server, ContextInternal context, boolean sendStartupMessage, PgConnectOptions options) {
8986
SslMode sslMode = options.isUsingDomainSocket() ? SslMode.DISABLE : options.getSslMode();
9087
ConnectOptions connectOptions = new ConnectOptions()
9188
.setRemoteAddress(server);
9289
Future<Connection> connFuture;
9390
switch (sslMode) {
9491
case DISABLE:
95-
connFuture = doConnect(connectOptions, context, false, options);
92+
connFuture = connect(connectOptions, context, false, sendStartupMessage, options);
9693
break;
9794
case ALLOW:
98-
connFuture = doConnect(connectOptions, context, false, options).recover(err -> doConnect(connectOptions, context, true, options));
95+
connFuture = connect(connectOptions, context, false, sendStartupMessage, options).recover(err -> connect(connectOptions, context, true, sendStartupMessage, options));
9996
break;
10097
case PREFER:
101-
connFuture = doConnect(connectOptions, context, true, options).recover(err -> doConnect(connectOptions, context, false, options));
98+
connFuture = connect(connectOptions, context, true, sendStartupMessage, options).recover(err -> connect(connectOptions, context, false, sendStartupMessage, options));
10299
break;
103100
case REQUIRE:
104101
case VERIFY_CA:
105102
case VERIFY_FULL:
106-
connFuture = doConnect(connectOptions, context, true, options);
103+
connFuture = connect(connectOptions, context, true, sendStartupMessage, options);
107104
break;
108105
default:
109106
return context.failedFuture(new IllegalArgumentException("Unsupported SSL mode"));
110107
}
111108
return connFuture;
112109
}
113110

114-
private Future<Connection> doConnect(ConnectOptions connectOptions, ContextInternal context, boolean ssl, PgConnectOptions options) {
115-
return doConnect_(connectOptions, context, ssl, options).flatMap(conn -> {
116-
String username = options.getUser();
117-
String password = options.getPassword();
118-
String database = options.getDatabase();
119-
Map<String, String> properties = options.getProperties() != null ? Collections.unmodifiableMap(options.getProperties()) : null;
120-
PgSocketConnection socket = (PgSocketConnection) conn;
121-
socket.init();
122-
return Future.<Connection>future(p -> socket.sendStartupMessage(username, password, database, properties, p))
123-
.map(conn);
124-
});
111+
private Future<Connection> connect(ConnectOptions connectOptions, ContextInternal context, boolean ssl, boolean sendStartupMessage, PgConnectOptions options) {
112+
Future<Connection> res = doConnect(connectOptions, context, ssl, options);
113+
if (sendStartupMessage) {
114+
return res.flatMap(conn -> {
115+
PgSocketConnection socket = (PgSocketConnection) conn;
116+
socket.init();
117+
String username = options.getUser();
118+
String password = options.getPassword();
119+
String database = options.getDatabase();
120+
Map<String, String> properties = options.getProperties() != null ? Collections.unmodifiableMap(options.getProperties()) : null;
121+
return socket.sendStartupMessage(username, password, database, properties);
122+
});
123+
} else {
124+
return res;
125+
}
125126
}
126127

127-
private Future<Connection> doConnect_(ConnectOptions connectOptions, ContextInternal context, boolean ssl, PgConnectOptions options) {
128+
private Future<Connection> doConnect(ConnectOptions connectOptions, ContextInternal context, boolean ssl, PgConnectOptions options) {
128129
Future<NetSocket> soFut;
129130
try {
130131
soFut = client.connect(connectOptions);

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public Future<Void> cancelRequest() {
123123
Promise<Void> promise = context.owner().getOrCreateContext().promise();
124124
context.emit(promise, p -> {
125125
PgSocketConnection unwrap = (PgSocketConnection) conn.unwrap();
126-
((PgConnectionFactory) factory).cancelRequest(unwrap.connectOptions(), this.processId(), this.secretKey(), p);
126+
((PgConnectionFactory) factory).cancelRequest(unwrap.connectOptions(), this.processId(), this.secretKey()).onComplete(p);
127127
});
128128
return promise.future();
129129
}

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -83,29 +83,26 @@ public void init() {
8383
}
8484

8585
// TODO RETURN FUTURE ???
86-
void sendStartupMessage(String username, String password, String database, Map<String, String> properties, Promise<Connection> completionHandler) {
86+
Future<Connection> sendStartupMessage(String username, String password, String database, Map<String, String> properties) {
8787
InitCommand cmd = new InitCommand(this, username, password, database, properties);
88-
schedule(context, cmd).onComplete(completionHandler);
88+
return schedule(context, cmd);
8989
}
9090

91-
void sendCancelRequestMessage(int processId, int secretKey, Handler<AsyncResult<Void>> handler) {
91+
Future<Void> sendCancelRequestMessage(int processId, int secretKey) {
9292
Buffer buffer = Buffer.buffer(16);
9393
buffer.appendInt(16);
9494
// cancel request code
9595
buffer.appendInt(80877102);
9696
buffer.appendInt(processId);
9797
buffer.appendInt(secretKey);
9898

99-
socket.write(buffer).onComplete(ar -> {
99+
return socket.write(buffer).andThen(ar -> {
100100
if (ar.succeeded()) {
101101
// directly close this connection
102102
if (status == Status.CONNECTED) {
103103
status = Status.CLOSING;
104104
socket.close();
105105
}
106-
handler.handle(Future.succeededFuture());
107-
} else {
108-
handler.handle(Future.failedFuture(ar.cause()));
109106
}
110107
});
111108
}

0 commit comments

Comments
 (0)