1414import socket
1515import struct
1616import time
17+ import threading
1718import errno
1819
1920log = logging .getLogger (__name__ )
@@ -162,7 +163,7 @@ def build_can_frame(msg: Message) -> bytes:
162163 __u8 data[CANFD_MAX_DLEN] __attribute__((aligned(8)));
163164 };
164165 """
165- can_id = _add_flags_to_can_id (msg )
166+ can_id = _compose_arbitration_id (msg )
166167 flags = 0
167168 if msg .bitrate_switch :
168169 flags |= CANFD_BRS
@@ -286,7 +287,7 @@ def send_bcm(bcm_socket: socket.socket, data: bytes) -> int:
286287 raise e
287288
288289
289- def _add_flags_to_can_id (message : Message ) -> int :
290+ def _compose_arbitration_id (message : Message ) -> int :
290291 can_id = message .arbitration_id
291292 if message .is_extended_id :
292293 log .debug ("sending an extended id type message" )
@@ -297,7 +298,6 @@ def _add_flags_to_can_id(message: Message) -> int:
297298 if message .is_error_frame :
298299 log .debug ("sending error frame" )
299300 can_id |= CAN_ERR_FLAG
300-
301301 return can_id
302302
303303
@@ -310,18 +310,22 @@ class CyclicSendTask(
310310 - setting of a task duration
311311 - modifying the data
312312 - stopping then subsequent restarting of the task
313-
314313 """
315314
316315 def __init__ (
317316 self ,
318317 bcm_socket : socket .socket ,
318+ task_id : int ,
319319 messages : Union [Sequence [Message ], Message ],
320320 period : float ,
321321 duration : Optional [float ] = None ,
322322 ):
323- """
323+ """Construct and :meth:`~start` a task.
324+
324325 :param bcm_socket: An open BCM socket on the desired CAN channel.
326+ :param task_id:
327+ The identifier used to uniquely reference particular cyclic send task
328+ within Linux BCM.
325329 :param messages:
326330 The messages to be sent periodically.
327331 :param period:
@@ -336,12 +340,12 @@ def __init__(
336340 super ().__init__ (messages , period , duration )
337341
338342 self .bcm_socket = bcm_socket
343+ self .task_id = task_id
339344 self ._tx_setup (self .messages )
340345
341346 def _tx_setup (self , messages : Sequence [Message ]) -> None :
342347 # Create a low level packed frame to pass to the kernel
343348 body = bytearray ()
344- self .can_id_with_flags = _add_flags_to_can_id (messages [0 ])
345349 self .flags = CAN_FD_FRAME if messages [0 ].is_fd else 0
346350
347351 if self .duration :
@@ -353,9 +357,19 @@ def _tx_setup(self, messages: Sequence[Message]) -> None:
353357 ival1 = 0.0
354358 ival2 = self .period
355359
356- # First do a TX_READ before creating a new task, and check if we get
357- # EINVAL. If so, then we are referring to a CAN message with the same
358- # ID
360+ self ._check_bcm_task ()
361+
362+ header = build_bcm_transmit_header (
363+ self .task_id , count , ival1 , ival2 , self .flags , nframes = len (messages )
364+ )
365+ for message in messages :
366+ body += build_can_frame (message )
367+ log .debug ("Sending BCM command" )
368+ send_bcm (self .bcm_socket , header + body )
369+
370+ def _check_bcm_task (self ):
371+ # Do a TX_READ on a task ID, and check if we get EINVAL. If so,
372+ # then we are referring to a CAN message with the existing ID
359373 check_header = build_bcm_header (
360374 opcode = CAN_BCM_TX_READ ,
361375 flags = 0 ,
@@ -364,7 +378,7 @@ def _tx_setup(self, messages: Sequence[Message]) -> None:
364378 ival1_usec = 0 ,
365379 ival2_seconds = 0 ,
366380 ival2_usec = 0 ,
367- can_id = self .can_id_with_flags ,
381+ can_id = self .task_id ,
368382 nframes = 0 ,
369383 )
370384 try :
@@ -374,45 +388,33 @@ def _tx_setup(self, messages: Sequence[Message]) -> None:
374388 raise e
375389 else :
376390 raise ValueError (
377- "A periodic Task for Arbitration ID {} has already been created " .format (
378- messages [ 0 ]. arbitration_id
391+ "A periodic task for Task ID {} is already in progress by SocketCAN Linux layer " .format (
392+ self . task_id
379393 )
380394 )
381395
382- header = build_bcm_transmit_header (
383- self .can_id_with_flags ,
384- count ,
385- ival1 ,
386- ival2 ,
387- self .flags ,
388- nframes = len (messages ),
389- )
390- for message in messages :
391- body += build_can_frame (message )
392- log .debug ("Sending BCM command" )
393- send_bcm (self .bcm_socket , header + body )
394-
395396 def stop (self ) -> None :
396- """Send a TX_DELETE message to cancel this task .
397+ """Stop a task by sending TX_DELETE message to Linux kernel .
397398
398399 This will delete the entry for the transmission of the CAN-message
399- with the specified can_id CAN identifier. The message length for the command
400- TX_DELETE is {[bcm_msg_head]} (only the header).
400+ with the specified :attr:`~task_id` identifier. The message length
401+ for the command TX_DELETE is {[bcm_msg_head]} (only the header).
401402 """
402403 log .debug ("Stopping periodic task" )
403404
404- stopframe = build_bcm_tx_delete_header (self .can_id_with_flags , self .flags )
405+ stopframe = build_bcm_tx_delete_header (self .task_id , self .flags )
405406 send_bcm (self .bcm_socket , stopframe )
406407
407408 def modify_data (self , messages : Union [Sequence [Message ], Message ]) -> None :
408- """Update the contents of the periodically sent messages.
409-
410- Note: The messages must all have the same
411- :attr:`~can.Message.arbitration_id` like the first message.
409+ """Update the contents of the periodically sent CAN messages by
410+ sending TX_SETUP message to Linux kernel.
412411
413- Note: The number of new cyclic messages to be sent must be equal to the
412+ The number of new cyclic messages to be sent must be equal to the
414413 original number of messages originally specified for this task.
415414
415+ .. note:: The messages must all have the same
416+ :attr:`~can.Message.arbitration_id` like the first message.
417+
416418 :param messages:
417419 The messages with the new :attr:`can.Message.data`.
418420 """
@@ -423,14 +425,22 @@ def modify_data(self, messages: Union[Sequence[Message], Message]) -> None:
423425
424426 body = bytearray ()
425427 header = build_bcm_update_header (
426- can_id = self .can_id_with_flags , msg_flags = self .flags , nframes = len (messages )
428+ can_id = self .task_id , msg_flags = self .flags , nframes = len (messages )
427429 )
428430 for message in messages :
429431 body += build_can_frame (message )
430432 log .debug ("Sending BCM command" )
431433 send_bcm (self .bcm_socket , header + body )
432434
433435 def start (self ) -> None :
436+ """Start a periodic task by sending TX_SETUP message to Linux kernel.
437+
438+ It verifies presence of the particular BCM task through sending TX_READ
439+ message to Linux kernel prior to scheduling.
440+
441+ :raises ValueError:
442+ If the task referenced by :attr:`~task_id` is already running.
443+ """
434444 self ._tx_setup (self .messages )
435445
436446
@@ -443,16 +453,17 @@ class MultiRateCyclicSendTask(CyclicSendTask):
443453 def __init__ (
444454 self ,
445455 channel : socket .socket ,
456+ task_id : int ,
446457 messages : Sequence [Message ],
447458 count : int ,
448459 initial_period : float ,
449460 subsequent_period : float ,
450461 ):
451- super ().__init__ (channel , messages , subsequent_period )
462+ super ().__init__ (channel , task_id , messages , subsequent_period )
452463
453464 # Create a low level packed frame to pass to the kernel
454465 header = build_bcm_transmit_header (
455- self .can_id_with_flags ,
466+ self .task_id ,
456467 count ,
457468 initial_period ,
458469 subsequent_period ,
@@ -509,10 +520,10 @@ def capture_message(
509520 # Fetching the Arb ID, DLC and Data
510521 try :
511522 if get_channel :
512- cf , addr = sock .recvfrom (CANFD_MTU )
523+ cf , _ , msg_flags , addr = sock .recvmsg (CANFD_MTU )
513524 channel = addr [0 ] if isinstance (addr , tuple ) else addr
514525 else :
515- cf = sock .recv (CANFD_MTU )
526+ cf , _ , msg_flags , _ = sock .recvmsg (CANFD_MTU )
516527 channel = None
517528 except socket .error as exc :
518529 raise can .CanError ("Error receiving: %s" % exc )
@@ -539,6 +550,9 @@ def capture_message(
539550 bitrate_switch = bool (flags & CANFD_BRS )
540551 error_state_indicator = bool (flags & CANFD_ESI )
541552
553+ # Section 4.7.1: MSG_DONTROUTE: set when the received frame was created on the local host.
554+ is_rx = not bool (msg_flags & socket .MSG_DONTROUTE )
555+
542556 if is_extended_frame_format :
543557 # log.debug("CAN: Extended")
544558 # TODO does this depend on SFF or EFF?
@@ -555,6 +569,7 @@ def capture_message(
555569 is_remote_frame = is_remote_transmission_request ,
556570 is_error_frame = is_error_frame ,
557571 is_fd = is_fd ,
572+ is_rx = is_rx ,
558573 bitrate_switch = bitrate_switch ,
559574 error_state_indicator = error_state_indicator ,
560575 dlc = can_dlc ,
@@ -567,8 +582,10 @@ def capture_message(
567582
568583
569584class SocketcanBus (BusABC ):
570- """
571- Implements :meth:`can.BusABC._detect_available_configs`.
585+ """ A SocketCAN interface to CAN.
586+
587+ It implements :meth:`can.BusABC._detect_available_configs` to search for
588+ available interfaces.
572589 """
573590
574591 def __init__ (
@@ -604,6 +621,8 @@ def __init__(
604621 self .channel_info = "socketcan channel '%s'" % channel
605622 self ._bcm_sockets : Dict [str , socket .socket ] = {}
606623 self ._is_filtered = False
624+ self ._task_id = 0
625+ self ._task_id_guard = threading .Lock ()
607626
608627 # set the receive_own_messages parameter
609628 try :
@@ -718,18 +737,26 @@ def _send_periodic_internal(
718737 ) -> CyclicSendTask :
719738 """Start sending messages at a given period on this bus.
720739
721- The kernel's Broadcast Manager SocketCAN API will be used.
740+ The Linux kernel's Broadcast Manager SocketCAN API is used to schedule
741+ periodic sending of CAN messages. The wrapping 32-bit counter (see
742+ :meth:`~_get_next_task_id()`) designated to distinguish different
743+ :class:`CyclicSendTask` within BCM provides flexibility to schedule
744+ CAN messages sending with the same CAN ID, but different CAN data.
722745
723746 :param messages:
724- The messages to be sent periodically
747+ The message(s) to be sent periodically.
725748 :param period:
726749 The rate in seconds at which to send the messages.
727750 :param duration:
728751 Approximate duration in seconds to continue sending messages. If
729752 no duration is provided, the task will continue indefinitely.
730753
754+ :raises ValueError:
755+ If task identifier passed to :class:`CyclicSendTask` can't be used
756+ to schedule new task in Linux BCM.
757+
731758 :return:
732- A started task instance. This can be used to modify the data,
759+ A :class:`CyclicSendTask` task instance. This can be used to modify the data,
733760 pause/resume the transmission and to stop the transmission.
734761
735762 .. note::
@@ -738,18 +765,20 @@ def _send_periodic_internal(
738765 be exactly the same as the duration specified by the user. In
739766 general the message will be sent at the given rate until at
740767 least *duration* seconds.
741-
742768 """
743769 msgs = LimitedDurationCyclicSendTaskABC ._check_and_convert_messages (msgs )
744770
745771 msgs_channel = str (msgs [0 ].channel ) if msgs [0 ].channel else None
746772 bcm_socket = self ._get_bcm_socket (msgs_channel or self .channel )
747- # TODO: The SocketCAN BCM interface treats all cyclic tasks sharing an
748- # Arbitration ID as the same Cyclic group. We should probably warn the
749- # user instead of overwriting the old group?
750- task = CyclicSendTask (bcm_socket , msgs , period , duration )
773+ task_id = self ._get_next_task_id ()
774+ task = CyclicSendTask (bcm_socket , task_id , msgs , period , duration )
751775 return task
752776
777+ def _get_next_task_id (self ) -> int :
778+ with self ._task_id_guard :
779+ self ._task_id = (self ._task_id + 1 ) % (2 ** 32 - 1 )
780+ return self ._task_id
781+
753782 def _get_bcm_socket (self , channel : str ) -> socket .socket :
754783 if channel not in self ._bcm_sockets :
755784 self ._bcm_sockets [channel ] = create_bcm_socket (self .channel )
0 commit comments