Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion canopen/timestamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def transmit(self, timestamp: Optional[float] = None):
:param float timestamp:
Optional Unix timestamp to use, otherwise the current time is used.
"""
delta = timestamp or time.time() - OFFSET
delta = (timestamp or time.time()) - OFFSET
days, seconds = divmod(delta, ONE_DAY)
data = TIME_OF_DAY_STRUCT.pack(int(seconds * 1000), int(days))
self.network.send_message(self.cob_id, data)
40 changes: 35 additions & 5 deletions test/test_time.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import time
import unittest
from datetime import datetime
from unittest.mock import patch

import canopen
import canopen.timestamp


class TestTime(unittest.TestCase):
Expand All @@ -10,12 +14,38 @@ def test_time_producer(self):
network.NOTIFIER_SHUTDOWN_TIMEOUT = 0.0
network.connect(interface="virtual", receive_own_messages=True)
producer = canopen.timestamp.TimeProducer(network)
producer.transmit(1486236238)
msg = network.bus.recv(1)

# Test that the epoch is correct
epoch = datetime.strptime(
"1984-01-01 00:00:00 +0000", "%Y-%m-%d %H:%M:%S %z"
).timestamp()
self.assertEqual(int(epoch), canopen.timestamp.OFFSET)

current = time.time()
with patch("canopen.timestamp.time.time", return_value=current):
current_in_epoch = current - epoch

# Test it looking up the current time
producer.transmit()
msg = network.bus.recv(1)
self.assertEqual(msg.arbitration_id, 0x100)
self.assertEqual(msg.dlc, 6)
ms, days = canopen.timestamp.TIME_OF_DAY_STRUCT.unpack(msg.data)
self.assertEqual(days, int(current_in_epoch) // canopen.timestamp.ONE_DAY)
self.assertEqual(ms, int((current_in_epoch % canopen.timestamp.ONE_DAY) * 1000))

# Test providing a timestamp
faketime = 1_927_999_438 # 2031-02-04 20:23:58
producer.transmit(faketime)
msg = network.bus.recv(1)
self.assertEqual(msg.arbitration_id, 0x100)
self.assertEqual(msg.dlc, 6)
ms, days = canopen.timestamp.TIME_OF_DAY_STRUCT.unpack(msg.data)
current_in_epoch = faketime - epoch
self.assertEqual(days, int(current_in_epoch) // canopen.timestamp.ONE_DAY)
self.assertEqual(ms, int((current_in_epoch % canopen.timestamp.ONE_DAY) * 1000))

network.disconnect()
self.assertEqual(msg.arbitration_id, 0x100)
self.assertEqual(msg.dlc, 6)
self.assertEqual(msg.data, b"\xb0\xa4\x29\x04\x31\x43")


if __name__ == "__main__":
Expand Down
Loading