Skip to content

Commit 1805a77

Browse files
committed
Add parallelised preprocessing option, other minor fixes
1 parent 5a6725f commit 1805a77

File tree

4 files changed

+70
-14
lines changed

4 files changed

+70
-14
lines changed

README.md

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,27 @@ Be careful using this option, as it creates a much larger number of files, and t
201201

202202
> [!WARNING]
203203
> This script in particular will use a large amount of RAM, since it loads the entire dataset into memory at once.
204-
> Processing may be done in batches by using the `--max-files` and `--skip-files` command-line arguments.
204+
> Processing may be done in batches by using the `--max-files` and `--skip-files` command-line arguments, or the script below.
205+
206+
##### np-to-tfrecord-parallel.sh
207+
208+
This script can run multiple instances of `np-to-tfrecord.py` in parallel, allowing preprocessing to be sped up and/or less RAM to be used.
209+
210+
Usage:
211+
```bash
212+
np-to-tfrecord-parallel.sh <NUM PROCESSES> <FILES PER PROCESS> <INPUT PATH> <OUTPUT PATH>
213+
```
214+
Where:
215+
- `INPUT PATH` contains your `.npy` files, as above.
216+
- `OUTPUT PATH` is the desired output directory.
217+
- `NUM PROCESSES` is the number of CPU cores to use.
218+
- `FILES PER PROCESS` is the number of files each thread should load at once.
219+
220+
Ensure that `NUM_PROCESSES * FILES_PER_PROCESS` input files can fit comfortably in RAM.
221+
222+
> [!NOTE]
223+
> Shuffling is disabled by default in this script - if shuffled data is desired, the `--no-shuffle` flag should be removed from the script.
224+
> If this flag is removed, shuffling will only be done on a per-process level - that is, each process will shuffle the files it has loaded, but not the dataset as a whole.
205225
206226

207227
#### sqlite3-compress.py

analysis/plots-data.ipynb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -311,8 +311,8 @@
311311
"metadata": {},
312312
"outputs": [],
313313
"source": [
314-
"data_dir = data_base + \"/filtered\"\n",
315-
"suffixes = [\"i\",\"j\",\"k\",\"l\",\"m\",\"n\",\"o\",\"p\",\"q\",\"r\",\"s\",\"t\",\"u\"]"
314+
"data_dir = data_base\n",
315+
"suffixes = [f\"{i:03d}\" for i in range(171)]"
316316
]
317317
},
318318
{
@@ -325,8 +325,8 @@
325325
" return sat_id * num_cells + sat_cell\n",
326326
"\n",
327327
"def load_data(path, suffix):\n",
328-
" file_ids = os.path.join(path, f\"ids-{suffix}.npy\")\n",
329-
" file_cells = os.path.join(path, f\"cells-{suffix}.npy\")\n",
328+
" file_ids = os.path.join(path, f\"ra_sat_{suffix}.npy\")\n",
329+
" file_cells = os.path.join(path, f\"ra_cell_{suffix}.npy\")\n",
330330
"\n",
331331
" return np.load(file_ids), np.load(file_cells)\n",
332332
"\n",
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#!/bin/bash
2+
# This script converts the numpy arrays to tfrecords in parallel, using a specified number of processes.
3+
4+
# If not enough arguments are specified, print help and exit
5+
if [ $# -ne 4 ]; then
6+
echo "Usage: $0 <num_processes> <files_per_process> <path_in> <path_out>"
7+
exit 1
8+
fi
9+
10+
# Take arguments from the command line
11+
num_processes=$1
12+
files_per_process=$2
13+
path_in=$3
14+
path_out=$4
15+
16+
num_files=$(ls -1 $path_in/samples_*.npy | wc -l)
17+
step_size=$((num_processes*files_per_process))
18+
19+
# Run the conversion in parallel
20+
for i in $(seq 0 $step_size $((num_files-1))); do
21+
start=$i
22+
end=$((start+step_size))
23+
if [ $end -gt $num_files ]; then
24+
end=$num_files
25+
fi
26+
echo "Starting processes with files $start to $end"
27+
for j in $(seq 0 $(($num_processes - 1))); do
28+
skip_files=$((start+j*files_per_process))
29+
echo " Starting process $j with files $skip_files to $(($skip_files+files_per_process))"
30+
python3 np-to-tfrecord.py --path-in=$path_in --path-out=$path_out --skip-files=$skip_files --max-files=$files_per_process --no-shuffle &
31+
done
32+
wait
33+
done

preprocessing/np-to-tfrecord.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def save_dataset(path, suffix, samples_array, ids_array, cells_array):
4242
}))
4343
writer.write(example.SerializeToString())
4444

45-
def save_dataset_batches(path, chunk_size, samples_array, ids_array, cells_array, verbose):
45+
def save_dataset_batches(path, chunk_size, samples_array, ids_array, cells_array, verbose, skip_count=None):
4646
chunk_count = 0
4747

4848
# Create directory if it doesn't exist
@@ -63,19 +63,21 @@ def save_dataset_batches(path, chunk_size, samples_array, ids_array, cells_array
6363
ids_array = ids_array[chunk_size:]
6464
cells_array = cells_array[chunk_size:]
6565

66-
save_dataset(path, str(chunk_count), s, i, c)
66+
suffix = f"{skip_count}-{chunk_count}" if skip_count is not None else f"{chunk_count}"
67+
save_dataset(path, suffix, s, i, c)
6768
chunk_count += 1
6869

6970
if samples_array.shape[0] > 0:
7071
if verbose:
7172
print(f"Saving chunk {chunk_count}...")
7273
print(f"Samples remaining: {samples_array.shape[0]}")
73-
save_dataset(path, str(chunk_count), samples_array, ids_array, cells_array)
74+
suffix = f"{skip_count}-{chunk_count}" if skip_count is not None else f"{chunk_count}"
75+
save_dataset(path, suffix, samples_array, ids_array, cells_array)
7476
chunk_count += 1
7577

7678
return chunk_count
7779

78-
def process_all(chunk_size, path_in, path_out, max_files=None, skip_files=0, verbose=False, shuffle=True, by_id=False):
80+
def process_all(chunk_size, path_in, path_out, max_files=None, skip_files=None, verbose=False, shuffle=True, by_id=False):
7981
samples_array = None
8082
ids_array = None
8183
cells_array = None
@@ -86,7 +88,8 @@ def process_all(chunk_size, path_in, path_out, max_files=None, skip_files=0, ver
8688
suffixes = [ f for f in os.listdir(path_in) if f.startswith("samples_") and f.endswith(".npy") ]
8789
suffixes.sort()
8890
suffixes = [ f[8:-4] for f in suffixes ]
89-
suffixes = suffixes[skip_files:]
91+
if skip_files is not None:
92+
suffixes = suffixes[skip_files:]
9093
if max_files is not None:
9194
suffixes = suffixes[:max_files]
9295

@@ -145,7 +148,7 @@ def process_all(chunk_size, path_in, path_out, max_files=None, skip_files=0, ver
145148
if verbose:
146149
print("Done")
147150

148-
save_dataset_batches(path_out_id, chunk_size, samples_array_unique_subset, ids_array_unique_subset, cells_array_unique_subset, verbose)
151+
save_dataset_batches(path_out_id, chunk_size, samples_array_unique_subset, ids_array_unique_subset, cells_array_unique_subset, verbose, skip_count=skip_files)
149152

150153
del samples_array_unique_subset
151154
del ids_array_unique_subset
@@ -180,7 +183,7 @@ def process_all(chunk_size, path_in, path_out, max_files=None, skip_files=0, ver
180183
if verbose:
181184
print("Done")
182185

183-
save_dataset_batches(path_out_test, chunk_size, samples_array_unique_subset, ids_array_unique_subset, cells_array_unique_subset, verbose)
186+
save_dataset_batches(path_out_test, chunk_size, samples_array_unique_subset, ids_array_unique_subset, cells_array_unique_subset, verbose, skip_count=skip_files)
184187

185188
del samples_array_unique_subset
186189
del ids_array_unique_subset
@@ -200,7 +203,7 @@ def process_all(chunk_size, path_in, path_out, max_files=None, skip_files=0, ver
200203
if verbose:
201204
print("Done")
202205

203-
chunk_count = save_dataset_batches(path_out, chunk_size, samples_array, ids_array, cells_array, verbose)
206+
chunk_count = save_dataset_batches(path_out, chunk_size, samples_array, ids_array, cells_array, verbose, skip_count=skip_files)
204207

205208
if verbose:
206209
print("Total messages: {}".format(message_count))
@@ -216,7 +219,7 @@ def process_all(chunk_size, path_in, path_out, max_files=None, skip_files=0, ver
216219
parser.add_argument("--path-in", type=str, default=path_in, help="Input directory.")
217220
parser.add_argument("--path-out", type=str, default=path_out, help="Output directory.")
218221
parser.add_argument("--max-files", type=int, default=None, help="Maximum number of input files to process.")
219-
parser.add_argument("--skip-files", type=int, default=0, help="Number of input files to skip.")
222+
parser.add_argument("--skip-files", type=int, default=None, help="Number of input files to skip.")
220223
parser.add_argument("--no-shuffle", action='store_true', help="Do not shuffle data.")
221224
parser.add_argument("--by-id", action='store_true', help="Create datasets with different percentages of the most common IDs. WARNING: This will create a lot of datasets, and take a long time!")
222225
parser.add_argument("-v", "--verbose", action='store_true', help="Display progress.")

0 commit comments

Comments
 (0)