@@ -276,65 +276,88 @@ def convert_to_robodm(self,
276276 traj .close ()
277277 return traj
278278
279- def discover_trajectories (self , trajectory_type : str = "success" , limit : int = None ) -> List [str ]:
279+ def discover_trajectories (self , trajectory_type : str = "success" , limit : int = None , labs : List [ str ] = None ) -> List [str ]:
280280 """
281- Discover available trajectories from GCS using gsutil.
281+ Discover available trajectories from GCS using gsutil across all labs .
282282
283283 Args:
284284 trajectory_type: Either "success" or "failure"
285285 limit: Maximum number of trajectories to return (None for all)
286+ labs: List of lab names to search (None for all available labs)
286287
287288 Returns:
288289 List of trajectory paths
289290 """
290- base_path = f"{ self .base_path } AUTOLab/{ trajectory_type } /"
291+ # Get all available labs if not specified
292+ if labs is None :
293+ try :
294+ result = subprocess .run (
295+ ["gsutil" , "ls" , self .base_path ],
296+ capture_output = True ,
297+ text = True ,
298+ check = True
299+ )
300+
301+ labs = [line .strip ().rstrip ('/' ).split ('/' )[- 1 ] for line in result .stdout .strip ().split ('\n ' )
302+ if line .strip ().endswith ('/' ) and not line .strip ().endswith ('1.0.1/' )]
303+
304+ except subprocess .CalledProcessError as e :
305+ print (f"Error discovering labs: { e } " )
306+ return []
291307
292- try :
293- # Get date directories
294- result = subprocess .run (
295- ["gsutil" , "ls" , base_path ],
296- capture_output = True ,
297- text = True ,
298- check = True
299- )
300-
301- date_dirs = [line .strip () for line in result .stdout .strip ().split ('\n ' )
302- if line .strip ().endswith ('/' ) and line .strip () != base_path ]
308+ trajectories = []
309+
310+ for lab in labs :
311+ lab_path = f"{ self .base_path } { lab } /{ trajectory_type } /"
303312
304- # Get individual trajectories from each date directory
305- trajectories = []
306- for date_dir in date_dirs :
307- try :
308- date_result = subprocess .run (
309- ["gsutil" , "ls" , date_dir ],
310- capture_output = True ,
311- text = True ,
312- check = True
313- )
314-
315- date_trajectories = [line .strip () for line in date_result .stdout .strip ().split ('\n ' )
316- if line .strip ().endswith ('/' )]
317-
318- trajectories .extend (date_trajectories )
319-
320- if limit and len (trajectories ) >= limit :
321- break
313+ try :
314+ # Check if this lab has the trajectory type directory
315+ result = subprocess .run (
316+ ["gsutil" , "ls" , lab_path ],
317+ capture_output = True ,
318+ text = True ,
319+ check = True
320+ )
321+
322+ date_dirs = [line .strip () for line in result .stdout .strip ().split ('\n ' )
323+ if line .strip ().endswith ('/' ) and line .strip () != lab_path ]
324+
325+ # Get individual trajectories from each date directory
326+ for date_dir in date_dirs :
327+ try :
328+ date_result = subprocess .run (
329+ ["gsutil" , "ls" , date_dir ],
330+ capture_output = True ,
331+ text = True ,
332+ check = True
333+ )
334+
335+ date_trajectories = [line .strip () for line in date_result .stdout .strip ().split ('\n ' )
336+ if line .strip ().endswith ('/' )]
337+
338+ trajectories .extend (date_trajectories )
339+
340+ if limit and len (trajectories ) >= limit :
341+ break
342+
343+ except subprocess .CalledProcessError :
344+ continue
322345
323- except subprocess . CalledProcessError :
324- continue
346+ if limit and len ( trajectories ) >= limit :
347+ break
325348
326- return trajectories [: limit ] if limit else trajectories
327-
328- except subprocess . CalledProcessError as e :
329- print ( f"Error discovering { trajectory_type } trajectories: { e } " )
330- return []
349+ except subprocess . CalledProcessError :
350+ # Lab doesn't have this trajectory type, skip
351+ continue
352+
353+ return trajectories [: limit ] if limit else trajectories
331354
332355 def download_sample_trajectories (self ,
333356 output_dir : str ,
334- num_success : int = 2 ,
335- num_failure : int = 2 ):
357+ num_success : int = 300 ,
358+ num_failure : int = 100 ):
336359 """
337- Download and convert sample successful and failed trajectories in parallel.
360+ Download and convert successful and failed trajectories in parallel from all labs .
338361
339362 Args:
340363 output_dir: Directory to save RoboDM trajectories
@@ -351,21 +374,24 @@ def download_sample_trajectories(self,
351374 temp_dir = tempfile .mkdtemp (prefix = "droid_download_" )
352375
353376 try :
354- # Discover available trajectories
355- print ("Discovering available trajectories..." )
356- success_trajectories = self .discover_trajectories ("success" , limit = max ( num_success , 10 ))
357- failure_trajectories = self .discover_trajectories ("failure" , limit = max ( num_failure , 10 ))
377+ # Discover available trajectories from all labs
378+ print ("Discovering available trajectories across all labs ..." )
379+ success_trajectories = self .discover_trajectories ("success" , limit = num_success * 2 ) # Get more than needed
380+ failure_trajectories = self .discover_trajectories ("failure" , limit = num_failure * 2 ) # Get more than needed
358381
359382 print (f"Found { len (success_trajectories )} success trajectories" )
360383 print (f"Found { len (failure_trajectories )} failure trajectories" )
361384
385+ # Curate the exact number requested
386+ selected_success = success_trajectories [:num_success ]
387+ selected_failure = failure_trajectories [:num_failure ]
388+
362389 # Combine trajectories to process
363- trajectories_to_process = (
364- success_trajectories [:num_success ] +
365- failure_trajectories [:num_failure ]
366- )
390+ trajectories_to_process = selected_success + selected_failure
367391
368392 print (f"Processing { len (trajectories_to_process )} trajectories in parallel..." )
393+ print (f" - { len (selected_success )} success trajectories" )
394+ print (f" - { len (selected_failure )} failure trajectories" )
369395
370396 # Submit all download and conversion tasks to Ray
371397 futures = []
@@ -501,17 +527,22 @@ def convert_single_trajectory(traj_dir: str, output_dir: str) -> Tuple[bool, str
501527 output_dir = "./robodm_trajectories"
502528
503529 try :
504- # New parallel download and conversion approach
530+ # Parallel download and conversion with 300 success + 100 failure trajectories
505531 print ("Starting parallel download and conversion..." )
506532 successful_paths = processor .download_sample_trajectories (
507533 output_dir = output_dir ,
508- num_success = 20 ,
509- num_failure = 20
534+ num_success = 300 ,
535+ num_failure = 100
510536 )
511537
512538 print (f"\n Successfully processed { len (successful_paths )} trajectories:" )
513- for path in successful_paths :
514- print (f" - { path } " )
539+ print (f"Output directory: { output_dir } " )
540+
541+ # Count success/failure trajectories
542+ success_count = len ([p for p in successful_paths if "success_" in p ])
543+ failure_count = len ([p for p in successful_paths if "failure_" in p ])
544+ print (f" - { success_count } success trajectories" )
545+ print (f" - { failure_count } failure trajectories" )
515546
516547 except Exception as e :
517548 print (f"Error during processing: { e } " )
0 commit comments