Skip to content

Commit eed2a0f

Browse files
committed
updated setting up subscribers so we only try to subscribe to things in the active runtime. adapter maintains a list of topics that are in the config but not yet subscribed to and will retry once a second to find that topic in the active runtime
1 parent 54e07b4 commit eed2a0f

File tree

1 file changed

+78
-31
lines changed
  • formant_ros2_adapter/scripts

1 file changed

+78
-31
lines changed

formant_ros2_adapter/scripts/main.py

Lines changed: 78 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env python3
22

3-
3+
import threading
44
import os
55
import cv2
66
import time
@@ -14,6 +14,8 @@
1414
import traceback
1515

1616
from formant.sdk.agent.v1 import Client as FormantAgentClient
17+
18+
# from formant.common.logger import get_logger, log_thread_exceptions
1719
from formant.protos.model.v1.datapoint_pb2 import Datapoint
1820
from formant.protos.model.v1.text_pb2 import Text
1921
from formant.sdk.agent.v1.localization.types import (
@@ -172,7 +174,10 @@ def __init__(self):
172174

173175
# Subscribers and publishers are stored as a dictionary of topics containing a list of objects
174176
self.ros2_subscribers = {}
177+
self.topics_to_subscribe = []
175178
self.ros2_publishers = {}
179+
self._subscribe_mutex = threading.Lock()
180+
176181
self.ros2_service_clients = {}
177182

178183
# Set up the localization objects
@@ -281,7 +286,7 @@ def update_adapter_configuration(self, config: Dict):
281286
self.setup_ros2_params()
282287
print("INFO: Finished setting up ROS2 parameters")
283288

284-
self.setup_subscribers()
289+
self._setup_subscribers()
285290
print("INFO: Finished setting up subscribers")
286291

287292
self.setup_publishers()
@@ -348,42 +353,84 @@ def setup_ros2_params(self):
348353
print("INFO: Setting new ROS2 parameters")
349354
self.ros2_node.set_parameters(new_params)
350355

351-
def setup_subscribers(self):
356+
def get_active_topics(self):
357+
358+
topic_names_and_types = self.ros2_node.get_topic_names_and_types()
359+
360+
active_topics = []
361+
for topic_name, _ in topic_names_and_types:
362+
publishers_info = self.ros2_node.get_publishers_info_by_topic(topic_name)
363+
if publishers_info:
364+
active_topics.append(topic_name)
365+
366+
return active_topics
367+
368+
def _setup_subscribers(self):
352369
# Remove any existing subscribers
353370
for subscriber_list in self.ros2_subscribers.values():
354371
for subscriber in subscriber_list:
355372
self.ros2_node.destroy_subscription(subscriber)
356373

357374
self.ros2_subscribers = {}
375+
self.topics_to_subscribe = [
376+
_.get("ros2_topic") for _ in self.config.get("subscribers", [])
377+
]
378+
print("Topics from config:")
379+
print(self.topics_to_subscribe)
380+
self._schedule_subscribe_to_topics()
381+
382+
def _schedule_subscribe_to_topics(self):
383+
t = threading.Timer(1.0, self.setup_subscribers_inner)
384+
t.daemon = True
385+
t.start()
386+
387+
def setup_subscribers_inner(self):
388+
with self._subscribe_mutex:
389+
"""Get all active topics in ROS 2 runtime"""
390+
print("Topics in runtime")
391+
topics_in_runtime = self.get_active_topics()
392+
print(topics_in_runtime)
393+
394+
for subscriber_config in self.config.get("subscribers", []):
395+
subscriber_topic = subscriber_config["ros2_topic"]
396+
"""For each subscriber, check if it is in the list of topics that don't already have a subscriber created"""
397+
if subscriber_topic in self.topics_to_subscribe:
398+
"""If the topic is in the active topics, create a subscriber"""
399+
if subscriber_topic in topics_in_runtime:
400+
print(f"Subscriber for topic: {subscriber_topic}")
401+
self.topics_to_subscribe.remove(subscriber_topic)
402+
403+
if "ros2_qos_profile" in subscriber_config:
404+
subscriber_qos = QOS_PROFILES.get(
405+
subscriber_config["ros2_qos_profile"],
406+
qos_profile_system_default,
407+
)
408+
else:
409+
subscriber_qos = qos_profile_system_default
410+
print(f"QoS: {subscriber_qos}, {type(subscriber_qos)}")
358411

359-
# Create new subscribers based on the config
360-
for subscriber_config in self.config.get("subscribers", []):
361-
subscriber_topic = subscriber_config["ros2_topic"]
362-
print(f"Subscriber for topic: {subscriber_topic}")
363-
364-
if "ros2_qos_profile" in subscriber_config:
365-
subscriber_qos = QOS_PROFILES.get(
366-
subscriber_config["ros2_qos_profile"], qos_profile_system_default
367-
)
368-
else:
369-
subscriber_qos = qos_profile_system_default
370-
print(f"QoS: {subscriber_qos}, {type(subscriber_qos)}")
371-
372-
new_sub = self.ros2_node.create_subscription(
373-
msg_type=get_ros2_type_from_string(
374-
subscriber_config["ros2_message_type"]
375-
),
376-
topic=subscriber_topic,
377-
callback=lambda msg, subscriber_config=subscriber_config: self.handle_ros2_message(
378-
msg, subscriber_config
379-
),
380-
qos_profile=subscriber_qos,
381-
)
382-
383-
if subscriber_config["ros2_topic"] not in self.ros2_subscribers:
384-
self.ros2_subscribers[subscriber_config["ros2_topic"]] = []
412+
new_sub = self.ros2_node.create_subscription(
413+
msg_type=get_ros2_type_from_string(
414+
subscriber_config["ros2_message_type"]
415+
),
416+
topic=subscriber_topic,
417+
callback=lambda msg, subscriber_config=subscriber_config: self.handle_ros2_message(
418+
msg, subscriber_config
419+
),
420+
qos_profile=subscriber_qos,
421+
)
422+
if subscriber_config["ros2_topic"] not in self.ros2_subscribers:
423+
self.ros2_subscribers[subscriber_config["ros2_topic"]] = []
424+
self.ros2_subscribers[subscriber_config["ros2_topic"]].append(
425+
new_sub
426+
)
385427

386-
self.ros2_subscribers[subscriber_config["ros2_topic"]].append(new_sub)
428+
else:
429+
"""If the topic is not in the active topics and has not had a subscriber created in the past, log the topic"""
430+
print(
431+
f"Not subscribing to {subscriber_topic}; not in ROS runtime."
432+
)
433+
self._schedule_subscribe_to_topics()
387434

388435
def setup_publishers(self):
389436
# Remove any existing publishers
@@ -1197,7 +1244,7 @@ def handle_formant_teleop_msg(self, msg):
11971244
elif ros2_msg == "PointStamped":
11981245
ros2_msg = PointStamped(point=point)
11991246
else:
1200-
self._logger.warn("Unsupported Point Type: %s" % ros2_msg_type)
1247+
print("Unsupported Point Type: %s" % ros2_msg_type)
12011248
publisher.publish(ros2_msg)
12021249

12031250
elif msg.HasField("pose"):

0 commit comments

Comments
 (0)