@@ -29,6 +29,8 @@ class TestConfig:
29
29
30
30
simulate : bool
31
31
pipette : Literal [200 , 1000 ]
32
+ blowout :int
33
+ blowoutnum :int
32
34
33
35
34
36
class TestData (TypedDict ):
@@ -70,6 +72,10 @@ async def main(args: argparse.Namespace, cfg: TestConfig) -> None:
70
72
stall_detection_enable = False ,
71
73
)
72
74
ax = Axis .P_L
75
+ xxx = Axis .X
76
+ yyy = Axis .Y
77
+ Z1 = Axis .Z_L
78
+ Z2 = Axis .Z_R
73
79
mount = OT3Mount .LEFT
74
80
settings = helpers_ot3 .get_gantry_load_per_axis_motion_settings_ot3 (api , ax )
75
81
settings .max_speed = 25
@@ -78,7 +84,7 @@ async def main(args: argparse.Namespace, cfg: TestConfig) -> None:
78
84
default_current = settings .run_current
79
85
default_speed = settings .max_speed
80
86
default_acceleration = 100
81
- top , bottom , _ , _ = helpers_ot3 .get_plunger_positions_ot3 (api , mount )
87
+ top , bottom , blow_out , _ = helpers_ot3 .get_plunger_positions_ot3 (api , mount )
82
88
print (f"Settings: { settings } " )
83
89
84
90
async def position_check () -> bool :
@@ -87,9 +93,9 @@ async def position_check() -> bool:
87
93
return aligned
88
94
89
95
await api .cache_instruments ()
90
- await api .home_z (OT3Mount .LEFT )
96
+ # await api.home_z(OT3Mount.LEFT)
91
97
# LOOP THROUGH CURRENTS + SPEEDS
92
- await api .home ()
98
+ await api .home ([ xxx , yyy , Z1 , Z2 ] )
93
99
today = datetime .now ().strftime ("%m-%d-%y_%H-%M" )
94
100
pip_id = api .attached_pipettes [Mount .LEFT ]["pipette_id" ]
95
101
with open (
@@ -109,51 +115,112 @@ async def position_check() -> bool:
109
115
try :
110
116
currents = list (CURRENTS_SPEEDS .keys ())
111
117
for cycle in range (1 , args .cycles + 1 ):
112
- print (f"Cycle: { cycle } " )
113
- for current in sorted (currents , reverse = True ):
114
- speed = CURRENTS_SPEEDS [current ]
115
- # HOME
116
- print ("homing..." )
117
- await api .home ([ax ])
118
-
119
- print (f"run-current set to { current } amps" )
120
- await helpers_ot3 .set_gantry_load_per_axis_current_settings_ot3 (
121
- api ,
122
- ax ,
123
- run_current = current ,
124
- )
125
- await helpers_ot3 .set_gantry_load_per_axis_motion_settings_ot3 (
126
- api ,
127
- ax ,
128
- default_max_speed = speed ,
129
- acceleration = default_acceleration ,
130
- )
131
- # MOVE DOWN
132
- print (f"moving down { bottom } mm at { speed } mm/sec" )
133
- position_checked = await position_check ()
134
- print (f"position checked: { position_checked } " )
135
- try :
136
- await helpers_ot3 .move_plunger_absolute_ot3 (
137
- api , mount , bottom , speed = speed , motor_current = current
118
+
119
+ if cycle % cfg .blowoutnum == 0 and cfg .blowout == 2 :
120
+ print (f"Cycle: { cycle } " )
121
+ for current in sorted (currents , reverse = True ):
122
+ speed = CURRENTS_SPEEDS [current ]
123
+ # HOME
124
+ print ("homing..." )
125
+ await api .home ([ax ])
126
+
127
+ print (f"run-current set to { current } amps" )
128
+ await helpers_ot3 .set_gantry_load_per_axis_current_settings_ot3 (
129
+ api ,
130
+ ax ,
131
+ run_current = current ,
138
132
)
139
- down_passed = await position_check ()
140
- test_data ["time_sec" ] = time .time () - start_time
141
- test_data ["cycle" ] = cycle
142
- test_data ["position" ] = "bottom"
143
- test_data ["position_check" ] = down_passed
144
- test_data ["stall" ] = "NONE"
145
- print (test_data )
146
- writer .writerow (test_data )
147
- csvfile .flush ()
148
- except Exception as e :
149
- print ("STALL DETECTION" )
150
- down_passed = await position_check ()
151
- test_data ["position_check" ] = down_passed
152
- test_data ["stall" ] = str ("Failed to move plunger down" )
153
- test_data ["error" ] = str (e )
154
- print (test_data )
155
- writer .writerow (test_data )
156
- csvfile .flush ()
133
+ await helpers_ot3 .set_gantry_load_per_axis_motion_settings_ot3 (
134
+ api ,
135
+ ax ,
136
+ default_max_speed = speed ,
137
+ acceleration = default_acceleration ,
138
+ )
139
+ # MOVE DOWN
140
+ print (f"moving down { blow_out } mm at { speed } mm/sec" )
141
+ position_checked = await position_check ()
142
+ print (f"position checked: { position_checked } " )
143
+ try :
144
+ await helpers_ot3 .move_plunger_absolute_ot3 (
145
+ api , mount , blow_out , speed = speed , motor_current = current
146
+ )
147
+ down_passed = await position_check ()
148
+ test_data ["time_sec" ] = time .time () - start_time
149
+ test_data ["cycle" ] = cycle
150
+ test_data ["position" ] = "blow_out"
151
+ test_data ["position_check" ] = down_passed
152
+ test_data ["stall" ] = "NONE"
153
+ print (test_data )
154
+ writer .writerow (test_data )
155
+ csvfile .flush ()
156
+ except Exception as e :
157
+ print ("STALL DETECTION" )
158
+ down_passed = await position_check ()
159
+ test_data ["position_check" ] = down_passed
160
+ test_data ["stall" ] = str ("Failed to move plunger down" )
161
+ test_data ["error" ] = str (e )
162
+ print (test_data )
163
+ writer .writerow (test_data )
164
+ csvfile .flush ()
165
+ print ("homing..." )
166
+ await helpers_ot3 .set_gantry_load_per_axis_current_settings_ot3 (
167
+ api , ax , run_current = default_current
168
+ )
169
+ await helpers_ot3 .set_gantry_load_per_axis_motion_settings_ot3 (
170
+ api ,
171
+ ax ,
172
+ default_max_speed = default_speed ,
173
+ acceleration = default_acceleration ,
174
+ )
175
+ await api ._backend .set_active_current (
176
+ {Axis .P_L : default_current }
177
+ )
178
+ await api .home ([ax ])
179
+ await helpers_ot3 .move_plunger_absolute_ot3 (
180
+ api ,
181
+ mount ,
182
+ blow_out ,
183
+ speed = default_speed ,
184
+ motor_current = default_current ,
185
+ )
186
+ # MOVE UP
187
+ print (f"moving up { top } mm at { speed } mm/sec" )
188
+ position_checked = await position_check ()
189
+ print (f"position checked: { position_checked } " )
190
+ await helpers_ot3 .set_gantry_load_per_axis_current_settings_ot3 (
191
+ api ,
192
+ ax ,
193
+ run_current = current ,
194
+ )
195
+ await helpers_ot3 .set_gantry_load_per_axis_motion_settings_ot3 (
196
+ api ,
197
+ ax ,
198
+ default_max_speed = speed ,
199
+ acceleration = default_acceleration ,
200
+ )
201
+ try :
202
+ await helpers_ot3 .move_plunger_absolute_ot3 (
203
+ api , mount , 0 , speed = speed , motor_current = current
204
+ )
205
+ up_passed = await position_check ()
206
+ test_data ["time_sec" ] = time .time () - start_time
207
+ test_data ["cycle" ] = cycle
208
+ test_data ["position" ] = "top"
209
+ test_data ["position_check" ] = up_passed
210
+ print (test_data )
211
+ writer .writerow (test_data )
212
+ test_data ["stall" ] = "NONE"
213
+ csvfile .flush ()
214
+ except Exception as e :
215
+ print ("STALL DETECTION" )
216
+ up_passed = await position_check ()
217
+ test_data ["stall" ] = str ("Failed to move plunger down" )
218
+ test_data ["position_check" ] = up_passed
219
+ test_data ["error" ] = str (e )
220
+ print (test_data )
221
+ writer .writerow (test_data )
222
+ csvfile .flush ()
223
+ # RESET CURRENTS AND HOME
157
224
print ("homing..." )
158
225
await helpers_ot3 .set_gantry_load_per_axis_current_settings_ot3 (
159
226
api , ax , run_current = default_current
@@ -164,66 +231,125 @@ async def position_check() -> bool:
164
231
default_max_speed = default_speed ,
165
232
acceleration = default_acceleration ,
166
233
)
167
- await api ._backend .set_active_current (
168
- {Axis .P_L : default_current }
169
- )
234
+ await api ._backend .set_active_current ({Axis .P_L : default_current })
235
+
236
+ else :
237
+
238
+ print (f"Cycle: { cycle } " )
239
+ for current in sorted (currents , reverse = True ):
240
+ speed = CURRENTS_SPEEDS [current ]
241
+ # HOME
242
+ print ("homing..." )
170
243
await api .home ([ax ])
171
- await helpers_ot3 .move_plunger_absolute_ot3 (
244
+
245
+ print (f"run-current set to { current } amps" )
246
+ await helpers_ot3 .set_gantry_load_per_axis_current_settings_ot3 (
172
247
api ,
173
- mount ,
174
- bottom ,
175
- speed = default_speed ,
176
- motor_current = default_current ,
248
+ ax ,
249
+ run_current = current ,
250
+ )
251
+ await helpers_ot3 .set_gantry_load_per_axis_motion_settings_ot3 (
252
+ api ,
253
+ ax ,
254
+ default_max_speed = speed ,
255
+ acceleration = default_acceleration ,
256
+ )
257
+ # MOVE DOWN
258
+ print (f"moving down { bottom } mm at { speed } mm/sec" )
259
+ position_checked = await position_check ()
260
+ print (f"position checked: { position_checked } " )
261
+ try :
262
+ await helpers_ot3 .move_plunger_absolute_ot3 (
263
+ api , mount , bottom , speed = speed , motor_current = current
264
+ )
265
+ down_passed = await position_check ()
266
+ test_data ["time_sec" ] = time .time () - start_time
267
+ test_data ["cycle" ] = cycle
268
+ test_data ["position" ] = "bottom"
269
+ test_data ["position_check" ] = down_passed
270
+ test_data ["stall" ] = "NONE"
271
+ print (test_data )
272
+ writer .writerow (test_data )
273
+ csvfile .flush ()
274
+ except Exception as e :
275
+ print ("STALL DETECTION" )
276
+ down_passed = await position_check ()
277
+ test_data ["position_check" ] = down_passed
278
+ test_data ["stall" ] = str ("Failed to move plunger down" )
279
+ test_data ["error" ] = str (e )
280
+ print (test_data )
281
+ writer .writerow (test_data )
282
+ csvfile .flush ()
283
+ print ("homing..." )
284
+ await helpers_ot3 .set_gantry_load_per_axis_current_settings_ot3 (
285
+ api , ax , run_current = default_current
286
+ )
287
+ await helpers_ot3 .set_gantry_load_per_axis_motion_settings_ot3 (
288
+ api ,
289
+ ax ,
290
+ default_max_speed = default_speed ,
291
+ acceleration = default_acceleration ,
292
+ )
293
+ await api ._backend .set_active_current (
294
+ {Axis .P_L : default_current }
295
+ )
296
+ await api .home ([ax ])
297
+ await helpers_ot3 .move_plunger_absolute_ot3 (
298
+ api ,
299
+ mount ,
300
+ bottom ,
301
+ speed = default_speed ,
302
+ motor_current = default_current ,
303
+ )
304
+ # MOVE UP
305
+ print (f"moving up { top } mm at { speed } mm/sec" )
306
+ position_checked = await position_check ()
307
+ print (f"position checked: { position_checked } " )
308
+ await helpers_ot3 .set_gantry_load_per_axis_current_settings_ot3 (
309
+ api ,
310
+ ax ,
311
+ run_current = current ,
312
+ )
313
+ await helpers_ot3 .set_gantry_load_per_axis_motion_settings_ot3 (
314
+ api ,
315
+ ax ,
316
+ default_max_speed = speed ,
317
+ acceleration = default_acceleration ,
318
+ )
319
+ try :
320
+ await helpers_ot3 .move_plunger_absolute_ot3 (
321
+ api , mount , 0 , speed = speed , motor_current = current
322
+ )
323
+ up_passed = await position_check ()
324
+ test_data ["time_sec" ] = time .time () - start_time
325
+ test_data ["cycle" ] = cycle
326
+ test_data ["position" ] = "top"
327
+ test_data ["position_check" ] = up_passed
328
+ print (test_data )
329
+ writer .writerow (test_data )
330
+ test_data ["stall" ] = "NONE"
331
+ csvfile .flush ()
332
+ except Exception as e :
333
+ print ("STALL DETECTION" )
334
+ up_passed = await position_check ()
335
+ test_data ["stall" ] = str ("Failed to move plunger down" )
336
+ test_data ["position_check" ] = up_passed
337
+ test_data ["error" ] = str (e )
338
+ print (test_data )
339
+ writer .writerow (test_data )
340
+ csvfile .flush ()
341
+ # RESET CURRENTS AND HOME
342
+ print ("homing..." )
343
+ await helpers_ot3 .set_gantry_load_per_axis_current_settings_ot3 (
344
+ api , ax , run_current = default_current
177
345
)
178
- # MOVE UP
179
- print (f"moving up { top } mm at { speed } mm/sec" )
180
- position_checked = await position_check ()
181
- print (f"position checked: { position_checked } " )
182
- await helpers_ot3 .set_gantry_load_per_axis_current_settings_ot3 (
183
- api ,
184
- ax ,
185
- run_current = current ,
186
- )
187
- await helpers_ot3 .set_gantry_load_per_axis_motion_settings_ot3 (
188
- api ,
189
- ax ,
190
- default_max_speed = speed ,
191
- acceleration = default_acceleration ,
192
- )
193
- try :
194
- await helpers_ot3 .move_plunger_absolute_ot3 (
195
- api , mount , 0 , speed = speed , motor_current = current
346
+ await helpers_ot3 .set_gantry_load_per_axis_motion_settings_ot3 (
347
+ api ,
348
+ ax ,
349
+ default_max_speed = default_speed ,
350
+ acceleration = default_acceleration ,
196
351
)
197
- up_passed = await position_check ()
198
- test_data ["time_sec" ] = time .time () - start_time
199
- test_data ["cycle" ] = cycle
200
- test_data ["position" ] = "top"
201
- test_data ["position_check" ] = up_passed
202
- print (test_data )
203
- writer .writerow (test_data )
204
- test_data ["stall" ] = "NONE"
205
- csvfile .flush ()
206
- except Exception as e :
207
- print ("STALL DETECTION" )
208
- up_passed = await position_check ()
209
- test_data ["stall" ] = str ("Failed to move plunger down" )
210
- test_data ["position_check" ] = up_passed
211
- test_data ["error" ] = str (e )
212
- print (test_data )
213
- writer .writerow (test_data )
214
- csvfile .flush ()
215
- # RESET CURRENTS AND HOME
216
- print ("homing..." )
217
- await helpers_ot3 .set_gantry_load_per_axis_current_settings_ot3 (
218
- api , ax , run_current = default_current
219
- )
220
- await helpers_ot3 .set_gantry_load_per_axis_motion_settings_ot3 (
221
- api ,
222
- ax ,
223
- default_max_speed = default_speed ,
224
- acceleration = default_acceleration ,
225
- )
226
- await api ._backend .set_active_current ({Axis .P_L : default_current })
352
+ await api ._backend .set_active_current ({Axis .P_L : default_current })
227
353
228
354
except Exception as e :
229
355
test_data ["error" ] = str (e )
@@ -237,7 +363,9 @@ async def position_check() -> bool:
237
363
parser .add_argument ("--cycles" , type = int , default = 100000 )
238
364
parser .add_argument ("--simulate" , action = "store_true" )
239
365
parser .add_argument ("--pipette" , type = int , choices = [200 , 1000 ], default = 200 )
366
+ parser .add_argument ("--blowout" , type = int , default = 1 )
367
+ parser .add_argument ("--blowoutnum" ,type = int ,default = 100 )
240
368
args = parser .parse_args ()
241
- _config = TestConfig (simulate = args .simulate , pipette = args .pipette )
369
+ _config = TestConfig (simulate = args .simulate , pipette = args .pipette , blowout = args . blowout , blowoutnum = args . blowoutnum )
242
370
243
371
asyncio .run (main (args , _config ))
0 commit comments