3333class DirectPoll (CollectorBase ):
3434 """
3535 Object representing an image collector.
36- The behavior is to start a coro task to waken every few seconds and scan for incoming files .
37- Files are pushed to xchembku.
36+ The behavior is to start a coro task to waken every few seconds and scan for newly created plate directories .
37+ Image files are pushed to xchembku.
3838 """
3939
4040 # ----------------------------------------------------------------------------------------
@@ -46,20 +46,21 @@ def __init__(self, specification, predefined_uuid=None):
4646 s = f"{ callsign (self )} specification" , self .specification ()
4747
4848 type_specific_tbd = require (s , self .specification (), "type_specific_tbd" )
49+
50+ # The sources for the collecting.
4951 self .__plates_directories = require (s , type_specific_tbd , "plates_directories" )
52+
53+ # The root directory of all visits.
5054 self .__visits_directory = Path (
5155 require (s , type_specific_tbd , "visits_directory" )
5256 )
57+
58+ # The subdirectory under a visit where to put subwell images that are collected.
5359 self .__visit_plates_subdirectory = Path (
5460 require (s , type_specific_tbd , "visit_plates_subdirectory" )
5561 )
56- self .__novisit_directory = Path (
57- require (s , type_specific_tbd , "novisit_directory" )
58- )
59- self .__nobarcode_directory = Path (
60- require (s , type_specific_tbd , "nobarcode_directory" )
61- )
6262
63+ # Explicit list of barcodes to process (used when testing a deployment).
6364 self .__ingest_only_barcodes = type_specific_tbd .get ("ingest_only_barcodes" )
6465
6566 # Database where we will get plate barcodes and add new wells.
@@ -70,12 +71,13 @@ def __init__(self, specification, predefined_uuid=None):
7071 self .__keep_ticking = True
7172 self .__tick_future = None
7273
74+ # This is the last formulatrix plate we have ingested, used to avoid re-handling the same plate.
7375 self .__latest_formulatrix__plate__id = 0
7476
7577 # This is the list of plates indexed by their barcode.
7678 self .__crystal_plate_models_by_barcode : Dict [CrystalPlateModel ] = {}
7779
78- # The plate names which we have already finished handling.
80+ # The plate names which we have already finished handling within the current instance .
7981 self .__handled_plate_names = []
8082
8183 # ----------------------------------------------------------------------------------------
@@ -86,18 +88,6 @@ async def activate(self) -> None:
8688 Then it starts the coro task to awaken every few seconds to scrape the directories.
8789 """
8890
89- # Make sure the novisit_directory is created.
90- try :
91- self .__novisit_directory .mkdir (parents = True )
92- except FileExistsError :
93- pass
94-
95- # Make sure the nobarcode_directory is created.
96- try :
97- self .__nobarcode_directory .mkdir (parents = True )
98- except FileExistsError :
99- pass
100-
10191 # Make the xchembku client context.
10292 s = require (
10393 f"{ callsign (self )} specification" ,
@@ -237,89 +227,32 @@ async def scrape_plates_directory(
237227 plate_barcode
238228 )
239229
240- # This plate is in the database?
241- if crystal_plate_model is not None :
242- visit_directory = None
243- try :
244- visit_directory = Path (
245- get_xchem_directory (
246- self .__visits_directory , crystal_plate_model .visit
247- )
248- )
249- except ValueError :
250- pass
251- except VisitNotFound :
252- pass
253-
254- if visit_directory is not None :
255- await self .scrape_plate_directory (
256- plates_directory / plate_name ,
257- crystal_plate_model ,
258- visit_directory ,
230+ # This plate is not in the database?
231+ if crystal_plate_model is None :
232+ continue
233+
234+ try :
235+ visit_directory = Path (
236+ get_xchem_directory (
237+ self .__visits_directory , crystal_plate_model .visit
259238 )
260- # This barcode is in the database, but the visit name
261- # is not properly formatted or the visit directory doesn't exist.
262- else :
263- # For now, don't move these out of SubwellImages since Texrank expects them here.
264- # TODO: Find out how to disable Texrank jobs from running at all.
265- # await self.__move_without_ingesting(
266- # plates_directory / plate_name,
267- # self.__novisit_directory,
268- # )
269-
270- # Remember we "handled" this one.
271- self .__handled_plate_names .append (plate_name )
272-
273- # Not in the formulatrix's database?
274- else :
275- # Move the plate directory somewhere else.
276- await self .__move_without_ingesting (
277- plates_directory / plate_name ,
278- self .__nobarcode_directory ,
279239 )
240+ # This is an improperly formatted visit name?
241+ except ValueError :
242+ continue
243+ # This visit is not found on disk?
244+ except VisitNotFound :
245+ continue
280246
281- # ----------------------------------------------------------------------------------------
282- async def __move_without_ingesting (
283- self ,
284- plate_directory : Path ,
285- target_directory : Path ,
286- ) -> None :
287- """
288- Move a plate directory's well images to the given target without ingesting it.
289-
290- Then remove the plate directory.
291- """
292-
293- target = target_directory / plate_directory .name
294- try :
295- target .mkdir (parents = True )
296- except FileExistsError :
297- pass
298-
299- # Get all the well images in the plate directory.
300- well_names = [
301- entry .name for entry in os .scandir (plate_directory ) if entry .is_file ()
302- ]
303-
304- for well_name in well_names :
305- # Move to target, replacing what might already be there.
306- # TODO: Protect against moving an image file which is currently being written by Luigi.
307- shutil .move (
308- plate_directory / well_name ,
309- target / well_name ,
247+ # Scrape the directory when all image files have arrived.
248+ await self .scrape_plate_directory_when_complete (
249+ plates_directory / plate_name ,
250+ crystal_plate_model ,
251+ visit_directory ,
310252 )
311253
312- # Remove the source directory, which should now be empty.
313- # TODO: Protect against removing an plate directory which is currently being written by Luigi.
314- try :
315- plate_directory .rmdir ()
316- except OSError :
317- pass
318-
319- logger .info (f"moved plate { plate_directory .name } to { target_directory } " )
320-
321254 # ----------------------------------------------------------------------------------------
322- async def scrape_plate_directory (
255+ async def scrape_plate_directory_when_complete (
323256 self ,
324257 plate_directory : Path ,
325258 crystal_plate_model : CrystalPlateModel ,
@@ -331,9 +264,21 @@ async def scrape_plate_directory(
331264 Adds discovered files to internal list which gets pushed when it reaches a configurable size.
332265 """
333266
334- # Update the path stem in the crystal plate record.
335- # TODO: Consider if important to report/record same barcodes on different rockmaker directories.
267+ # Name of the destination directory where we will permanently store ingested well image files.
268+ target = (
269+ visit_directory / self .__visit_plates_subdirectory / plate_directory .name
270+ )
271+
272+ # We have already put this plate directory into the visit directory and presumably also the database?
273+ if target .is_dir ():
274+ # Remember we "handled" this one.
275+ self .__handled_plate_names .append (plate_directory .stem )
276+ return
277+
278+ # This is the first time we have scraped a directory for this plate?
336279 if crystal_plate_model .rockminer_collected_stem is None :
280+ # Update the path stem in the crystal plate record.
281+ # TODO: Consider if important to report/record same barcodes on different rockmaker directories.
337282 crystal_plate_model .rockminer_collected_stem = plate_directory .stem
338283 await self .__xchembku .upsert_crystal_plates (
339284 [crystal_plate_model ], "update rockminer_collected_stem"
@@ -348,45 +293,35 @@ async def scrape_plate_directory(
348293 if len (well_names ) < 288 :
349294 return
350295
351- # Name of the destination directory where we will permanently store ingested well image files.
352- target = (
353- visit_directory / self .__visit_plates_subdirectory / plate_directory .name
354- )
355-
356- # We have already put this plate directory into the visit directory?
357- # And presumable the database?
358- if target .is_dir ():
359- # Remember we "handled" this one.
360- self .__handled_plate_names .append (plate_directory .stem )
361- return
362-
363- # Sort so that tests are deterministic.
296+ # Sort wells by name so that tests are deterministic.
364297 well_names .sort ()
365298
366299 crystal_well_models : List [CrystalWellModel ] = []
367300 for well_name in well_names :
368- # Process well's image file.
369- # TODO: Improve safety by ignoring wrongly formatted and non-jpg well filenames.
370- crystal_well_models .append (
371- await self .ingest_well (
372- plate_directory ,
373- well_name ,
374- crystal_plate_model ,
375- target ,
376- )
301+ # Make the well model, including image width/height.
302+ crystal_well_model = await self .ingest_well (
303+ plate_directory ,
304+ well_name ,
305+ crystal_plate_model ,
306+ target ,
377307 )
378308
309+ # Append well model to the list of all wells on the plate.
310+ crystal_well_models .append (crystal_well_model )
311+
379312 # Here we create or update the crystal well records into xchembku.
380- # TODO: Handle case where we upsert the crystal_well record bit the image object store fails to accept image binary.
381313 await self .__xchembku .upsert_crystal_wells (crystal_well_models )
382314
383315 # Copy scraped directory to visit, replacing what might already be there.
316+ # TODO: Handle case where we upsert the crystal_well record but then unable to copy image file.
384317 shutil .copytree (
385318 plate_directory ,
386319 target ,
387320 )
388321
389- logger .info (f"copied well images from plate { plate_directory .name } to { target } " )
322+ logger .info (
323+ f"copied { len (well_names )} well images from plate { plate_directory .name } to { target } "
324+ )
390325
391326 # Remember we "handled" this one.
392327 self .__handled_plate_names .append (plate_directory .stem )
@@ -409,6 +344,7 @@ async def ingest_well(
409344 ingested_well_filename = target / well_name
410345
411346 # Stems are like "9acx_01A_1".
347+ # TODO: Improve safety by ignoring wrongly formatted and non-jpg well filenames.
412348 parts = Path (well_name ).stem .split ("_" )
413349 if len (parts ) > 1 :
414350 # Strip off the leading 4-letter barcode and underscore.
0 commit comments