@@ -41,6 +41,7 @@ def __init__(
4141 debug : bool = False ,
4242 num_threads : int = DEFAULT_THREAD_POOL_SIZE ,
4343 shm_channels : Optional [List [str ]] = None ,
44+ jpeg_shm_channels : Optional [List [str ]] = None ,
4445 ):
4546 self .host = host
4647 self .port = port
@@ -57,8 +58,10 @@ def __init__(
5758 self .num_threads = num_threads
5859
5960 self .shm_channels = shm_channels or []
61+ self .jpeg_shm_channels = jpeg_shm_channels or []
6062 self .shm_subscribers : Dict [str , Callable [[], None ]] = {} # topic -> unsubscribe function
6163 self .shm_thread : Optional [threading .Thread ] = None
64+ self .jpeg_shm_thread : Optional [threading .Thread ] = None
6265
6366 # For cross-thread communication
6467 self .topic_queue : asyncio .Queue = asyncio .Queue ()
@@ -104,6 +107,12 @@ async def run(self):
104107 self .shm_thread .daemon = True
105108 self .shm_thread .start ()
106109
110+ # Start JPEG SHM handling thread if we have JPEG SHM topics
111+ if self .jpeg_shm_channels :
112+ self .jpeg_shm_thread = threading .Thread (target = self ._jpeg_shm_thread_func )
113+ self .jpeg_shm_thread .daemon = True
114+ self .jpeg_shm_thread .start ()
115+
107116 # Create and start Foxglove WebSocket server as a context manager
108117 async with FoxgloveServer (
109118 host = self .host ,
@@ -282,22 +291,42 @@ def _shm_thread_func(self):
282291 """Thread for handling SharedMemory topics"""
283292 try :
284293 from dimos .protocol .pubsub .shmpubsub import PickleSharedMemory
285-
294+
286295 logger .info (f"Starting SHM thread for topics: { self .shm_channels } " )
287-
296+
288297 shm = PickleSharedMemory ()
289298 shm .start ()
290-
299+
291300 for channel in self .shm_channels :
292- self ._subscribe_to_shm_channel (shm , channel )
293-
301+ self ._subscribe_to_shm_channel (shm , channel , is_jpeg = False )
302+
294303 while self .running :
295304 time .sleep (0.1 )
296-
305+
297306 except Exception as e :
298307 logger .error (f"Error in SHM thread: { e } " )
299308 traceback .print_exc ()
300309
310+ def _jpeg_shm_thread_func (self ):
311+ """Thread for handling JPEG SharedMemory topics"""
312+ try :
313+ from dimos .protocol .pubsub .jpeg_shm import JpegSharedMemory
314+
315+ logger .info (f"Starting JPEG SHM thread for topics: { self .jpeg_shm_channels } " )
316+
317+ shm = JpegSharedMemory ()
318+ shm .start ()
319+
320+ for channel in self .jpeg_shm_channels :
321+ self ._subscribe_to_shm_channel (shm , channel , is_jpeg = True )
322+
323+ while self .running :
324+ time .sleep (0.1 )
325+
326+ except Exception as e :
327+ logger .error (f"Error in JPEG SHM thread: { e } " )
328+ traceback .print_exc ()
329+
301330 def _add_topic (self , topic_name : str ):
302331 try :
303332 logger .info (f"Discovered topic: { topic_name } " )
@@ -368,18 +397,18 @@ def _on_topic_discovered(self, topic_name: str):
368397 logger .error (f"Error processing topic { topic_name } : { e } " )
369398 traceback .print_exc ()
370399
371- def _subscribe_to_shm_channel (self , shm , channel : str ):
400+ def _subscribe_to_shm_channel (self , shm , channel : str , is_jpeg : bool = False ):
372401 try :
373402 self ._add_topic (channel )
374403 base_topic = channel .split ("#" , 1 )[0 ]
375404 self .shm_topic_to_topic [base_topic ] = channel
376405
377406 def shm_callback (msg , topic_name ):
378- self ._on_shm_message (topic_name , msg )
379-
407+ self ._on_shm_message (topic_name , msg , is_jpeg = is_jpeg )
408+
380409 unsub = shm .subscribe (base_topic , shm_callback )
381410 self .shm_subscribers [base_topic ] = unsub
382-
411+
383412 except Exception as e :
384413 logger .error (f"Error subscribing to SHM topic { channel } : { e } " )
385414 traceback .print_exc ()
@@ -436,7 +465,7 @@ def _on_lcm_message(self, channel: str, data: bytes):
436465 except Exception as e :
437466 logger .error (f"Error queuing message on { channel } : { e } " )
438467
439- def _on_shm_message (self , topic : str , msg ):
468+ def _on_shm_message (self , topic : str , msg , is_jpeg : bool = False ):
440469 topic_info = self .topics .get (self .shm_topic_to_topic .get (topic , '' ))
441470
442471 if not topic_info :
@@ -448,11 +477,23 @@ def _on_shm_message(self, topic: str, msg):
448477 if topic_info .channel_id is None :
449478 return
450479
451- # Use the existing message converter - SHM already gives us the typed object
452- # This takes an average of 0.005062 seconds
453- msg = topic_info .lcm_class .lcm_decode (msg .lcm_encode ())
454- # This takes an average of 0.013835 seconds
455- msg_dict = self .message_processor .converter .convert_message (topic_info , msg )
480+ if is_jpeg :
481+ # For JPEG SHM, msg is already a sensor_msgs.Image object
482+ # We need to convert it back to LCM format, then use the standard converter
483+ if topic_info .lcm_class :
484+ # Convert Image object to LCM bytes and back
485+ lcm_msg = topic_info .lcm_class .lcm_decode (msg .lcm_encode ())
486+ msg_dict = self .message_processor .converter .convert_message (topic_info , lcm_msg )
487+ else :
488+ logger .error (f"No LCM class available for JPEG SHM topic { topic } " )
489+ return
490+ else :
491+ # Use the existing message converter - SHM already gives us the typed object
492+ # This takes an average of 0.005062 seconds
493+ msg = topic_info .lcm_class .lcm_decode (msg .lcm_encode ())
494+ # This takes an average of 0.013835 seconds
495+ msg_dict = self .message_processor .converter .convert_message (topic_info , msg )
496+
456497 if not msg_dict :
457498 logger .error (f"Failed to convert SHM message for { topic } " )
458499 return
0 commit comments