Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions src/network/NetworkManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,17 +305,17 @@ NetworkManager::create_receiver(std::vector<ConnectionInfo> connections, Connect
TLOG_DEBUG(12) << "Creating plugin of type " << plugin_type;
auto plugin = dunedaq::ipm::make_ipm_receiver(plugin_type);

nlohmann::json config_json;
ipm::Receiver::ConnectionInfo conn_info(conn_id.uid);
if (is_pubsub) {
std::vector<std::string> uris = get_pubsub_connection_strings(connections);
if (uris.size() == 0) {
return nullptr;
}
config_json["connection_strings"] = uris;
conn_info.connection_strings = uris;
} else {
config_json["connection_string"] = connections[0].uri;
conn_info.connection_string = connections[0].uri;
}
auto newCs = plugin->connect_for_receives(config_json);
auto newCs = plugin->connect_for_receives(conn_info);
TLOG_DEBUG(12) << "Receiver reports connected to URI " << newCs;

// Replace with resolved if there are wildcards (host and/or port)
Expand Down Expand Up @@ -371,8 +371,9 @@ NetworkManager::create_sender(ConnectionInfo connection)
TLOG_DEBUG(11) << "Creating sender plugin of type " << plugin_type;
auto plugin = dunedaq::ipm::make_ipm_sender(plugin_type);
TLOG_DEBUG(11) << "Connecting sender plugin to " << connection.uri;
ipm::Sender::ConnectionInfo conn_info(connection.uid, connection.uri, connection.capacity);
auto newCs =
plugin->connect_for_sends({ { "connection_string", connection.uri }, { "capacity", connection.capacity } });
plugin->connect_for_sends(conn_info);
TLOG_DEBUG(11) << "Sender Plugin connected, reports URI " << newCs;

// Replace with resolved if there are wildcards (host and/or port)
Expand Down Expand Up @@ -410,15 +411,16 @@ NetworkManager::update_subscribers()
try {
auto response = get_connections(subscriber_pair.first, false);

nlohmann::json config_json;
ipm::Receiver::ConnectionInfo conn_info;
conn_info.connection_name = subscriber_pair.first.uid;
std::vector<std::string> uris = get_pubsub_connection_strings(response.connections);
if (uris.size() == 0) {
TLOG_DEBUG(14) << "No valid connection strings found, is the Connectivity Service running?!";
continue;
}
config_json["connection_strings"] = uris;
conn_info.connection_strings = uris;

subscriber_pair.second->connect_for_receives(config_json);
subscriber_pair.second->connect_for_receives(conn_info);
} catch (ers::Issue&) {
}
}
Expand Down