From 7cd9948b6497bccac07c40dc71a25c7d0b4eb101 Mon Sep 17 00:00:00 2001 From: Niranjan Krishna Date: Mon, 7 Aug 2023 19:15:40 +0530 Subject: [PATCH 01/14] prelim version of batch_ingester: flush_once, message queue --- .../components/subscriber/batched_ingester.py | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 formant_ros2_adapter/scripts/components/subscriber/batched_ingester.py diff --git a/formant_ros2_adapter/scripts/components/subscriber/batched_ingester.py b/formant_ros2_adapter/scripts/components/subscriber/batched_ingester.py new file mode 100644 index 0000000..faafa1b --- /dev/null +++ b/formant_ros2_adapter/scripts/components/subscriber/batched_ingester.py @@ -0,0 +1,52 @@ +from .ingester import Ingester +from formant.sdk.agent.v1 import Client +from queue import LifoQueue +from typing import Dict + + +MAX_INGEST_SIZE = 10 + + +class Message: + def __init__(self, msg, msg_type: type, topic: str, msg_timestamp: int, tags: Dict): + self.msg = msg + self.msg_type = msg_type + self.topic = topic + self.msg_timestamp = msg_timestamp + self.tags = tags + + +class BatchIngester(Ingester): + def __init__(self, _fclient: Client): + super(BatchIngester, self).__init__(_fclient) + self._stream_queues: Dict[str, LifoQueue[Message]] = {} + self._ingest_interval = 1 + + # Basically 1 or more threads should be running the flush function + + def batch_ingest( + self, + msg, + msg_type: type, + formant_stream: str, + topic: str, + msg_timestamp: int, + tags: Dict, + ): + message = Message(msg, msg_type, topic, msg_timestamp, tags) + self._stream_queues[formant_stream].put(message) + + def _flush_once(self): + + for stream, queue in self._stream_queues.items(): + ingest_size = min(len(queue), MAX_INGEST_SIZE) + for _ in range(ingest_size): + top_message = queue.get() + self.ingest( + top_message.msg, + top_message.msg_type, + stream, + top_message.topic, + top_message.msg_timestamp, + top_message.tags, + ) From bfdebf3ba0cf74bf3fbec49bb7ca2f188ecafe46 Mon Sep 17 00:00:00 2001 From: Niranjan Krishna Date: Mon, 7 Aug 2023 19:30:06 +0530 Subject: [PATCH 02/14] batched_ingsester with threading completed, tests TBD --- .../components/subscriber/batched_ingester.py | 33 ++++++++++++++++--- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/formant_ros2_adapter/scripts/components/subscriber/batched_ingester.py b/formant_ros2_adapter/scripts/components/subscriber/batched_ingester.py index faafa1b..cb5297a 100644 --- a/formant_ros2_adapter/scripts/components/subscriber/batched_ingester.py +++ b/formant_ros2_adapter/scripts/components/subscriber/batched_ingester.py @@ -1,8 +1,9 @@ from .ingester import Ingester from formant.sdk.agent.v1 import Client from queue import LifoQueue -from typing import Dict - +from typing import Dict, List +import threading +import time MAX_INGEST_SIZE = 10 @@ -17,12 +18,15 @@ def __init__(self, msg, msg_type: type, topic: str, msg_timestamp: int, tags: Di class BatchIngester(Ingester): - def __init__(self, _fclient: Client): + def __init__(self, _fclient: Client, num_threads: int = 2): super(BatchIngester, self).__init__(_fclient) self._stream_queues: Dict[str, LifoQueue[Message]] = {} self._ingest_interval = 1 + self._num_threads = num_threads + self._threads: List[threading.Thread] = [] + self._terminate_flag = False - # Basically 1 or more threads should be running the flush function + self._start() def batch_ingest( self, @@ -36,7 +40,7 @@ def batch_ingest( message = Message(msg, msg_type, topic, msg_timestamp, tags) self._stream_queues[formant_stream].put(message) - def _flush_once(self): + def _ingest_once(self): for stream, queue in self._stream_queues.items(): ingest_size = min(len(queue), MAX_INGEST_SIZE) @@ -50,3 +54,22 @@ def _flush_once(self): top_message.msg_timestamp, top_message.tags, ) + + def _ingest_continually(self): + while not self._terminate_flag: + self._ingest_once(self) + time.sleep(self._ingest_interval) + + def _start(self): + self._terminate_flag = False + for i in range(self._num_threads): + self._threads.append( + threading.Thread( + target=self._ingest_continually, + daemon=True, + ) + ) + self._threads[i].start() + + def terminate(self): + self._terminate_flag = True From ef688f72bb94cf52ae116afb31ff7c534b6d3fe9 Mon Sep 17 00:00:00 2001 From: Niranjan Krishna Date: Mon, 7 Aug 2023 19:33:33 +0530 Subject: [PATCH 03/14] ingester -> batchingester normal tests work --- .../components/subscriber/basic_subscriber_coodinator.py | 3 ++- .../scripts/components/subscriber/subscriber_coordinator.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/formant_ros2_adapter/scripts/components/subscriber/basic_subscriber_coodinator.py b/formant_ros2_adapter/scripts/components/subscriber/basic_subscriber_coodinator.py index a018763..ba24415 100644 --- a/formant_ros2_adapter/scripts/components/subscriber/basic_subscriber_coodinator.py +++ b/formant_ros2_adapter/scripts/components/subscriber/basic_subscriber_coodinator.py @@ -11,6 +11,7 @@ from configuration.config_schema import ConfigSchema from configuration.subscriber_config import SubscriberConfig, MessagePathConfig from .ingester import Ingester +from .batched_ingester import BatchIngester from ros2_utils.qos import QOS_PROFILES, qos_profile_system_default from ros2_utils.topic_type_provider import TopicTypeProvider from utils.logger import get_logger @@ -30,7 +31,7 @@ def __init__( self, fclient: Client, node: Node, - ingester: Ingester, + ingester: BatchIngester, topic_type_provider: TopicTypeProvider, ): self._fclient = fclient diff --git a/formant_ros2_adapter/scripts/components/subscriber/subscriber_coordinator.py b/formant_ros2_adapter/scripts/components/subscriber/subscriber_coordinator.py index c5bea3c..ccd8d13 100644 --- a/formant_ros2_adapter/scripts/components/subscriber/subscriber_coordinator.py +++ b/formant_ros2_adapter/scripts/components/subscriber/subscriber_coordinator.py @@ -5,6 +5,7 @@ from .basic_subscriber_coodinator import BasicSubscriberCoordinator from configuration.config_schema import ConfigSchema from .ingester import Ingester +from .batched_ingester import BatchIngester from .localization_subscriber_coodinator import LocalizationSubscriberCoordinator from .numeric_set_subscriber_coodinator import NumericSetSubscriberCoordinator from ros2_utils.topic_type_provider import TopicTypeProvider @@ -17,7 +18,7 @@ def __init__( ): self._logger = get_logger() self._fclient = fclient - self._ingester = Ingester(self._fclient) + self._ingester = BatchIngester(self._fclient) self._node = node self._topic_type_provider = topic_type_provider self._basic_subscriber_coodinator = BasicSubscriberCoordinator( From 3f8f7703aa4797177fd044a10353a4d876dbce8e Mon Sep 17 00:00:00 2001 From: Niranjan Krishna Date: Mon, 7 Aug 2023 19:37:38 +0530 Subject: [PATCH 04/14] added ingest_interval variable, tests specifcally for batched_ingester TBD --- .../scripts/components/subscriber/batched_ingester.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/formant_ros2_adapter/scripts/components/subscriber/batched_ingester.py b/formant_ros2_adapter/scripts/components/subscriber/batched_ingester.py index cb5297a..b919c4c 100644 --- a/formant_ros2_adapter/scripts/components/subscriber/batched_ingester.py +++ b/formant_ros2_adapter/scripts/components/subscriber/batched_ingester.py @@ -18,10 +18,12 @@ def __init__(self, msg, msg_type: type, topic: str, msg_timestamp: int, tags: Di class BatchIngester(Ingester): - def __init__(self, _fclient: Client, num_threads: int = 2): + def __init__( + self, _fclient: Client, ingest_interval: int = 30, num_threads: int = 2 + ): super(BatchIngester, self).__init__(_fclient) self._stream_queues: Dict[str, LifoQueue[Message]] = {} - self._ingest_interval = 1 + self._ingest_interval = ingest_interval self._num_threads = num_threads self._threads: List[threading.Thread] = [] self._terminate_flag = False From 05ca2bb47f02306955528856f9edc01fb497f30a Mon Sep 17 00:00:00 2001 From: Niranjan Krishna Date: Tue, 8 Aug 2023 07:39:24 +0530 Subject: [PATCH 05/14] BaseIngester class implemented; passes tests when integrated to Ingester class --- .../components/subscriber/base_ingester.py | 119 ++++++++++++++++++ .../subscriber/basic_subscriber_coodinator.py | 2 +- .../scripts/components/subscriber/ingester.py | 41 ++---- .../subscriber/subscriber_coordinator.py | 2 +- 4 files changed, 129 insertions(+), 35 deletions(-) create mode 100644 formant_ros2_adapter/scripts/components/subscriber/base_ingester.py diff --git a/formant_ros2_adapter/scripts/components/subscriber/base_ingester.py b/formant_ros2_adapter/scripts/components/subscriber/base_ingester.py new file mode 100644 index 0000000..069b243 --- /dev/null +++ b/formant_ros2_adapter/scripts/components/subscriber/base_ingester.py @@ -0,0 +1,119 @@ +from cv_bridge import CvBridge +import cv2 +import grpc +from typing import Dict +from sensor_msgs.msg import ( + BatteryState, + CompressedImage, + Image, + Joy, + LaserScan, + NavSatFix, + PointCloud2, +) +from std_msgs.msg import ( + Bool, + Char, + String, + Float32, + Float64, + Int8, + Int16, + Int32, + Int64, + UInt8, + UInt16, + UInt32, + UInt64, +) + +from formant.sdk.agent.v1 import Client +from formant.protos.model.v1.datapoint_pb2 import Datapoint +from formant.sdk.agent.v1.localization.types import ( + PointCloud as FPointCloud, + Map as FMap, + Path as FPath, + Transform as FTransform, + Goal as FGoal, + Odometry as FOdometry, + Vector3 as FVector3, + Quaternion as FQuaternion, +) + +from utils.logger import get_logger +from ros2_utils.message_utils import ( + get_ros2_type_from_string, + message_to_json, + get_message_path_value, +) + +""" +A Handle Exceptions Class would be nice +""" + +OTHER_DATA_TYPES = [NavSatFix, BatteryState, LaserScan, PointCloud2] + + +class BaseIngester: + def __init__(self, _fclient: Client): + self._fclient = _fclient + self.cv_bridge = CvBridge() + self._logger = get_logger() + + def prepare(self, msg, msg_type: type): + + if msg_type in [str, String, Char]: + msg = self._prepare_string(msg) + elif msg_type in [Bool, bool]: + msg = self._prepare_attr_data(msg) + elif msg_type in [ + int, + float, + Float32, + Float64, + Int8, + Int16, + Int32, + Int64, + UInt8, + UInt16, + UInt32, + UInt64, + ]: + msg = self._prepare_attr_data(msg) + elif msg_type == Image: + msg = self._prepare_image(msg) + + elif msg_type == CompressedImage: + msg = self._prepare_compressed_image(msg) + + elif msg_type not in OTHER_DATA_TYPES: + # Ingest any messages without a direct mapping to a Formant type as JSON + msg = message_to_json(msg) + + return msg + + def _prepare_string(self, msg): + msg = self._prepare_attr_data(msg) + msg = str(msg) + return msg + + def _prepare_image(self, msg): + cv_image = self.cv_bridge.imgmsg_to_cv2(msg, "bgr8") + encoded_image = cv2.imencode(".jpg", cv_image)[1].tobytes() + return encoded_image + + def _prepare_compressed_image(self, msg): + if "jpg" in msg.format or "jpeg" in msg.format: + content_type = "image/jpg" + elif "png" in msg.format: + content_type = "image/png" + else: + self._logger.warn("Image format", msg.format, "not supported") + return + return {"value": bytes(msg.data), "content_type": content_type} + + def _prepare_attr_data(self, msg): + if hasattr(msg, "data"): + msg = msg.data + return msg diff --git a/formant_ros2_adapter/scripts/components/subscriber/basic_subscriber_coodinator.py b/formant_ros2_adapter/scripts/components/subscriber/basic_subscriber_coodinator.py index ba24415..3238336 100644 --- a/formant_ros2_adapter/scripts/components/subscriber/basic_subscriber_coodinator.py +++ b/formant_ros2_adapter/scripts/components/subscriber/basic_subscriber_coodinator.py @@ -31,7 +31,7 @@ def __init__( self, fclient: Client, node: Node, - ingester: BatchIngester, + ingester: Ingester, topic_type_provider: TopicTypeProvider, ): self._fclient = fclient diff --git a/formant_ros2_adapter/scripts/components/subscriber/ingester.py b/formant_ros2_adapter/scripts/components/subscriber/ingester.py index 46794bd..9de93d1 100644 --- a/formant_ros2_adapter/scripts/components/subscriber/ingester.py +++ b/formant_ros2_adapter/scripts/components/subscriber/ingester.py @@ -46,14 +46,10 @@ message_to_json, get_message_path_value, ) +from .base_ingester import BaseIngester -class Ingester: - def __init__(self, _fclient: Client): - self._fclient = _fclient - self.cv_bridge = CvBridge() - self._logger = get_logger() - +class Ingester(BaseIngester): def ingest( self, msg, @@ -63,24 +59,18 @@ def ingest( msg_timestamp: int, tags: Dict, ): - + msg = self.prepare(msg, msg_type) # Handle the message based on its type try: if msg_type in [str, String, Char]: - if hasattr(msg, "data"): - msg = msg.data - self._fclient.post_text( formant_stream, - str(msg), + msg, tags=tags, timestamp=msg_timestamp, ) elif msg_type in [Bool, bool]: - if hasattr(msg, "data"): - msg = msg.data - self._fclient.post_bitset( formant_stream, {topic: msg}, @@ -102,9 +92,6 @@ def ingest( UInt32, UInt64, ]: - if hasattr(msg, "data"): - msg = msg.data - self._fclient.post_numeric( formant_stream, msg, @@ -124,30 +111,18 @@ def ingest( ) elif msg_type == Image: - # Convert Image to a Formant image - cv_image = self.cv_bridge.imgmsg_to_cv2(msg, "bgr8") - encoded_image = cv2.imencode(".jpg", cv_image)[1].tobytes() - self._fclient.post_image( stream=formant_stream, - value=encoded_image, + value=msg, tags=tags, timestamp=msg_timestamp, ) elif msg_type == CompressedImage: - # Post the compressed image - if "jpg" in msg.format or "jpeg" in msg.format: - content_type = "image/jpg" - elif "png" in msg.format: - content_type = "image/png" - else: - self._logger.warn("Image format", msg.format, "not supported") - return self._fclient.post_image( formant_stream, - value=bytes(msg.data), - content_type=content_type, + value=msg["value"], + content_type=msg["content_type"], tags=tags, timestamp=msg_timestamp, ) @@ -204,7 +179,7 @@ def ingest( # Ingest any messages without a direct mapping to a Formant type as JSON self._fclient.post_json( formant_stream, - message_to_json(msg), + msg, tags=tags, timestamp=msg_timestamp, ) diff --git a/formant_ros2_adapter/scripts/components/subscriber/subscriber_coordinator.py b/formant_ros2_adapter/scripts/components/subscriber/subscriber_coordinator.py index ccd8d13..6212e32 100644 --- a/formant_ros2_adapter/scripts/components/subscriber/subscriber_coordinator.py +++ b/formant_ros2_adapter/scripts/components/subscriber/subscriber_coordinator.py @@ -18,7 +18,7 @@ def __init__( ): self._logger = get_logger() self._fclient = fclient - self._ingester = BatchIngester(self._fclient) + self._ingester = Ingester(self._fclient) self._node = node self._topic_type_provider = topic_type_provider self._basic_subscriber_coodinator = BasicSubscriberCoordinator( From 12e85c3d2345ef91cd3800f09998d804146eff47 Mon Sep 17 00:00:00 2001 From: Niranjan Krishna Date: Tue, 8 Aug 2023 17:04:03 +0530 Subject: [PATCH 06/14] Added types.py --- .../components/subscriber/base_ingester.py | 42 ++----------------- .../scripts/components/subscriber/types.py | 40 ++++++++++++++++++ 2 files changed, 43 insertions(+), 39 deletions(-) create mode 100644 formant_ros2_adapter/scripts/components/subscriber/types.py diff --git a/formant_ros2_adapter/scripts/components/subscriber/base_ingester.py b/formant_ros2_adapter/scripts/components/subscriber/base_ingester.py index 069b243..e9e0103 100644 --- a/formant_ros2_adapter/scripts/components/subscriber/base_ingester.py +++ b/formant_ros2_adapter/scripts/components/subscriber/base_ingester.py @@ -3,29 +3,10 @@ import grpc from typing import Dict from sensor_msgs.msg import ( - BatteryState, CompressedImage, Image, - Joy, - LaserScan, - NavSatFix, - PointCloud2, -) -from std_msgs.msg import ( - Bool, - Char, - String, - Float32, - Float64, - Int8, - Int16, - Int32, - Int64, - UInt8, - UInt16, - UInt32, - UInt64, ) +from .types import STRING_TYPES, BOOL_TYPES, NUMERIC_TYPES, OTHER_DATA_TYPES from formant.sdk.agent.v1 import Client from formant.protos.model.v1.datapoint_pb2 import Datapoint @@ -51,8 +32,6 @@ A Handle Exceptions Class would be nice """ -OTHER_DATA_TYPES = [NavSatFix, BatteryState, LaserScan, PointCloud2] - class BaseIngester: def __init__(self, _fclient: Client): @@ -62,24 +41,9 @@ def __init__(self, _fclient: Client): def prepare(self, msg, msg_type: type): - if msg_type in [str, String, Char]: + if msg_type in STRING_TYPES: msg = self._prepare_string(msg) - elif msg_type in [Bool, bool]: - msg = self._prepare_attr_data(msg) - elif msg_type in [ - int, - float, - Float32, - Float64, - Int8, - Int16, - Int32, - Int64, - UInt8, - UInt16, - UInt32, - UInt64, - ]: + elif msg_type in BOOL_TYPES or msg_type in NUMERIC_TYPES: msg = self._prepare_attr_data(msg) elif msg_type == Image: msg = self._prepare_image(msg) diff --git a/formant_ros2_adapter/scripts/components/subscriber/types.py b/formant_ros2_adapter/scripts/components/subscriber/types.py new file mode 100644 index 0000000..98b222d --- /dev/null +++ b/formant_ros2_adapter/scripts/components/subscriber/types.py @@ -0,0 +1,40 @@ +from std_msgs.msg import ( + Bool, + Char, + String, + Float32, + Float64, + Int8, + Int16, + Int32, + Int64, + UInt8, + UInt16, + UInt32, + UInt64, +) +from sensor_msgs.msg import ( + BatteryState, + LaserScan, + NavSatFix, + PointCloud2, +) + +STRING_TYPES = [str, String, Char] +BOOL_TYPES = [Bool, bool] +NUMERIC_TYPES = [ + int, + float, + Float32, + Float64, + Int8, + Int16, + Int32, + Int64, + UInt8, + UInt16, + UInt32, + UInt64, +] + +OTHER_DATA_TYPES = [NavSatFix, BatteryState, LaserScan, PointCloud2] From e29e9191fb3c3f0a515f0a5f808cb8433ad86a88 Mon Sep 17 00:00:00 2001 From: Niranjan Krishna Date: Tue, 8 Aug 2023 17:07:30 +0530 Subject: [PATCH 07/14] types.py added for ingester --- .../scripts/components/subscriber/ingester.py | 37 ++----------------- 1 file changed, 4 insertions(+), 33 deletions(-) diff --git a/formant_ros2_adapter/scripts/components/subscriber/ingester.py b/formant_ros2_adapter/scripts/components/subscriber/ingester.py index 9de93d1..7dc4885 100644 --- a/formant_ros2_adapter/scripts/components/subscriber/ingester.py +++ b/formant_ros2_adapter/scripts/components/subscriber/ingester.py @@ -6,27 +6,10 @@ BatteryState, CompressedImage, Image, - Joy, LaserScan, NavSatFix, PointCloud2, ) -from std_msgs.msg import ( - Bool, - Char, - String, - Float32, - Float64, - Int8, - Int16, - Int32, - Int64, - UInt8, - UInt16, - UInt32, - UInt64, -) - from formant.sdk.agent.v1 import Client from formant.protos.model.v1.datapoint_pb2 import Datapoint from formant.sdk.agent.v1.localization.types import ( @@ -47,6 +30,7 @@ get_message_path_value, ) from .base_ingester import BaseIngester +from .types import STRING_TYPES, BOOL_TYPES, NUMERIC_TYPES class Ingester(BaseIngester): @@ -62,7 +46,7 @@ def ingest( msg = self.prepare(msg, msg_type) # Handle the message based on its type try: - if msg_type in [str, String, Char]: + if msg_type in STRING_TYPES: self._fclient.post_text( formant_stream, msg, @@ -70,7 +54,7 @@ def ingest( timestamp=msg_timestamp, ) - elif msg_type in [Bool, bool]: + elif msg_type in BOOL_TYPES: self._fclient.post_bitset( formant_stream, {topic: msg}, @@ -78,20 +62,7 @@ def ingest( timestamp=msg_timestamp, ) - elif msg_type in [ - int, - float, - Float32, - Float64, - Int8, - Int16, - Int32, - Int64, - UInt8, - UInt16, - UInt32, - UInt64, - ]: + elif msg_type in NUMERIC_TYPES: self._fclient.post_numeric( formant_stream, msg, From b350088e60899434114ce494d0e504ad5967a778 Mon Sep 17 00:00:00 2001 From: Niranjan Krishna Date: Sun, 13 Aug 2023 11:00:02 +0530 Subject: [PATCH 08/14] Added prepare_datapoint to base_ingester + updated ingester --- .../components/subscriber/base_ingester.py | 89 ++++++++++++- .../scripts/components/subscriber/ingester.py | 119 ++---------------- 2 files changed, 94 insertions(+), 114 deletions(-) diff --git a/formant_ros2_adapter/scripts/components/subscriber/base_ingester.py b/formant_ros2_adapter/scripts/components/subscriber/base_ingester.py index e9e0103..6fcea0d 100644 --- a/formant_ros2_adapter/scripts/components/subscriber/base_ingester.py +++ b/formant_ros2_adapter/scripts/components/subscriber/base_ingester.py @@ -3,8 +3,12 @@ import grpc from typing import Dict from sensor_msgs.msg import ( + BatteryState, CompressedImage, Image, + LaserScan, + NavSatFix, + PointCloud2, ) from .types import STRING_TYPES, BOOL_TYPES, NUMERIC_TYPES, OTHER_DATA_TYPES @@ -39,7 +43,89 @@ def __init__(self, _fclient: Client): self.cv_bridge = CvBridge() self._logger = get_logger() - def prepare(self, msg, msg_type: type): + def prepare( + self, + msg, + msg_type: type, + formant_stream: str, + topic: str, + msg_timestamp: int, + tags: Dict, + ): + msg = self._preprocess(msg, msg_type) + + if msg_type in STRING_TYPES: + msg = self._fclient.prepare_text(formant_stream, msg, tags, msg_timestamp) + + elif msg_type in BOOL_TYPES: + self._fclient.prepare_bitset(formant_stream, msg, tags, msg_timestamp) + elif msg_type in NUMERIC_TYPES: + self._fclient.prepare_numeric(formant_stream, msg, tags, msg_timestamp) + + elif msg_type == NavSatFix: + + self._fclient.prepare_geolocation( + formant_stream, + msg.latitude, + msg.longitude, + altitude=msg.altitude, + tags=tags, + timestamp=msg_timestamp, + ) + + elif msg_type == Image: + self._fclient.prepare_image( + formant_stream, + value=msg, + tags=tags, + timestamp=msg_timestamp, + ) + elif msg_type == CompressedImage: + self._fclient.prepare_image( + formant_stream, + value=msg["value"], + content_type=msg["content_type"], + tags=tags, + timestamp=msg_timestamp, + ) + + elif msg_type == BatteryState: + self._fclient.prepare_battery( + formant_stream, + msg.percentage, + voltage=msg.voltage, + current=msg.current, + charge=msg.charge, + tags=tags, + timestamp=msg_timestamp, + ) + + elif msg_type == LaserScan: + msg = Datapoint( + stream=formant_stream, + point_cloud=FPointCloud.from_ros_laserscan(msg).to_proto(), + tags=tags, + timestamp=msg_timestamp, + ) + + elif msg_type == PointCloud2: + Datapoint( + stream=formant_stream, + point_cloud=FPointCloud.from_ros(msg).to_proto(), + tags=tags, + timestamp=msg_timestamp, + ) + + else: + self._fclient.prepare_json( + formant_stream, + msg, + tags=tags, + timestamp=msg_timestamp, + ) + return msg + + def _preprocess(self, msg, msg_type: type): if msg_type in STRING_TYPES: msg = self._prepare_string(msg) @@ -52,7 +138,6 @@ def prepare(self, msg, msg_type: type): msg = self._prepare_compressed_image(msg) elif msg_type not in OTHER_DATA_TYPES: - # Ingest any messages without a direct mapping to a Formant type as JSON msg = message_to_json(msg) return msg diff --git a/formant_ros2_adapter/scripts/components/subscriber/ingester.py b/formant_ros2_adapter/scripts/components/subscriber/ingester.py index 7dc4885..7df8658 100644 --- a/formant_ros2_adapter/scripts/components/subscriber/ingester.py +++ b/formant_ros2_adapter/scripts/components/subscriber/ingester.py @@ -43,117 +43,12 @@ def ingest( msg_timestamp: int, tags: Dict, ): - msg = self.prepare(msg, msg_type) - # Handle the message based on its type - try: - if msg_type in STRING_TYPES: - self._fclient.post_text( - formant_stream, - msg, - tags=tags, - timestamp=msg_timestamp, - ) - - elif msg_type in BOOL_TYPES: - self._fclient.post_bitset( - formant_stream, - {topic: msg}, - tags=tags, - timestamp=msg_timestamp, - ) - - elif msg_type in NUMERIC_TYPES: - self._fclient.post_numeric( - formant_stream, - msg, - tags=tags, - timestamp=msg_timestamp, - ) - - elif msg_type == NavSatFix: - # Convert NavSatFix to a Formant location - self._fclient.post_geolocation( - stream=formant_stream, - latitude=msg.latitude, - longitude=msg.longitude, - altitude=msg.altitude, - tags=tags, - timestamp=msg_timestamp, - ) - - elif msg_type == Image: - self._fclient.post_image( - stream=formant_stream, - value=msg, - tags=tags, - timestamp=msg_timestamp, - ) - - elif msg_type == CompressedImage: - self._fclient.post_image( - formant_stream, - value=msg["value"], - content_type=msg["content_type"], - tags=tags, - timestamp=msg_timestamp, - ) + msg = self.prepare(msg, msg_type, formant_stream, topic, msg_timestamp, tags) - elif msg_type == BatteryState: - self._fclient.post_battery( - formant_stream, - msg.percentage, - voltage=msg.voltage, - current=msg.current, - charge=msg.charge, - tags=tags, - timestamp=msg_timestamp, - ) - - elif msg_type == LaserScan: - # Convert LaserScan to a Formant pointcloud - try: - self._fclient.agent_stub.PostData( - Datapoint( - stream=formant_stream, - point_cloud=FPointCloud.from_ros_laserscan(msg).to_proto(), - tags=tags, - timestamp=msg_timestamp, - ) - ) - except grpc.RpcError as e: - return - except Exception as e: - self._logger.error( - "Could not ingest " + formant_stream + ": " + str(e) - ) - return - - elif msg_type == PointCloud2: - try: - self._fclient.agent_stub.PostData( - Datapoint( - stream=formant_stream, - point_cloud=FPointCloud.from_ros(msg).to_proto(), - tags=tags, - timestamp=msg_timestamp, - ) - ) - except grpc.RpcError as e: - return - except Exception as e: - self._logger.error( - "Could not ingest " + formant_stream + ": " + str(e) - ) - return - - else: - # Ingest any messages without a direct mapping to a Formant type as JSON - self._fclient.post_json( - formant_stream, - msg, - tags=tags, - timestamp=msg_timestamp, - ) - - except AttributeError as e: + try: + self._fclient.post_data(msg) + except grpc.RpcError as e: + return + except Exception as e: self._logger.error("Could not ingest " + formant_stream + ": " + str(e)) + return From 0bd6ab1a9115e67f15fd8571de538185a07526bd Mon Sep 17 00:00:00 2001 From: Niranjan Krishna Date: Sun, 13 Aug 2023 11:58:28 +0530 Subject: [PATCH 09/14] Batched Ingester works + time specific tests TBD --- .../subscriber/basic_subscriber_coodinator.py | 2 +- .../components/subscriber/batched_ingester.py | 46 ++++++++----------- .../subscriber/subscriber_coordinator.py | 2 +- 3 files changed, 21 insertions(+), 29 deletions(-) diff --git a/formant_ros2_adapter/scripts/components/subscriber/basic_subscriber_coodinator.py b/formant_ros2_adapter/scripts/components/subscriber/basic_subscriber_coodinator.py index 3238336..ba24415 100644 --- a/formant_ros2_adapter/scripts/components/subscriber/basic_subscriber_coodinator.py +++ b/formant_ros2_adapter/scripts/components/subscriber/basic_subscriber_coodinator.py @@ -31,7 +31,7 @@ def __init__( self, fclient: Client, node: Node, - ingester: Ingester, + ingester: BatchIngester, topic_type_provider: TopicTypeProvider, ): self._fclient = fclient diff --git a/formant_ros2_adapter/scripts/components/subscriber/batched_ingester.py b/formant_ros2_adapter/scripts/components/subscriber/batched_ingester.py index b919c4c..88849bc 100644 --- a/formant_ros2_adapter/scripts/components/subscriber/batched_ingester.py +++ b/formant_ros2_adapter/scripts/components/subscriber/batched_ingester.py @@ -1,4 +1,6 @@ -from .ingester import Ingester +from .base_ingester import BaseIngester +from formant.protos.agent.v1 import agent_pb2 +from formant.protos.model.v1 import datapoint_pb2 from formant.sdk.agent.v1 import Client from queue import LifoQueue from typing import Dict, List @@ -8,21 +10,12 @@ MAX_INGEST_SIZE = 10 -class Message: - def __init__(self, msg, msg_type: type, topic: str, msg_timestamp: int, tags: Dict): - self.msg = msg - self.msg_type = msg_type - self.topic = topic - self.msg_timestamp = msg_timestamp - self.tags = tags - - -class BatchIngester(Ingester): +class BatchIngester(BaseIngester): def __init__( self, _fclient: Client, ingest_interval: int = 30, num_threads: int = 2 ): super(BatchIngester, self).__init__(_fclient) - self._stream_queues: Dict[str, LifoQueue[Message]] = {} + self._stream_queues: Dict[str, LifoQueue] = {} self._ingest_interval = ingest_interval self._num_threads = num_threads self._threads: List[threading.Thread] = [] @@ -30,7 +23,7 @@ def __init__( self._start() - def batch_ingest( + def ingest( self, msg, msg_type: type, @@ -39,27 +32,26 @@ def batch_ingest( msg_timestamp: int, tags: Dict, ): - message = Message(msg, msg_type, topic, msg_timestamp, tags) + message = self.prepare( + msg, msg_type, formant_stream, topic, msg_timestamp, tags + ) + has_stream = formant_stream in self._stream_queues + if not has_stream: + self._stream_queues[formant_stream] = LifoQueue() + self._stream_queues[formant_stream].put(message) def _ingest_once(self): - for stream, queue in self._stream_queues.items(): - ingest_size = min(len(queue), MAX_INGEST_SIZE) - for _ in range(ingest_size): - top_message = queue.get() - self.ingest( - top_message.msg, - top_message.msg_type, - stream, - top_message.topic, - top_message.msg_timestamp, - top_message.tags, - ) + for _, queue in self._stream_queues.items(): + ingest_size = min(queue.qsize(), MAX_INGEST_SIZE) + datapoints = [queue.get() for _ in range(ingest_size)] + + self._fclient.post_data_multi(datapoints) def _ingest_continually(self): while not self._terminate_flag: - self._ingest_once(self) + self._ingest_once() time.sleep(self._ingest_interval) def _start(self): diff --git a/formant_ros2_adapter/scripts/components/subscriber/subscriber_coordinator.py b/formant_ros2_adapter/scripts/components/subscriber/subscriber_coordinator.py index 6212e32..ccd8d13 100644 --- a/formant_ros2_adapter/scripts/components/subscriber/subscriber_coordinator.py +++ b/formant_ros2_adapter/scripts/components/subscriber/subscriber_coordinator.py @@ -18,7 +18,7 @@ def __init__( ): self._logger = get_logger() self._fclient = fclient - self._ingester = Ingester(self._fclient) + self._ingester = BatchIngester(self._fclient) self._node = node self._topic_type_provider = topic_type_provider self._basic_subscriber_coodinator = BasicSubscriberCoordinator( From 2d4f122ef3fb512b3008fd84be96ae3a2d3fc431 Mon Sep 17 00:00:00 2001 From: Niranjan Krishna Date: Wed, 16 Aug 2023 14:57:27 +0530 Subject: [PATCH 10/14] Added function var names + updated ingest_interval --- .../components/subscriber/base_ingester.py | 30 +++++++++++-------- .../components/subscriber/batched_ingester.py | 2 +- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/formant_ros2_adapter/scripts/components/subscriber/base_ingester.py b/formant_ros2_adapter/scripts/components/subscriber/base_ingester.py index 6fcea0d..0b44872 100644 --- a/formant_ros2_adapter/scripts/components/subscriber/base_ingester.py +++ b/formant_ros2_adapter/scripts/components/subscriber/base_ingester.py @@ -55,19 +55,25 @@ def prepare( msg = self._preprocess(msg, msg_type) if msg_type in STRING_TYPES: - msg = self._fclient.prepare_text(formant_stream, msg, tags, msg_timestamp) + msg = self._fclient.prepare_text( + stream=formant_stream, value=msg, tags=tags, timestamp=msg_timestamp + ) elif msg_type in BOOL_TYPES: - self._fclient.prepare_bitset(formant_stream, msg, tags, msg_timestamp) + self._fclient.prepare_bitset( + stream=formant_stream, value=msg, tags=tags, timestamp=msg_timestamp + ) elif msg_type in NUMERIC_TYPES: - self._fclient.prepare_numeric(formant_stream, msg, tags, msg_timestamp) + self._fclient.prepare_numeric( + stream=formant_stream, value=msg, tags=tags, timestamp=msg_timestamp + ) elif msg_type == NavSatFix: self._fclient.prepare_geolocation( - formant_stream, - msg.latitude, - msg.longitude, + stream=formant_stream, + latitude=msg.latitude, + longitude=msg.longitude, altitude=msg.altitude, tags=tags, timestamp=msg_timestamp, @@ -75,14 +81,14 @@ def prepare( elif msg_type == Image: self._fclient.prepare_image( - formant_stream, + stream=formant_stream, value=msg, tags=tags, timestamp=msg_timestamp, ) elif msg_type == CompressedImage: self._fclient.prepare_image( - formant_stream, + stream=formant_stream, value=msg["value"], content_type=msg["content_type"], tags=tags, @@ -91,8 +97,8 @@ def prepare( elif msg_type == BatteryState: self._fclient.prepare_battery( - formant_stream, - msg.percentage, + stream=formant_stream, + percentage=msg.percentage, voltage=msg.voltage, current=msg.current, charge=msg.charge, @@ -118,8 +124,8 @@ def prepare( else: self._fclient.prepare_json( - formant_stream, - msg, + stream=formant_stream, + value=msg, tags=tags, timestamp=msg_timestamp, ) diff --git a/formant_ros2_adapter/scripts/components/subscriber/batched_ingester.py b/formant_ros2_adapter/scripts/components/subscriber/batched_ingester.py index 88849bc..24c61bd 100644 --- a/formant_ros2_adapter/scripts/components/subscriber/batched_ingester.py +++ b/formant_ros2_adapter/scripts/components/subscriber/batched_ingester.py @@ -12,7 +12,7 @@ class BatchIngester(BaseIngester): def __init__( - self, _fclient: Client, ingest_interval: int = 30, num_threads: int = 2 + self, _fclient: Client, ingest_interval: int = 0.04, num_threads: int = 2 ): super(BatchIngester, self).__init__(_fclient) self._stream_queues: Dict[str, LifoQueue] = {} From 578ae56701946de97885d1d16e5106d825b25635 Mon Sep 17 00:00:00 2001 From: Niranjan Krishna Date: Wed, 16 Aug 2023 15:02:50 +0530 Subject: [PATCH 11/14] chooses between batch_ingester and ingester based on agent functionality --- .../components/subscriber/subscriber_coordinator.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/formant_ros2_adapter/scripts/components/subscriber/subscriber_coordinator.py b/formant_ros2_adapter/scripts/components/subscriber/subscriber_coordinator.py index ccd8d13..c783d90 100644 --- a/formant_ros2_adapter/scripts/components/subscriber/subscriber_coordinator.py +++ b/formant_ros2_adapter/scripts/components/subscriber/subscriber_coordinator.py @@ -18,7 +18,7 @@ def __init__( ): self._logger = get_logger() self._fclient = fclient - self._ingester = BatchIngester(self._fclient) + self._ingester = self._choose_ingester() self._node = node self._topic_type_provider = topic_type_provider self._basic_subscriber_coodinator = BasicSubscriberCoordinator( @@ -37,3 +37,11 @@ def setup_with_config(self, config: ConfigSchema): self._localization_subscriber_coordinator.setup_with_config(config) self._numeric_set_subscriber_coodinator.setup_with_config(config) self._logger.info("Set up Subscriber Coordinator") + + def _choose_ingester(self): + has_batch_ingester = hasattr(self._fclient, "post_data_multi") and callable( + self._fclient.post_data_multi + ) + if has_batch_ingester: + return BatchIngester(self._fclient) + return Ingester(self._fclient) From 32da6657114784065a3286f2ea12f163fa0140d5 Mon Sep 17 00:00:00 2001 From: Niranjan Krishna Date: Thu, 7 Sep 2023 01:35:40 +0530 Subject: [PATCH 12/14] added test_subscribers_batched.py --- formant_ros2_adapter/scripts/README.md | 4 ++ .../scripts/tests/test_subscribers_batched.py | 43 +++++++++++++++++++ 2 files changed, 47 insertions(+) create mode 100644 formant_ros2_adapter/scripts/README.md create mode 100644 formant_ros2_adapter/scripts/tests/test_subscribers_batched.py diff --git a/formant_ros2_adapter/scripts/README.md b/formant_ros2_adapter/scripts/README.md new file mode 100644 index 0000000..49d287a --- /dev/null +++ b/formant_ros2_adapter/scripts/README.md @@ -0,0 +1,4 @@ +## Tests (Python) + +1. At the root folder (in formant_ros2_adapter/scripts) run the following command + - `python -m unittest ./tests/test_subscribers_batched.py` \ No newline at end of file diff --git a/formant_ros2_adapter/scripts/tests/test_subscribers_batched.py b/formant_ros2_adapter/scripts/tests/test_subscribers_batched.py new file mode 100644 index 0000000..346fc1b --- /dev/null +++ b/formant_ros2_adapter/scripts/tests/test_subscribers_batched.py @@ -0,0 +1,43 @@ +import unittest +from formant.sdk.agent.v1 import Client +from components.subscriber.batched_ingester import ( + BatchIngester, +) # Replace with the actual import +from queue import Empty +import time + + +class TestBatchIngester(unittest.TestCase): + def setUp(self): + self.fclient = Client() + self.ingester = BatchIngester(self.fclient, ingest_interval=1, num_threads=1) + + def test_message_ingest(self): + self.ingester.ingest("msg", str, "stream1", "topic", 12345, {}) + + # Checking if the message is added to the correct stream queue + self.assertEqual(self.ingester._stream_queues["stream1"].qsize(), 1) + + def test_queue_size_limit(self): + for i in range(15): # Adding 15 messages + self.ingester.ingest(f"msg{i}", str, "stream1", "topic", 12345, {}) + + # Ingest once + self.ingester._ingest_once() + + # Checking the remaining queue size (should be 15 - MAX_INGEST_SIZE) + self.assertEqual(self.ingester._stream_queues["stream1"].qsize(), 5) + + def test_queue_flush(self): + for i in range(5): # Adding 5 messages + self.ingester.ingest(f"msg{i}", str, "stream1", "topic", 12345, {}) + + # Wait for the ingest interval to pass (plus a small buffer) + time.sleep(1.2) + + # The queue should be empty after one ingest interval + self.assertRaises(Empty, self.ingester._stream_queues["stream1"].get_nowait) + + +if __name__ == "__main__": + unittest.main() From c5d29a649bd25caa44976befdf121f26a5b41e98 Mon Sep 17 00:00:00 2001 From: Niranjan-Formant Date: Wed, 8 Nov 2023 23:52:55 +0530 Subject: [PATCH 13/14] testing in progress --- formant_ros2_adapter/scripts/config.json | 32 ++++++++++++++++++- .../test_subscribers_batched.sh | 25 +++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) create mode 100755 tests/test_subscribers/test_subscribers_batched.sh diff --git a/formant_ros2_adapter/scripts/config.json b/formant_ros2_adapter/scripts/config.json index 9e26dfe..c5b9c3e 100644 --- a/formant_ros2_adapter/scripts/config.json +++ b/formant_ros2_adapter/scripts/config.json @@ -1 +1,31 @@ -{} \ No newline at end of file +{ + "ros2_adapter_configuration": { + "subscribers": [ + { + "ros2_topic": "/my_string", + "ros2_message_type": "std_msgs/msg/String", + "formant_stream": "my.string" + }, + { + "ros2_topic": "/my_velocity", + "ros2_message_type": "geometry_msgs/msg/Twist", + "formant_stream": "my.velocity.linear", + "ros2_message_paths": [ + { + "path": "linear" + } + ] + }, + { + "ros2_topic": "/my_velocity", + "formant_stream": "my.velocity.angular", + "ros2_message_type": "geometry_msgs/msg/Twist", + "ros2_message_paths": [ + { + "path": "angular" + } + ] + } + ] + } +} \ No newline at end of file diff --git a/tests/test_subscribers/test_subscribers_batched.sh b/tests/test_subscribers/test_subscribers_batched.sh new file mode 100755 index 0000000..57eb7b4 --- /dev/null +++ b/tests/test_subscribers/test_subscribers_batched.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +DEVICE="nicolas-container" +TOPIC="/my_string" +STREAM="my.string" + +source /opt/ros/*/setup.bash + +# How to get date formant required for fctl +start_time=$(date -u +"%Y-%m-%dT%H:%M:%S") + +# Publish multiple messages in quick succession to trigger batching +number_of_messages=20 +for i in $(seq 1 $number_of_messages); do + ros2 topic pub -t 2 -w 0 $TOPIC std_msgs/msg/String "data: {key: value_$i}" & + sleep 0.01 # Sleep for 10ms between messages +done + +wait # Wait for all background jobs to finish + +end_time=$(date -u +"%Y-%m-%dT%H:%M:%S") + +# Give some time for the BatchIngester to process and send the data +# This time should be greater than the ingest_interval in BatchIngester +sleep 1 \ No newline at end of file From 25f198d4220efd61ac22e55ef272fdfd296b0f8f Mon Sep 17 00:00:00 2001 From: Niranjan Krishna Date: Thu, 9 Nov 2023 03:08:31 +0530 Subject: [PATCH 14/14] adding msg = --- .../components/subscriber/base_ingester.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/formant_ros2_adapter/scripts/components/subscriber/base_ingester.py b/formant_ros2_adapter/scripts/components/subscriber/base_ingester.py index 0b44872..35c15fe 100644 --- a/formant_ros2_adapter/scripts/components/subscriber/base_ingester.py +++ b/formant_ros2_adapter/scripts/components/subscriber/base_ingester.py @@ -60,17 +60,17 @@ def prepare( ) elif msg_type in BOOL_TYPES: - self._fclient.prepare_bitset( + msg = self._fclient.prepare_bitset( stream=formant_stream, value=msg, tags=tags, timestamp=msg_timestamp ) elif msg_type in NUMERIC_TYPES: - self._fclient.prepare_numeric( + msg = self._fclient.prepare_numeric( stream=formant_stream, value=msg, tags=tags, timestamp=msg_timestamp ) elif msg_type == NavSatFix: - self._fclient.prepare_geolocation( + msg = self._fclient.prepare_geolocation( stream=formant_stream, latitude=msg.latitude, longitude=msg.longitude, @@ -80,14 +80,14 @@ def prepare( ) elif msg_type == Image: - self._fclient.prepare_image( + msg = self._fclient.prepare_image( stream=formant_stream, value=msg, tags=tags, timestamp=msg_timestamp, ) elif msg_type == CompressedImage: - self._fclient.prepare_image( + msg = self._fclient.prepare_image( stream=formant_stream, value=msg["value"], content_type=msg["content_type"], @@ -96,7 +96,7 @@ def prepare( ) elif msg_type == BatteryState: - self._fclient.prepare_battery( + msg = self._fclient.prepare_battery( stream=formant_stream, percentage=msg.percentage, voltage=msg.voltage, @@ -115,7 +115,7 @@ def prepare( ) elif msg_type == PointCloud2: - Datapoint( + msg = Datapoint( stream=formant_stream, point_cloud=FPointCloud.from_ros(msg).to_proto(), tags=tags, @@ -123,7 +123,7 @@ def prepare( ) else: - self._fclient.prepare_json( + msg = self._fclient.prepare_json( stream=formant_stream, value=msg, tags=tags,