Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions src/core/core.pri
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ HEADERS += \
$$PWD/cowstring.h

HEADERS += \
$$PWD/qzmqcontext.h \
$$PWD/qzmqsocket.h \
$$PWD/qzmqvalve.h \
$$PWD/qzmqreqmessage.h \
$$PWD/qzmqreprouter.h
$$PWD/zmqcontext.h \
$$PWD/zmqsocket.h \
$$PWD/zmqvalve.h \
$$PWD/zmqreqmessage.h \
$$PWD/zmqreprouter.h

SOURCES += \
$$PWD/qzmqcontext.cpp \
$$PWD/qzmqsocket.cpp \
$$PWD/qzmqvalve.cpp \
$$PWD/qzmqreprouter.cpp
$$PWD/zmqcontext.cpp \
$$PWD/zmqsocket.cpp \
$$PWD/zmqvalve.cpp \
$$PWD/zmqreprouter.cpp

HEADERS += $$PWD/processquit.h
SOURCES += $$PWD/processquit.cpp
Expand Down
6 changes: 3 additions & 3 deletions src/core/statsmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
#include <QDateTime>
#include <QJsonDocument>
#include <QJsonObject>
#include "qzmqsocket.h"
#include "zmqsocket.h"
#include "timerwheel.h"
#include "log.h"
#include "defercall.h"
Expand Down Expand Up @@ -401,7 +401,7 @@ class StatsManager::Private
int subscriptionTtl;
int subscriptionLinger;
int reportInterval;
std::unique_ptr<QZmq::Socket> sock;
std::unique_ptr<ZmqSocket> sock;
std::unique_ptr<SimpleHttpServer> prometheusServer;
int prometheusConnectionsMax;
QString prometheusPrefix;
Expand Down Expand Up @@ -499,7 +499,7 @@ class StatsManager::Private
{
sock.reset();

sock = std::make_unique<QZmq::Socket>(QZmq::Socket::Pub);
sock = std::make_unique<ZmqSocket>(ZmqSocket::Pub);

sock->setHwm(OUT_HWM);
sock->setWriteQueueEnabled(false);
Expand Down
48 changes: 24 additions & 24 deletions src/core/zhttpmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
#include <QStringList>
#include <QHash>
#include "cowbytearray.h"
#include "qzmqsocket.h"
#include "qzmqvalve.h"
#include "zmqsocket.h"
#include "zmqvalve.h"
#include "tnetstring.h"
#include "zhttprequestpacket.h"
#include "zhttpresponsepacket.h"
Expand Down Expand Up @@ -81,17 +81,17 @@ class ZhttpManager::Private
QStringList server_in_specs;
QStringList server_in_stream_specs;
QStringList server_out_specs;
std::unique_ptr<QZmq::Socket> client_out_sock;
std::unique_ptr<QZmq::Socket> client_out_stream_sock;
std::unique_ptr<QZmq::Socket> client_in_sock;
std::unique_ptr<QZmq::Socket> client_req_sock;
std::unique_ptr<QZmq::Socket> server_in_sock;
std::unique_ptr<QZmq::Socket> server_in_stream_sock;
std::unique_ptr<QZmq::Socket> server_out_sock;
std::unique_ptr<QZmq::Valve> client_in_valve;
std::unique_ptr<QZmq::Valve> client_out_stream_valve;
std::unique_ptr<QZmq::Valve> server_in_valve;
std::unique_ptr<QZmq::Valve> server_in_stream_valve;
std::unique_ptr<ZmqSocket> client_out_sock;
std::unique_ptr<ZmqSocket> client_out_stream_sock;
std::unique_ptr<ZmqSocket> client_in_sock;
std::unique_ptr<ZmqSocket> client_req_sock;
std::unique_ptr<ZmqSocket> server_in_sock;
std::unique_ptr<ZmqSocket> server_in_stream_sock;
std::unique_ptr<ZmqSocket> server_out_sock;
std::unique_ptr<ZmqValve> client_in_valve;
std::unique_ptr<ZmqValve> client_out_stream_valve;
std::unique_ptr<ZmqValve> server_in_valve;
std::unique_ptr<ZmqValve> server_in_stream_valve;
QByteArray instanceId;
int ipcFileMode;
bool doBind;
Expand Down Expand Up @@ -157,7 +157,7 @@ class ZhttpManager::Private
client_req_sock.reset();
client_out_sock.reset();

client_out_sock = std::make_unique<QZmq::Socket>(QZmq::Socket::Push);
client_out_sock = std::make_unique<ZmqSocket>(ZmqSocket::Push);
cosConnection = client_out_sock->messagesWritten.connect(boost::bind(&Private::client_out_messagesWritten, this, boost::placeholders::_1));

client_out_sock->setHwm(OUT_HWM);
Expand All @@ -181,7 +181,7 @@ class ZhttpManager::Private
client_out_stream_valve.reset();
client_out_stream_sock.reset();

client_out_stream_sock = std::make_unique<QZmq::Socket>(QZmq::Socket::Router);
client_out_stream_sock = std::make_unique<ZmqSocket>(ZmqSocket::Router);
cossConnection = client_out_stream_sock->messagesWritten.connect(boost::bind(&Private::client_out_stream_messagesWritten, this, boost::placeholders::_1));

client_out_stream_sock->setIdentity(instanceId);
Expand All @@ -200,7 +200,7 @@ class ZhttpManager::Private
return false;
}

client_out_stream_valve = std::make_unique<QZmq::Valve>(client_out_stream_sock.get());
client_out_stream_valve = std::make_unique<ZmqValve>(client_out_stream_sock.get());
clientOutStreamConnection = client_out_stream_valve->readyRead.connect(boost::bind(&Private::client_out_stream_readyRead, this, boost::placeholders::_1));

client_out_stream_valve->open();
Expand All @@ -215,7 +215,7 @@ class ZhttpManager::Private
client_in_valve.reset();
client_in_sock.reset();

client_in_sock = std::make_unique<QZmq::Socket>(QZmq::Socket::Sub);
client_in_sock = std::make_unique<ZmqSocket>(ZmqSocket::Sub);

client_in_sock->setHwm(DEFAULT_HWM);
client_in_sock->setShutdownWaitTime(0);
Expand All @@ -228,7 +228,7 @@ class ZhttpManager::Private
return false;
}

client_in_valve = std::make_unique<QZmq::Valve>(client_in_sock.get());
client_in_valve = std::make_unique<ZmqValve>(client_in_sock.get());
clientConnection = client_in_valve->readyRead.connect(boost::bind(&Private::client_in_readyRead, this, boost::placeholders::_1));

client_in_valve->open();
Expand All @@ -244,7 +244,7 @@ class ZhttpManager::Private
client_out_stream_sock.reset();
client_in_sock.reset();

client_req_sock = std::make_unique<QZmq::Socket>(QZmq::Socket::Dealer);
client_req_sock = std::make_unique<ZmqSocket>(ZmqSocket::Dealer);
rrConnection = client_req_sock->readyRead.connect(boost::bind(&Private::client_req_readyRead, this));

client_req_sock->setHwm(OUT_HWM);
Expand All @@ -265,7 +265,7 @@ class ZhttpManager::Private
server_in_valve.reset();
server_in_sock.reset();

server_in_sock = std::make_unique<QZmq::Socket>(QZmq::Socket::Pull);
server_in_sock = std::make_unique<ZmqSocket>(ZmqSocket::Pull);

server_in_sock->setHwm(IN_HWM);

Expand All @@ -276,7 +276,7 @@ class ZhttpManager::Private
return false;
}

server_in_valve = std::make_unique<QZmq::Valve>(server_in_sock.get());
server_in_valve = std::make_unique<ZmqValve>(server_in_sock.get());
serverConnection = server_in_valve->readyRead.connect(boost::bind(&Private::server_in_readyRead, this, boost::placeholders::_1));

server_in_valve->open();
Expand All @@ -289,7 +289,7 @@ class ZhttpManager::Private
serverStreamConnection.disconnect();
server_in_stream_sock.reset();

server_in_stream_sock = std::make_unique<QZmq::Socket>(QZmq::Socket::Router);
server_in_stream_sock = std::make_unique<ZmqSocket>(ZmqSocket::Router);

server_in_stream_sock->setIdentity(instanceId);
server_in_stream_sock->setHwm(DEFAULT_HWM);
Expand All @@ -301,7 +301,7 @@ class ZhttpManager::Private
return false;
}

server_in_stream_valve = std::make_unique<QZmq::Valve>(server_in_stream_sock.get());
server_in_stream_valve = std::make_unique<ZmqValve>(server_in_stream_sock.get());
serverStreamConnection = server_in_stream_valve->readyRead.connect(boost::bind(&Private::server_in_stream_readyRead, this, boost::placeholders::_1));

server_in_stream_valve->open();
Expand All @@ -314,7 +314,7 @@ class ZhttpManager::Private
sosConnection.disconnect();
server_out_sock.reset();

server_out_sock = std::make_unique<QZmq::Socket>(QZmq::Socket::Pub);
server_out_sock = std::make_unique<ZmqSocket>(ZmqSocket::Pub);
sosConnection = server_out_sock->messagesWritten.connect(boost::bind(&Private::server_out_messagesWritten, this, boost::placeholders::_1));

server_out_sock->setWriteQueueEnabled(false);
Expand Down
10 changes: 3 additions & 7 deletions src/core/qzmqcontext.cpp → src/core/zmqcontext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,18 @@
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

#include "qzmqcontext.h"
#include "zmqcontext.h"

#include <assert.h>
#include "rust/bindings.h"

namespace QZmq {

Context::Context(int ioThreads)
ZmqContext::ZmqContext(int ioThreads)
{
context_ = ffi::wzmq_init(ioThreads);
assert(context_);
}

Context::~Context()
ZmqContext::~ZmqContext()
{
ffi::wzmq_term(context_);
}

}
14 changes: 5 additions & 9 deletions src/core/qzmqcontext.h → src/core/zmqcontext.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

#ifndef QZMQCONTEXT_H
#define QZMQCONTEXT_H
#ifndef ZMQCONTEXT_H
#define ZMQCONTEXT_H

namespace QZmq {

class Context
class ZmqContext
{
public:
Context(int ioThreads = 1);
~Context();
ZmqContext(int ioThreads = 1);
~ZmqContext();

// the zmq context
void *context() { return context_; }
Expand All @@ -39,6 +37,4 @@ class Context
void *context_;
};

}

#endif
39 changes: 17 additions & 22 deletions src/core/qzmqreprouter.cpp → src/core/zmqreprouter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,24 @@
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

#include "qzmqreprouter.h"
#include "zmqreprouter.h"

#include "cowstring.h"
#include "qzmqsocket.h"
#include "qzmqreqmessage.h"
#include "zmqsocket.h"
#include "zmqreqmessage.h"

namespace QZmq {

class RepRouter::Private
class ZmqRepRouter::Private
{
public:
RepRouter *q;
std::unique_ptr<Socket> sock;
ZmqRepRouter *q;
std::unique_ptr<ZmqSocket> sock;
Connection mWConnection;
Connection rrConnection;

Private(RepRouter *_q) :
Private(ZmqRepRouter *_q) :
q(_q)
{
sock = std::make_unique<Socket>(Socket::Router);
sock = std::make_unique<ZmqSocket>(ZmqSocket::Router);
rrConnection = sock->readyRead.connect(boost::bind(&Private::sock_readyRead, this));
mWConnection = sock->messagesWritten.connect(boost::bind(&Private::sock_messagesWritten, this, boost::placeholders::_1));
}
Expand All @@ -56,42 +54,39 @@ class RepRouter::Private
}
};

RepRouter::RepRouter()
ZmqRepRouter::ZmqRepRouter()
{
d = std::make_unique<Private>(this);
}

RepRouter::~RepRouter() = default;
ZmqRepRouter::~ZmqRepRouter() = default;

void RepRouter::setShutdownWaitTime(int msecs)
void ZmqRepRouter::setShutdownWaitTime(int msecs)
{
d->sock->setShutdownWaitTime(msecs);
}

void RepRouter::connectToAddress(const CowString &addr)
void ZmqRepRouter::connectToAddress(const CowString &addr)
{
d->sock->connectToAddress(addr);
}

bool RepRouter::bind(const CowString &addr)
bool ZmqRepRouter::bind(const CowString &addr)
{
return d->sock->bind(addr);
}

bool RepRouter::canRead() const
bool ZmqRepRouter::canRead() const
{
return d->sock->canRead();
}

ReqMessage RepRouter::read()
ZmqReqMessage ZmqRepRouter::read()
{
return ReqMessage(d->sock->read());
return ZmqReqMessage(d->sock->read());
}

void RepRouter::write(const ReqMessage &message)
void ZmqRepRouter::write(const ZmqReqMessage &message)
{
d->sock->write(message.toRawMessage());
}

}

24 changes: 10 additions & 14 deletions src/core/qzmqreprouter.h → src/core/zmqreprouter.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

#ifndef QZMQREPROUTER_H
#define QZMQREPROUTER_H
#ifndef ZMQREPROUTER_H
#define ZMQREPROUTER_H

#include <boost/signals2.hpp>

Expand All @@ -32,15 +32,13 @@ using Signal = boost::signals2::signal<void()>;
using SignalInt = boost::signals2::signal<void(int)>;
using Connection = boost::signals2::scoped_connection;

namespace QZmq {
class ZmqReqMessage;

class ReqMessage;

class RepRouter
class ZmqRepRouter
{
public:
RepRouter();
~RepRouter();
ZmqRepRouter();
~ZmqRepRouter();

void setShutdownWaitTime(int msecs);

Expand All @@ -49,21 +47,19 @@ class RepRouter

bool canRead() const;

ReqMessage read();
void write(const ReqMessage &message);
ZmqReqMessage read();
void write(const ZmqReqMessage &message);

Signal readyRead;
SignalInt messagesWritten;

private:
RepRouter(const RepRouter &) = delete;
RepRouter &operator=(const RepRouter &) = delete;
ZmqRepRouter(const ZmqRepRouter &) = delete;
ZmqRepRouter &operator=(const ZmqRepRouter &) = delete;

class Private;
friend class Private;
std::unique_ptr<Private> d;
};

}

#endif
Loading
Loading