@@ -11,30 +11,121 @@ def __init__(self):
1111 super ().__init__ ('camera_decoder' )
1212 self .bridge = CvBridge ()
1313
14- self .udp_ports = {
15- 'wrist' : 5000 ,
16- 'arm_rgb' : 5002 ,
17- 'arm_ir' : 5003 ,
18- 'front_rgb' : 5004 ,
19- 'ptz' : 5005
14+ self .cameras = {
15+ 'wrist' : {
16+ 'port' : 5000 ,
17+ 'image_topic' : '/camera/wrist/image_raw'
18+ },
19+ 'arm_rgb' : {
20+ 'port' : 5002 ,
21+ 'image_topic' : '/camera/arm/rgb/image_raw'
22+ },
23+ 'arm_ir' : {
24+ 'port' : 5003 ,
25+ 'image_topic' : '/camera/arm/ir/image_raw'
26+ },
27+ 'front_rgb' : {
28+ 'port' : 5004 ,
29+ 'image_topic' : '/camera/front/rgb/image_raw'
30+ },
31+ 'ptz' : {
32+ 'port' : 5005 ,
33+ 'image_topic' : '/camera/ptz/image_raw'
34+ }
2035 }
2136
22- for name , port in self .udp_ports .items ():
23- threading .Thread (target = self .decode_loop , args = (name , port ), daemon = True ).start ()
37+ self .captures = {}
38+ self .timers = {}
39+ self .publishers = {}
2440
25- def decode_loop (self , name , port ):
26- cap = cv2 .VideoCapture (f'udp://127.0.0.1:{ port } ' , cv2 .CAP_FFMPEG )
27- pub = self .create_publisher (Image , f'/camera/{ name } /image_raw' , 10 )
41+ for name , config in self .cameras .items ():
42+ self .publishers [name ] = self .create_publisher (
43+ Image ,
44+ config ['image_topic' ],
45+ 10
46+ )
2847
29- while rclpy .ok ():
30- ret , frame = cap .read ()
31- if ret :
32- msg = self .bridge .cv2_to_imgmsg (frame , encoding = 'bgr8' )
33- pub .publish (msg )
48+ for name in self .cameras .keys ():
49+ self .toggle_decoder (name , True )
3450
35- def main ():
36- rclpy .init ()
51+ def toggle_decoder (self , name , active ):
52+ if active :
53+ if name not in self .captures :
54+ config = self .cameras [name ]
55+ udp_url = f'udp://127.0.0.1:{ config ["port" ]} '
56+ self .get_logger ().info (f"Starting decoder for { name } from { udp_url } " )
57+
58+
59+ cap = cv2 .VideoCapture (udp_url , cv2 .CAP_FFMPEG )
60+ cap .set (cv2 .CAP_PROP_BUFFERSIZE , 1 )
61+
62+ if not cap .isOpened ():
63+ self .get_logger ().error (f"Cannot open UDP stream for { name } " )
64+ return
65+
66+ self .captures [name ] = cap
67+
68+ # Create timer to decode frames at 30 FPS
69+ self .timers [name ] = self .create_timer (
70+ 0.033 , # ~30 FPS
71+ lambda n = name : self .decode_and_publish (n )
72+ )
73+
74+ self .get_logger ().info (f"Successfully started decoder for { name } " )
75+
76+
77+ else :
78+ self .get_logger ().warn (f"Decoder for { name } is already running" )
79+ else :
80+ if name in self .captures :
81+ self .get_logger ().info (f"Stopping decoder for { name } " )
82+
83+ if name in self .timers :
84+ self .timers [name ].cancel ()
85+ del self .timers [name ]
86+
87+ self .captures [name ].release ()
88+ del self .captures [name ]
89+
90+ self .get_logger ().info (f"Successfully stopped decoder for { name } " )
91+ else :
92+ self .get_logger ().warn (f"Decoder for { name } is not running" )
93+
94+ def decode_and_publish (self , name ):
95+ if name not in self .captures :
96+ return
97+
98+ cap = self .captures [name ]
99+ ret , frame = cap .read ()
100+
101+ if not ret :
102+ return
103+
104+ try :
105+ # Convert to ROS2 Image message
106+ img_msg = self .bridge .cv2_to_imgmsg (frame , encoding = 'bgr8' )
107+ img_msg .header .stamp = self .get_clock ().now ().to_msg ()
108+ img_msg .header .frame_id = name
109+
110+ # Publish
111+ self .publishers [name ].publish (img_msg )
112+
113+ except Exception as e :
114+ self .get_logger ().error (f"Failed to publish frame from { name } : { e } " )
115+
116+
117+ def main (args = None ):
118+ rclpy .init (args = args )
37119 node = CameraDecoder ()
38- rclpy .spin (node )
120+
121+ try :
122+ rclpy .spin (node )
123+ except KeyboardInterrupt :
124+ pass
125+
39126 node .destroy_node ()
40127 rclpy .shutdown ()
128+
129+
130+ if __name__ == '__main__' :
131+ main ()
0 commit comments