Skip to content

Commit 56d80b1

Browse files
committed
feat: continue to link up parts
1 parent ff26aee commit 56d80b1

File tree

3 files changed

+47
-5
lines changed

3 files changed

+47
-5
lines changed

create_downsampled/chunking.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ def process(
105105
input_path,
106106
all_files,
107107
delete_input,
108+
gcs_project,
109+
num_channels,
108110
):
109111
x_i, y_i, z_i = args
110112

@@ -125,13 +127,22 @@ def process(
125127

126128
# Use the new load_file function that handles download/caching
127129
print(f"Loading file for coordinates ({x_i}, {y_i}, {z_i})")
128-
loaded_zarr_store = load_file(x_i, y_i)
130+
loaded_zarr_store = load_file(
131+
x_i,
132+
y_i,
133+
use_gcs_bucket,
134+
input_path,
135+
total_rows,
136+
total_cols,
137+
all_files,
138+
gcs_project,
139+
)
129140

130141
if loaded_zarr_store is None:
131142
print(f"Warning: Could not load file for row {x_i}, col {y_i}. Skipping...")
132143
return
133144

134-
rawdata = load_data_from_zarr_store(loaded_zarr_store)
145+
rawdata = load_data_from_zarr_store(loaded_zarr_store, num_channels)
135146

136147
# Process all mip levels
137148
for mip_level in reversed(range(mip_cutoff, num_mips)):

create_downsampled/gcs_local_io.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ def check_and_upload_completed_chunks(
225225
use_gcs_output,
226226
gcs_project,
227227
gcs_output_bucket_name,
228+
gcs_output_path,
228229
num_upload_workers,
229230
delete_output,
230231
already_uploaded_path,
@@ -283,6 +284,10 @@ def check_and_upload_completed_chunks(
283284
gcs_output_bucket_name,
284285
files_to_upload_this_batch,
285286
source_directory=output_path,
287+
gcs_output_path=gcs_output_path,
288+
gcs_project=gcs_project,
289+
uploaded_files=uploaded_files,
290+
failed_files=failed_files,
286291
workers=num_upload_workers,
287292
)
288293
uploaded_count += len(files_to_upload_this_batch)

create_downsampled/main.py

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,12 @@
44
from load_config import load_env_config
55
from gcs import list_gcs_files, sync_info_to_gcs_output
66
from wells import compute_grid_dimensions, get_grid_coords
7-
from gcs_local_io import load_file, check_and_upload_completed_chunks, check_any_remaining_chunks
7+
from gcs_local_io import (
8+
get_uploaded_files,
9+
load_file,
10+
check_and_upload_completed_chunks,
11+
check_any_remaining_chunks,
12+
)
813
from chunking import compute_volume_and_chunk_size, process
914
from volume import create_cloudvolume_info
1015

@@ -89,6 +94,9 @@ def main():
8994
# Process each well into chunks
9095
iter_coords = list(get_grid_coords(num_chunks_per_dim))
9196

97+
# Find which files were already done and keep track of them
98+
uploaded_files = get_uploaded_files(output_path)
99+
92100
processed_chunks = []
93101
failed_chunks = []
94102
total_uploads = 0
@@ -110,18 +118,36 @@ def main():
110118
input_path=input_path,
111119
all_files=all_files,
112120
delete_input=delete_input,
121+
gcs_project=gcs_project,
122+
num_channels=num_channels,
113123
)
114124
start, end = bounds
115125
processed_chunks.append((start, end))
116-
total_uploads += check_and_upload_completed_chunks()
126+
total_uploads += check_and_upload_completed_chunks(
127+
num_mips=num_mips,
128+
output_path=output_path,
129+
volume_size=volume_size,
130+
processed_chunks_bounds=processed_chunks,
131+
use_gcs_output=use_gcs_output,
132+
gcs_project=gcs_project,
133+
gcs_output_bucket_name=gcs_output_bucket_name,
134+
gcs_output_path=gcs_output_path,
135+
num_upload_workers=num_upload_workers,
136+
delete_output=delete_output,
137+
already_uploaded_path=output_path / "uploaded_to_gcs_chunks.txt",
138+
uploaded_files=uploaded_files,
139+
failed_files=failed_chunks,
140+
)
117141
print(f"Total chunks uploaded so far: {total_uploads}")
118142

119143
if failed_chunks:
120144
print(f"Failed to process {len(failed_chunks)} chunks:")
121145
for chunk in failed_chunks:
122146
print(f" {chunk}")
123147

124-
remaining_files = check_any_remaining_chunks()
148+
remaining_files = check_any_remaining_chunks(
149+
num_mips=num_mips, output_path=output_path, uploaded_files=uploaded_files
150+
)
125151
if remaining_files:
126152
print(f"Remaining chunks: {remaining_files}")
127153

0 commit comments

Comments
 (0)