1
1
"""Flex Stacker TOF Data Collection Script."""
2
2
3
- # TODO: Make the output of this match tools/tof-analysis/data/raw_data_frame.csv
4
3
import argparse
5
4
import asyncio
6
5
import subprocess
7
6
import re
8
7
import time
9
8
from datetime import datetime
10
9
from typing import Any , Dict , List , Optional
10
+ import uuid
11
11
12
12
from hardware_testing import data
13
13
from hardware_testing .opentrons_api .types import OT3Mount , Axis , Point
14
14
from hardware_testing .opentrons_api .helpers_ot3 import build_async_ot3_hardware_api
15
15
from opentrons .drivers .flex_stacker .types import StackerAxis , Direction , TOFSensor
16
+ from opentrons .drivers .flex_stacker .utils import NUMBER_OF_BINS
16
17
from opentrons .hardware_control .ot3api import OT3API
17
18
18
19
@@ -70,18 +71,33 @@ def __init__(
70
71
self .axes = [Axis .X , Axis .Y , Axis .Z_L , Axis .Z_R ]
71
72
self .stackers : List [str ] = []
72
73
self .test_files : List [str ] = []
74
+
73
75
self .test_data = {
74
- "Time" : "None" ,
76
+ "Hash_id" : "None" ,
77
+ "Date" : "None" ,
78
+ "Test" : "None" ,
79
+ "Labware_Name" : "None" ,
80
+ "Stacker_SN" : "None" ,
81
+ "Axis" : "None" ,
82
+ "Platform_Position" : "None" ,
83
+ "Labware_Num_X" : "None" ,
84
+ "Labware_Num_Z" : "None" ,
75
85
"Sample" : "None" ,
76
86
"Zone" : "None" ,
87
+ "Time" : "None" ,
77
88
}
89
+ # Bins 1-128
90
+ self .test_data .update (
91
+ {str (bin ): "None" for bin in range (1 , NUMBER_OF_BINS + 1 )}
92
+ )
93
+
78
94
self .tof_axes = {
79
- "X-Axis " : TOFSensor .X ,
80
- "Z-Axis " : TOFSensor .Z ,
95
+ "x " : TOFSensor .X ,
96
+ "z " : TOFSensor .Z ,
81
97
}
82
98
self .directions = {
83
- "Retract " : Direction .RETRACT ,
84
- "Extend " : Direction .EXTEND ,
99
+ "retract " : Direction .RETRACT ,
100
+ "extend " : Direction .EXTEND ,
85
101
}
86
102
87
103
async def test_setup (self ) -> None :
@@ -113,66 +129,32 @@ def file_setup(self) -> None:
113
129
self .test_date = "run-" + datetime .utcnow ().strftime ("%y-%m-%d" )
114
130
self .test_path = data .create_folder_for_test_data (self .test_name )
115
131
if self .labware_amount == 0 :
116
- self .labware_name = "LW= baseline"
132
+ self .labware_name = "baseline"
117
133
self .labware_amount_z = self .labware_amount
118
134
elif self .labware_amount == 1 :
119
- self .labware_name = "LW= nest-96-pcr"
135
+ self .labware_name = "nest-96-pcr"
120
136
self .labware_amount_z = self .labware_amount
121
137
elif self .labware_amount == 3 :
122
- self .labware_name = "LW= tiprack"
138
+ self .labware_name = "tiprack"
123
139
self .labware_amount = 1
124
140
self .labware_amount_z = 3
125
141
print ("FILE PATH = " , self .test_path )
126
142
print ("FILE NAMES = " )
127
143
for stacker in self .stackers :
128
- self .test_tag_x_ret = f"x-axis_labx{ self .labware_amount } _labz{ self .labware_amount_z } _retract_{ stacker } "
129
- self .test_tag_x_ext = f"x-axis_labx{ self .labware_amount } _labz{ self .labware_amount_z } _extend_{ stacker } "
130
- self .test_tag_z_ret = f"z-axis_labx{ self .labware_amount } _labz{ self .labware_amount_z } _retract_{ stacker } "
131
- self .test_tag_z_ext = f"z-axis_labx{ self .labware_amount } _labz{ self .labware_amount_z } _extend_{ stacker } "
132
- test_file_x_ret = data .create_file_name (
133
- self .labware_name , self .test_id , self .test_tag_x_ret
134
- )
135
- test_file_x_ext = data .create_file_name (
136
- self .labware_name , self .test_id , self .test_tag_x_ext
137
- )
138
- test_file_z_ret = data .create_file_name (
139
- self .labware_name , self .test_id , self .test_tag_z_ret
140
- )
141
- test_file_z_ext = data .create_file_name (
142
- self .labware_name , self .test_id , self .test_tag_z_ext
143
- )
144
- data .append_data_to_file (
145
- test_name = self .test_name ,
146
- run_id = self .test_date ,
147
- file_name = test_file_x_ret ,
148
- data = self .test_header ,
144
+ self .test_tag = (
145
+ f"labx{ self .labware_amount } _labz{ self .labware_amount_z } _{ stacker } "
149
146
)
150
- data .append_data_to_file (
151
- test_name = self .test_name ,
152
- run_id = self .test_date ,
153
- file_name = test_file_x_ext ,
154
- data = self .test_header ,
147
+ test_file = data .create_file_name (
148
+ self .labware_name , self .test_id , self .test_tag
155
149
)
156
150
data .append_data_to_file (
157
151
test_name = self .test_name ,
158
152
run_id = self .test_date ,
159
- file_name = test_file_z_ret ,
153
+ file_name = test_file ,
160
154
data = self .test_header ,
161
155
)
162
- data .append_data_to_file (
163
- test_name = self .test_name ,
164
- run_id = self .test_date ,
165
- file_name = test_file_z_ext ,
166
- data = self .test_header ,
167
- )
168
- self .test_files .append (test_file_x_ret )
169
- self .test_files .append (test_file_x_ext )
170
- self .test_files .append (test_file_z_ret )
171
- self .test_files .append (test_file_z_ext )
172
- print (test_file_x_ret )
173
- print (test_file_x_ext )
174
- print (test_file_z_ret )
175
- print (test_file_z_ext )
156
+ self .test_files .append (test_file )
157
+ print (test_file )
176
158
177
159
def dict_keys_to_line (self , dict : Dict [str , Any ]) -> str :
178
160
"""Convert dict keys to CSV line."""
@@ -185,6 +167,7 @@ def dict_values_to_line(self, dict: Dict[str, Any]) -> str:
185
167
async def read_stacker_tof (self ) -> None :
186
168
"""Read the stacker TOF Sensor data."""
187
169
for i in range (len (self .stackers )):
170
+ await self .api .attached_modules [i ].home_all () # type: ignore
188
171
print (f"\n >> Stacker = { self .stackers [i ]} " )
189
172
for axis , tof_axis in self .tof_axes .items ():
190
173
for pos , direction in self .directions .items ():
@@ -198,30 +181,36 @@ async def read_stacker_tof(self) -> None:
198
181
)
199
182
hist = await self .api .attached_modules [ # type: ignore
200
183
i
201
- ]._driver .get_tof_histogram (tof_axis )
184
+ ]._driver .get_tof_histogram ( # type: ignore
185
+ tof_axis
186
+ )
202
187
for zone , bins_list in hist .bins .items ():
188
+ date = datetime .now ().strftime ("%Y-%m-%d-%H-%M-%S" )
203
189
test_data = self .test_data .copy ()
204
- test_data ["Time" ] = str (elapsed_time )
190
+ test_data ["Hash_id" ] = str (uuid .uuid4 ())
191
+ test_data ["Date" ] = str (date )
192
+ test_data ["Test" ] = self .test_name
193
+ test_data ["Labware_Name" ] = self .labware_name
194
+ test_data ["Stacker_SN" ] = self .stackers [i ]
195
+ test_data ["Axis" ] = str (axis .lower ())
196
+ test_data ["Platform_Position" ] = pos .lower ()
197
+ test_data ["Labware_Num_X" ] = str (self .labware_amount )
198
+ test_data ["Labware_Num_Z" ] = str (self .labware_amount_z )
205
199
test_data ["Sample" ] = str (sample )
206
200
test_data ["Zone" ] = str (zone )
207
- bins_dict = {
208
- index : str ( value )
209
- for index , value in enumerate (bins_list )
210
- }
211
- test_data . update ( bins_dict ) # type: ignore
201
+ test_data [ "Time" ] = str ( elapsed_time )
202
+ # Add the bin values
203
+ test_data . update ({ str ( i ): str ( v ) for i , v in enumerate (bins_list , start = 1 )}) # type: ignore
204
+
205
+ # Update the csv with new values
212
206
test_data_str = self .dict_values_to_line (test_data )
213
207
for test_file in self .test_files :
214
- if (
215
- self .stackers [i ] in test_file
216
- and axis .lower () in test_file
217
- and pos .lower () in test_file
218
- ):
219
- data .append_data_to_file (
220
- test_name = self .test_name ,
221
- run_id = self .test_date ,
222
- file_name = test_file ,
223
- data = test_data_str ,
224
- )
208
+ data .append_data_to_file (
209
+ test_name = self .test_name ,
210
+ run_id = self .test_date ,
211
+ file_name = test_file ,
212
+ data = test_data_str ,
213
+ )
225
214
time .sleep (self .interval )
226
215
print ("" )
227
216
0 commit comments