@@ -187,18 +187,45 @@ class AMQT {
187187 }
188188
189189 while (isConnected ()) {
190- amqp_maybe_release_buffers (m_connection);
191190 amqp_envelope_t envelope;
191+ amqp_maybe_release_buffers (m_connection);
192+ // struct timeval timeout = {1, 0};
192193 auto ret = amqp_consume_message (m_connection, &envelope, NULL , 0 );
193194 if (AMQP_RESPONSE_NORMAL != ret.reply_type ) {
194- log::error (" Error consuming message: {}" , AMQErrorToString (ret));
195- break ;
195+ if (AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type && AMQP_STATUS_UNEXPECTED_STATE == ret.library_error ) {
196+ amqp_frame_t frame;
197+ if (AMQP_STATUS_OK != amqp_simple_wait_frame (m_connection, &frame)) {
198+ log::error (" Failed to wait for frame after unexpected state: {}" , AMQErrorToString (ret));
199+ break ;
200+ }
201+ if (AMQP_FRAME_METHOD == frame.frame_type ) {
202+ if (frame.payload .method .id == AMQP_BASIC_ACK_METHOD) continue ;
203+ if (frame.payload .method .id == AMQP_BASIC_RETURN_METHOD) {
204+ {
205+ amqp_message_t message;
206+ ret = amqp_read_message (m_connection, frame.channel , &message, 0 );
207+ if (AMQP_RESPONSE_NORMAL != ret.reply_type ) {
208+ log::error (" Error reading returned message: {}" , AMQErrorToString (ret));
209+ break ;
210+ }
211+ amqp_destroy_message (&message);
212+ }
213+ continue ;
214+ } else {
215+ log::error (" (Frame ID: {}) Error consuming message: {}" , frame.payload .method .id , AMQErrorToString (ret));
216+ break ;
217+ }
218+ }
219+ } else {
220+ log::error (" Error consuming message: {}" , AMQErrorToString (ret));
221+ break ;
222+ }
223+ } else {
224+ log::info (" call rate event" );
225+ std::string data ((char *)envelope.message .body .bytes , envelope.message .body .len );
226+ msgQueue.push (data);
227+ amqp_destroy_envelope (&envelope);
196228 }
197- log::info (" call rate event" );
198- std::string data ((char *)envelope.message .body .bytes , envelope.message .body .len );
199- msgQueue.push (data);
200-
201- amqp_destroy_envelope (&envelope);
202229 }
203230 m_connected = false ;
204231 m_cv.notify_all ();
0 commit comments