Skip to content

Commit ff0dd81

Browse files
Improved reverse subscription operation
1 parent c53c920 commit ff0dd81

File tree

4 files changed

+110
-67
lines changed

4 files changed

+110
-67
lines changed

src/lib/transport/DataSubscriber.cpp

Lines changed: 62 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,30 @@ using namespace sttp::transport::tssc;
4141
constexpr float32_t MissingCacheWarningInterval = 20.0;
4242
const DataSubscriberPtr DataSubscriber::NullPtr = nullptr;
4343

44+
class ReverseConnection;
45+
typedef SharedPtr<ReverseConnection> ReverseConnectionPtr;
46+
47+
class ReverseConnection : public boost::enable_shared_from_this<ReverseConnection>
48+
{
49+
private:
50+
TcpSocket m_socket;
51+
52+
public:
53+
explicit ReverseConnection(IOContext& service) :
54+
m_socket(service)
55+
{
56+
}
57+
58+
TcpSocket& GetSocket()
59+
{
60+
return m_socket;
61+
}
62+
63+
static ReverseConnectionPtr New(IOContext& service) {
64+
return NewSharedPtr<ReverseConnection, IOContext&>(service);
65+
}
66+
};
67+
4468
DataSubscriber::DataSubscriber() :
4569
m_subscriberID(Empty::Guid),
4670
m_compressPayloadData(true),
@@ -127,7 +151,8 @@ void DataSubscriber::RunCommandChannelResponseThread()
127151
ReadPayloadHeader(error, bytesTransferred);
128152
});
129153

130-
m_commandChannelService.run();
154+
if (!m_listening)
155+
m_commandChannelService.run();
131156
}
132157

133158
// Callback for async read of the payload header.
@@ -1096,7 +1121,7 @@ void DataSubscriber::Connect(const string& hostname, const uint16_t port, const
10961121
EstablishConnection(hostEndpoint, false);
10971122
}
10981123

1099-
void DataSubscriber::Listen(const sttp::TcpEndPoint& endpoint)
1124+
void DataSubscriber::Listen(const sttp::TcpEndPoint& endPoint)
11001125
{
11011126
// This function fails by exception, consumers should try/catch calls to Start
11021127
if (IsListening())
@@ -1115,14 +1140,14 @@ void DataSubscriber::Listen(const sttp::TcpEndPoint& endpoint)
11151140
#endif
11161141

11171142
// TODO: Add TLS implementation options
1118-
m_clientAcceptor = TcpAcceptor(m_commandChannelService, endpoint, false); //-V601
1143+
m_clientAcceptor = TcpAcceptor(m_commandChannelService, endPoint, false); //-V601
11191144

11201145
// Run command channel accept thread
11211146
m_commandChannelAcceptThread = Thread([this]
1122-
{
1123-
StartAccept();
1124-
m_commandChannelService.run();
1125-
});
1147+
{
1148+
StartAccept();
1149+
m_commandChannelService.run();
1150+
});
11261151

11271152
m_listening = true;
11281153
}
@@ -1139,49 +1164,53 @@ void DataSubscriber::Listen(const std::string& networkInterfaceIP, const uint16_
11391164

11401165
void DataSubscriber::StartAccept()
11411166
{
1142-
TcpSocket commandChannelSocket(m_commandChannelService);
1167+
ReverseConnectionPtr reverseConnection = ReverseConnection::New(m_commandChannelService);
11431168

1144-
m_clientAcceptor.async_accept(commandChannelSocket, [this, &commandChannelSocket]<typename T0>(T0 && error)
1169+
m_clientAcceptor.async_accept(reverseConnection->GetSocket(), [this, reverseConnection]<typename T0>(T0 && error)
11451170
{
1146-
AcceptConnection(commandChannelSocket, error);
1171+
AcceptConnection(&reverseConnection->GetSocket(), error);
11471172
});
11481173
}
11491174

1150-
void DataSubscriber::AcceptConnection(TcpSocket& commandChannelSocket, const ErrorCode& error)
1175+
void DataSubscriber::AcceptConnection(TcpSocket* commandChannelSocket, const ErrorCode& error)
11511176
{
11521177
if (!IsListening())
11531178
return;
11541179

1180+
// TODO: For secured connections, validate certificate and IP information here to assign subscriberID
1181+
const TcpEndPoint endPoint = commandChannelSocket->remote_endpoint();
1182+
string address = "<unknown>";
1183+
string errorMessage = "closed.";
1184+
1185+
auto closeSocket = [&]
1186+
{
1187+
try
1188+
{
1189+
address = ResolveDNSName(m_commandChannelService, endPoint);
1190+
commandChannelSocket->close();
1191+
}
1192+
catch (SystemError& ex)
1193+
{
1194+
errorMessage = "close error: " + string(ex.what());
1195+
}
1196+
catch (...)
1197+
{
1198+
errorMessage = "close error: " + boost::current_exception_diagnostic_information(true);
1199+
}
1200+
};
1201+
11551202
if (error)
11561203
{
1157-
DispatchErrorMessage("Error while attempting to accept DataPublisher connection for reverse connection: " + string(SystemError(error).what()));
1204+
closeSocket();
1205+
DispatchErrorMessage("Error while attempting to accept DataPublisher connection for reverse connection: " + string(SystemError(error).what()) + " - connection " + errorMessage);
11581206
}
11591207
else
11601208
{
1161-
// TODO: For secured connections, validate certificate and IP information here to assign subscriberID
1162-
const TcpEndPoint endPoint = commandChannelSocket.remote_endpoint();
1163-
11641209
// Will only accept one active connection at a time, this may be indicative
11651210
// of a rouge connection attempt - consumer should log warning
11661211
if (IsConnected())
11671212
{
1168-
string address = "<unknown>";
1169-
string errorMessage = "closed.";
1170-
1171-
try
1172-
{
1173-
address = ResolveDNSName(m_commandChannelService, endPoint);
1174-
commandChannelSocket.close();
1175-
}
1176-
catch (SystemError& ex)
1177-
{
1178-
errorMessage = "close error: " + string(ex.what());
1179-
}
1180-
catch (...)
1181-
{
1182-
errorMessage = "close error: " + boost::current_exception_diagnostic_information(true);
1183-
}
1184-
1213+
closeSocket();
11851214
DispatchErrorMessage("WARNING: Duplicate connection attempt detected from: \"" + address + "\". Existing data publisher connection already established, data subscriber will only accept one connection at a time - connection " + errorMessage);
11861215
}
11871216
else
@@ -1194,7 +1223,7 @@ void DataSubscriber::AcceptConnection(TcpSocket& commandChannelSocket, const Err
11941223
SetupConnection();
11951224

11961225
// Hold on to primary socket
1197-
m_commandChannelSocket = std::move(commandChannelSocket);
1226+
m_commandChannelSocket = std::move(*commandChannelSocket);
11981227

11991228
// Create new command channel
12001229
EstablishConnection(endPoint, true);

src/lib/transport/DataSubscriber.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ namespace sttp::transport
153153
void ReadPacket(const ErrorCode& error, size_t bytesTransferred);
154154
void WriteHandler(const ErrorCode& error, size_t bytesTransferred);
155155
void StartAccept();
156-
void AcceptConnection(TcpSocket& commandChannelSocket, const ErrorCode& error);
156+
void AcceptConnection(TcpSocket* commandChannelSocket, const ErrorCode& error);
157157

158158
// Server response handlers
159159
void ProcessServerResponse(uint8_t* buffer, uint32_t offset, uint32_t length);
@@ -274,7 +274,7 @@ namespace sttp::transport
274274
void Connect(const std::string& hostname, uint16_t port);
275275

276276
// Establish a reverse listening connection for subscriber using specified connection info
277-
void Listen(const sttp::TcpEndPoint& endpoint);
277+
void Listen(const sttp::TcpEndPoint& endPoint);
278278
void Listen(uint16_t port, bool ipV6 = false); // Bind to default NIC
279279
void Listen(const std::string& networkInterfaceIP, uint16_t port); // Bind to specified NIC IP, format determines IP version
280280

src/lib/transport/SubscriberInstance.cpp

Lines changed: 43 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ SubscriberInstance::SubscriberInstance() :
6464
m_subscriber->RegisterSubscriptionUpdatedCallback(&HandleSubscriptionUpdated);
6565
m_subscriber->RegisterNewMeasurementsCallback(&HandleNewMeasurements);
6666
m_subscriber->RegisterConfigurationChangedCallback(&HandleConfigurationChanged);
67+
m_subscriber->RegisterConnectionEstablishedCallback(&HandleConnectionEstablished);
6768
m_subscriber->RegisterConnectionTerminatedCallback(&HandleConnectionTerminated);
6869
}
6970

@@ -186,6 +187,35 @@ void SubscriberInstance::SetMetadataFilters(const std::string& metadataFilters)
186187
m_metadataFilters = metadataFilters;
187188
}
188189

190+
void SubscriberInstance::HandleConnect()
191+
{
192+
// If automatically parsing metadata, request metadata upon successful connection,
193+
// after metadata is received the SubscriberInstance will then initiate subscribe;
194+
// otherwise, subscribe is initiated immediately (when auto subscribe requested)
195+
if (m_autoParseMetadata)
196+
SendMetadataRefreshCommand();
197+
else
198+
m_subscriber->Subscribe();
199+
}
200+
201+
void SubscriberInstance::SetupSubscriptionInfo()
202+
{
203+
m_subscriptionInfo = CreateSubscriptionInfo();
204+
205+
if (!m_startTime.empty() && !m_stopTime.empty())
206+
{
207+
m_subscriber->RegisterProcessingCompleteCallback(&HandleProcessingComplete);
208+
m_subscriptionInfo.StartTime = m_startTime;
209+
m_subscriptionInfo.StopTime = m_stopTime;
210+
}
211+
212+
if (m_udpPort > 0)
213+
{
214+
m_subscriptionInfo.UdpDataChannel = true;
215+
m_subscriptionInfo.DataChannelLocalPort = m_udpPort;
216+
}
217+
}
218+
189219
bool SubscriberInstance::Connect()
190220
{
191221
if (IsConnected())
@@ -206,20 +236,7 @@ bool SubscriberInstance::Connect()
206236
if (m_port == 0)
207237
throw SubscriberException("No port specified for connection: call Initialize before Connect");
208238

209-
m_subscriptionInfo = CreateSubscriptionInfo();
210-
211-
if (!m_startTime.empty() && !m_stopTime.empty())
212-
{
213-
m_subscriber->RegisterProcessingCompleteCallback(&HandleProcessingComplete);
214-
m_subscriptionInfo.StartTime = m_startTime;
215-
m_subscriptionInfo.StopTime = m_stopTime;
216-
}
217-
218-
if (m_udpPort > 0)
219-
{
220-
m_subscriptionInfo.UdpDataChannel = true;
221-
m_subscriptionInfo.DataChannelLocalPort = m_udpPort;
222-
}
239+
SetupSubscriptionInfo();
223240

224241
// Connect and subscribe to publisher
225242
const int result = connector.Connect(*m_subscriber, m_subscriptionInfo);
@@ -239,14 +256,7 @@ bool SubscriberInstance::Connect()
239256
}
240257

241258
ConnectionEstablished();
242-
243-
// If automatically parsing metadata, request metadata upon successful connection,
244-
// after metadata is handled the SubscriberInstance will then initiate subscribe;
245-
// otherwise, initiate subscribe immediately
246-
if (m_autoParseMetadata)
247-
SendMetadataRefreshCommand();
248-
else
249-
m_subscriber->Subscribe();
259+
HandleConnect();
250260

251261
return true;
252262
}
@@ -280,6 +290,12 @@ bool SubscriberInstance::Listen(const sttp::TcpEndPoint& endPoint)
280290

281291
try
282292
{
293+
m_hostname = endPoint.address().to_string();
294+
m_port = endPoint.port();;
295+
296+
SetupSubscriptionInfo();
297+
m_subscriber->SetSubscriptionInfo(m_subscriptionInfo);
298+
283299
m_subscriber->Listen(endPoint);
284300
}
285301
catch (SubscriberException& ex)
@@ -301,7 +317,7 @@ bool SubscriberInstance::Listen(const sttp::TcpEndPoint& endPoint)
301317
return m_subscriber->IsListening();
302318
}
303319

304-
bool SubscriberInstance::ListenStart(const uint16_t port, const bool ipV6)
320+
bool SubscriberInstance::Listen(const uint16_t port, const bool ipV6)
305321
{
306322
return Listen(TcpEndPoint(ipV6 ? tcp::v6() : tcp::v4(), port));
307323
}
@@ -1260,10 +1276,12 @@ void SubscriberInstance::HistoricalReadComplete()
12601276

12611277
void SubscriberInstance::ConnectionEstablished()
12621278
{
1279+
StatusMessage("Connection from \"" + GetConnectionID() + "\" established.");
12631280
}
12641281

12651282
void SubscriberInstance::ConnectionTerminated()
12661283
{
1284+
StatusMessage("Connection from \"" + GetConnectionID() + "\" terminated.");
12671285
}
12681286

12691287
void SubscriberInstance::GetAssemblyInfo(std::string& source, std::string& version, std::string& updatedOn) const
@@ -1291,14 +1309,7 @@ void SubscriberInstance::HandleResubscribe(DataSubscriber* source)
12911309
if (source->IsConnected())
12921310
{
12931311
instance->ConnectionEstablished();
1294-
1295-
// If automatically parsing metadata, request metadata upon successful connection,
1296-
// after metadata is handled the SubscriberInstance will then initiate subscribe;
1297-
// otherwise, initiate subscribe immediately
1298-
if (instance->m_autoParseMetadata)
1299-
instance->SendMetadataRefreshCommand();
1300-
else
1301-
source->Subscribe();
1312+
instance->HandleConnect();
13021313
}
13031314
else
13041315
{
@@ -1410,6 +1421,7 @@ void SubscriberInstance::HandleConnectionEstablished(const DataSubscriber* sourc
14101421
return;
14111422

14121423
instance->ConnectionEstablished();
1424+
instance->HandleConnect();
14131425
}
14141426

14151427
void SubscriberInstance::HandleConnectionTerminated(const DataSubscriber* source)

src/lib/transport/SubscriberInstance.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ namespace sttp::transport
5656
Mutex m_configurationUpdateLock;
5757
void* m_userData;
5858

59+
void HandleConnect();
60+
void SetupSubscriptionInfo();
5961
void SendMetadataRefreshCommand();
6062

6163
static void ConstructConfigurationFrames(const sttp::StringMap<DeviceMetadataPtr>& devices, const std::unordered_map<Guid, MeasurementMetadataPtr>& measurements, sttp::StringMap<ConfigurationFramePtr>& configurationFrames);
@@ -201,7 +203,7 @@ namespace sttp::transport
201203
// Returns true if subscriber was successfully started
202204
// Throws SubscriberException for implementation logic errors
203205
virtual bool Listen(const sttp::TcpEndPoint& endPoint);
204-
virtual bool ListenStart(uint16_t port, bool ipV6 = false); // Bind to default NIC
206+
virtual bool Listen(uint16_t port, bool ipV6 = false); // Bind to default NIC
205207
virtual bool Listen(const std::string& networkInterfaceIP, uint16_t port); // Bind to specified NIC IP, format determines IP version
206208

207209
// Disconnects from the STTP publisher, normal or reverse connection mode

0 commit comments

Comments
 (0)