@@ -70,14 +70,14 @@ def load_env_config():
7070 "GCS_OUTPUT_BUCKET_NAME" , "your-output-bucket-name"
7171 ),
7272 "GCS_OUTPUT_PREFIX" : os .getenv ("GCS_OUTPUT_PREFIX" , "processed/" ),
73+ "NUM_UPLOAD_WORKERS" : int (os .getenv ("NUM_UPLOAD_WORKERS" , "4" )),
7374 # Local paths (used when USE_GCS_BUCKET is False)
7475 "INPUT_PATH" : Path (os .getenv ("INPUT_PATH" , "/temp/in" )),
7576 "OUTPUT_PATH" : Path (os .getenv ("OUTPUT_PATH" , "/temp/out" )),
7677 "DELETE_INPUT" : parse_bool (os .getenv ("DELETE_INPUT" , "false" )),
7778 "DELETE_OUTPUT" : parse_bool (os .getenv ("DELETE_OUTPUT" , "false" )),
7879 # Processing settings
7980 "OVERWRITE" : parse_bool (os .getenv ("OVERWRITE" , "false" )),
80- "OVERWRITE_GCS" : parse_bool (os .getenv ("OVERWRITE_GCS" , "false" )),
8181 "NUM_MIPS" : int (os .getenv ("NUM_MIPS" , "5" )),
8282 "MIP_CUTOFF" : int (os .getenv ("MIP_CUTOFF" , "0" )),
8383 "CHANNEL_LIMIT" : int (os .getenv ("CHANNEL_LIMIT" , "4" )),
@@ -107,13 +107,13 @@ def load_env_config():
107107gcs_project = config ["GCS_PROJECT" ]
108108use_gcs_output = config ["USE_GCS_OUTPUT" ]
109109gcs_output_bucket_name = config ["GCS_OUTPUT_BUCKET_NAME" ]
110- gcs_output_path = config ["GCS_OUTPUT_PREFIX" ]
110+ gcs_output_path = config ["GCS_OUTPUT_PREFIX" ].rstrip ("/" ) + "/"
111+ num_upload_workers = config ["NUM_UPLOAD_WORKERS" ]
111112input_path = config ["INPUT_PATH" ]
112113output_path = config ["OUTPUT_PATH" ]
113114delete_input = config ["DELETE_INPUT" ]
114115delete_output = config ["DELETE_OUTPUT" ]
115116overwrite_output = config ["OVERWRITE" ]
116- overwrite_gcs = config ["OVERWRITE_GCS" ]
117117num_mips = config ["NUM_MIPS" ]
118118mip_cutoff = config ["MIP_CUTOFF" ]
119119channel_limit = config ["CHANNEL_LIMIT" ]
@@ -454,7 +454,7 @@ def sync_info_to_gcs_output():
454454 This uploads the info file so the bucket is ready to receive the rest of the data.
455455 """
456456 local_info_path = output_path / "info"
457- gcs_info_path = gcs_output_path . rstrip ( "/" ) + "/ info"
457+ gcs_info_path = gcs_output_path + "info"
458458 upload_file_to_gcs (local_info_path , gcs_info_path )
459459
460460
@@ -652,7 +652,9 @@ def compute_optimal_chunk_size(single_file_shape, num_mips, max_chunk_size=None)
652652vol .commit_provenance ()
653653
654654# Sync the info file to GCS output bucket if configured
655- sync_info_to_gcs_output ()
655+ info_synced = True
656+ if not info_synced and use_gcs_output :
657+ sync_info_to_gcs_output ()
656658
657659del vol
658660
@@ -924,9 +926,14 @@ def upload_many_blobs_with_transfer_manager(
924926 storage_client = Client (project = gcs_project )
925927 bucket = storage_client .bucket (bucket_name )
926928
929+ source_directory = str (source_directory )
930+ output_filenames = [
931+ filenames [len (source_directory ) + 1 :] for filenames in filenames
932+ ]
933+
927934 results = transfer_manager .upload_many_from_filenames (
928935 bucket ,
929- filenames ,
936+ output_filenames ,
930937 source_directory = source_directory ,
931938 blob_name_prefix = gcs_output_path ,
932939 max_workers = workers ,
@@ -962,11 +969,12 @@ def check_and_upload_completed_chunks():
962969 # For each file in the output dir check if it is fully covered by the already processed bounds
963970 # First, we loop over all the files in the output directory
964971 for chunk_file in output_path_for_mip .glob ("**/*" ):
965- if str (chunk_file ) in uploaded_files :
972+ chunk_file = str (chunk_file )
973+ if chunk_file in uploaded_files :
966974 continue
967975 # 1. Pull out the bounds of the chunk from the filename
968976 # filename format is x0-x1_y0-y1_z0-z1
969- match = re .search (r"(\d+)-(\d+)_(\d+)-(\d+)_(\d+)-(\d+)" , str ( chunk_file ) )
977+ match = re .search (r"(\d+)-(\d+)_(\d+)-(\d+)_(\d+)-(\d+)" , chunk_file )
970978 if not match :
971979 continue
972980 x0 , x1 , y0 , y1 , z0 , z1 = map (int , match .groups ())
@@ -993,7 +1001,10 @@ def check_and_upload_completed_chunks():
9931001 print (f"Uploading { len (files_to_upload_this_batch )} completed chunks to GCS..." )
9941002 if use_gcs_output :
9951003 upload_many_blobs_with_transfer_manager (
996- gcs_output_bucket_name , files_to_upload_this_batch , workers = 8
1004+ gcs_output_bucket_name ,
1005+ files_to_upload_this_batch ,
1006+ source_directory = output_path ,
1007+ workers = num_upload_workers ,
9971008 )
9981009 uploaded_count += len (files_to_upload_this_batch )
9991010 else :
0 commit comments