|
46 | 46 |
|
47 | 47 | namespace RabbitMQ.Client.Impl
|
48 | 48 | {
|
49 |
| - enum AssemblyState |
50 |
| - { |
51 |
| - ExpectingMethod, |
52 |
| - ExpectingContentHeader, |
53 |
| - ExpectingContentBody, |
54 |
| - Complete |
55 |
| - } |
56 |
| - |
57 |
| - class CommandAssembler |
| 49 | + internal sealed class CommandAssembler |
58 | 50 | {
|
59 | 51 | private const int MaxArrayOfBytesSize = 2_147_483_591;
|
60 | 52 |
|
61 |
| - public MethodBase m_method; |
62 |
| - public ContentHeaderBase m_header; |
63 |
| - public Memory<byte> m_body; |
64 |
| - public ProtocolBase m_protocol; |
65 |
| - public int m_remainingBodyBytes; |
| 53 | + private readonly ProtocolBase _protocol; |
| 54 | + |
| 55 | + private MethodBase _method; |
| 56 | + private ContentHeaderBase _header; |
| 57 | + private byte[] _bodyBytes; |
| 58 | + private Memory<byte> _body; |
| 59 | + private int _remainingBodyBytes; |
66 | 60 | private int _offset;
|
67 |
| - public AssemblyState m_state; |
| 61 | + private AssemblyState _state; |
68 | 62 |
|
69 | 63 | public CommandAssembler(ProtocolBase protocol)
|
70 | 64 | {
|
71 |
| - m_protocol = protocol; |
| 65 | + _protocol = protocol; |
72 | 66 | Reset();
|
73 | 67 | }
|
74 | 68 |
|
75 |
| - public Command HandleFrame(in InboundFrame f) |
| 69 | + private void Reset() |
| 70 | + { |
| 71 | + _method = null; |
| 72 | + _header = null; |
| 73 | + _bodyBytes = null; |
| 74 | + _body = Memory<byte>.Empty; |
| 75 | + _remainingBodyBytes = 0; |
| 76 | + _offset = 0; |
| 77 | + _state = AssemblyState.ExpectingMethod; |
| 78 | + } |
| 79 | + |
| 80 | + public IncomingCommand HandleFrame(in InboundFrame frame) |
76 | 81 | {
|
77 |
| - switch (m_state) |
| 82 | + switch (_state) |
78 | 83 | {
|
79 | 84 | case AssemblyState.ExpectingMethod:
|
80 |
| - if (!f.IsMethod()) |
81 |
| - { |
82 |
| - throw new UnexpectedFrameException(f.Type); |
83 |
| - } |
84 |
| - m_method = m_protocol.DecodeMethodFrom(f.Payload.Span); |
85 |
| - m_state = m_method.HasContent ? AssemblyState.ExpectingContentHeader : AssemblyState.Complete; |
86 |
| - return CompletedCommand(); |
| 85 | + ParseMethodFrame(in frame); |
| 86 | + break; |
87 | 87 | case AssemblyState.ExpectingContentHeader:
|
88 |
| - if (!f.IsHeader()) |
89 |
| - { |
90 |
| - throw new UnexpectedFrameException(f.Type); |
91 |
| - } |
92 |
| - |
93 |
| - ReadOnlySpan<byte> span = f.Payload.Span; |
94 |
| - m_header = m_protocol.DecodeContentHeaderFrom(NetworkOrderDeserializer.ReadUInt16(span)); |
95 |
| - m_header.ReadFrom(span.Slice(12)); |
96 |
| - ulong totalBodyBytes = NetworkOrderDeserializer.ReadUInt64(span.Slice(4)); |
97 |
| - if (totalBodyBytes > MaxArrayOfBytesSize) |
98 |
| - { |
99 |
| - throw new UnexpectedFrameException(f.Type); |
100 |
| - } |
101 |
| - |
102 |
| - m_remainingBodyBytes = (int)totalBodyBytes; |
103 |
| - |
104 |
| - // Is returned by Command.Dispose in Session.HandleFrame |
105 |
| - byte[] bodyBytes = ArrayPool<byte>.Shared.Rent(m_remainingBodyBytes); |
106 |
| - m_body = new Memory<byte>(bodyBytes, 0, m_remainingBodyBytes); |
107 |
| - UpdateContentBodyState(); |
108 |
| - return CompletedCommand(); |
| 88 | + ParseHeaderFrame(in frame); |
| 89 | + break; |
109 | 90 | case AssemblyState.ExpectingContentBody:
|
110 |
| - if (!f.IsBody()) |
111 |
| - { |
112 |
| - throw new UnexpectedFrameException(f.Type); |
113 |
| - } |
114 |
| - |
115 |
| - if (f.Payload.Length > m_remainingBodyBytes) |
116 |
| - { |
117 |
| - throw new MalformedFrameException($"Overlong content body received - {m_remainingBodyBytes} bytes remaining, {f.Payload.Length} bytes received"); |
118 |
| - } |
119 |
| - |
120 |
| - f.Payload.CopyTo(m_body.Slice(_offset)); |
121 |
| - m_remainingBodyBytes -= f.Payload.Length; |
122 |
| - _offset += f.Payload.Length; |
123 |
| - UpdateContentBodyState(); |
124 |
| - return CompletedCommand(); |
125 |
| - case AssemblyState.Complete: |
126 |
| - default: |
127 |
| - return null; |
| 91 | + ParseBodyFrame(in frame); |
| 92 | + break; |
| 93 | + } |
| 94 | + |
| 95 | + if (_state != AssemblyState.Complete) |
| 96 | + { |
| 97 | + return IncomingCommand.Empty; |
| 98 | + } |
| 99 | + |
| 100 | + var result = new IncomingCommand(_method, _header, _body, _bodyBytes); |
| 101 | + Reset(); |
| 102 | + return result; |
| 103 | + } |
| 104 | + |
| 105 | + private void ParseMethodFrame(in InboundFrame frame) |
| 106 | + { |
| 107 | + if (frame.Type != FrameType.FrameMethod) |
| 108 | + { |
| 109 | + throw new UnexpectedFrameException(frame.Type); |
128 | 110 | }
|
| 111 | + |
| 112 | + _method = _protocol.DecodeMethodFrom(frame.Payload.Span); |
| 113 | + _state = _method.HasContent ? AssemblyState.ExpectingContentHeader : AssemblyState.Complete; |
129 | 114 | }
|
130 | 115 |
|
131 |
| - private Command CompletedCommand() |
| 116 | + private void ParseHeaderFrame(in InboundFrame frame) |
132 | 117 | {
|
133 |
| - if (m_state == AssemblyState.Complete) |
| 118 | + if (frame.Type != FrameType.FrameHeader) |
134 | 119 | {
|
135 |
| - Command result = new Command(m_method, m_header, m_body, true); |
136 |
| - Reset(); |
137 |
| - return result; |
| 120 | + throw new UnexpectedFrameException(frame.Type); |
138 | 121 | }
|
139 |
| - else |
| 122 | + |
| 123 | + ReadOnlySpan<byte> span = frame.Payload.Span; |
| 124 | + _header = _protocol.DecodeContentHeaderFrom(NetworkOrderDeserializer.ReadUInt16(span)); |
| 125 | + _header.ReadFrom(span.Slice(12)); |
| 126 | + ulong totalBodyBytes = NetworkOrderDeserializer.ReadUInt64(span.Slice(4)); |
| 127 | + if (totalBodyBytes > MaxArrayOfBytesSize) |
140 | 128 | {
|
141 |
| - return null; |
| 129 | + throw new UnexpectedFrameException(frame.Type); |
142 | 130 | }
|
| 131 | + |
| 132 | + _remainingBodyBytes = (int) totalBodyBytes; |
| 133 | + |
| 134 | + // Is returned by IncomingCommand.Dispose in Session.HandleFrame |
| 135 | + _bodyBytes = ArrayPool<byte>.Shared.Rent(_remainingBodyBytes); |
| 136 | + _body = new Memory<byte>(_bodyBytes, 0, _remainingBodyBytes); |
| 137 | + UpdateContentBodyState(); |
143 | 138 | }
|
144 | 139 |
|
145 |
| - private void Reset() |
| 140 | + private void ParseBodyFrame(in InboundFrame frame) |
146 | 141 | {
|
147 |
| - m_state = AssemblyState.ExpectingMethod; |
148 |
| - m_method = null; |
149 |
| - m_header = null; |
150 |
| - m_body = null; |
151 |
| - _offset = 0; |
152 |
| - m_remainingBodyBytes = 0; |
| 142 | + if (frame.Type != FrameType.FrameBody) |
| 143 | + { |
| 144 | + throw new UnexpectedFrameException(frame.Type); |
| 145 | + } |
| 146 | + |
| 147 | + int payloadLength = frame.Payload.Length; |
| 148 | + if (payloadLength > _remainingBodyBytes) |
| 149 | + { |
| 150 | + throw new MalformedFrameException($"Overlong content body received - {_remainingBodyBytes} bytes remaining, {payloadLength} bytes received"); |
| 151 | + } |
| 152 | + |
| 153 | + frame.Payload.CopyTo(_body.Slice(_offset)); |
| 154 | + _remainingBodyBytes -= payloadLength; |
| 155 | + _offset += payloadLength; |
| 156 | + UpdateContentBodyState(); |
153 | 157 | }
|
154 | 158 |
|
155 | 159 | private void UpdateContentBodyState()
|
156 | 160 | {
|
157 |
| - m_state = (m_remainingBodyBytes > 0) |
158 |
| - ? AssemblyState.ExpectingContentBody |
159 |
| - : AssemblyState.Complete; |
| 161 | + _state = _remainingBodyBytes > 0 ? AssemblyState.ExpectingContentBody : AssemblyState.Complete; |
| 162 | + } |
| 163 | + |
| 164 | + private enum AssemblyState |
| 165 | + { |
| 166 | + ExpectingMethod, |
| 167 | + ExpectingContentHeader, |
| 168 | + ExpectingContentBody, |
| 169 | + Complete |
160 | 170 | }
|
161 | 171 | }
|
162 | 172 | }
|
0 commit comments