Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/main/java/com/rabbitmq/client/impl/AMQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -689,10 +689,10 @@ public boolean hasBrokerInitiatedShutdown() {
private void readFrame(Frame frame) throws IOException {
if (frame != null) {
_missedHeartbeats = 0;
if (frame.type == AMQP.FRAME_HEARTBEAT) {
if (frame.getType() == AMQP.FRAME_HEARTBEAT) {
// Ignore it: we've already just reset the heartbeat counter.
} else {
if (frame.channel == 0) { // the special channel
if (frame.getChannel() == 0) { // the special channel
_channel0.handleFrame(frame);
} else {
if (isOpen()) {
Expand All @@ -705,7 +705,7 @@ private void readFrame(Frame frame) throws IOException {
if (cm != null) {
ChannelN channel;
try {
channel = cm.getChannel(frame.channel);
channel = cm.getChannel(frame.getChannel());
} catch(UnknownChannelException e) {
// this can happen if channel has been closed,
// but there was e.g. an in-flight delivery.
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/rabbitmq/client/impl/CommandAssembler.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private void updateContentBodyState() {
}

private void consumeMethodFrame(Frame f) throws IOException {
if (f.type == AMQP.FRAME_METHOD) {
if (f.getType() == AMQP.FRAME_METHOD) {
this.method = AMQImpl.readMethodFrom(f.getInputStream());
this.state = this.method.hasContent() ? CAState.EXPECTING_CONTENT_HEADER : CAState.COMPLETE;
} else {
Expand All @@ -97,7 +97,7 @@ private void consumeMethodFrame(Frame f) throws IOException {
}

private void consumeHeaderFrame(Frame f) throws IOException {
if (f.type == AMQP.FRAME_HEADER) {
if (f.getType() == AMQP.FRAME_HEADER) {
this.contentHeader = AMQImpl.readContentHeaderFrom(f.getInputStream());
this.remainingBodyBytes = this.contentHeader.getBodySize();
updateContentBodyState();
Expand All @@ -107,7 +107,7 @@ private void consumeHeaderFrame(Frame f) throws IOException {
}

private void consumeBodyFrame(Frame f) {
if (f.type == AMQP.FRAME_BODY) {
if (f.getType() == AMQP.FRAME_BODY) {
byte[] fragment = f.getPayload();
this.remainingBodyBytes -= fragment.length;
updateContentBodyState();
Expand Down
13 changes: 10 additions & 3 deletions src/main/java/com/rabbitmq/client/impl/Frame.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@

/**
* Represents an AMQP wire-protocol frame, with frame type, channel number, and payload bytes.
* TODO: make state private
*/
public class Frame {
/** Frame type code */
public final int type;
private final int type;

/** Frame channel number, 0-65535 */
public final int channel;
private final int channel;

/** Frame payload bytes (for inbound frames) */
private final byte[] payload;
Expand Down Expand Up @@ -345,4 +344,12 @@ private static int shortStrSize(String str)
{
return str.getBytes("utf-8").length + 1;
}

public int getType() {
return type;
}

public int getChannel() {
return channel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public FrameBuilder(ReadableByteChannel channel, ByteBuffer buffer) {

/**
* Read a frame from the network.
* This method returns null f a frame could not have been fully built from
* This method returns null if a frame could not have been fully built from
* the network. The client must then retry later (typically
* when the channel notifies it has something to read).
*
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/com/rabbitmq/client/test/BrokenFramesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class BrokenFramesTest {
} catch (IOException e) {
UnexpectedFrameError unexpectedFrameError = findUnexpectedFrameError(e);
assertNotNull(unexpectedFrameError);
assertEquals(AMQP.FRAME_HEADER, unexpectedFrameError.getReceivedFrame().type);
assertEquals(AMQP.FRAME_HEADER, unexpectedFrameError.getReceivedFrame().getType());
assertEquals(AMQP.FRAME_METHOD, unexpectedFrameError.getExpectedFrameType());
return;
}
Expand All @@ -88,7 +88,7 @@ public class BrokenFramesTest {
} catch (IOException e) {
UnexpectedFrameError unexpectedFrameError = findUnexpectedFrameError(e);
assertNotNull(unexpectedFrameError);
assertEquals(AMQP.FRAME_BODY, unexpectedFrameError.getReceivedFrame().type);
assertEquals(AMQP.FRAME_BODY, unexpectedFrameError.getReceivedFrame().getType());
assertEquals(AMQP.FRAME_HEADER, unexpectedFrameError.getExpectedFrameType());
return;
}
Expand Down
12 changes: 6 additions & 6 deletions src/test/java/com/rabbitmq/client/test/FrameBuilderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ public void buildFrameInOneGo() throws IOException {
builder = new FrameBuilder(channel, buffer);
Frame frame = builder.readFrame();
assertThat(frame, notNullValue());
assertThat(frame.type, is(1));
assertThat(frame.channel, is(0));
assertThat(frame.getType(), is(1));
assertThat(frame.getChannel(), is(0));
assertThat(frame.getPayload().length, is(3));
}

Expand All @@ -74,8 +74,8 @@ public void buildFramesInOneGo() throws IOException {
Frame frame;
while ((frame = builder.readFrame()) != null) {
assertThat(frame, notNullValue());
assertThat(frame.type, is(1));
assertThat(frame.channel, is(0));
assertThat(frame.getType(), is(1));
assertThat(frame.getChannel(), is(0));
assertThat(frame.getPayload().length, is(3));
frameCount++;
}
Expand All @@ -95,8 +95,8 @@ public void buildFrameInSeveralCalls() throws IOException {

frame = builder.readFrame();
assertThat(frame, notNullValue());
assertThat(frame.type, is(1));
assertThat(frame.channel, is(0));
assertThat(frame.getType(), is(1));
assertThat(frame.getChannel(), is(0));
assertThat(frame.getPayload().length, is(3));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public UnexpectedFrames() {
@Test public void missingHeader() throws IOException {
expectUnexpectedFrameError(new Confuser() {
public Frame confuse(Frame frame) {
if (frame.type == AMQP.FRAME_HEADER) {
if (frame.getType() == AMQP.FRAME_HEADER) {
return null;
}
return frame;
Expand All @@ -112,11 +112,11 @@ public Frame confuse(Frame frame) {
@Test public void missingMethod() throws IOException {
expectUnexpectedFrameError(new Confuser() {
public Frame confuse(Frame frame) {
if (frame.type == AMQP.FRAME_METHOD) {
if (frame.getType() == AMQP.FRAME_METHOD) {
// We can't just skip the method as that will lead us to
// send 0 bytes and hang waiting for a response.
return new Frame(AMQP.FRAME_HEADER,
frame.channel, frame.getPayload());
frame.getChannel(), frame.getPayload());
}
return frame;
}
Expand All @@ -126,7 +126,7 @@ public Frame confuse(Frame frame) {
@Test public void missingBody() throws IOException {
expectUnexpectedFrameError(new Confuser() {
public Frame confuse(Frame frame) {
if (frame.type == AMQP.FRAME_BODY) {
if (frame.getType() == AMQP.FRAME_BODY) {
return null;
}
return frame;
Expand All @@ -137,10 +137,10 @@ public Frame confuse(Frame frame) {
@Test public void wrongClassInHeader() throws IOException {
expectUnexpectedFrameError(new Confuser() {
public Frame confuse(Frame frame) {
if (frame.type == AMQP.FRAME_HEADER) {
if (frame.getType() == AMQP.FRAME_HEADER) {
byte[] payload = frame.getPayload();
Frame confusedFrame = new Frame(AMQP.FRAME_HEADER,
frame.channel, payload);
frame.getChannel(), payload);
// First two bytes = class ID, must match class ID from
// method.
payload[0] = 12;
Expand All @@ -155,8 +155,8 @@ public Frame confuse(Frame frame) {
@Test public void heartbeatOnChannel() throws IOException {
expectUnexpectedFrameError(new Confuser() {
public Frame confuse(Frame frame) {
if (frame.type == AMQP.FRAME_METHOD) {
return new Frame(AMQP.FRAME_HEARTBEAT, frame.channel);
if (frame.getType() == AMQP.FRAME_METHOD) {
return new Frame(AMQP.FRAME_HEARTBEAT, frame.getChannel());
}
return frame;
}
Expand All @@ -166,8 +166,8 @@ public Frame confuse(Frame frame) {
@Test public void unknownFrameType() throws IOException {
expectError(AMQP.FRAME_ERROR, new Confuser() {
public Frame confuse(Frame frame) {
if (frame.type == AMQP.FRAME_METHOD) {
return new Frame(0, frame.channel,
if (frame.getType() == AMQP.FRAME_METHOD) {
return new Frame(0, frame.getChannel(),
"1234567890\0001234567890".getBytes());
}
return frame;
Expand Down