@@ -59,6 +59,7 @@ void free_parsers(struct parse_worker *parsers, struct listener_thread_input *in
5959 free (messages [i ].msg_hdr .msg_iov -> iov_base );
6060 free (messages [i ].msg_hdr .msg_iov );
6161 free (messages [i ].msg_hdr .msg_name );
62+ free (messages [i ].msg_hdr .msg_control );
6263 }
6364 free (messages );
6465}
@@ -185,6 +186,45 @@ int create_monitoring_thread(struct monitoring_worker *monitoring, unyte_udp_que
185186 return 0 ;
186187}
187188
189+ struct sockaddr_storage * get_dest_addr (struct msghdr * mh , unyte_udp_sock_t * sock )
190+ {
191+ struct sockaddr_storage * addr = (struct sockaddr_storage * )malloc (sizeof (struct sockaddr_storage ));
192+
193+ if (addr == NULL )
194+ {
195+ printf ("Malloc failed\n" );
196+ return NULL ;
197+ }
198+
199+ memset (addr , 0 , sizeof (struct sockaddr_storage ));
200+ struct in_pktinfo * in_pktinfo ;
201+ struct in6_pktinfo * in6_pktinfo ;
202+
203+ for ( // iterate through all the control headers
204+ struct cmsghdr * cmsg = CMSG_FIRSTHDR (mh );
205+ cmsg != NULL ;
206+ cmsg = CMSG_NXTHDR (mh , cmsg ))
207+ {
208+ if (cmsg -> cmsg_level == IPPROTO_IP && cmsg -> cmsg_type == IP_PKTINFO )
209+ {
210+ in_pktinfo = (struct in_pktinfo * )CMSG_DATA (cmsg );
211+ ((struct sockaddr_in * )addr )-> sin_family = AF_INET ;
212+ ((struct sockaddr_in * )addr )-> sin_addr = in_pktinfo -> ipi_addr ;
213+ ((struct sockaddr_in * )addr )-> sin_port = ((struct sockaddr_in * )sock -> addr )-> sin_port ;
214+ break ;
215+ }
216+ if (cmsg -> cmsg_level == IPPROTO_IPV6 && cmsg -> cmsg_type == IPV6_PKTINFO )
217+ {
218+ in6_pktinfo = (struct in6_pktinfo * )CMSG_DATA (cmsg );
219+ ((struct sockaddr_in6 * )addr )-> sin6_family = AF_INET6 ;
220+ ((struct sockaddr_in6 * )addr )-> sin6_addr = in6_pktinfo -> ipi6_addr ;
221+ ((struct sockaddr_in6 * )addr )-> sin6_port = ((struct sockaddr_in6 * )sock -> addr )-> sin6_port ;
222+ break ;
223+ }
224+ }
225+ return addr ;
226+ }
227+
188228/**
189229 * Udp listener worker on PORT port.
190230 */
@@ -234,11 +274,14 @@ int listener(struct listener_thread_input *in)
234274 printf ("Malloc failed \n" );
235275 return -1 ;
236276 }
277+ int cmbuf_len = 1024 ;
237278 // Init msg_iov first to reduce mallocs every iteration of while
238279 for (uint16_t i = 0 ; i < in -> recvmmsg_vlen ; i ++ )
239280 {
240281 messages [i ].msg_hdr .msg_iov = (struct iovec * )malloc (sizeof (struct iovec ));
241282 messages [i ].msg_hdr .msg_iovlen = 1 ;
283+ messages [i ].msg_hdr .msg_control = (char * )malloc (cmbuf_len );
284+ messages [i ].msg_hdr .msg_controllen = cmbuf_len ;
242285 }
243286 // FIXME: malloc failed
244287 // TODO: malloc array of msg_hdr.msg_name and assign addresss to every messages[i]
@@ -251,12 +294,10 @@ int listener(struct listener_thread_input *in)
251294 {
252295 messages [i ].msg_hdr .msg_iov -> iov_base = (char * )malloc (UDP_SIZE * sizeof (char ));
253296 messages [i ].msg_hdr .msg_iov -> iov_len = UDP_SIZE ;
254- messages [i ].msg_hdr .msg_control = 0 ;
255- messages [i ].msg_hdr .msg_controllen = 0 ;
256- messages [i ].msg_hdr .msg_name = (struct sockaddr_in * )malloc (sizeof (struct sockaddr_in ));
257- messages [i ].msg_hdr .msg_namelen = sizeof (struct sockaddr_in );
297+ memset (messages [i ].msg_hdr .msg_control , 0 , cmbuf_len );
298+ messages [i ].msg_hdr .msg_name = (struct sockaddr_storage * )malloc (sizeof (struct sockaddr_storage ));
299+ messages [i ].msg_hdr .msg_namelen = sizeof (struct sockaddr_storage );
258300 }
259-
260301 int read_count = recvmmsg (* in -> conn -> sockfd , messages , in -> recvmmsg_vlen , 0 , NULL );
261302 if (read_count == -1 )
262303 {
@@ -273,7 +314,12 @@ int listener(struct listener_thread_input *in)
273314 // If msg_len == 0 -> message has 0 bytes -> we discard message and free the buffer
274315 if (messages [i ].msg_len > 0 )
275316 {
276- unyte_min_t * seg = minimal_parse (messages [i ].msg_hdr .msg_iov -> iov_base , ((struct sockaddr_in * )messages [i ].msg_hdr .msg_name ), in -> conn -> addr );
317+ struct sockaddr_storage * dest_addr = get_dest_addr (& (messages [i ].msg_hdr ), in -> conn );
318+
319+ if (dest_addr == NULL )
320+ return -1 ;
321+
322+ unyte_min_t * seg = minimal_parse (messages [i ].msg_hdr .msg_iov -> iov_base , ((struct sockaddr_storage * )messages [i ].msg_hdr .msg_name ), dest_addr );
277323 if (seg == NULL )
278324 {
279325 printf ("minimal_parse error\n" );
@@ -303,11 +349,6 @@ int listener(struct listener_thread_input *in)
303349 free (messages [i ].msg_hdr .msg_iov -> iov_base );
304350 }
305351 }
306-
307- for (uint16_t i = 0 ; i < in -> recvmmsg_vlen ; i ++ )
308- {
309- free (messages [i ].msg_hdr .msg_name );
310- }
311352 }
312353
313354 // Never called cause while(1)
0 commit comments