Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions formant_ros2_adapter/scripts/components/analytics/ingestion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import threading
import os
import time
from datetime import datetime

ANALYTICS_INTERVAL = 60


class IngestionAnalytics:
def __init__(self):
self.received_messages_count = {}
self.sent_messages_count = {}
self._configure_settings()
if self.analytics_enabled:
self._start_reporting_thread()

def _configure_settings(self):
self.analytics_enabled = (
os.getenv("ANALYTICS_INGESTION", "false").lower() == "true"
)
self.file_logging_enabled = bool(os.getenv("ANALYTICS_FILE_PATH"))
self.console_logging_enabled = (
os.getenv("ANALYTICS_CONSOLE_PRINT", "false").lower() == "true"
)
self.log_file_path = os.getenv("ANALYTICS_FILE_PATH", "ingestion_analytics.log")

def _start_reporting_thread(self):
self.reporting_thread = threading.Thread(target=self._reporting_loop)
self.reporting_thread.daemon = True
self.reporting_thread.start()

def _reporting_loop(self):
while True:
self._generate_and_output_report()
time.sleep(ANALYTICS_INTERVAL)

def log_received_message(self, topic):
self.received_messages_count[topic] = (
self.received_messages_count.get(topic, 0) + 1
)

def log_sent_message(self, stream):
self.sent_messages_count[stream] = self.sent_messages_count.get(stream, 0) + 1

def _generate_and_output_report(self):
report_data = self._generate_report()
self._output_report(report_data)
self._reset_message_counts()

def _generate_report(self):
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
report_lines = [
f"Ingestion Analytics Report - {current_time}\n",
"Received Messages:\n",
]
for topic, count in self.received_messages_count.items():
report_lines.append(f" Topic: {topic}, Count: {count}\n")

report_lines.append("Sent Messages:\n")
for stream, count in self.sent_messages_count.items():
report_lines.append(f" Stream: {stream}, Count: {count}\n")

return "".join(report_lines)

def _output_report(self, report_data):
if self.file_logging_enabled:
with open(self.log_file_path, "a") as file:
file.write(report_data)
if self.console_logging_enabled:
print(report_data)

def _reset_message_counts(self):
self.received_messages_count.clear()
self.sent_messages_count.clear()

def __del__(self):
if self.reporting_thread and self.reporting_thread.is_alive():
self.reporting_thread.join()
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def _handle_message(
if header_timestamp > 1500000000000:
timestamp = int(header_timestamp)

self._ingester.ingestion_analytics.log_received_message(topic)
if message_path_config is None:
if subscriber_config.message_paths:
for message_path_config in subscriber_config.message_paths:
Expand Down
13 changes: 12 additions & 1 deletion formant_ros2_adapter/scripts/components/subscriber/ingester.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
)

from utils.logger import get_logger
from components.analytics.ingestion import IngestionAnalytics
from ros2_utils.message_utils import (
get_ros2_type_from_string,
message_to_json,
Expand All @@ -53,6 +54,7 @@ def __init__(self, _fclient: Client):
self._fclient = _fclient
self.cv_bridge = CvBridge()
self._logger = get_logger()
self.ingestion_analytics = IngestionAnalytics()

def ingest(
self,
Expand All @@ -63,7 +65,6 @@ def ingest(
msg_timestamp: int,
tags: Dict,
):

# Handle the message based on its type
try:
if msg_type in [str, String, Char]:
Expand All @@ -76,6 +77,7 @@ def ingest(
tags=tags,
timestamp=msg_timestamp,
)
self.ingestion_analytics.log_sent_message(formant_stream)

elif msg_type in [Bool, bool]:
if hasattr(msg, "data"):
Expand All @@ -87,6 +89,7 @@ def ingest(
tags=tags,
timestamp=msg_timestamp,
)
self.ingestion_analytics.log_sent_message(formant_stream)

elif msg_type in [
int,
Expand All @@ -111,6 +114,7 @@ def ingest(
tags=tags,
timestamp=msg_timestamp,
)
self.ingestion_analytics.log_sent_message(formant_stream)

elif msg_type == NavSatFix:
# Convert NavSatFix to a Formant location
Expand All @@ -122,6 +126,7 @@ def ingest(
tags=tags,
timestamp=msg_timestamp,
)
self.ingestion_analytics.log_sent_message(formant_stream)

elif msg_type == Image:
# Convert Image to a Formant image
Expand All @@ -134,6 +139,7 @@ def ingest(
tags=tags,
timestamp=msg_timestamp,
)
self.ingestion_analytics.log_sent_message(formant_stream)

elif msg_type == CompressedImage:
# Post the compressed image
Expand All @@ -151,6 +157,7 @@ def ingest(
tags=tags,
timestamp=msg_timestamp,
)
self.ingestion_analytics.log_sent_message(formant_stream)

elif msg_type == BatteryState:
self._fclient.post_battery(
Expand All @@ -162,6 +169,7 @@ def ingest(
tags=tags,
timestamp=msg_timestamp,
)
self.ingestion_analytics.log_sent_message(formant_stream)

elif msg_type == LaserScan:
# Convert LaserScan to a Formant pointcloud
Expand All @@ -174,6 +182,7 @@ def ingest(
timestamp=msg_timestamp,
)
)
self.ingestion_analytics.log_sent_message(formant_stream)
except grpc.RpcError as e:
return
except Exception as e:
Expand All @@ -192,6 +201,7 @@ def ingest(
timestamp=msg_timestamp,
)
)
self.ingestion_analytics.log_sent_message(formant_stream)
except grpc.RpcError as e:
return
except Exception as e:
Expand All @@ -208,6 +218,7 @@ def ingest(
tags=tags,
timestamp=msg_timestamp,
)
self.ingestion_analytics.log_sent_message(formant_stream)

except AttributeError as e:
self._logger.error("Could not ingest " + formant_stream + ": " + str(e))
Empty file.
Empty file.
26 changes: 26 additions & 0 deletions formant_ros2_adapter/scripts/tests/test_analytics/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"ros2_adapter_configuration": {
"subscribers": [
{
"ros2_topic": "/my_string",
"ros2_message_type": "std_msgs/msg/String",
"formant_stream": "my.string"
},
{
"ros2_topic": "/my_int",
"ros2_message_type": "std_msgs/msg/Int32",
"formant_stream": "my.int"
},
{
"ros2_topic": "/my_twist",
"ros2_message_type": "geometry_msgs/msg/Twist",
"formant_stream": "my.twist",
"ros2_message_paths": [
{
"path": "linear"
}
]
}
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import unittest
import rclpy
from std_msgs.msg import String, Int32
from geometry_msgs.msg import Twist
from ros2_adapter import ROS2Adapter # Adjust import as necessary
from formant.sdk.agent.v1 import Client # Adjust import as necessary
import time
import threading


convert_topic_to_stream = lambda topic: topic.replace("/", ".").strip(".")


# Utility function to publish messages
def publish_messages(node, topic, msg_type, count, delay=1):
publisher = node.create_publisher(msg_type, topic, 10)
for i in range(count):
msg = msg_type()
if msg_type == String:
msg.data = "test_string"
elif msg_type == Twist:
msg.linear.x = 1.0
msg.angular.z = 1.0
elif msg_type == Int32:
msg.data = i # Or any integer value
publisher.publish(msg)
time.sleep(delay)


# Test class
class TestROS2Adapter(unittest.TestCase):
def setUp(self):
# Initialize ROS and create a node
rclpy.init()
self.node = rclpy.create_node("test_node")

# Start ROS2Adapter in a separate thread
self.adapter = ROS2Adapter(Client(), self.node)
self.adapter_thread = threading.Thread(target=self.spin_ros)
self.adapter_thread.start()

time.sleep(2) # Wait for ROS2Adapter to initialize

def spin_ros(self):
while rclpy.ok():
rclpy.spin_once(self.node, timeout_sec=0.1)

def test_message_counts(self):
# Test for message counts
topics = ["/my_string", "/my_int"]
message_types = [String, Int32]
message_counts = [8, 3]
ingestion_analytics = (
self.adapter._subscriber_coordinator._ingester.ingestion_analytics
)
ingestion_analytics.received_messages_count = {}
ingestion_analytics.sent_messages_count = {}
for topic, msg_type, count in zip(topics, message_types, message_counts):
publish_messages(self.node, topic, msg_type, count, delay=0.5)
received_messages = {}
sent_messages = {}
received_message_history = {}
sent_message_history = {}

total_duration = 120
interval_duration = 30
start_time = time.time()

def update_received_messages(key, value):
if value == 0:
received_messages[key] = received_messages.get(
key, 0
) + received_message_history.get(key, 0)
received_message_history[key] = 0
else:
received_message_history[key] = max(
received_message_history.get(key, 0), value
)

def update_sent_messages(key, value):
if value == 0:
sent_messages[key] = sent_messages.get(
key, 0
) + sent_message_history.get(key, 0)
sent_message_history[key] = 0
else:
sent_message_history[key] = max(sent_message_history.get(key, 0), value)

while time.time() - start_time < total_duration:
for key, value in ingestion_analytics.received_messages_count.items():
update_received_messages(key, value)

for key, value in ingestion_analytics.sent_messages_count.items():
update_sent_messages(key, value)

time.sleep(interval_duration)

# Final iteration for received and sent messages
for key in received_message_history:
update_received_messages(key, 0)

for key in sent_message_history:
update_sent_messages(key, 0)

for topic, count in zip(topics, message_counts):
stream = convert_topic_to_stream(topic)
received_count = received_messages.get(topic, 0)
sent_count = sent_messages.get(stream, 0)

try:
self.assertEqual(
received_count,
count,
f"Incorrect number of received messages for {topic}",
)
self.assertEqual(
sent_count, count, f"Incorrect number of sent messages for {stream}"
)
except AssertionError as e:
print(f"Warning: {e}")

def tearDown(self):
if rclpy.ok():
rclpy.shutdown()
self.adapter_thread.join()


if __name__ == "__main__":
unittest.main()