17
17
import datetime
18
18
import time
19
19
import logging
20
- from typing import List , BinaryIO , Generator , Union
20
+ from typing import List , BinaryIO , Generator , Union , Tuple , Optional , cast
21
21
22
22
from ..message import Message
23
23
from ..util import len2dlc , dlc2len , channel2int
24
24
from ..typechecking import StringPathLike
25
25
from .generic import FileIOMessageWriter , MessageReader
26
26
27
27
28
+ TSystemTime = Tuple [int , int , int , int , int , int , int , int ]
29
+
30
+
28
31
class BLFParseError (Exception ):
29
32
"""BLF file could not be parsed correctly."""
30
33
@@ -97,11 +100,11 @@ class BLFParseError(Exception):
97
100
TIME_ONE_NANS = 0x00000002
98
101
99
102
100
- def timestamp_to_systemtime (timestamp ) :
103
+ def timestamp_to_systemtime (timestamp : float ) -> TSystemTime :
101
104
if timestamp is None or timestamp < 631152000 :
102
105
# Probably not a Unix timestamp
103
- return ( 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 )
104
- t = datetime .datetime .fromtimestamp (timestamp )
106
+ return 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0
107
+ t = datetime .datetime .fromtimestamp (round ( timestamp , 3 ) )
105
108
return (
106
109
t .year ,
107
110
t .month ,
@@ -110,11 +113,11 @@ def timestamp_to_systemtime(timestamp):
110
113
t .hour ,
111
114
t .minute ,
112
115
t .second ,
113
- int ( round ( t .microsecond / 1000.0 )) ,
116
+ t .microsecond // 1000 ,
114
117
)
115
118
116
119
117
- def systemtime_to_timestamp (systemtime ) :
120
+ def systemtime_to_timestamp (systemtime : TSystemTime ) -> float :
118
121
try :
119
122
t = datetime .datetime (
120
123
systemtime [0 ],
@@ -125,7 +128,7 @@ def systemtime_to_timestamp(systemtime):
125
128
systemtime [6 ],
126
129
systemtime [7 ] * 1000 ,
127
130
)
128
- return time . mktime ( t . timetuple ()) + systemtime [ 7 ] / 1000.0
131
+ return t . timestamp ()
129
132
except ValueError :
130
133
return 0
131
134
@@ -154,8 +157,8 @@ def __init__(self, file: Union[StringPathLike, BinaryIO]) -> None:
154
157
self .file_size = header [10 ]
155
158
self .uncompressed_size = header [11 ]
156
159
self .object_count = header [12 ]
157
- self .start_timestamp = systemtime_to_timestamp (header [14 :22 ])
158
- self .stop_timestamp = systemtime_to_timestamp (header [22 :30 ])
160
+ self .start_timestamp = systemtime_to_timestamp (cast ( TSystemTime , header [14 :22 ]) )
161
+ self .stop_timestamp = systemtime_to_timestamp (cast ( TSystemTime , header [22 :30 ]) )
159
162
# Read rest of header
160
163
self .file .read (header [1 ] - FILE_HEADER_STRUCT .size )
161
164
self ._tail = b""
@@ -405,8 +408,12 @@ def __init__(
405
408
raise BLFParseError ("Unexpected file format" )
406
409
self .uncompressed_size = header [11 ]
407
410
self .object_count = header [12 ]
408
- self .start_timestamp = systemtime_to_timestamp (header [14 :22 ])
409
- self .stop_timestamp = systemtime_to_timestamp (header [22 :30 ])
411
+ self .start_timestamp : Optional [float ] = systemtime_to_timestamp (
412
+ cast (TSystemTime , header [14 :22 ])
413
+ )
414
+ self .stop_timestamp : Optional [float ] = systemtime_to_timestamp (
415
+ cast (TSystemTime , header [22 :30 ])
416
+ )
410
417
# Jump to the end of the file
411
418
self .file .seek (0 , 2 )
412
419
else :
0 commit comments