@@ -184,9 +184,11 @@ class HttpSession::Private : public QObject
184184 Priority needUpdatePriority;
185185 UpdateAction *pendingAction;
186186 QList<QueuedItem> publishQueue;
187+ bool processingSendQueue;
187188 QByteArray retryToAddress;
188189 RetryRequestPacket retryPacket;
189190 LogUtil::Config logConfig;
191+ std::unique_ptr<Filter::MessageFilterStack> messageFilters;
190192 FilterStack *responseFilters;
191193 QSet<QString> activeChannels;
192194 int connectionSubscriptionMax;
@@ -202,6 +204,7 @@ class HttpSession::Private : public QObject
202204 Connection errorOutConnection;
203205 Connection timerConnection;
204206 Connection retryTimerConnection;
207+ Connection messageFiltersFinishedConnection;
205208
206209 Private (HttpSession *_q, ZhttpRequest *_req, const HttpSession::AcceptData &_adata, const Instruct &_instruct, ZhttpManager *_outZhttp, StatsManager *_stats, RateLimiter *_updateLimiter, PublishLastIds *_publishLastIds, HttpSessionUpdateManager *_updateManager, int _connectionSubscriptionMax) :
207210 QObject (_q),
@@ -218,6 +221,7 @@ class HttpSession::Private : public QObject
218221 retries (0 ),
219222 needUpdate (false ),
220223 pendingAction (0 ),
224+ processingSendQueue (false ),
221225 responseFilters (0 ),
222226 connectionSubscriptionMax (_connectionSubscriptionMax),
223227 connectionStatsActive (true )
@@ -744,14 +748,17 @@ class HttpSession::Private : public QObject
744748
745749 void trySendQueue ()
746750 {
747- while (!publishQueue.isEmpty () && req->writeBytesAvailable () > 0 )
751+ processingSendQueue = true ;
752+
753+ while (!publishQueue.isEmpty () && req->writeBytesAvailable () > 0 && !messageFilters)
748754 {
749- const QueuedItem &qi = publishQueue.takeFirst ();
755+ const QueuedItem &qi = publishQueue.first ();
750756 const PublishItem &item = qi.item ;
751757
752758 if (!channels.contains (item.channel ))
753759 {
754760 log_debug (" httpsession: received publish for channel with no subscription, dropping" );
761+ publishQueue.removeFirst ();
755762 continue ;
756763 }
757764
@@ -786,12 +793,22 @@ class HttpSession::Private : public QObject
786793 }
787794 if (contentFilters != f.contentFilters )
788795 {
796+ publishQueue.removeFirst ();
789797 errorMessage = QString (" content filter mismatch: subscription=%1 message=%2" ).arg (contentFilters.join (" ," ), f.contentFilters .join (" ," ));
790798 doError ();
791799 break ;
792800 }
793801 }
794802
803+ QByteArray body;
804+ if (f.type == PublishFormat::HttpResponse && f.haveBodyPatch )
805+ body = applyBodyPatch (instruct.response .body , f.bodyPatch );
806+ else
807+ body = f.body ;
808+
809+ messageFilters = std::make_unique<Filter::MessageFilterStack>(channels[item.channel ].filters );
810+ messageFiltersFinishedConnection = messageFilters->finished .connect (boost::bind (&Private::messageFiltersFinished, this , boost::placeholders::_1));
811+
795812 QHash<QString, QString> prevIds;
796813 QHashIterator<QString, Instruct::Channel> it (channels);
797814 while (it.hasNext ())
@@ -810,30 +827,13 @@ class HttpSession::Private : public QObject
810827 fc.route = adata.route ;
811828 fc.trusted = adata.trusted ;
812829
813- FilterStack fs (fc, channels[item.channel ].filters );
814-
815- QByteArray body;
816-
817- if (f.action == PublishFormat::Send && fs.sendAction () == Filter::Send)
818- {
819- if (f.type == PublishFormat::HttpResponse && f.haveBodyPatch )
820- body = applyBodyPatch (instruct.response .body , f.bodyPatch );
821- else
822- body = f.body ;
823-
824- body = fs.process (body);
825- if (body.isNull ())
826- {
827- errorMessage = QString (" filter error: %1" ).arg (fs.errorMessage ());
828- doError ();
829- return ;
830- }
831- }
832-
833- processItem (item, fs.sendAction (), body, qi.exposeHeaders );
830+ // may call messageFiltersFinished immediately. if it does, queue
831+ // processing will continue. else, the loop will end and queue
832+ // processing will resume after the filters finish
833+ messageFilters->start (fc, body);
834834 }
835835
836- if (instruct.holdMode == Instruct::StreamHold)
836+ if (!messageFilters && instruct.holdMode == Instruct::StreamHold)
837837 {
838838 // the queue is empty or client buffer is full
839839
@@ -852,6 +852,8 @@ class HttpSession::Private : public QObject
852852 }
853853 }
854854 }
855+
856+ processingSendQueue = false ;
855857 }
856858
857859 void sendQueueDone ()
@@ -1374,6 +1376,27 @@ class HttpSession::Private : public QObject
13741376 req->writeBody (body);
13751377 }
13761378
1379+ void messageFiltersFinished (const Filter::MessageFilter::Result &result)
1380+ {
1381+ QueuedItem qi = publishQueue.takeFirst ();
1382+
1383+ messageFiltersFinishedConnection.disconnect ();
1384+ messageFilters.reset ();
1385+
1386+ if (!result.errorMessage .isNull ())
1387+ {
1388+ errorMessage = QString (" filter error: %1" ).arg (result.errorMessage );
1389+ doError ();
1390+ return ;
1391+ }
1392+
1393+ processItem (qi.item , result.sendAction , result.content , qi.exposeHeaders );
1394+
1395+ // if filters finished asynchronously then we need to resume processing
1396+ if (!processingSendQueue)
1397+ trySendQueue ();
1398+ }
1399+
13771400 void processItem (const PublishItem &item, Filter::SendAction sendAction, const QByteArray &content, const QList<QByteArray> &exposeHeaders)
13781401 {
13791402 const PublishFormat &f = item.format ;
0 commit comments