1
+ import numpy as np
1
2
from pydantic import validate_arguments
2
3
3
4
from kmm import CarDirection
4
5
from kmm .positions .positions import Positions
5
6
6
7
7
8
@validate_arguments (config = dict (arbitrary_types_allowed = True ))
8
- def wire_camera_positions (positions : Positions , direction : CarDirection ):
9
- ascending = kmm_ascending (positions .dataframe )
10
-
11
- if (
12
- (direction == CarDirection .A and ascending )
13
- or (direction == CarDirection .B and not ascending )
14
- ):
15
- correction = - 8
16
-
17
- elif (
18
- (direction == CarDirection .A and not ascending )
19
- or (direction == CarDirection .B and ascending )
20
- ):
21
- correction = 8
9
+ def wire_camera_positions (positions : Positions , car_direction : CarDirection ):
22
10
11
+ if car_direction == CarDirection .A :
12
+ corrections = kmm_directions (positions .dataframe ) * - 8
13
+ elif car_direction == CarDirection .B :
14
+ corrections = kmm_directions (positions .dataframe ) * 8
23
15
else :
24
- raise ValueError
16
+ raise ValueError ( car_direction )
25
17
26
18
return positions .replace (
27
19
dataframe = (
28
20
positions .dataframe
29
- .assign (meter = lambda df : df ["meter" ] + correction )
21
+ .assign (meter = lambda df : df ["meter" ] + corrections )
30
22
)
31
23
)
32
24
33
25
34
- def kmm_ascending (dataframe ):
35
- total_meter = dataframe ["kilometer" ] * 1000 + dataframe ["meter" ]
36
- diff = total_meter .values [:- 1 ] - total_meter .values [1 :]
37
- descending = (diff < 0 ).mean ()
38
- ascending = (diff > 0 ).mean ()
26
+ def kmm_directions (df ):
39
27
40
- if descending < 0.9 and ascending < 0.9 :
41
- raise ValueError ("Unable to determine ascending/descending kmm numbers" )
28
+ total_meter = (df ["kilometer" ] * 1000 + df ["meter" ]).values
42
29
43
- else :
44
- return ascending > 0.9
30
+ track_section_changes = np .argwhere (np .concatenate ([
31
+ np .array ([True ]),
32
+ df ["track_section" ].values [1 :] != df ["track_section" ].values [:- 1 ],
33
+ ])).squeeze (1 ).tolist () + [len (df )]
34
+
35
+ return np .concatenate ([
36
+ (
37
+ np .ones (to_index - from_index , dtype = np .uint8 )
38
+ * kmm_direction (total_meter [from_index : to_index ])
39
+ )
40
+ for from_index , to_index in zip (
41
+ track_section_changes [:- 1 ],
42
+ track_section_changes [1 :],
43
+ )
44
+ ])
45
+
46
+
47
+ def kmm_direction (total_meter ):
48
+ diffs = np .clip (total_meter [1 :] - total_meter [:- 1 ], - 1 , 1 )
49
+ if len (diffs ) >= 10 and (diffs > 0 ).mean () < 0.9 and (diffs < 0 ).mean () < 0.9 :
50
+ raise ValueError ("Unable to determine direction of kmm numbers." , diffs )
51
+ return int (np .sign (diffs .sum ()))
45
52
46
53
47
54
def test_camera_positions_kmm ():
@@ -52,7 +59,7 @@ def test_camera_positions_kmm():
52
59
positions_calibrated = wire_camera_positions (positions , header .car_direction )
53
60
assert (
54
61
positions_calibrated .dataframe ["meter" ].iloc [0 ]
55
- == positions .dataframe ["meter" ].iloc [0 ] - 8
62
+ == positions .dataframe ["meter" ].iloc [0 ] + 8
56
63
)
57
64
58
65
@@ -64,5 +71,5 @@ def test_camera_positions_kmm2():
64
71
positions_calibrated = wire_camera_positions (positions , header .car_direction )
65
72
assert (
66
73
positions_calibrated .dataframe ["meter" ].iloc [0 ]
67
- == positions .dataframe ["meter" ].iloc [0 ] - 8
74
+ == positions .dataframe ["meter" ].iloc [0 ] + 8
68
75
)
0 commit comments