diff --git a/src/main/java/io/nats/client/impl/NatsConnectionReader.java b/src/main/java/io/nats/client/impl/NatsConnectionReader.java index 91a63fcbe..970d20714 100644 --- a/src/main/java/io/nats/client/impl/NatsConnectionReader.java +++ b/src/main/java/io/nats/client/impl/NatsConnectionReader.java @@ -39,13 +39,30 @@ enum Mode { GATHER_DATA }; + enum Op { + OK(OP_OK), + MSG(OP_MSG), + PING(OP_PING), + PONG(OP_PONG), + ERR(OP_ERR), + HMSG(OP_HMSG), + INFO(OP_INFO), + UNKNOWN(UNKNOWN_OP); + + public final String text; + + Op(String text) { + this.text = text; + } + } + private final NatsConnection connection; private ByteBuffer protocolBuffer; // use a byte buffer to assist character decoding private boolean gotCR; - private String op; + private Op op; private final char[] opArray; private int opPos; @@ -208,7 +225,7 @@ void gatherOp(int maxPos) throws IOException { } else if (b == SP || b == TAB) { // Got a space, get the rest of the protocol line this.op = opFor(opArray, opPos); this.opPos = 0; - if (this.op.equals(OP_MSG) || this.op.equals(OP_HMSG)) { + if (op.equals(Op.MSG) || op.equals(Op.HMSG)) { this.msgLinePosition = 0; this.mode = Mode.GATHER_MSG_HMSG_PROTO; } else { @@ -352,13 +369,13 @@ void gatherMessageData(int maxPos) throws IOException { NatsMessage m = incoming.getMessage(); this.connection.deliverMessage(m); if (readListener != null) { - readListener.message(op, m); + readListener.message(op.text, m); } msgData = null; msgDataPosition = 0; incoming = null; gotCR = false; - this.op = UNKNOWN_OP; + this.op = Op.UNKNOWN; this.mode = Mode.GATHER_OP; break; } else { @@ -394,51 +411,64 @@ public String grabNextMessageLineElement(int max) { return new String(this.msgLineChars, start, this.msgLinePosition-start); } - static String opFor(char[] chars, int length) { + static Op opFor(char[] chars, int length) { if (length == 3) { - if ((chars[0] == 'M' || chars[0] == 'm') && - (chars[1] == 'S' || chars[1] == 's') && - (chars[2] == 'G' || chars[2] == 'g')) { - return OP_MSG; - } else if (chars[0] == '+' && - (chars[1] == 'O' || chars[1] == 'o') && + if ((chars[0] == 'M' || chars[0] == 'm')) { + if ((chars[1] == 'S' || chars[1] == 's') && + (chars[2] == 'G' || chars[2] == 'g')) + { + return Op.MSG; + } + return Op.UNKNOWN; + } + if (chars[0] == '+' && + (chars[1] == 'O' || chars[1] == 'o') && (chars[2] == 'K' || chars[2] == 'k')) { - return OP_OK; - } else { - return UNKNOWN_OP; + return Op.OK; } - } else if (length == 4) { // do them in a unique order for uniqueness when possible to branch asap - if ((chars[1] == 'I' || chars[1] == 'i') && - (chars[0] == 'P' || chars[0] == 'p') && + return Op.UNKNOWN; + } + if (length == 4) { // do them in a unique order for uniqueness when possible to branch asap + if ((chars[1] == 'I' || chars[1] == 'i')) { + if ((chars[0] == 'P' || chars[0] == 'p') && (chars[2] == 'N' || chars[2] == 'n') && (chars[3] == 'G' || chars[3] == 'g')) { - return OP_PING; - } else if ((chars[1] == 'O' || chars[1] == 'o') && - (chars[0] == 'P' || chars[0] == 'p') && - (chars[2] == 'N' || chars[2] == 'n') && - (chars[3] == 'G' || chars[3] == 'g')) { - return OP_PONG; - } else if (chars[0] == '-' && - (chars[1] == 'E' || chars[1] == 'e') && - (chars[2] == 'R' || chars[2] == 'r') && - (chars[3] == 'R' || chars[3] == 'r')) { - return OP_ERR; - } else if ((chars[0] == 'I' || chars[0] == 'i') && - (chars[1] == 'N' || chars[1] == 'n') && + return Op.PING; + } + return Op.UNKNOWN; + } + if ((chars[1] == 'O' || chars[1] == 'o')) { + if ((chars[0] == 'P' || chars[0] == 'p') && + (chars[2] == 'N' || chars[2] == 'n') && + (chars[3] == 'G' || chars[3] == 'g')) { + return Op.PONG; + } + return Op.UNKNOWN; + } + if (chars[0] == '-') { + if ((chars[1] == 'E' || chars[1] == 'e') && + (chars[2] == 'R' || chars[2] == 'r') && + (chars[3] == 'R' || chars[3] == 'r')) { + return Op.ERR; + } + return Op.UNKNOWN; + } + if ((chars[0] == 'I' || chars[0] == 'i')) { + if ((chars[1] == 'N' || chars[1] == 'n') && (chars[2] == 'F' || chars[2] == 'f') && (chars[3] == 'O' || chars[3] == 'o')) { - return OP_INFO; - } else if ((chars[0] == 'H' || chars[0] == 'h') && - (chars[1] == 'M' || chars[1] == 'm') && - (chars[2] == 'S' || chars[2] == 's') && - (chars[3] == 'G' || chars[3] == 'g')) { - return OP_HMSG; - } else { - return UNKNOWN_OP; + return Op.INFO; + } + return Op.UNKNOWN; + } + if ((chars[0] == 'H' || chars[0] == 'h') && + (chars[1] == 'M' || chars[1] == 'm') && + (chars[2] == 'S' || chars[2] == 's') && + (chars[3] == 'G' || chars[3] == 'g')) { + return Op.HMSG; } - } else { - return UNKNOWN_OP; } + return Op.UNKNOWN; } private static final int[] TENS = new int[] { 1, 10, 100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000, 100_000_000, 1_000_000_000}; @@ -467,8 +497,8 @@ public static int parseLength(String s) throws NumberFormatException { void parseProtocolMessage() throws IOException { try { - switch (this.op) { - case OP_MSG: + switch (op) { + case MSG: int protocolLength = this.msgLinePosition; //This is just after the last character int protocolLineLength = protocolLength + 4; // 4 for the "MSG " @@ -505,7 +535,7 @@ void parseProtocolMessage() throws IOException { this.msgDataPosition = 0; this.msgLinePosition = 0; break; - case OP_HMSG: + case HMSG: int hProtocolLength = this.msgLinePosition; //This is just after the last character int hProtocolLineLength = hProtocolLength + 5; // 5 for the "HMSG " @@ -549,50 +579,50 @@ void parseProtocolMessage() throws IOException { this.msgDataPosition = 0; this.msgLinePosition = 0; break; - case OP_OK: + case OK: this.connection.processOK(); if (readListener != null) { - readListener.protocol(op, null); + readListener.protocol(op.text, null); } - this.op = UNKNOWN_OP; + this.op = Op.UNKNOWN; this.mode = Mode.GATHER_OP; break; - case OP_ERR: + case ERR: String errorText = StandardCharsets.UTF_8.decode(protocolBuffer).toString().replace("'", ""); this.connection.processError(errorText); if (readListener != null) { - readListener.protocol(op, errorText); + readListener.protocol(op.text, errorText); } - this.op = UNKNOWN_OP; + this.op = Op.UNKNOWN; this.mode = Mode.GATHER_OP; break; - case OP_PING: + case PING: this.connection.sendPong(); if (readListener != null) { - readListener.protocol(op, null); + readListener.protocol(op.text, null); } - this.op = UNKNOWN_OP; + this.op = Op.UNKNOWN; this.mode = Mode.GATHER_OP; break; - case OP_PONG: + case PONG: this.connection.handlePong(); if (readListener != null) { - readListener.protocol(op, null); + readListener.protocol(op.text, null); } - this.op = UNKNOWN_OP; + this.op = Op.UNKNOWN; this.mode = Mode.GATHER_OP; break; - case OP_INFO: + case INFO: String info = StandardCharsets.UTF_8.decode(protocolBuffer).toString(); this.connection.handleInfo(info); if (readListener != null) { - readListener.protocol(op, info); + readListener.protocol(op.text, info); } - this.op = UNKNOWN_OP; + this.op = Op.UNKNOWN; this.mode = Mode.GATHER_OP; break; default: - throw new IllegalStateException("Unknown protocol operation "+op); + throw new IllegalStateException("Unknown protocol operation " + op.text); } } catch (IllegalStateException | NumberFormatException | NullPointerException ex) { @@ -608,11 +638,11 @@ void encounteredProtocolError(Exception ex) throws IOException { void fakeReadForTest(byte[] bytes) { System.arraycopy(bytes, 0, this.buffer, 0, bytes.length); this.bufferPosition = 0; - this.op = UNKNOWN_OP; + this.op = Op.UNKNOWN; this.mode = Mode.GATHER_OP; } String currentOp() { - return this.op; + return op.text; } } \ No newline at end of file diff --git a/src/test/java/io/nats/client/impl/ParseTests.java b/src/test/java/io/nats/client/impl/ParseTests.java index 5e1008f03..541a4a20b 100644 --- a/src/test/java/io/nats/client/impl/ParseTests.java +++ b/src/test/java/io/nats/client/impl/ParseTests.java @@ -226,34 +226,40 @@ public void testProtocolStrings() throws Exception { } @Test - public void testOpFor_ForCoverage() { - coverOpFor(OP_MSG, "MSG"); - coverOpFor(OP_OK, "+OK"); - coverOpFor(OP_PING, "PING"); - coverOpFor(OP_PONG, "PONG"); - coverOpFor(OP_ERR, "-ERR"); - coverOpFor(OP_INFO, "INFO"); - coverOpFor(OP_HMSG, "HMSG"); + public void testOpForCoverage() { + coverOpFor(NatsConnectionReader.Op.MSG, OP_MSG); + coverOpFor(NatsConnectionReader.Op.OK, OP_OK); + coverOpFor(NatsConnectionReader.Op.PING, OP_PING); + coverOpFor(NatsConnectionReader.Op.PONG, OP_PONG); + coverOpFor(NatsConnectionReader.Op.ERR, OP_ERR); + coverOpFor(NatsConnectionReader.Op.INFO, OP_INFO); + coverOpFor(NatsConnectionReader.Op.HMSG, OP_HMSG); assertUnknownOpFor(1, "X".toCharArray()); + assertUnknownOpFor(2, "Xx".toCharArray()); + assertUnknownOpFor(5, "Xxxxx".toCharArray()); } - private void coverOpFor(String op, String s) { + private void coverOpFor(NatsConnectionReader.Op op, String s) { _coverOpFor(op, s.toUpperCase()); _coverOpFor(op, s.toLowerCase()); } - private void _coverOpFor(String op, String s) { + private void _coverOpFor(NatsConnectionReader.Op op, String s) { int len = s.length(); assertEquals(op, NatsConnectionReader.opFor(s.toCharArray(), len)); for (int x = 0; x < len; x++) { char[] chars = s.toCharArray(); + chars[x] = Character.toLowerCase(chars[x]); + assertEquals(op, NatsConnectionReader.opFor(chars, len)); + + chars = s.toCharArray(); chars[x] = 'X'; assertUnknownOpFor(len, chars); } } private void assertUnknownOpFor(int len, char[] chars) { - assertEquals(UNKNOWN_OP, NatsConnectionReader.opFor(chars, len)); + assertEquals(NatsConnectionReader.Op.UNKNOWN, NatsConnectionReader.opFor(chars, len)); } } \ No newline at end of file