3
3
import os
4
4
from pathlib import Path
5
5
6
- from typing import List
6
+ from typing import List , Tuple
7
7
8
- from dataframes import repair_dropped_frames
8
+ from dataframes import repair_dropped_frames , compute_time_range
9
+ from PostProcessVideos import scan_session_dir
9
10
10
11
import pandas as pd
11
12
15
16
RECSYNCH_SESSION_DIR = os .environ .get (RECSYNCH_SESSION_DIR_VAR )
16
17
print ("Data root set at '{}'." .format (RECSYNCH_SESSION_DIR ))
17
18
18
- #@pytest.fixture(autouse=True)
19
- #def client_dir() -> str:
20
- # if RECSYNCH_SESSION_DIR is None:
21
- # raise Exception(f"Environment variable {RECSYNCH_SESSION_DIR_VAR} not defined")
22
19
20
+ @pytest .fixture (scope = "session" , autouse = True )
21
+ def session_data () -> Tuple [List [str ], List [pd .DataFrame ], List [str ]]:
23
22
24
- def client_IDs () -> List [str ]:
23
+ assert RECSYNCH_SESSION_DIR is not None , "Variable RECSYNCH_SESSION_DIR is None."
24
+ assert os .path .exists (RECSYNCH_SESSION_DIR )
25
+ assert os .path .isdir (RECSYNCH_SESSION_DIR )
26
+
27
+ clienIDs , dataframes , video_paths = scan_session_dir (Path (RECSYNCH_SESSION_DIR ))
28
+
29
+ return clienIDs , dataframes , video_paths
30
+
31
+
32
+ def test_session_data (session_data ):
33
+
34
+ clienIDs , dataframes , video_paths = session_data
35
+
36
+ assert len (clienIDs ) > 0
37
+ assert len (clienIDs ) == len (dataframes ) == len (video_paths )
38
+
39
+ for df in dataframes :
40
+ assert len (df ) > 0
41
+ assert len (df .columns ) == 1
42
+
43
+ for vp in video_paths :
44
+ assert os .path .exists (vp )
45
+ assert os .path .isfile (vp )
46
+
47
+
48
+ def session_data_list () -> List [Tuple [str , pd .DataFrame , str ]]:
49
+
50
+ clienIDs , dataframes , video_paths = scan_session_dir (Path (RECSYNCH_SESSION_DIR ))
51
+
52
+ for clientID , df , video_path in zip (clienIDs , dataframes , video_paths ):
53
+ yield (clientID , df , video_path )
54
+
55
+
56
+ def client_IDs () -> List [Path ]:
25
57
26
58
out = []
27
59
@@ -48,15 +80,44 @@ def CSVs() -> List[str]:
48
80
return out
49
81
50
82
51
- @pytest .mark .parametrize ("csv_file" , CSVs ())
52
- def test_df_reparation (csv_file ):
83
+ # @pytest.mark.parametrize("csv_file", CSVs())
84
+ # def test_df_reparation(csv_file):
85
+ #
86
+ # # Load the test dataframes
87
+ # csv_path = Path(RECSYNCH_SESSION_DIR) / csv_file
88
+ # df = pd.read_csv(csv_path)
89
+ #
90
+ # assert len(df) >= 2
91
+ #
92
+ # repaired_df = repair_dropped_frames(df)
93
+ #
94
+ # assert len(repaired_df) >= len(df)
95
+
96
+ @pytest .mark .parametrize ("client_data" , session_data_list ())
97
+ def test_df_reparation (client_data ):
53
98
54
- # Load the test dataframes
55
- csv_path = Path (RECSYNCH_SESSION_DIR ) / csv_file
56
- df = pd .read_csv (csv_path )
99
+ _ , df , _ = client_data
57
100
58
101
assert len (df ) >= 2
59
102
60
103
repaired_df = repair_dropped_frames (df )
61
104
62
105
assert len (repaired_df ) >= len (df )
106
+ assert df [0 ].iloc [0 ] == repaired_df [0 ].iloc [0 ]
107
+ assert df [0 ].iloc [- 1 ] == repaired_df [0 ].iloc [- 1 ]
108
+
109
+
110
+ def test_df_trimming (session_data ):
111
+ _ , dataframes , _ = session_data
112
+
113
+ min_common , max_common = compute_time_range (dataframes )
114
+ assert min_common <= max_common
115
+
116
+ for df in dataframes :
117
+ # Get the first element of the first column
118
+ ts_start = df [0 ].iloc [0 ]
119
+ assert ts_start <= min_common
120
+
121
+ # Get the last element of the first column
122
+ ts_end = df [0 ].iloc [- 1 ]
123
+ assert ts_end >= max_common
0 commit comments