Skip to content

Commit 5977a9f

Browse files
Improved data publisher shutdown operations
1 parent c13b50d commit 5977a9f

File tree

3 files changed

+26
-7
lines changed

3 files changed

+26
-7
lines changed

src/lib/transport/DataPublisher.cpp

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ DataPublisher::DataPublisher() :
5353
m_cipherKeyRotationPeriod(60000),
5454
m_started(false),
5555
m_shuttingDown(false),
56+
m_stopped(false),
5657
m_userData(nullptr),
5758
m_clientAcceptor(m_commandChannelService)
5859
{
@@ -100,7 +101,7 @@ void DataPublisher::StartAccept()
100101

101102
void DataPublisher::AcceptConnection(const SubscriberConnectionPtr& connection, const ErrorCode& error)
102103
{
103-
if (m_shuttingDown)
104+
if (m_shuttingDown || m_stopped)
104105
return;
105106

106107
if (!error)
@@ -863,7 +864,7 @@ void DataPublisher::Start(const TcpEndPoint& endpoint)
863864
{
864865
m_callbackQueue.WaitForData();
865866

866-
if (m_shuttingDown)
867+
if (m_shuttingDown || m_stopped)
867868
break;
868869

869870
const CallbackDispatcher dispatcher = m_callbackQueue.Dequeue();
@@ -893,7 +894,7 @@ void DataPublisher::Start(const string& networkInterfaceIP, const uint16_t port)
893894

894895
void DataPublisher::Stop()
895896
{
896-
if (!m_started || m_shuttingDown)
897+
if (!m_started || m_shuttingDown || m_stopped)
897898
return;
898899

899900
// Stop method executes shutdown on a separate thread without stopping to prevent
@@ -903,11 +904,21 @@ void DataPublisher::Stop()
903904

904905
void DataPublisher::ShutDown(const bool joinThread)
905906
{
906-
// Notify running threads that the publisher is shutting down
907+
// Check if shutdown thread is running or publisher has already stopped
908+
if (m_shuttingDown || m_stopped)
909+
{
910+
if (joinThread && !m_stopped)
911+
m_shutdownThread.join();
912+
913+
return;
914+
}
915+
916+
// Notify running threads that the publisher is shutting down, i.e., shutdown thread is active
907917
m_shuttingDown = true;
908918
m_started = false;
909919

910-
Thread shutdownThread([this]{
920+
m_shutdownThread = Thread([this]
921+
{
911922
try
912923
{
913924
// Let any pending start operation complete before continuing stop - prevents destruction stop before start is completed
@@ -962,11 +973,12 @@ void DataPublisher::ShutDown(const bool joinThread)
962973
}
963974

964975
// Shutdown complete
976+
m_stopped = true;
965977
m_shuttingDown = false;
966978
});
967979

968980
if (joinThread)
969-
shutdownThread.join();
981+
m_shutdownThread.join();
970982
}
971983

972984
bool DataPublisher::IsStarted() const

src/lib/transport/DataPublisher.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ namespace transport
7878
uint32_t m_cipherKeyRotationPeriod;
7979
std::atomic_bool m_started;
8080
std::atomic_bool m_shuttingDown;
81+
std::atomic_bool m_stopped;
8182
sttp::Mutex m_connectActionMutex;
83+
Thread m_shutdownThread;
8284
void* m_userData;
8385

8486
// Dispatch reference - unordered map needed to manage reference
@@ -262,6 +264,7 @@ namespace transport
262264
void DisconnectSubscriber(const sttp::Guid& instanceID);
263265

264266
friend class SubscriberConnection;
267+
friend class PublisherInstance;
265268
};
266269

267270
typedef SharedPtr<DataPublisher> DataPublisherPtr;

src/lib/transport/PublisherInstance.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,11 @@ PublisherInstance::PublisherInstance() :
5050
m_publisher->RegisterUserCommandCallback(&HandleReceivedUserCommand);
5151
}
5252

53-
PublisherInstance::~PublisherInstance() = default;
53+
PublisherInstance::~PublisherInstance()
54+
{
55+
if (m_publisher != nullptr)
56+
m_publisher->ShutDown(true);
57+
}
5458

5559
void PublisherInstance::HandleStatusMessage(DataPublisher* source, const string& message)
5660
{

0 commit comments

Comments
 (0)