Skip to content

Commit 19bda0c

Browse files
committed
feat: more checking on already uploaded
1 parent 3ec6253 commit 19bda0c

File tree

1 file changed

+29
-48
lines changed

1 file changed

+29
-48
lines changed

examples/create_downampled.py

Lines changed: 29 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -869,12 +869,13 @@ def is_chunk_fully_covered(chunk_bounds, processed_chunks_bounds):
869869
# All corners are covered
870870
return True
871871

872+
872873
# %% Read in the files that were already processed
873874

874875
already_uploaded_path = output_path / "uploaded_to_gcs_chunks.txt"
875876
if already_uploaded_path.exists():
876877
with open(already_uploaded_path, "r") as f:
877-
uploaded_files = f.readlines()
878+
uploaded_files = [line.strip() for line in f.readlines() if line.strip()]
878879
else:
879880
uploaded_files = []
880881

@@ -961,8 +962,7 @@ def check_and_upload_completed_chunks():
961962
# For each file in the output dir check if it is fully covered by the already processed bounds
962963
# First, we loop over all the files in the output directory
963964
for chunk_file in output_path_for_mip.glob("**/*"):
964-
# TODO probably need to use remote files here
965-
if chunk_file in uploaded_files:
965+
if str(chunk_file) in uploaded_files:
966966
continue
967967
# 1. Pull out the bounds of the chunk from the filename
968968
# filename format is x0-x1_y0-y1_z0-z1
@@ -991,10 +991,15 @@ def check_and_upload_completed_chunks():
991991

992992
if files_to_upload_this_batch:
993993
print(f"Uploading {len(files_to_upload_this_batch)} completed chunks to GCS...")
994-
upload_many_blobs_with_transfer_manager(
995-
gcs_output_bucket_name, files_to_upload_this_batch, workers=8
996-
)
997-
uploaded_count += len(files_to_upload_this_batch)
994+
if use_gcs_output:
995+
upload_many_blobs_with_transfer_manager(
996+
gcs_output_bucket_name, files_to_upload_this_batch, workers=8
997+
)
998+
uploaded_count += len(files_to_upload_this_batch)
999+
else:
1000+
print("GCS output not configured, skipping upload")
1001+
uploaded_count += len(files_to_upload_this_batch)
1002+
uploaded_files.extend([str(x) for x in files_to_upload_this_batch])
9981003

9991004
# Remove local chunks to save space
10001005
if use_gcs_output and delete_output:
@@ -1006,41 +1011,35 @@ def check_and_upload_completed_chunks():
10061011
chunk_file.unlink()
10071012
except Exception as e:
10081013
print(f"Error deleting local chunk file {chunk_file}: {e}")
1014+
1015+
# Append to the list of uploaded files
1016+
with open(already_uploaded_path, "a") as f:
1017+
for file in files_to_upload_this_batch:
1018+
if file not in failed_files:
1019+
f.write(f"{file}\n")
1020+
10091021
return uploaded_count
10101022

10111023

1012-
def upload_any_remaining_chunks():
1024+
def check_any_remaining_chunks():
10131025
"""
1014-
Upload any remaining chunks in the output directory to GCS.
1026+
Check any remaining chunks in the output directory to GCS.
10151027
This is called at the end of processing to ensure all data is uploaded.
10161028
1017-
Returns:
1018-
int: Number of chunks uploaded
10191029
"""
1020-
uploaded_count = 0
1030+
non_uploaded_files = []
10211031

10221032
for mip_level in range(num_mips):
10231033
factor = 2**mip_level
10241034
dir_name = f"{factor}_{factor}_{factor}"
10251035
output_path_for_mip = output_path / dir_name
10261036
# For each file in the output dir
10271037
for chunk_file in output_path_for_mip.glob("**/*"):
1028-
if chunk_file in uploaded_files:
1038+
if str(chunk_file) in uploaded_files:
10291039
continue
1030-
relative_path = chunk_file.relative_to(output_path)
1031-
gcs_chunk_path = (
1032-
gcs_output_path.rstrip("/")
1033-
+ "/"
1034-
+ str(relative_path).replace("\\", "/")
1035-
)
1036-
if upload_file_to_gcs(chunk_file, gcs_chunk_path, overwrite=overwrite_gcs):
1037-
uploaded_count += 1
1038-
# Remove local chunk to save space
1039-
if use_gcs_output and delete_output:
1040-
chunk_file.unlink()
1041-
uploaded_files.append((chunk_file, gcs_chunk_path))
1040+
non_uploaded_files.append(str(chunk_file))
10421041

1043-
return uploaded_count
1042+
return non_uploaded_files
10441043

10451044

10461045
# %% Move the data across with a single worker
@@ -1060,33 +1059,15 @@ def upload_any_remaining_chunks():
10601059
total_uploaded_files += check_and_upload_completed_chunks()
10611060
print(f"Total uploaded chunks so far: {total_uploaded_files}")
10621061

1063-
# Write files that were written before that final upload check
1064-
with open(output_path / "processed_chunks.txt", "w") as f:
1065-
for local_path, gcs_path in uploaded_files:
1066-
f.write(f"{local_path} -> {gcs_path}\n")
1067-
1068-
# %% Show any failed uploads
1062+
# %% Show any failed uploads or files left over
10691063
if failed_files:
10701064
print("The following files failed to upload to GCS:")
10711065
for f in failed_files:
10721066
print(f)
10731067

1074-
# %% Final upload of any remaining chunks - hopefully should be none here, but maybe some failed
1075-
# This is not something we always want to run, so puttin an input prompt here just in case
1076-
continue_upload = input(
1077-
"Do you want to upload any remaining chunks in the output directory to GCS? (y/n): "
1078-
)
1079-
if continue_upload.lower() != "y":
1080-
print("Skipping final upload of remaining chunks.")
1081-
else:
1082-
print("Processing complete, uploading any remaining chunks...")
1083-
total_uploaded_files += upload_any_remaining_chunks()
1084-
print(f"Final upload completed: {total_uploaded_files} chunks uploaded")
1085-
1086-
# Write the list of uploaded files to a text file for reference
1087-
with open(output_path / "uploaded_files.txt", "w") as f:
1088-
for local_path, gcs_path in uploaded_files:
1089-
f.write(f"{local_path} -> {gcs_path}\n")
1068+
remaining_files = check_any_remaining_chunks()
1069+
if remaining_files:
1070+
print(f"The following files were not uploaded yet: {remaining_files}")
10901071

10911072
# %% Serve the dataset to be used in neuroglancer
10921073
vols[0].viewer(port=1337)

0 commit comments

Comments
 (0)