Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/remote/pv/beaconHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
#include <pv/remote.h>
#include <pv/pvAccess.h>

namespace
{
class InternalClientContextImpl;
class BeaconCleanupHandler;
}

namespace epics {
namespace pvAccess {

Expand Down Expand Up @@ -85,6 +91,10 @@ class BeaconHandler
* First beacon flag.
*/
bool _first;
/**
* Callback for cleaning up the beacon
*/
std::shared_ptr<BeaconCleanupHandler> _callback;

/**
* Update beacon.
Expand All @@ -100,6 +110,8 @@ class BeaconHandler
ServerGUID const &guid,
epics::pvData::int16 sequentalID,
epics::pvData::int16 changeCount);

friend class ::InternalClientContextImpl;
};

}
Expand Down
99 changes: 94 additions & 5 deletions src/remoteClient/clientContextImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ using std::tr1::static_pointer_cast;
using namespace std;
using namespace epics::pvData;

static const float maxBeaconLifetime = 180.f * 2.f;
static const int maxTrackedBeacons = 2048;

namespace epics {
namespace pvAccess {

Expand Down Expand Up @@ -3038,7 +3041,43 @@ enum ContextState {
};


/**
* Handles cleanup of old beacons.
*/
class BeaconCleanupHandler
{
public:
POINTER_DEFINITIONS(BeaconCleanupHandler);

class Callback : public TimerCallback
{
public:
Callback(BeaconCleanupHandler& handler) : m_handler(handler)
{
}

virtual void callback() OVERRIDE FINAL;
virtual void timerStopped() OVERRIDE FINAL;

BeaconCleanupHandler& m_handler;
};

BeaconCleanupHandler(InternalClientContextImpl& impl, osiSockAddr addr);
~BeaconCleanupHandler();

/**
* Extend the lifetime of the beacon, resetting removal countdown to 0
*/
void touch() { epicsAtomicSetIntT(&m_count, 0); }

private:
void remove();

std::shared_ptr<BeaconCleanupHandler::Callback> m_callback;
osiSockAddr m_from;
InternalClientContextImpl& m_impl;
int m_count;
};

class InternalClientContextImpl :
public ClientContextImpl,
Expand Down Expand Up @@ -4058,6 +4097,12 @@ class InternalClientContextImpl :

m_timer->close();

// Remove all beacons
{
Lock guard(m_beaconMapMutex);
m_beaconHandlers.clear();
}

m_channelSearchManager->cancel();

// this will also close all PVA transports
Expand All @@ -4079,11 +4124,6 @@ class InternalClientContextImpl :
while ((transportCount = m_transportRegistry.size()) && tries--)
epicsThreadSleep(0.025);

{
Lock guard(m_beaconMapMutex);
m_beaconHandlers.clear();
}

if (transportCount)
LOG(logLevelDebug, "PVA client context destroyed with %u transport(s) active.", (unsigned)transportCount);
}
Expand Down Expand Up @@ -4351,12 +4391,25 @@ class InternalClientContextImpl :
BeaconHandler::shared_pointer handler;
if (it == m_beaconHandlers.end())
{
/* If we're tracking too many beacons, we'll just ignore this one */
if (m_beaconHandlers.size() >= maxTrackedBeacons)
{
char ipa[64];
sockAddrToDottedIP(&responseFrom->sa, ipa, sizeof(ipa));
LOG(logLevelDebug, "Tracked beacon limit reached (%d), ignoring %s\n", maxTrackedBeacons, ipa);
Comment on lines +4395 to +4399
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To minimize log spam it would be friendlier to only log when size()==max. So once each time the limit is reached, but not again until falling below the limit. eg. consider if some PVA server gets stuck in a reset loop.

return nullptr;
}

// stores weak_ptr
handler.reset(new BeaconHandler(internal_from_this(), responseFrom));
handler->_callback.reset(new BeaconCleanupHandler(*this, *responseFrom));
m_beaconHandlers[*responseFrom] = handler;
}
else
{
handler = it->second;
handler->_callback->touch(); /* Update the callback's latest use time */
}
return handler;
}

Expand Down Expand Up @@ -4556,8 +4609,44 @@ class InternalClientContextImpl :
Configuration::shared_pointer m_configuration;

TransportRegistry::transportVector_t m_flushTransports;

friend class BeaconCleanupHandler;
};


BeaconCleanupHandler::BeaconCleanupHandler(InternalClientContextImpl& impl, osiSockAddr addr) :
m_from(addr),
m_impl(impl),
m_count(0)
{
m_callback.reset(new Callback(*this));
m_impl.m_timer->schedulePeriodic(m_callback, maxBeaconLifetime / 4, maxBeaconLifetime / 4);
}

BeaconCleanupHandler::~BeaconCleanupHandler()
{
m_impl.m_timer->cancel(m_callback);
}

void BeaconCleanupHandler::Callback::callback()
{
if (epicsAtomicIncrIntT(&m_handler.m_count) >= 5) {
m_handler.remove();
}
}

void BeaconCleanupHandler::Callback::timerStopped()
{
m_handler.remove();
}

void BeaconCleanupHandler::remove()
{
Lock guard(m_impl.m_beaconMapMutex);
m_impl.m_timer->cancel(m_callback);
m_impl.m_beaconHandlers.erase(m_from);
}

size_t InternalClientContextImpl::num_instances;
size_t InternalClientContextImpl::InternalChannelImpl::num_instances;
size_t InternalClientContextImpl::InternalChannelImpl::num_active;
Expand Down
Loading