11
11
from configuration .config_schema import ConfigSchema
12
12
from configuration .subscriber_config import SubscriberConfig , MessagePathConfig
13
13
from .ingester import Ingester
14
+ from .decorators import lock_decorator , handle_value_error
14
15
from ros2_utils .qos import QOS_PROFILES , qos_profile_system_default
15
16
from ros2_utils .topic_type_provider import TopicTypeProvider
16
17
from utils .logger import get_logger
23
24
FORMANT_OVERRIDE_TIMESTAMP = (
24
25
os .getenv ("FORMANT_OVERRIDE_TIMESTAMP" , "" ).lower () == "true"
25
26
)
27
+ SCHEDULE_SUBSCRIPTIONS_INTERVAL = 1.0
26
28
27
29
28
30
class BasicSubscriberCoordinator :
@@ -38,22 +40,44 @@ def __init__(
38
40
self ._ingester = ingester
39
41
self ._topic_type_provider = topic_type_provider
40
42
self ._subscriptions : Dict [str , List [Subscription ]] = {}
43
+ self ._queued_topics : Dict = {}
41
44
self ._logger = get_logger ()
42
45
self ._config_lock = threading .RLock ()
46
+ self ._subscribe_lock = threading .Lock ()
43
47
48
+ @lock_decorator ("_config_lock" )
44
49
def setup_with_config (self , config : ConfigSchema ):
45
- with self ._config_lock :
46
- self ._config = config
47
- self ._cleanup ()
48
- if self ._config .subscribers :
49
- for subscriber_config in self ._config .subscribers :
50
- try :
51
- self ._setup_subscription_for_config (subscriber_config )
52
- except ValueError as value_error :
53
- self ._logger .warn (value_error )
54
- continue
55
-
56
- def _setup_subscription_for_config (self , subscriber_config : SubscriberConfig ):
50
+ self ._config = config
51
+ self ._cleanup ()
52
+
53
+ has_subscribers = self ._config .subscribers
54
+ if not has_subscribers :
55
+ return
56
+
57
+ self ._queued_topics = {
58
+ subscriber .topic : subscriber for subscriber in self ._config .subscribers
59
+ }
60
+
61
+ self ._schedule_subscriptions ()
62
+
63
+ def _schedule_subscriptions (self ):
64
+ t = threading .Timer (SCHEDULE_SUBSCRIPTIONS_INTERVAL , self ._setup_subscribers )
65
+ t .daemon = True
66
+ t .start ()
67
+
68
+ @lock_decorator ("_subscriber_lock" )
69
+ def _setup_subscribers (self ):
70
+ active_topics = set (self ._get_active_topics ())
71
+
72
+ backlog_topics = active_topics .intersection (self ._queued_topics .keys )
73
+
74
+ for topic in backlog_topics :
75
+ topic_subscriber = self ._queued_topics [topic ]
76
+ self ._setup_subscription (topic_subscriber )
77
+ _ = self ._queued_topics .pop (topic )
78
+
79
+ @handle_value_error
80
+ def _setup_subscription (self , subscriber_config : SubscriberConfig ):
57
81
topic = subscriber_config .topic
58
82
qos_profile = QOS_PROFILES .get (
59
83
subscriber_config .qos_profile , qos_profile_system_default
@@ -62,8 +86,6 @@ def _setup_subscription_for_config(self, subscriber_config: SubscriberConfig):
62
86
ros2_type = subscriber_config .message_type
63
87
if ros2_type is None :
64
88
ros2_type = self ._topic_type_provider .get_type_for_topic (topic )
65
- if ros2_type is None :
66
- raise ValueError ("No ROS2 type found for %s" % topic )
67
89
68
90
self ._logger .debug (
69
91
"Setting up subscription %s, %s, %s"
@@ -82,6 +104,13 @@ def _setup_subscription_for_config(self, subscriber_config: SubscriberConfig):
82
104
self ._subscriptions [topic ] = []
83
105
self ._subscriptions [topic ].append (new_subscriber )
84
106
107
+ def _get_active_topics (self ):
108
+ return [
109
+ topic_name
110
+ for topic_name , _ in self ._node .get_topic_names_and_types ()
111
+ if self ._node .get_publishers_info_by_topic (topic_name )
112
+ ]
113
+
85
114
def _handle_message (
86
115
self ,
87
116
msg ,
0 commit comments