Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/core/qzmq/src/qzmqvalve.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright (C) 2012-2020 Justin Karneges
* Copyright (C) 2025 Fastly, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the
Expand All @@ -25,6 +26,7 @@

#include <QPointer>
#include "qzmqsocket.h"
#include "defercall.h"

namespace QZmq {

Expand All @@ -39,6 +41,7 @@ class Valve::Private : public QObject
bool pendingRead;
int maxReadsPerEvent;
boost::signals2::scoped_connection rrConnection;
DeferCall deferCall;

Private(Valve *_q) :
QObject(_q),
Expand All @@ -62,7 +65,7 @@ class Valve::Private : public QObject
return;

pendingRead = true;
QMetaObject::invokeMethod(this, "queuedRead", Qt::QueuedConnection);
deferCall.defer([=] { queuedRead(); });
}

void tryRead()
Expand Down Expand Up @@ -99,7 +102,6 @@ class Valve::Private : public QObject
tryRead();
}

private slots:
void queuedRead()
{
pendingRead = false;
Expand Down
7 changes: 4 additions & 3 deletions src/core/zrpcrequest.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright (C) 2014-2015 Fanout, Inc.
* Copyright (C) 2024 Fastly, Inc.
* Copyright (C) 2024-2025 Fastly, Inc.
*
* This file is part of Pushpin.
*
Expand Down Expand Up @@ -31,6 +31,7 @@
#include "uuidutil.h"
#include "log.h"
#include "rtimer.h"
#include "defercall.h"

using Connection = boost::signals2::scoped_connection;

Expand All @@ -52,6 +53,7 @@ class ZrpcRequest::Private : public QObject
QByteArray conditionString;
std::unique_ptr<RTimer> timer;
Connection timerConnection;
DeferCall deferCall;

Private(ZrpcRequest *_q) :
QObject(_q),
Expand Down Expand Up @@ -136,7 +138,6 @@ class ZrpcRequest::Private : public QObject
q->finished();
}

private slots:
void doStart()
{
if(!manager->canWriteImmediately())
Expand Down Expand Up @@ -238,7 +239,7 @@ void ZrpcRequest::start(const QString &method, const QVariantHash &args)
{
d->method = method;
d->args = args;
QMetaObject::invokeMethod(d, "doStart", Qt::QueuedConnection);
d->deferCall.defer([=] { d->doStart(); });
}

void ZrpcRequest::respond(const QVariant &result)
Expand Down
9 changes: 4 additions & 5 deletions src/core/zwebsocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class ZWebSocket::Private : public QObject
bool multi;
Connection expireTimerConnection;
Connection keepAliveTimerConnection;
DeferCall deferCall;

Private(ZWebSocket *_q) :
QObject(_q),
Expand Down Expand Up @@ -266,7 +267,7 @@ class ZWebSocket::Private : public QObject
if(!pendingUpdate)
{
pendingUpdate = true;
QMetaObject::invokeMethod(this, "doUpdate", Qt::QueuedConnection);
deferCall.defer([=] { doUpdate(); });
}
}

Expand Down Expand Up @@ -298,7 +299,7 @@ class ZWebSocket::Private : public QObject

state = Idle;
cleanup();
QMetaObject::invokeMethod(this, "doClosed", Qt::QueuedConnection);
deferCall.defer([=] { doClosed(); });
}

Frame readFrame()
Expand Down Expand Up @@ -338,7 +339,7 @@ class ZWebSocket::Private : public QObject
// if peer was already closed, then we're done!
state = Idle;
cleanup();
QMetaObject::invokeMethod(this, "doClosed", Qt::QueuedConnection);
deferCall.defer([=] { doClosed(); });
}
else
{
Expand Down Expand Up @@ -977,7 +978,6 @@ class ZWebSocket::Private : public QObject
return ErrorGeneric;
}

public slots:
void doClosed()
{
q->closed();
Expand Down Expand Up @@ -1067,7 +1067,6 @@ public slots:
}
}

public:
void expire_timeout()
{
state = Idle;
Expand Down
3 changes: 2 additions & 1 deletion src/handler/deferred.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 Fanout, Inc.
* Copyright (C) 2025 Fastly, Inc.
*
* This file is part of Pushpin.
*
Expand Down Expand Up @@ -42,7 +43,7 @@ void Deferred::setFinished(bool ok, const QVariant &value)
result_.success = ok;
result_.value = value;

QMetaObject::invokeMethod(this, "doFinish", Qt::QueuedConnection);
deferCall_.defer([=] { doFinish(); });
}

void Deferred::doFinish()
Expand Down
8 changes: 5 additions & 3 deletions src/handler/deferred.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 Fanout, Inc.
* Copyright (C) 2025 Fastly, Inc.
*
* This file is part of Pushpin.
*
Expand All @@ -26,6 +27,7 @@
#include <QVariant>
#include <QObject>
#include <boost/signals2.hpp>
#include "defercall.h"

class DeferredResult
{
Expand Down Expand Up @@ -63,11 +65,11 @@ class Deferred : public QObject

void setFinished(bool ok, const QVariant &value = QVariant());

private slots:
void doFinish();

private:
DeferredResult result_;
DeferCall deferCall_;

void doFinish();
};

#endif
6 changes: 3 additions & 3 deletions src/handler/handlermain.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright (C) 2016 Fanout, Inc.
* Copyright (C) 2024 Fastly, Inc.
* Copyright (C) 2024-2025 Fastly, Inc.
*
* This file is part of Pushpin.
*
Expand All @@ -22,7 +22,6 @@
*/

#include <QCoreApplication>
#include <QTimer>
#include "rtimer.h"
#include "defercall.h"
#include "handlerapp.h"
Expand Down Expand Up @@ -53,7 +52,8 @@ int handler_main(int argc, char **argv)
QCoreApplication qapp(argc, argv);

HandlerAppMain appMain;
QTimer::singleShot(0, [&appMain]() {appMain.start();});
DeferCall deferCall;
deferCall.defer([&] { appMain.start(); });
int ret = qapp.exec();

// ensure deferred deletes are processed
Expand Down
5 changes: 2 additions & 3 deletions src/handler/httpsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ class HttpSession::Private : public QObject
Connection timerConnection;
Connection retryTimerConnection;
Connection messageFiltersFinishedConnection;
DeferCall deferCall;

Private(HttpSession *_q, ZhttpRequest *_req, const HttpSession::AcceptData &_adata, const Instruct &_instruct, ZhttpManager *_outZhttp, StatsManager *_stats, RateLimiter *_updateLimiter, PublishLastIds *_publishLastIds, HttpSessionUpdateManager *_updateManager, int _connectionSubscriptionMax) :
QObject(_q),
Expand Down Expand Up @@ -1164,7 +1165,7 @@ class HttpSession::Private : public QObject
if(!outZhttp)
{
errorMessage = "Instruct contained link, but handler not configured for outbound requests.";
QMetaObject::invokeMethod(this, "doError", Qt::QueuedConnection);
deferCall.defer([=] { doError(); });
return;
}

Expand Down Expand Up @@ -1457,7 +1458,6 @@ class HttpSession::Private : public QObject
}
}

private slots:
void doError()
{
if(instruct.holdMode == Instruct::ResponseHold)
Expand Down Expand Up @@ -1621,7 +1621,6 @@ private slots:
}
}

private:
void timer_timeout()
{
if(instruct.holdMode == Instruct::ResponseHold)
Expand Down
15 changes: 8 additions & 7 deletions src/proxy/domainmap.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright (C) 2012-2022 Fanout, Inc.
* Copyright (C) 2023-2024 Fastly, Inc.
* Copyright (C) 2023-2025 Fastly, Inc.
*
* This file is part of Pushpin.
*
Expand Down Expand Up @@ -205,6 +205,7 @@ class DomainMap::Worker : public QObject
RTimer t;
Connection tConnection;
QFileSystemWatcher watcher;
DeferCall deferCall;

Worker() :
watcher(this)
Expand Down Expand Up @@ -292,7 +293,7 @@ class DomainMap::Worker : public QObject

log_info("routes loaded with %d entries", allRules.count());

QMetaObject::invokeMethod(this, "doChanged", Qt::QueuedConnection);
deferCall.defer([=] { doChanged(); });
}

// mutex must be locked when calling this method
Expand All @@ -312,11 +313,6 @@ class DomainMap::Worker : public QObject
Signal changed;

public slots:
void doChanged()
{
changed();
}

void start()
{
if(!fileName.isEmpty())
Expand Down Expand Up @@ -719,6 +715,11 @@ public slots:

return AddRuleOk;
}

void doChanged()
{
changed();
}
};

class DomainMap::Thread : public QThread
Expand Down
22 changes: 14 additions & 8 deletions src/proxy/main.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright (C) 2016 Fanout, Inc.
* Copyright (C) 2025 Fastly, Inc.
*
* This file is part of Pushpin.
*
Expand All @@ -22,15 +23,13 @@

#include <QCoreApplication>
#include "app.h"
#include "defercall.h"

class AppMain : public QObject
class AppMain
{
Q_OBJECT

public:
App *app;

public slots:
void start()
{
app = new App;
Expand All @@ -53,10 +52,17 @@ int proxy_main(int argc, char **argv)
QCoreApplication qapp(argc, argv);

AppMain appMain;
QMetaObject::invokeMethod(&appMain, "start", Qt::QueuedConnection);
return qapp.exec();
}
DeferCall deferCall;
deferCall.defer([&] { appMain.start(); });
int ret = qapp.exec();

// ensure deferred deletes are processed
QCoreApplication::instance()->sendPostedEvents();

// deinit here, after all event loop activity has completed
DeferCall::cleanup();

return ret;
}

#include "main.moc"
}
11 changes: 6 additions & 5 deletions src/proxy/requestsession.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright (C) 2012-2023 Fanout, Inc.
* Copyright (C) 2024 Fastly, Inc.
* Copyright (C) 2024-2025 Fastly, Inc.
*
* This file is part of Pushpin.
*
Expand Down Expand Up @@ -36,6 +36,7 @@
#include "qtcompat.h"
#include "bufferlist.h"
#include "log.h"
#include "defercall.h"
#include "layertracker.h"
#include "sockjsmanager.h"
#include "inspectdata.h"
Expand Down Expand Up @@ -200,6 +201,7 @@ class RequestSession::Private : public QObject
ZhttpReqConnections zhttpReqConnections;
Connection inspectFinishedConnection;
Connection acceptFinishedConnection;
DeferCall deferCall;

Private(RequestSession *_q, int _workerId, DomainMap *_domainMap = 0, SockJsManager *_sockJsManager = 0, ZrpcManager *_inspectManager = 0, ZrpcChecker *_inspectChecker = 0, ZrpcManager *_acceptManager = 0, StatsManager *_stats = 0) :
QObject(_q),
Expand Down Expand Up @@ -308,7 +310,7 @@ class RequestSession::Private : public QObject
isSockJs = true;
sockJsManager->giveRequest(zhttpRequest, route.sockJsPath.length(), route.sockJsAsPath, route);
zhttpRequest = 0;
QMetaObject::invokeMethod(this, "doFinished", Qt::QueuedConnection);
deferCall.defer([=] { doFinished(); });
return;
}
}
Expand Down Expand Up @@ -480,7 +482,7 @@ class RequestSession::Private : public QObject
if(!inspectRequest)
{
log_debug("inspect not available");
QMetaObject::invokeMethod(this, "doInspectError", Qt::QueuedConnection);
deferCall.defer([=] { doInspectError(); });
}
}
}
Expand Down Expand Up @@ -559,7 +561,7 @@ class RequestSession::Private : public QObject
if(!pendingResponseUpdate)
{
pendingResponseUpdate = true;
QMetaObject::invokeMethod(this, "doResponseUpdate", Qt::QueuedConnection);
deferCall.defer([=] { doResponseUpdate(); });
}
}

Expand Down Expand Up @@ -973,7 +975,6 @@ class RequestSession::Private : public QObject
}
}

public slots:
void doResponseUpdate()
{
pendingResponseUpdate = false;
Expand Down
Loading