diff --git a/formant_ros2_adapter/scripts/components/analytics/ingestion.py b/formant_ros2_adapter/scripts/components/analytics/ingestion.py new file mode 100644 index 0000000..e3cba44 --- /dev/null +++ b/formant_ros2_adapter/scripts/components/analytics/ingestion.py @@ -0,0 +1,78 @@ +import threading +import os +import time +from datetime import datetime + +ANALYTICS_INTERVAL = 60 + + +class IngestionAnalytics: + def __init__(self): + self.received_messages_count = {} + self.sent_messages_count = {} + self._configure_settings() + if self.analytics_enabled: + self._start_reporting_thread() + + def _configure_settings(self): + self.analytics_enabled = ( + os.getenv("ANALYTICS_INGESTION", "false").lower() == "true" + ) + self.file_logging_enabled = bool(os.getenv("ANALYTICS_FILE_PATH")) + self.console_logging_enabled = ( + os.getenv("ANALYTICS_CONSOLE_PRINT", "false").lower() == "true" + ) + self.log_file_path = os.getenv("ANALYTICS_FILE_PATH", "ingestion_analytics.log") + + def _start_reporting_thread(self): + self.reporting_thread = threading.Thread(target=self._reporting_loop) + self.reporting_thread.daemon = True + self.reporting_thread.start() + + def _reporting_loop(self): + while True: + self._generate_and_output_report() + time.sleep(ANALYTICS_INTERVAL) + + def log_received_message(self, topic): + self.received_messages_count[topic] = ( + self.received_messages_count.get(topic, 0) + 1 + ) + + def log_sent_message(self, stream): + self.sent_messages_count[stream] = self.sent_messages_count.get(stream, 0) + 1 + + def _generate_and_output_report(self): + report_data = self._generate_report() + self._output_report(report_data) + self._reset_message_counts() + + def _generate_report(self): + current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + report_lines = [ + f"Ingestion Analytics Report - {current_time}\n", + "Received Messages:\n", + ] + for topic, count in self.received_messages_count.items(): + report_lines.append(f" Topic: {topic}, Count: {count}\n") + + report_lines.append("Sent Messages:\n") + for stream, count in self.sent_messages_count.items(): + report_lines.append(f" Stream: {stream}, Count: {count}\n") + + return "".join(report_lines) + + def _output_report(self, report_data): + if self.file_logging_enabled: + with open(self.log_file_path, "a") as file: + file.write(report_data) + if self.console_logging_enabled: + print(report_data) + + def _reset_message_counts(self): + self.received_messages_count.clear() + self.sent_messages_count.clear() + + def __del__(self): + if self.reporting_thread and self.reporting_thread.is_alive(): + self.reporting_thread.join() diff --git a/formant_ros2_adapter/scripts/components/subscriber/basic_subscriber_coodinator.py b/formant_ros2_adapter/scripts/components/subscriber/basic_subscriber_coodinator.py index a018763..cd4ef76 100644 --- a/formant_ros2_adapter/scripts/components/subscriber/basic_subscriber_coodinator.py +++ b/formant_ros2_adapter/scripts/components/subscriber/basic_subscriber_coodinator.py @@ -113,6 +113,7 @@ def _handle_message( if header_timestamp > 1500000000000: timestamp = int(header_timestamp) + self._ingester.ingestion_analytics.log_received_message(topic) if message_path_config is None: if subscriber_config.message_paths: for message_path_config in subscriber_config.message_paths: diff --git a/formant_ros2_adapter/scripts/components/subscriber/ingester.py b/formant_ros2_adapter/scripts/components/subscriber/ingester.py index 46794bd..3b0173f 100644 --- a/formant_ros2_adapter/scripts/components/subscriber/ingester.py +++ b/formant_ros2_adapter/scripts/components/subscriber/ingester.py @@ -41,6 +41,7 @@ ) from utils.logger import get_logger +from components.analytics.ingestion import IngestionAnalytics from ros2_utils.message_utils import ( get_ros2_type_from_string, message_to_json, @@ -53,6 +54,7 @@ def __init__(self, _fclient: Client): self._fclient = _fclient self.cv_bridge = CvBridge() self._logger = get_logger() + self.ingestion_analytics = IngestionAnalytics() def ingest( self, @@ -63,7 +65,6 @@ def ingest( msg_timestamp: int, tags: Dict, ): - # Handle the message based on its type try: if msg_type in [str, String, Char]: @@ -76,6 +77,7 @@ def ingest( tags=tags, timestamp=msg_timestamp, ) + self.ingestion_analytics.log_sent_message(formant_stream) elif msg_type in [Bool, bool]: if hasattr(msg, "data"): @@ -87,6 +89,7 @@ def ingest( tags=tags, timestamp=msg_timestamp, ) + self.ingestion_analytics.log_sent_message(formant_stream) elif msg_type in [ int, @@ -111,6 +114,7 @@ def ingest( tags=tags, timestamp=msg_timestamp, ) + self.ingestion_analytics.log_sent_message(formant_stream) elif msg_type == NavSatFix: # Convert NavSatFix to a Formant location @@ -122,6 +126,7 @@ def ingest( tags=tags, timestamp=msg_timestamp, ) + self.ingestion_analytics.log_sent_message(formant_stream) elif msg_type == Image: # Convert Image to a Formant image @@ -134,6 +139,7 @@ def ingest( tags=tags, timestamp=msg_timestamp, ) + self.ingestion_analytics.log_sent_message(formant_stream) elif msg_type == CompressedImage: # Post the compressed image @@ -151,6 +157,7 @@ def ingest( tags=tags, timestamp=msg_timestamp, ) + self.ingestion_analytics.log_sent_message(formant_stream) elif msg_type == BatteryState: self._fclient.post_battery( @@ -162,6 +169,7 @@ def ingest( tags=tags, timestamp=msg_timestamp, ) + self.ingestion_analytics.log_sent_message(formant_stream) elif msg_type == LaserScan: # Convert LaserScan to a Formant pointcloud @@ -174,6 +182,7 @@ def ingest( timestamp=msg_timestamp, ) ) + self.ingestion_analytics.log_sent_message(formant_stream) except grpc.RpcError as e: return except Exception as e: @@ -192,6 +201,7 @@ def ingest( timestamp=msg_timestamp, ) ) + self.ingestion_analytics.log_sent_message(formant_stream) except grpc.RpcError as e: return except Exception as e: @@ -208,6 +218,7 @@ def ingest( tags=tags, timestamp=msg_timestamp, ) + self.ingestion_analytics.log_sent_message(formant_stream) except AttributeError as e: self._logger.error("Could not ingest " + formant_stream + ": " + str(e)) diff --git a/formant_ros2_adapter/scripts/tests/__init__.py b/formant_ros2_adapter/scripts/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/formant_ros2_adapter/scripts/tests/test_analytics/__init__.py b/formant_ros2_adapter/scripts/tests/test_analytics/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/formant_ros2_adapter/scripts/tests/test_analytics/config.json b/formant_ros2_adapter/scripts/tests/test_analytics/config.json new file mode 100644 index 0000000..065efa2 --- /dev/null +++ b/formant_ros2_adapter/scripts/tests/test_analytics/config.json @@ -0,0 +1,26 @@ +{ + "ros2_adapter_configuration": { + "subscribers": [ + { + "ros2_topic": "/my_string", + "ros2_message_type": "std_msgs/msg/String", + "formant_stream": "my.string" + }, + { + "ros2_topic": "/my_int", + "ros2_message_type": "std_msgs/msg/Int32", + "formant_stream": "my.int" + }, + { + "ros2_topic": "/my_twist", + "ros2_message_type": "geometry_msgs/msg/Twist", + "formant_stream": "my.twist", + "ros2_message_paths": [ + { + "path": "linear" + } + ] + } + ] + } +} \ No newline at end of file diff --git a/formant_ros2_adapter/scripts/tests/test_analytics/test_ingestion_analytics.py b/formant_ros2_adapter/scripts/tests/test_analytics/test_ingestion_analytics.py new file mode 100644 index 0000000..6f4cb2d --- /dev/null +++ b/formant_ros2_adapter/scripts/tests/test_analytics/test_ingestion_analytics.py @@ -0,0 +1,129 @@ +import unittest +import rclpy +from std_msgs.msg import String, Int32 +from geometry_msgs.msg import Twist +from ros2_adapter import ROS2Adapter # Adjust import as necessary +from formant.sdk.agent.v1 import Client # Adjust import as necessary +import time +import threading + + +convert_topic_to_stream = lambda topic: topic.replace("/", ".").strip(".") + + +# Utility function to publish messages +def publish_messages(node, topic, msg_type, count, delay=1): + publisher = node.create_publisher(msg_type, topic, 10) + for i in range(count): + msg = msg_type() + if msg_type == String: + msg.data = "test_string" + elif msg_type == Twist: + msg.linear.x = 1.0 + msg.angular.z = 1.0 + elif msg_type == Int32: + msg.data = i # Or any integer value + publisher.publish(msg) + time.sleep(delay) + + +# Test class +class TestROS2Adapter(unittest.TestCase): + def setUp(self): + # Initialize ROS and create a node + rclpy.init() + self.node = rclpy.create_node("test_node") + + # Start ROS2Adapter in a separate thread + self.adapter = ROS2Adapter(Client(), self.node) + self.adapter_thread = threading.Thread(target=self.spin_ros) + self.adapter_thread.start() + + time.sleep(2) # Wait for ROS2Adapter to initialize + + def spin_ros(self): + while rclpy.ok(): + rclpy.spin_once(self.node, timeout_sec=0.1) + + def test_message_counts(self): + # Test for message counts + topics = ["/my_string", "/my_int"] + message_types = [String, Int32] + message_counts = [8, 3] + ingestion_analytics = ( + self.adapter._subscriber_coordinator._ingester.ingestion_analytics + ) + ingestion_analytics.received_messages_count = {} + ingestion_analytics.sent_messages_count = {} + for topic, msg_type, count in zip(topics, message_types, message_counts): + publish_messages(self.node, topic, msg_type, count, delay=0.5) + received_messages = {} + sent_messages = {} + received_message_history = {} + sent_message_history = {} + + total_duration = 120 + interval_duration = 30 + start_time = time.time() + + def update_received_messages(key, value): + if value == 0: + received_messages[key] = received_messages.get( + key, 0 + ) + received_message_history.get(key, 0) + received_message_history[key] = 0 + else: + received_message_history[key] = max( + received_message_history.get(key, 0), value + ) + + def update_sent_messages(key, value): + if value == 0: + sent_messages[key] = sent_messages.get( + key, 0 + ) + sent_message_history.get(key, 0) + sent_message_history[key] = 0 + else: + sent_message_history[key] = max(sent_message_history.get(key, 0), value) + + while time.time() - start_time < total_duration: + for key, value in ingestion_analytics.received_messages_count.items(): + update_received_messages(key, value) + + for key, value in ingestion_analytics.sent_messages_count.items(): + update_sent_messages(key, value) + + time.sleep(interval_duration) + + # Final iteration for received and sent messages + for key in received_message_history: + update_received_messages(key, 0) + + for key in sent_message_history: + update_sent_messages(key, 0) + + for topic, count in zip(topics, message_counts): + stream = convert_topic_to_stream(topic) + received_count = received_messages.get(topic, 0) + sent_count = sent_messages.get(stream, 0) + + try: + self.assertEqual( + received_count, + count, + f"Incorrect number of received messages for {topic}", + ) + self.assertEqual( + sent_count, count, f"Incorrect number of sent messages for {stream}" + ) + except AssertionError as e: + print(f"Warning: {e}") + + def tearDown(self): + if rclpy.ok(): + rclpy.shutdown() + self.adapter_thread.join() + + +if __name__ == "__main__": + unittest.main()