@@ -174,52 +174,20 @@ Inbound *Tcp::send(const Query &query)
174174 return result2 >= (ssize_t )query.size () ? this : nullptr ;
175175}
176176
177- /* *
178- * Size of the response -- this method only works if we have already received the frist two bytes
179- * @return uint16_t
180- */
181- uint16_t Tcp::responsesize () const
182- {
183- // result variable
184- uint16_t result;
185-
186- // get the first two bytes from the buffer
187- memcpy (&result, _buffer.data (), 2 );
188-
189- // put the bytes in the right order
190- return ntohs (result);
191- }
192-
193177/* *
194178 * Number of bytes that we expect in the next read operation
195179 * @return size_t
196180 */
197181size_t Tcp::expected () const
198182{
199183 // if we have not yet received the first two bytes, we expect those first
200- switch (_filled ) {
201- case 0 : return 2 ;
202- case 1 : return 1 ;
203- default : return responsesize () - (_filled - 2 );
184+ switch (_transferred ) {
185+ case 0 : return sizeof ( uint16_t ) ;
186+ case 1 : return sizeof ( uint16_t ) - 1 ;
187+ default : return _size - (_transferred - sizeof ( uint16_t ) );
204188 }
205189}
206190
207- /* *
208- * Reallocate the buffer if it turns out that our buffer is too small for the expected response
209- * @return bool
210- */
211- bool Tcp::reallocate ()
212- {
213- // preferred buffer size
214- size_t preferred = responsesize () + 2 ;
215-
216- // reallocate the buffer (but do not shrink)
217- _buffer.resize (std::max (_buffer.size (), preferred));
218-
219- // report result
220- return true ;
221- }
222-
223191/* *
224192 * Upgrate the socket from a _connecting_ socket to a _connected_ socket
225193 */
@@ -230,10 +198,7 @@ void Tcp::upgrade()
230198
231199 // if the connection failed
232200 if (!_connected) return fail ();
233-
234- // already allocate enough data for the first two bytes (holding the size of a response)
235- _buffer.resize (2 );
236-
201+
237202 // we no longer monitor for writability, but for readability instead
238203 _loop->update (_identifier, _fd, 1 , this );
239204
@@ -268,6 +233,26 @@ void Tcp::fail()
268233 _handler->onBuffered (this );
269234}
270235
236+ /* *
237+ * Check return value of a recv syscall
238+ * @param bytes The bytes transferred
239+ * @return true if we should leap out (an error occurred), false if not
240+ */
241+ bool Tcp::updatetransferred (ssize_t result)
242+ {
243+ // the operation would block, but don't leap out
244+ if (result < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) return false ;
245+
246+ // if there is a failure we leap out as well
247+ if (result <= 0 ) return fail (), true ;
248+
249+ // update the number of transferred bytes
250+ _transferred += result;
251+
252+ // don't leap out
253+ return false ;
254+ }
255+
271256/* *
272257 * Method that is called when the socket becomes active (readable in our case)
273258 */
@@ -279,30 +264,45 @@ void Tcp::notify()
279264
280265 // if the socket is not yet connected, it might be connected right now
281266 if (!_connected) return upgrade ();
282-
283- // receive data from the socket
284- auto result = ::recv (_fd, _buffer.data () + _filled, expected (), MSG_DONTWAIT);
285-
286- // do nothing if the operation is blocking
287- if (result < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) return ;
288-
289- // if there is a failure we leap out
290- if (result <= 0 ) return fail ();
291-
292- // update the number of bytes received
293- _filled += result;
294-
295- // after we've received the first two bytes, we can reallocate the buffer so that it is of sufficient size
296- if (_filled == 2 && !reallocate ()) return fail ();
297-
267+
268+ // We can be in two receive states: the first state is that we're waiting for the
269+ // size of the buffer. The second state is that we are waiting for the response content itself.
270+ // To determine in what state we're in, we can check how many bytes have been transferred.
271+ // If that's less than 2 then we're still waiting for the response size.
272+ if (_transferred < sizeof (uint16_t ))
273+ {
274+ const auto result = ::recv (_fd, (uint8_t *)&_size + _transferred, sizeof (uint16_t ) - _transferred, MSG_DONTWAIT);
275+
276+ // if there is a failure we leap out
277+ if (updatetransferred (result)) return ;
278+
279+ // if we still haven't received the two bytes we should leap out here
280+ else if (_transferred < sizeof (uint16_t )) return ;
281+
282+ // OK: the size of the rest of the frame was received, we know how much to allocate
283+ // update the size
284+ _size = ntohs (_size);
285+
286+ // size the buffer accordingly
287+ _buffer.resize (_size);
288+ }
289+
290+ // calculate offset into the buffer
291+ size_t offset = _transferred - sizeof (uint16_t );
292+
293+ // This is the second state of the Tcp state machine. At this point we know we have
294+ // received at least two bytes of the frame, and so we know we have resized the
295+ // buffer accordingly. All that's left to do is to await the full response content
296+ if (updatetransferred (::recv (_fd, _buffer.data () + offset, _buffer.size () - offset, MSG_DONTWAIT))) return ;
297+
298298 // continue waiting if we have not yet received everything there is
299299 if (expected () > 0 ) return ;
300-
301- // all data has been received, buffer the response for now
302- add (_ip, _buffer. data () + 2 , _filled - 2 );
300+
301+ // all data has been received, we can move the response content into a deferred list to be processed later
302+ add (_ip, move (_buffer) );
303303
304304 // for the next response we empty the buffer again
305- _filled = 0 ;
305+ _transferred = 0 ;
306306}
307307
308308/* *
0 commit comments