Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@
import io.vertx.core.internal.net.NetSocketInternal;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.db2client.DB2ConnectOptions;
import io.vertx.db2client.impl.codec.CommandCodec;
import io.vertx.db2client.DB2Exception;
import io.vertx.db2client.impl.codec.ConnectionState;
import io.vertx.db2client.impl.codec.DB2CommandMessage;
import io.vertx.db2client.impl.codec.DB2Codec;
import io.vertx.db2client.impl.codec.DB2PreparedStatement;
import io.vertx.db2client.impl.codec.ExtendedBatchQueryCommandCodec;
import io.vertx.db2client.impl.codec.ExtendedQueryCommandCodec;
import io.vertx.db2client.impl.codec.ExtendedBatchQueryDB2CommandMessage;
import io.vertx.db2client.impl.codec.ExtendedQueryDB2CommandMessage;
import io.vertx.db2client.impl.command.InitialHandshakeCommand;
import io.vertx.db2client.impl.drda.ConnectionMetaData;
import io.vertx.db2client.impl.drda.SQLState;
import io.vertx.db2client.impl.drda.SqlCode;
import io.vertx.sqlclient.SqlConnectOptions;
import io.vertx.sqlclient.codec.CommandMessage;
import io.vertx.sqlclient.spi.connection.Connection;
Expand All @@ -51,6 +55,7 @@ public class DB2SocketConnection extends SocketConnectionBase {
private DB2Codec codec;
private Handler<Void> closeHandler;
public final ConnectionMetaData connMetadata = new ConnectionMetaData();
public ConnectionState status = ConnectionState.CONNECTING;

public DB2SocketConnection(NetSocketInternal socket,
ClientMetrics clientMetrics,
Expand All @@ -64,6 +69,19 @@ public DB2SocketConnection(NetSocketInternal socket,
this.connectOptions = connectOptions;
}

@Override
protected <R> void fail(CommandBase<R> command, Completable<R> handler, Throwable err) {
if (status == ConnectionState.CONNECTING && command instanceof InitialHandshakeCommand) {
// Sometimes DB2 closes the connection when sending an invalid Database name.
// -4499 = A fatal error occurred that resulted in a disconnect from the data
// source.
// 08001 = "The connection was unable to be established"
err = new DB2Exception("The connection was closed by the database server.", SqlCode.CONNECTION_REFUSED,
SQLState.AUTH_DATABASE_CONNECTION_REFUSED);
}
super.fail(command, handler, err);
}

// TODO RETURN FUTURE ???
void sendStartupMessage(String username,
String password,
Expand All @@ -90,15 +108,15 @@ public void init() {
@Override
protected CommandMessage<?, ?> toMessage(ExtendedQueryCommand<?> command, PreparedStatement preparedStatement) {
if (command.isBatch()) {
return new ExtendedBatchQueryCommandCodec<>(command, (DB2PreparedStatement) preparedStatement);
return new ExtendedBatchQueryDB2CommandMessage<>(command, (DB2PreparedStatement) preparedStatement);
} else {
return new ExtendedQueryCommandCodec(command, (DB2PreparedStatement) preparedStatement);
return new ExtendedQueryDB2CommandMessage(command, (DB2PreparedStatement) preparedStatement);
}
}

@Override
protected CommandMessage<?, ?> toMessage(CommandBase<?> command) {
return CommandCodec.wrap(command);
return DB2CommandMessage.wrap(command);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

import io.vertx.db2client.impl.command.AuthenticationCommandBase;

abstract class AuthenticationCommandBaseCodec<R, C extends AuthenticationCommandBase<R>> extends CommandCodec<R, C> {
abstract class AuthenticationDB2CommandBaseMessage<R, C extends AuthenticationCommandBase<R>> extends DB2CommandMessage<R, C> {

AuthenticationCommandBaseCodec(C cmd) {
AuthenticationDB2CommandBaseMessage(C cmd) {
super(cmd);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import io.vertx.db2client.impl.drda.DRDAQueryResponse;
import io.vertx.sqlclient.spi.protocol.CloseConnectionCommand;

class CloseConnectionCommandCodec extends CommandCodec<Void, CloseConnectionCommand> {
class CloseConnectionDB2CommandMessage extends DB2CommandMessage<Void, CloseConnectionCommand> {

CloseConnectionCommandCodec(CloseConnectionCommand cmd) {
CloseConnectionDB2CommandMessage(CloseConnectionCommand cmd) {
super(cmd);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import io.vertx.sqlclient.spi.protocol.CloseCursorCommand;
import io.vertx.sqlclient.codec.CommandResponse;

class CloseCursorCommandCodec extends CommandCodec<Void, CloseCursorCommand> {
class CloseCursorDB2CommandMessage extends DB2CommandMessage<Void, CloseCursorCommand> {

private static final Logger LOG = LoggerFactory.getLogger(CloseCursorCommandCodec.class);
private static final Logger LOG = LoggerFactory.getLogger(CloseCursorDB2CommandMessage.class);

CloseCursorCommandCodec(CloseCursorCommand cmd) {
CloseCursorDB2CommandMessage(CloseCursorCommand cmd) {
super(cmd);
}

Expand All @@ -52,6 +52,6 @@ void encode(DB2Encoder encoder) {
void decodePayload(ByteBuf payload, int payloadLength) {
DRDAQueryResponse closeCursor = new DRDAQueryResponse(payload, encoder.socketConnection.connMetadata);
closeCursor.readCursorClose();
completionHandler.handle(CommandResponse.success(null));
fireCommandSuccess(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@
import io.vertx.sqlclient.spi.protocol.CloseStatementCommand;
import io.vertx.sqlclient.codec.CommandResponse;

class CloseStatementCommandCodec extends CommandCodec<Void, CloseStatementCommand> {
class CloseStatementDB2CommandMessage extends DB2CommandMessage<Void, CloseStatementCommand> {

CloseStatementCommandCodec(CloseStatementCommand cmd) {
CloseStatementDB2CommandMessage(CloseStatementCommand cmd) {
super(cmd);
}

@Override
void encode(DB2Encoder encoder) {
super.encode(encoder);
DB2PreparedStatement statement = (DB2PreparedStatement) cmd.statement();
statement.close();
completionHandler.handle(CommandResponse.success(null));
fireCommandSuccess(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.vertx.db2client.impl.codec;

public enum ConnectionState {
CONNECTING, AUTHENTICATING, CONNECTED, CONNECT_FAILED
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
*/
package io.vertx.db2client.impl.codec;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.CombinedChannelDuplexHandler;
import io.vertx.db2client.impl.DB2SocketConnection;
import io.vertx.sqlclient.ClosedConnectionException;

import java.util.ArrayDeque;

Expand All @@ -27,23 +25,11 @@ public class DB2Codec extends CombinedChannelDuplexHandler<DB2Decoder, DB2Encode
// TODO @AGG check what packet length limit actually is for DB2
static final int PACKET_PAYLOAD_LENGTH_LIMIT = 0xFFFFFF;

private final ArrayDeque<CommandCodec<?, ?>> inflight = new ArrayDeque<>();
private final ArrayDeque<DB2CommandMessage<?, ?>> inflight = new ArrayDeque<>();

public DB2Codec(DB2SocketConnection db2SocketConnection) {
DB2Encoder encoder = new DB2Encoder(inflight, db2SocketConnection);
DB2Decoder decoder = new DB2Decoder(inflight);
init(decoder, encoder);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
clearInflightCommands(ClosedConnectionException.INSTANCE);
super.channelInactive(ctx);
}

private void clearInflightCommands(Throwable failure) {
for (CommandCodec<?, ?> commandCodec : inflight) {
commandCodec.fail(failure);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,34 +28,41 @@
import io.vertx.sqlclient.spi.protocol.PrepareStatementCommand;
import io.vertx.sqlclient.spi.protocol.SimpleQueryCommand;

public abstract class CommandCodec<R, C extends CommandBase<R>> extends CommandMessage<R, C> {
public abstract class DB2CommandMessage<R, C extends CommandBase<R>> extends CommandMessage<R, C> {

Handler<? super CommandResponse<R>> completionHandler;
public Throwable failure;
public R result;
DB2Encoder encoder;

CommandCodec(C cmd) {
DB2CommandMessage(C cmd) {
super(cmd);
}

void fireCommandFailure(Throwable err) {
encoder.fireCommandFailure(this, err);
}

void fireCommandSuccess(R result) {
encoder.fireCommandSuccess(this, result);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
public static CommandCodec<?, ?> wrap(CommandBase<?> cmd) {
CommandCodec<?, ?> codec = null;
public static DB2CommandMessage<?, ?> wrap(CommandBase<?> cmd) {
DB2CommandMessage<?, ?> codec = null;
if (cmd instanceof InitialHandshakeCommand) {
codec = new InitialHandshakeCommandCodec((InitialHandshakeCommand) cmd);
codec = new InitialHandshakeDB2CommandMessage((InitialHandshakeCommand) cmd);
} else if (cmd instanceof SimpleQueryCommand) {
codec = new SimpleQueryCommandCodec((SimpleQueryCommand) cmd);
codec = new SimpleQueryDB2CommandMessage((SimpleQueryCommand) cmd);
} else if (cmd instanceof CloseConnectionCommand) {
codec = new CloseConnectionCommandCodec((CloseConnectionCommand) cmd);
codec = new CloseConnectionDB2CommandMessage((CloseConnectionCommand) cmd);
} else if (cmd instanceof PrepareStatementCommand) {
codec = new PrepareStatementCodec((PrepareStatementCommand) cmd);
codec = new PrepareStatementDB2CommandMessage((PrepareStatementCommand) cmd);
} else if (cmd instanceof CloseStatementCommand) {
codec = new CloseStatementCommandCodec((CloseStatementCommand) cmd);
codec = new CloseStatementDB2CommandMessage((CloseStatementCommand) cmd);
} else if (cmd instanceof CloseCursorCommand) {
codec = new CloseCursorCommandCodec((CloseCursorCommand) cmd);
codec = new CloseCursorDB2CommandMessage((CloseCursorCommand) cmd);
} else if (cmd instanceof PingCommand) {
codec = new PingCommandCodec((PingCommand) cmd);
codec = new PingDB2CommandMessage((PingCommand) cmd);
// } else if (cmd instanceof InitDbCommand) {
// codec = new InitDbCommandCodec((InitDbCommand) cmd);
// } else if (cmd instanceof StatisticsCommand) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ class DB2Decoder extends ByteToMessageDecoder {

private static final Logger LOG = LoggerFactory.getLogger(DB2Decoder.class);

private final ArrayDeque<CommandCodec<?, ?>> inflight;
private final ArrayDeque<DB2CommandMessage<?, ?>> inflight;

DB2Decoder(ArrayDeque<CommandCodec<?, ?>> inflight) {
DB2Decoder(ArrayDeque<DB2CommandMessage<?, ?>> inflight) {
this.inflight = inflight;
}

Expand Down Expand Up @@ -71,7 +71,7 @@ private int computeLength(ByteBuf in) {
}

private void decodePayload(ByteBuf payload, int payloadLength) {
CommandCodec<?, ?> ctx = inflight.peek();
DB2CommandMessage<?, ?> ctx = inflight.peek();
int startIndex = payload.readerIndex();
try {
if (LOG.isDebugEnabled())
Expand All @@ -93,7 +93,7 @@ private void decodePayload(ByteBuf payload, int payloadLength) {
LOG.error("Command failed with: " + t.getMessage() + "\nCommand=" + ctx, t);
}
}
ctx.completionHandler.handle(CommandResponse.failure(t));
ctx.fireCommandFailure(t);
} finally {
payload.clear();
payload.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ class DB2Encoder extends ChannelOutboundHandlerAdapter {

public static final Logger LOG = LoggerFactory.getLogger(DB2Encoder.class);

private final ArrayDeque<CommandCodec<?, ?>> inflight;
private final ArrayDeque<DB2CommandMessage<?, ?>> inflight;
ChannelHandlerContext chctx;

final DB2SocketConnection socketConnection;

DB2Encoder(ArrayDeque<CommandCodec<?, ?>> inflight, DB2SocketConnection db2SocketConnection) {
DB2Encoder(ArrayDeque<DB2CommandMessage<?, ?>> inflight, DB2SocketConnection db2SocketConnection) {
this.inflight = inflight;
this.socketConnection = db2SocketConnection;
}
Expand All @@ -47,27 +47,41 @@ public void handlerAdded(ChannelHandlerContext ctx) {

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof CommandCodec<?, ?>) {
CommandCodec<?, ?> cmd = (CommandCodec<?, ?>) msg;
if (msg instanceof DB2CommandMessage<?, ?>) {
DB2CommandMessage<?, ?> cmd = (DB2CommandMessage<?, ?>) msg;
write(cmd);
} else {
super.write(ctx, msg, promise);
}
}

void fireCommandFailure(DB2CommandMessage msg, Throwable err) {
DB2CommandMessage<?, ?> c = inflight.poll();
if (c == msg) {
chctx.fireChannelRead(CommandResponse.failure(err));
} else {
throw new IllegalStateException();
}
}

<R> void fireCommandSuccess(DB2CommandMessage msg, R result) {
DB2CommandMessage<?, ?> c = inflight.poll();
if (c == msg) {
chctx.fireChannelRead(CommandResponse.success(result));
} else {
throw new IllegalStateException();
}
}

@SuppressWarnings({ "unchecked", "rawtypes" })
void write(CommandCodec<?, ?> msg) {
msg.completionHandler = resp -> {
CommandCodec<?, ?> c = inflight.poll();
resp.handler = (Completable) c.handler;
chctx.fireChannelRead(resp);
};
inflight.add(msg);
void write(DB2CommandMessage<?, ?> msg) {
try {
inflight.add(msg);
msg.encode(this);
} catch (Throwable e) {
inflight.pollLast();
LOG.error("FATAL: Unable to encode command: " + msg, e);
msg.completionHandler.handle(CommandResponse.failure(e));
chctx.fireChannelRead(CommandResponse.failure(e));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.stream.Collectors;

import io.netty.buffer.ByteBuf;
import io.vertx.core.VertxException;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.db2client.impl.codec.DB2PreparedStatement.QueryInstance;
Expand All @@ -31,15 +32,15 @@
import io.vertx.sqlclient.codec.CommandResponse;
import io.vertx.sqlclient.spi.protocol.ExtendedQueryCommand;

public class ExtendedBatchQueryCommandCodec<R> extends ExtendedQueryCommandBaseCodec<R, ExtendedQueryCommand<R>> {
public class ExtendedBatchQueryDB2CommandMessage<R> extends ExtendedQueryDB2CommandBaseMessage<R, ExtendedQueryCommand<R>> {

private static final Logger LOG = LoggerFactory.getLogger(ExtendedBatchQueryCommandCodec.class);
private static final Logger LOG = LoggerFactory.getLogger(ExtendedBatchQueryDB2CommandMessage.class);

private final List<TupleBase> params;
private final List<QueryInstance> queryInstances;
private final String baseCursorId;

public ExtendedBatchQueryCommandCodec(ExtendedQueryCommand<R> cmd, DB2PreparedStatement statement) {
public ExtendedBatchQueryDB2CommandMessage(ExtendedQueryCommand<R> cmd, DB2PreparedStatement statement) {
super(cmd, statement);
params = cmd.paramsList();
queryInstances = new ArrayList<>(params.size());
Expand All @@ -49,7 +50,7 @@ public ExtendedBatchQueryCommandCodec(ExtendedQueryCommand<R> cmd, DB2PreparedSt
@Override
void encode(DB2Encoder encoder) {
if (params.isEmpty()) {
completionHandler.handle(CommandResponse.failure("Can not execute batch query with 0 sets of batch parameters."));
fireCommandFailure(VertxException.noStackTrace("Can not execute batch query with 0 sets of batch parameters."));
return;
}

Expand Down Expand Up @@ -88,7 +89,7 @@ void decodeQuery(ByteBuf payload) {
hasMoreResults &= !queryComplete;
handleQueryResult(decoder);
}
completionHandler.handle(CommandResponse.success(hasMoreResults));
fireCommandSuccess(hasMoreResults);
}

void decodeUpdate(ByteBuf payload) {
Expand All @@ -99,7 +100,7 @@ void decodeUpdate(ByteBuf payload) {
if (cmd.autoCommit()) {
updateResponse.readLocalCommit();
}
completionHandler.handle(CommandResponse.success(true));
fireCommandSuccess(true);
}

@Override
Expand Down
Loading