Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,34 +80,12 @@ class ChannelImpl : public Asyncable::IConnectable
};

struct ThreadData {
const std::thread::id threadId;
std::thread::id threadId;
std::vector<QueueData*> queues;

ThreadData(const std::thread::id& thId)
: threadId(thId) {}

inline void deleteAll(std::vector<Receiver*>& recs, Asyncable::IConnectable* conn) const
{
for (Receiver* r : recs) {
if (r->receiver) {
r->receiver->async_disconnect(conn);
}
delete r;
}
recs.clear();
}

inline void clearAll(Asyncable::IConnectable* conn)
{
deleteAll(receivers, conn);
deleteAll(pendingToAdd, conn);

for (QueueData* qdata : queues) {
delete qdata;
}
queues.clear();
}

inline bool addReceiver(const Asyncable* receiver, const Callback& f, Asyncable::Mode mode, Asyncable::IConnectable* conn)
{
bool needIncrement = false;
Expand Down Expand Up @@ -188,6 +166,11 @@ class ChannelImpl : public Asyncable::IConnectable
return needDecrement;
}

inline size_t receiverCount() const
{
return receivers.size() + pendingToAdd.size();
}

inline void receiversCall(const T&... args)
{
addPending();
Expand Down Expand Up @@ -224,6 +207,57 @@ class ChannelImpl : public Asyncable::IConnectable
addPending();
}

inline void clearReceivers(Asyncable::IConnectable* conn)
{
auto deleteAll = [](std::vector<Receiver*>& recs, Asyncable::IConnectable* conn) {
for (Receiver* r : recs) {
if (r->receiver) {
r->receiver->async_disconnect(conn);
}
delete r;
}
recs.clear();
};

deleteAll(receivers, conn);
deleteAll(pendingToAdd, conn);
}

QueueData* addQueue(size_t queue_capacity, const std::thread::id& receiveTh,
const std::function<void(const CallMsg& m)>& handler)
{
QueueData* qdata = new QueueData(queue_capacity);
qdata->receiveTh = receiveTh;
qdata->queue.port2()->onMessage(handler);

QueuePool* pool = QueuePool::instance();
pool->regPort(threadId, qdata->queue.port1()); // send
pool->regPort(receiveTh, qdata->queue.port2()); // receive

queues.push_back(qdata);

return qdata;
}

void clearAllQueue()
{
// the queue is no longer functioning or may even be destroyed
if (conf::terminated) {
return;
}

QueuePool* pool = QueuePool::instance();
for (QueueData* qdata : queues) {
qdata->queue.port2()->onMessage(nullptr);
pool->unregPort(threadId, qdata->queue.port1()); // send
pool->unregPort(qdata->receiveTh, qdata->queue.port2()); // receive

delete qdata;
}

queues.clear();
}

std::string dump() const
{
std::stringstream s;
Expand Down Expand Up @@ -322,7 +356,7 @@ class ChannelImpl : public Asyncable::IConnectable
ObjectPool<SharedReceiverCall> m_rcalls;
std::atomic<int> m_enabledReceiversCount = 0;
ChannelOpt m_opt;
std::mutex m_dumpMutex;
std::mutex m_mutex;

std::string thDataDump() const
{
Expand All @@ -347,8 +381,35 @@ class ChannelImpl : public Asyncable::IConnectable
return *thdata;
}

//! NOTE If we didn't find an empty slot, let's try looking for an unused one.
{
std::scoped_lock lock(m_mutex);
thdata = m_thdatas.tryGet(
[](ThreadData* td) {
if (td->receiverCount() > 0) {
return false;
}

for (const QueueData* qd : td->queues) {
if (qd->queue.port1()->countToSend() > 0) {
return false;
}
}

return true;
},
[thId] () { return new ThreadData(thId); }
);

if (thdata) {
thdata->threadId = thId;
thdata->clearAllQueue();
return *thdata;
}
}

{
std::scoped_lock lock(m_dumpMutex);
std::scoped_lock lock(m_mutex);
std::cout << "channel: " << (m_opt.chname.empty() ? "no name" : m_opt.chname) << std::endl;
std::cout << "thread data pool exhausted!!" << std::endl;
std::cout << "required thread: " << thId << std::endl;
Expand Down Expand Up @@ -401,47 +462,17 @@ class ChannelImpl : public Asyncable::IConnectable

// we'll create a new one if we didn't find one.
if (!qdata) {
qdata = new QueueData(m_opt.queueCapacity);
qdata->receiveTh = receiveTh;
qdata->queue.port2()->onMessage([this](const CallMsg& m) {
qdata = sendThdata.addQueue(m_opt.queueCapacity, receiveTh, [this](const CallMsg& m) {
const std::thread::id threadId = std::this_thread::get_id();
ThreadData& thdata = threadData(threadId);
thdata.receiversCall(m);
m.func->unlock();
});

QueuePool::instance()->regPort(sendThdata.threadId, qdata->queue.port1()); // send
QueuePool::instance()->regPort(receiveTh, qdata->queue.port2()); // receive

sendThdata.queues.push_back(qdata);
}

qdata->queue.port1()->send(msg);
}

void unregAllQueue()
{
// the queue is no longer functioning or may even be destroyed
if (conf::terminated) {
return;
}

QueuePool* pool = QueuePool::instance();
for (size_t i = 0; i < m_thdatas.count(); ++i) {
ThreadData* thdata = m_thdatas.at(i);
assert(thdata);
if (!thdata) {
break;
}

for (QueueData* qdata : thdata->queues) {
qdata->queue.port2()->onMessage(nullptr);
pool->unregPort(thdata->threadId, qdata->queue.port1()); // send
pool->unregPort(qdata->receiveTh, qdata->queue.port2()); // receive
}
}
}

void sendAuto(const T&... args)
{
const std::thread::id threadId = std::this_thread::get_id();
Expand Down Expand Up @@ -507,16 +538,15 @@ class ChannelImpl : public Asyncable::IConnectable

~ChannelImpl()
{
unregAllQueue();

for (size_t i = 0; i < m_thdatas.count(); ++i) {
ThreadData* thdata = m_thdatas.at(i);
assert(thdata);
if (!thdata) {
break;
}

thdata->clearAll(this);
thdata->clearAllQueue();
thdata->clearReceivers(this);
}

m_thdatas.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ class RpcPort
}
}

size_t countToSend() const
{
return m_queue.availableRead() + m_pending.size();
}

bool hasPending() const
{
return !m_pending.empty();
Expand Down
Loading