Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 91 additions & 61 deletions src/main/java/io/nats/client/impl/NatsConnectionReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,30 @@ enum Mode {
GATHER_DATA
};

enum Op {
OK(OP_OK),
MSG(OP_MSG),
PING(OP_PING),
PONG(OP_PONG),
ERR(OP_ERR),
HMSG(OP_HMSG),
INFO(OP_INFO),
UNKNOWN(UNKNOWN_OP);

public final String text;

Op(String text) {
this.text = text;
}
}

private final NatsConnection connection;

private ByteBuffer protocolBuffer; // use a byte buffer to assist character decoding

private boolean gotCR;

private String op;
private Op op;
private final char[] opArray;
private int opPos;

Expand Down Expand Up @@ -208,7 +225,7 @@ void gatherOp(int maxPos) throws IOException {
} else if (b == SP || b == TAB) { // Got a space, get the rest of the protocol line
this.op = opFor(opArray, opPos);
this.opPos = 0;
if (this.op.equals(OP_MSG) || this.op.equals(OP_HMSG)) {
if (op.equals(Op.MSG) || op.equals(Op.HMSG)) {
this.msgLinePosition = 0;
this.mode = Mode.GATHER_MSG_HMSG_PROTO;
} else {
Expand Down Expand Up @@ -352,13 +369,13 @@ void gatherMessageData(int maxPos) throws IOException {
NatsMessage m = incoming.getMessage();
this.connection.deliverMessage(m);
if (readListener != null) {
readListener.message(op, m);
readListener.message(op.text, m);
}
msgData = null;
msgDataPosition = 0;
incoming = null;
gotCR = false;
this.op = UNKNOWN_OP;
this.op = Op.UNKNOWN;
this.mode = Mode.GATHER_OP;
break;
} else {
Expand Down Expand Up @@ -394,51 +411,64 @@ public String grabNextMessageLineElement(int max) {
return new String(this.msgLineChars, start, this.msgLinePosition-start);
}

static String opFor(char[] chars, int length) {
static Op opFor(char[] chars, int length) {
if (length == 3) {
if ((chars[0] == 'M' || chars[0] == 'm') &&
(chars[1] == 'S' || chars[1] == 's') &&
(chars[2] == 'G' || chars[2] == 'g')) {
return OP_MSG;
} else if (chars[0] == '+' &&
(chars[1] == 'O' || chars[1] == 'o') &&
if ((chars[0] == 'M' || chars[0] == 'm')) {
if ((chars[1] == 'S' || chars[1] == 's') &&
(chars[2] == 'G' || chars[2] == 'g'))
{
return Op.MSG;
}
return Op.UNKNOWN;
}
if (chars[0] == '+' &&
(chars[1] == 'O' || chars[1] == 'o') &&
(chars[2] == 'K' || chars[2] == 'k')) {
return OP_OK;
} else {
return UNKNOWN_OP;
return Op.OK;
}
} else if (length == 4) { // do them in a unique order for uniqueness when possible to branch asap
if ((chars[1] == 'I' || chars[1] == 'i') &&
(chars[0] == 'P' || chars[0] == 'p') &&
return Op.UNKNOWN;
}
if (length == 4) { // do them in a unique order for uniqueness when possible to branch asap
if ((chars[1] == 'I' || chars[1] == 'i')) {
if ((chars[0] == 'P' || chars[0] == 'p') &&
(chars[2] == 'N' || chars[2] == 'n') &&
(chars[3] == 'G' || chars[3] == 'g')) {
return OP_PING;
} else if ((chars[1] == 'O' || chars[1] == 'o') &&
(chars[0] == 'P' || chars[0] == 'p') &&
(chars[2] == 'N' || chars[2] == 'n') &&
(chars[3] == 'G' || chars[3] == 'g')) {
return OP_PONG;
} else if (chars[0] == '-' &&
(chars[1] == 'E' || chars[1] == 'e') &&
(chars[2] == 'R' || chars[2] == 'r') &&
(chars[3] == 'R' || chars[3] == 'r')) {
return OP_ERR;
} else if ((chars[0] == 'I' || chars[0] == 'i') &&
(chars[1] == 'N' || chars[1] == 'n') &&
return Op.PING;
}
return Op.UNKNOWN;
}
if ((chars[1] == 'O' || chars[1] == 'o')) {
if ((chars[0] == 'P' || chars[0] == 'p') &&
(chars[2] == 'N' || chars[2] == 'n') &&
(chars[3] == 'G' || chars[3] == 'g')) {
return Op.PONG;
}
return Op.UNKNOWN;
}
if (chars[0] == '-') {
if ((chars[1] == 'E' || chars[1] == 'e') &&
(chars[2] == 'R' || chars[2] == 'r') &&
(chars[3] == 'R' || chars[3] == 'r')) {
return Op.ERR;
}
return Op.UNKNOWN;
}
if ((chars[0] == 'I' || chars[0] == 'i')) {
if ((chars[1] == 'N' || chars[1] == 'n') &&
(chars[2] == 'F' || chars[2] == 'f') &&
(chars[3] == 'O' || chars[3] == 'o')) {
return OP_INFO;
} else if ((chars[0] == 'H' || chars[0] == 'h') &&
(chars[1] == 'M' || chars[1] == 'm') &&
(chars[2] == 'S' || chars[2] == 's') &&
(chars[3] == 'G' || chars[3] == 'g')) {
return OP_HMSG;
} else {
return UNKNOWN_OP;
return Op.INFO;
}
return Op.UNKNOWN;
}
if ((chars[0] == 'H' || chars[0] == 'h') &&
(chars[1] == 'M' || chars[1] == 'm') &&
(chars[2] == 'S' || chars[2] == 's') &&
(chars[3] == 'G' || chars[3] == 'g')) {
return Op.HMSG;
}
} else {
return UNKNOWN_OP;
}
return Op.UNKNOWN;
}

private static final int[] TENS = new int[] { 1, 10, 100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000, 100_000_000, 1_000_000_000};
Expand Down Expand Up @@ -467,8 +497,8 @@ public static int parseLength(String s) throws NumberFormatException {

void parseProtocolMessage() throws IOException {
try {
switch (this.op) {
case OP_MSG:
switch (op) {
case MSG:
int protocolLength = this.msgLinePosition; //This is just after the last character
int protocolLineLength = protocolLength + 4; // 4 for the "MSG "

Expand Down Expand Up @@ -505,7 +535,7 @@ void parseProtocolMessage() throws IOException {
this.msgDataPosition = 0;
this.msgLinePosition = 0;
break;
case OP_HMSG:
case HMSG:
int hProtocolLength = this.msgLinePosition; //This is just after the last character
int hProtocolLineLength = hProtocolLength + 5; // 5 for the "HMSG "

Expand Down Expand Up @@ -549,50 +579,50 @@ void parseProtocolMessage() throws IOException {
this.msgDataPosition = 0;
this.msgLinePosition = 0;
break;
case OP_OK:
case OK:
this.connection.processOK();
if (readListener != null) {
readListener.protocol(op, null);
readListener.protocol(op.text, null);
}
this.op = UNKNOWN_OP;
this.op = Op.UNKNOWN;
this.mode = Mode.GATHER_OP;
break;
case OP_ERR:
case ERR:
String errorText = StandardCharsets.UTF_8.decode(protocolBuffer).toString().replace("'", "");
this.connection.processError(errorText);
if (readListener != null) {
readListener.protocol(op, errorText);
readListener.protocol(op.text, errorText);
}
this.op = UNKNOWN_OP;
this.op = Op.UNKNOWN;
this.mode = Mode.GATHER_OP;
break;
case OP_PING:
case PING:
this.connection.sendPong();
if (readListener != null) {
readListener.protocol(op, null);
readListener.protocol(op.text, null);
}
this.op = UNKNOWN_OP;
this.op = Op.UNKNOWN;
this.mode = Mode.GATHER_OP;
break;
case OP_PONG:
case PONG:
this.connection.handlePong();
if (readListener != null) {
readListener.protocol(op, null);
readListener.protocol(op.text, null);
}
this.op = UNKNOWN_OP;
this.op = Op.UNKNOWN;
this.mode = Mode.GATHER_OP;
break;
case OP_INFO:
case INFO:
String info = StandardCharsets.UTF_8.decode(protocolBuffer).toString();
this.connection.handleInfo(info);
if (readListener != null) {
readListener.protocol(op, info);
readListener.protocol(op.text, info);
}
this.op = UNKNOWN_OP;
this.op = Op.UNKNOWN;
this.mode = Mode.GATHER_OP;
break;
default:
throw new IllegalStateException("Unknown protocol operation "+op);
throw new IllegalStateException("Unknown protocol operation " + op.text);
}

} catch (IllegalStateException | NumberFormatException | NullPointerException ex) {
Expand All @@ -608,11 +638,11 @@ void encounteredProtocolError(Exception ex) throws IOException {
void fakeReadForTest(byte[] bytes) {
System.arraycopy(bytes, 0, this.buffer, 0, bytes.length);
this.bufferPosition = 0;
this.op = UNKNOWN_OP;
this.op = Op.UNKNOWN;
this.mode = Mode.GATHER_OP;
}

String currentOp() {
return this.op;
return op.text;
}
}
28 changes: 17 additions & 11 deletions src/test/java/io/nats/client/impl/ParseTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,34 +226,40 @@ public void testProtocolStrings() throws Exception {
}

@Test
public void testOpFor_ForCoverage() {
coverOpFor(OP_MSG, "MSG");
coverOpFor(OP_OK, "+OK");
coverOpFor(OP_PING, "PING");
coverOpFor(OP_PONG, "PONG");
coverOpFor(OP_ERR, "-ERR");
coverOpFor(OP_INFO, "INFO");
coverOpFor(OP_HMSG, "HMSG");
public void testOpForCoverage() {
coverOpFor(NatsConnectionReader.Op.MSG, OP_MSG);
coverOpFor(NatsConnectionReader.Op.OK, OP_OK);
coverOpFor(NatsConnectionReader.Op.PING, OP_PING);
coverOpFor(NatsConnectionReader.Op.PONG, OP_PONG);
coverOpFor(NatsConnectionReader.Op.ERR, OP_ERR);
coverOpFor(NatsConnectionReader.Op.INFO, OP_INFO);
coverOpFor(NatsConnectionReader.Op.HMSG, OP_HMSG);

assertUnknownOpFor(1, "X".toCharArray());
assertUnknownOpFor(2, "Xx".toCharArray());
assertUnknownOpFor(5, "Xxxxx".toCharArray());
}

private void coverOpFor(String op, String s) {
private void coverOpFor(NatsConnectionReader.Op op, String s) {
_coverOpFor(op, s.toUpperCase());
_coverOpFor(op, s.toLowerCase());
}

private void _coverOpFor(String op, String s) {
private void _coverOpFor(NatsConnectionReader.Op op, String s) {
int len = s.length();
assertEquals(op, NatsConnectionReader.opFor(s.toCharArray(), len));
for (int x = 0; x < len; x++) {
char[] chars = s.toCharArray();
chars[x] = Character.toLowerCase(chars[x]);
assertEquals(op, NatsConnectionReader.opFor(chars, len));

chars = s.toCharArray();
chars[x] = 'X';
assertUnknownOpFor(len, chars);
}
}

private void assertUnknownOpFor(int len, char[] chars) {
assertEquals(UNKNOWN_OP, NatsConnectionReader.opFor(chars, len));
assertEquals(NatsConnectionReader.Op.UNKNOWN, NatsConnectionReader.opFor(chars, len));
}
}
Loading