Skip to content

Commit 90e29f5

Browse files
committed
Refactor socket based clients to produce a command message from a command.
Motivation: Commands are currently sent to every socket based encoder (e.g. PgEncoder) that then turns it into a codec object that is then encoded. There are two issues related to this: 1. the codec is actually the message to be sent and not the command 2. the command requires the handler to be executed and we want to make the command base handler private (so it can be removed later) Then: Most likely we will continue this refactor by renaming *Codec classes to *Message and make the Completable<R> handler part of the message, so we can remove it from CommandBase
1 parent 4ccee4d commit 90e29f5

27 files changed

+285
-215
lines changed

vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2SocketConnection.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@
2424
import io.vertx.core.internal.net.NetSocketInternal;
2525
import io.vertx.core.spi.metrics.ClientMetrics;
2626
import io.vertx.db2client.DB2ConnectOptions;
27+
import io.vertx.db2client.impl.codec.CommandCodec;
2728
import io.vertx.db2client.impl.codec.DB2Codec;
2829
import io.vertx.db2client.impl.command.InitialHandshakeCommand;
2930
import io.vertx.db2client.impl.drda.ConnectionMetaData;
3031
import io.vertx.sqlclient.SqlConnectOptions;
32+
import io.vertx.sqlclient.impl.CommandMessage;
3133
import io.vertx.sqlclient.internal.Connection;
3234
import io.vertx.sqlclient.internal.QueryResultHandler;
3335
import io.vertx.sqlclient.impl.SocketConnectionBase;
@@ -79,6 +81,11 @@ public void init() {
7981
super.init();
8082
}
8183

84+
@Override
85+
protected CommandMessage<?, ?> toMessage(CommandBase<?> command) {
86+
return CommandCodec.wrap(command);
87+
}
88+
8289
@Override
8390
protected <R> void doSchedule(CommandBase<R> cmd, Completable<R> handler) {
8491
if (cmd instanceof TxCommand) {

vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CommandCodec.java

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,19 @@
1717

1818
import io.netty.buffer.ByteBuf;
1919
import io.vertx.core.Handler;
20+
import io.vertx.db2client.impl.command.InitialHandshakeCommand;
21+
import io.vertx.db2client.impl.command.PingCommand;
22+
import io.vertx.sqlclient.impl.CommandMessage;
23+
import io.vertx.sqlclient.internal.command.CloseConnectionCommand;
24+
import io.vertx.sqlclient.internal.command.CloseCursorCommand;
25+
import io.vertx.sqlclient.internal.command.CloseStatementCommand;
2026
import io.vertx.sqlclient.internal.command.CommandBase;
2127
import io.vertx.sqlclient.internal.command.CommandResponse;
28+
import io.vertx.sqlclient.internal.command.ExtendedQueryCommand;
29+
import io.vertx.sqlclient.internal.command.PrepareStatementCommand;
30+
import io.vertx.sqlclient.internal.command.SimpleQueryCommand;
2231

23-
abstract class CommandCodec<R, C extends CommandBase<R>> {
32+
public abstract class CommandCodec<R, C extends CommandBase<R>> extends CommandMessage<R, C> {
2433

2534
Handler<? super CommandResponse<R>> completionHandler;
2635
public Throwable failure;
@@ -32,6 +41,52 @@ abstract class CommandCodec<R, C extends CommandBase<R>> {
3241
this.cmd = cmd;
3342
}
3443

44+
@SuppressWarnings({ "rawtypes", "unchecked" })
45+
public static CommandCodec<?, ?> wrap(CommandBase<?> cmd) {
46+
CommandCodec<?, ?> codec = null;
47+
if (cmd instanceof InitialHandshakeCommand) {
48+
codec = new InitialHandshakeCommandCodec((InitialHandshakeCommand) cmd);
49+
} else if (cmd instanceof SimpleQueryCommand) {
50+
codec = new SimpleQueryCommandCodec((SimpleQueryCommand) cmd);
51+
} else if (cmd instanceof ExtendedQueryCommand) {
52+
ExtendedQueryCommand<?> queryCmd = (ExtendedQueryCommand<?>) cmd;
53+
if (queryCmd.isBatch()) {
54+
codec = new ExtendedBatchQueryCommandCodec<>(queryCmd);
55+
} else {
56+
codec = new ExtendedQueryCommandCodec(queryCmd);
57+
}
58+
} else if (cmd instanceof CloseConnectionCommand) {
59+
codec = new CloseConnectionCommandCodec((CloseConnectionCommand) cmd);
60+
} else if (cmd instanceof PrepareStatementCommand) {
61+
codec = new PrepareStatementCodec((PrepareStatementCommand) cmd);
62+
} else if (cmd instanceof CloseStatementCommand) {
63+
codec = new CloseStatementCommandCodec((CloseStatementCommand) cmd);
64+
} else if (cmd instanceof CloseCursorCommand) {
65+
codec = new CloseCursorCommandCodec((CloseCursorCommand) cmd);
66+
} else if (cmd instanceof PingCommand) {
67+
codec = new PingCommandCodec((PingCommand) cmd);
68+
// } else if (cmd instanceof InitDbCommand) {
69+
// codec = new InitDbCommandCodec((InitDbCommand) cmd);
70+
// } else if (cmd instanceof StatisticsCommand) {
71+
// codec = new StatisticsCommandCodec((StatisticsCommand) cmd);
72+
// } else if (cmd instanceof SetOptionCommand) {
73+
// codec = new SetOptionCommandCodec((SetOptionCommand) cmd);
74+
// } else if (cmd instanceof ResetConnectionCommand) {
75+
// codec = new ResetConnectionCommandCodec((ResetConnectionCommand) cmd);
76+
// } else if (cmd instanceof DebugCommand) {
77+
// codec = new DebugCommandCodec((DebugCommand) cmd);
78+
// } else if (cmd instanceof ChangeUserCommand) {
79+
// codec = new ChangeUserCommandCodec((ChangeUserCommand) cmd);
80+
} else {
81+
UnsupportedOperationException uoe = new UnsupportedOperationException("Unsupported command type: " + cmd);
82+
DB2Encoder.LOG.error(uoe);
83+
throw uoe;
84+
}
85+
if (DB2Encoder.LOG.isDebugEnabled())
86+
DB2Encoder.LOG.debug(">>> ENCODE " + codec);
87+
return codec;
88+
}
89+
3590
abstract void decodePayload(ByteBuf payload, int payloadLength);
3691

3792
void encode(DB2Encoder encoder) {

vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2Encoder.java

Lines changed: 9 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,12 @@
2323
import io.vertx.core.internal.logging.Logger;
2424
import io.vertx.core.internal.logging.LoggerFactory;
2525
import io.vertx.db2client.impl.DB2SocketConnection;
26-
import io.vertx.db2client.impl.command.InitialHandshakeCommand;
27-
import io.vertx.db2client.impl.command.PingCommand;
28-
import io.vertx.sqlclient.internal.command.CloseConnectionCommand;
29-
import io.vertx.sqlclient.internal.command.CloseCursorCommand;
30-
import io.vertx.sqlclient.internal.command.CloseStatementCommand;
3126
import io.vertx.sqlclient.internal.command.CommandBase;
3227
import io.vertx.sqlclient.internal.command.CommandResponse;
33-
import io.vertx.sqlclient.internal.command.ExtendedQueryCommand;
34-
import io.vertx.sqlclient.internal.command.PrepareStatementCommand;
35-
import io.vertx.sqlclient.internal.command.SimpleQueryCommand;
3628

3729
class DB2Encoder extends ChannelOutboundHandlerAdapter {
3830

39-
private static final Logger LOG = LoggerFactory.getLogger(DB2Encoder.class);
31+
public static final Logger LOG = LoggerFactory.getLogger(DB2Encoder.class);
4032

4133
private final ArrayDeque<CommandCodec<?, ?>> inflight;
4234
ChannelHandlerContext chctx;
@@ -55,75 +47,27 @@ public void handlerAdded(ChannelHandlerContext ctx) {
5547

5648
@Override
5749
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
58-
if (msg instanceof CommandBase<?>) {
59-
CommandBase<?> cmd = (CommandBase<?>) msg;
50+
if (msg instanceof CommandCodec<?, ?>) {
51+
CommandCodec<?, ?> cmd = (CommandCodec<?, ?>) msg;
6052
write(cmd);
6153
} else {
6254
super.write(ctx, msg, promise);
6355
}
6456
}
6557

6658
@SuppressWarnings({ "unchecked", "rawtypes" })
67-
void write(CommandBase<?> cmd) {
68-
CommandCodec<?, ?> codec = wrap(cmd);
69-
codec.completionHandler = resp -> {
59+
void write(CommandCodec<?, ?> msg) {
60+
msg.completionHandler = resp -> {
7061
CommandCodec<?, ?> c = inflight.poll();
7162
resp.cmd = (CommandBase) c.cmd;
7263
chctx.fireChannelRead(resp);
7364
};
74-
inflight.add(codec);
65+
inflight.add(msg);
7566
try {
76-
codec.encode(this);
67+
msg.encode(this);
7768
} catch (Throwable e) {
78-
LOG.error("FATAL: Unable to encode command: " + cmd, e);
79-
codec.completionHandler.handle(CommandResponse.failure(e));
69+
LOG.error("FATAL: Unable to encode command: " + msg, e);
70+
msg.completionHandler.handle(CommandResponse.failure(e));
8071
}
8172
}
82-
83-
@SuppressWarnings({ "rawtypes", "unchecked" })
84-
private CommandCodec<?, ?> wrap(CommandBase<?> cmd) {
85-
CommandCodec<?, ?> codec = null;
86-
if (cmd instanceof InitialHandshakeCommand) {
87-
codec = new InitialHandshakeCommandCodec((InitialHandshakeCommand) cmd);
88-
} else if (cmd instanceof SimpleQueryCommand) {
89-
codec = new SimpleQueryCommandCodec((SimpleQueryCommand) cmd);
90-
} else if (cmd instanceof ExtendedQueryCommand) {
91-
ExtendedQueryCommand<?> queryCmd = (ExtendedQueryCommand<?>) cmd;
92-
if (queryCmd.isBatch()) {
93-
codec = new ExtendedBatchQueryCommandCodec<>(queryCmd);
94-
} else {
95-
codec = new ExtendedQueryCommandCodec(queryCmd);
96-
}
97-
} else if (cmd instanceof CloseConnectionCommand) {
98-
codec = new CloseConnectionCommandCodec((CloseConnectionCommand) cmd);
99-
} else if (cmd instanceof PrepareStatementCommand) {
100-
codec = new PrepareStatementCodec((PrepareStatementCommand) cmd);
101-
} else if (cmd instanceof CloseStatementCommand) {
102-
codec = new CloseStatementCommandCodec((CloseStatementCommand) cmd);
103-
} else if (cmd instanceof CloseCursorCommand) {
104-
codec = new CloseCursorCommandCodec((CloseCursorCommand) cmd);
105-
} else if (cmd instanceof PingCommand) {
106-
codec = new PingCommandCodec((PingCommand) cmd);
107-
// } else if (cmd instanceof InitDbCommand) {
108-
// codec = new InitDbCommandCodec((InitDbCommand) cmd);
109-
// } else if (cmd instanceof StatisticsCommand) {
110-
// codec = new StatisticsCommandCodec((StatisticsCommand) cmd);
111-
// } else if (cmd instanceof SetOptionCommand) {
112-
// codec = new SetOptionCommandCodec((SetOptionCommand) cmd);
113-
// } else if (cmd instanceof ResetConnectionCommand) {
114-
// codec = new ResetConnectionCommandCodec((ResetConnectionCommand) cmd);
115-
// } else if (cmd instanceof DebugCommand) {
116-
// codec = new DebugCommandCodec((DebugCommand) cmd);
117-
// } else if (cmd instanceof ChangeUserCommand) {
118-
// codec = new ChangeUserCommandCodec((ChangeUserCommand) cmd);
119-
} else {
120-
UnsupportedOperationException uoe = new UnsupportedOperationException("Unsupported command type: " + cmd);
121-
LOG.error(uoe);
122-
throw uoe;
123-
}
124-
if (LOG.isDebugEnabled())
125-
LOG.debug(">>> ENCODE " + codec);
126-
return codec;
127-
}
128-
12973
}

vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLSocketConnection.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,14 @@
2929
import io.vertx.core.spi.metrics.ClientMetrics;
3030
import io.vertx.mssqlclient.MSSQLConnectOptions;
3131
import io.vertx.mssqlclient.MSSQLInfo;
32+
import io.vertx.mssqlclient.impl.codec.MSSQLCommandCodec;
3233
import io.vertx.mssqlclient.impl.codec.TdsLoginSentCompletionHandler;
3334
import io.vertx.mssqlclient.impl.codec.TdsMessageCodec;
3435
import io.vertx.mssqlclient.impl.codec.TdsPacketDecoder;
3536
import io.vertx.mssqlclient.impl.codec.TdsSslHandshakeCodec;
3637
import io.vertx.mssqlclient.impl.command.PreLoginCommand;
3738
import io.vertx.sqlclient.SqlConnectOptions;
39+
import io.vertx.sqlclient.impl.CommandMessage;
3840
import io.vertx.sqlclient.internal.Connection;
3941
import io.vertx.sqlclient.internal.QueryResultHandler;
4042
import io.vertx.sqlclient.impl.SocketConnectionBase;
@@ -147,6 +149,11 @@ public void init() {
147149
super.init();
148150
}
149151

152+
@Override
153+
protected CommandMessage<?, ?> toMessage(CommandBase<?> command) {
154+
return MSSQLCommandCodec.wrap(command);
155+
}
156+
150157
@Override
151158
protected <R> void doSchedule(CommandBase<R> cmd, Completable<R> handler) {
152159
if (cmd instanceof TxCommand) {

vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/CloseConnectionCommandCodec.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
import io.vertx.sqlclient.internal.command.CloseConnectionCommand;
1515

1616
class CloseConnectionCommandCodec extends MSSQLCommandCodec<Void, CloseConnectionCommand> {
17-
CloseConnectionCommandCodec(TdsMessageCodec tdsMessageCodec, CloseConnectionCommand cmd) {
18-
super(tdsMessageCodec, cmd);
17+
CloseConnectionCommandCodec(CloseConnectionCommand cmd) {
18+
super(cmd);
1919
}
2020

2121
@Override

vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/CloseCursorCommandCodec.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,16 @@
2020

2121
public class CloseCursorCommandCodec extends MSSQLCommandCodec<Void, CloseCursorCommand> {
2222

23-
private final CursorData cursorData;
23+
private CursorData cursorData;
2424
private boolean cursorClosed;
2525

26-
public CloseCursorCommandCodec(TdsMessageCodec tdsMessageCodec, CloseCursorCommand cmd) {
27-
super(tdsMessageCodec, cmd);
28-
cursorData = tdsMessageCodec.removeCursorData(cmd.id());
26+
public CloseCursorCommandCodec(CloseCursorCommand cmd) {
27+
super(cmd);
2928
}
3029

3130
@Override
3231
void encode() {
32+
cursorData = tdsMessageCodec.removeCursorData(cmd.id());
3333
if (cursorData != null && cursorData.serverCursorId > 0) {
3434
sendCursorClose();
3535
} else {

vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/CloseStatementCommandCodec.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020

2121
class CloseStatementCommandCodec extends MSSQLCommandCodec<Void, CloseStatementCommand> {
2222

23-
CloseStatementCommandCodec(TdsMessageCodec tdsMessageCodec, CloseStatementCommand cmd) {
24-
super(tdsMessageCodec, cmd);
23+
CloseStatementCommandCodec(CloseStatementCommand cmd) {
24+
super(cmd);
2525
}
2626

2727
@Override

vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedBatchQueryCommandCodec.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ class ExtendedBatchQueryCommandCodec<T> extends ExtendedQueryCommandBaseCodec<T>
2525
private int paramsIdx;
2626
private int messageDecoded;
2727

28-
ExtendedBatchQueryCommandCodec(TdsMessageCodec tdsMessageCodec, ExtendedQueryCommand<T> cmd) {
29-
super(tdsMessageCodec, cmd);
28+
ExtendedBatchQueryCommandCodec(ExtendedQueryCommand<T> cmd) {
29+
super(cmd);
3030
paramsList = cmd.paramsList();
3131
}
3232

vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedCursorQueryCommandCodec.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,15 @@
2121

2222
class ExtendedCursorQueryCommandCodec<T> extends ExtendedQueryCommandBaseCodec<T> {
2323

24-
private final CursorData cursorData;
24+
private CursorData cursorData;
2525

26-
ExtendedCursorQueryCommandCodec(TdsMessageCodec tdsMessageCodec, ExtendedQueryCommand<T> cmd) {
27-
super(tdsMessageCodec, cmd);
28-
cursorData = tdsMessageCodec.getOrCreateCursorData(cmd.cursorId());
26+
ExtendedCursorQueryCommandCodec(ExtendedQueryCommand<T> cmd) {
27+
super(cmd);
2928
}
3029

3130
@Override
3231
void encode() {
32+
cursorData = tdsMessageCodec.getOrCreateCursorData(cmd.cursorId());
3333
if (cursorData.preparedHandle == 0) {
3434
sendCursorPrepExec();
3535
} else {

vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedQueryCommandBaseCodec.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,18 @@ abstract class ExtendedQueryCommandBaseCodec<T> extends QueryCommandBaseCodec<T,
2323

2424
final MSSQLPreparedStatement ps;
2525

26-
ExtendedQueryCommandBaseCodec(TdsMessageCodec tdsMessageCodec, ExtendedQueryCommand<T> cmd) {
27-
super(tdsMessageCodec, cmd);
26+
ExtendedQueryCommandBaseCodec(ExtendedQueryCommand<T> cmd) {
27+
super(cmd);
2828
ps = (MSSQLPreparedStatement) this.cmd.preparedStatement();
2929
}
3030

31-
public static <U> MSSQLCommandCodec<?, ?> create(TdsMessageCodec tdsMessageCodec, ExtendedQueryCommand<U> queryCmd) {
31+
public static <U> MSSQLCommandCodec<?, ?> create(ExtendedQueryCommand<U> queryCmd) {
3232
if (queryCmd.isBatch()) {
33-
return new ExtendedBatchQueryCommandCodec<>(tdsMessageCodec, queryCmd);
33+
return new ExtendedBatchQueryCommandCodec<>(queryCmd);
3434
} else if (queryCmd.cursorId() != null) {
35-
return new ExtendedCursorQueryCommandCodec<>(tdsMessageCodec, queryCmd);
35+
return new ExtendedCursorQueryCommandCodec<>(queryCmd);
3636
} else {
37-
return new ExtendedQueryCommandCodec<>(tdsMessageCodec, queryCmd);
37+
return new ExtendedQueryCommandCodec<>(queryCmd);
3838
}
3939
}
4040

0 commit comments

Comments
 (0)