Skip to content

Commit ca1cefc

Browse files
authored
Atomic write of compressed files (#85)
* writes compressed files in a tmp file first * adds test for tmp wkw writing * tests * ci
1 parent f1cdb9e commit ca1cefc

File tree

5 files changed

+94
-18
lines changed

5 files changed

+94
-18
lines changed

.github/workflows/python-module.yml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,3 +171,20 @@ jobs:
171171
user: __token__
172172
password: ${{ secrets.PYPI_PASSWORD }}
173173
skip-existing: true
174+
175+
complete:
176+
needs:
177+
- test_rust
178+
- build_lin
179+
- build_mac
180+
- build_win
181+
if: always()
182+
runs-on: ubuntu-latest
183+
steps:
184+
- name: Check failure
185+
if: |
186+
contains(needs.*.result, 'failure') ||
187+
contains(needs.*.result, 'cancelled')
188+
run: exit 1
189+
- name: Success
190+
run: echo Success!

python/tests/test_wkw.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import os
12
from pathlib import Path
23
import wkw
34
import numpy as np
@@ -384,6 +385,35 @@ def test_dataset_with_invalid_jumptable():
384385
assert "Corrupt jump table" in str(excinfo)
385386

386387

388+
def test_readwrite_compressed_tmp():
389+
with wkw.Dataset.create(
390+
"tests/tmp",
391+
wkw.Header(np.uint8, block_type=wkw.Header.BLOCK_TYPE_LZ4, file_len=1),
392+
) as dataset:
393+
test_data = generate_test_data(dataset.header.voxel_type)
394+
395+
wkw_path = path.join("tests/tmp", "z0", "y0", "x0.wkw")
396+
397+
# write data, should run through cleanly
398+
dataset.write(POSITION, test_data)
399+
assert np.array_equiv(dataset.read(POSITION, SIZE), test_data)
400+
assert path.exists(wkw_path)
401+
assert not path.exists(wkw_path + "_tmp")
402+
403+
# simulate broken write
404+
os.rename(wkw_path, wkw_path + "_tmp")
405+
wkw_file_bytes = bytearray(Path(wkw_path + "_tmp").read_bytes())
406+
zeros = b"\x00" * 8
407+
wkw_file_bytes[16 : (16 + 8)] = zeros
408+
Path(wkw_path + "_tmp").write_bytes(wkw_file_bytes)
409+
410+
# try another write, should clean up broken write
411+
dataset.write(POSITION, test_data)
412+
assert path.exists(wkw_path)
413+
assert not path.exists(wkw_path + "_tmp")
414+
assert np.array_equiv(dataset.read(POSITION, SIZE), test_data)
415+
416+
387417
def generate_test_data(dtype, size=SIZE, order="C"):
388418
return np.array(
389419
np.random.uniform(np.iinfo(dtype).min, np.iinfo(dtype).max, size).astype(dtype),

rust/src/dataset.rs

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::fs;
22
use std::path::{Path, PathBuf};
3-
use {BlockType, Box3, File, Header, Mat, Result, Vec3};
3+
use {Box3, File, Header, Mat, Result, Vec3};
44

55
#[derive(Debug, Clone)]
66
pub struct Dataset {
@@ -140,7 +140,7 @@ impl Dataset {
140140
}
141141

142142
let file_len_vx_log2 = self.header.file_len_vx_log2() as u32;
143-
if self.header.block_type == BlockType::LZ4 || self.header.block_type == BlockType::LZ4HC {
143+
if self.header.is_compressed() {
144144
let file_len_vec = Vec3::from(1 << file_len_vx_log2);
145145
let is_dst_aligned = dst_pos % file_len_vec == Vec3::from(0);
146146
let is_shape_aligned = mat.shape % file_len_vec == Vec3::from(0);
@@ -170,6 +170,11 @@ impl Dataset {
170170
cur_path.push(format!("y{}", cur_y));
171171
cur_path.push(format!("x{}.wkw", cur_x));
172172

173+
// writing compressed file into temporary file first
174+
if self.header.is_compressed() {
175+
cur_path.set_extension("wkw_tmp");
176+
}
177+
173178
// bounding box
174179
let cur_file_ids = Vec3 {
175180
x: cur_x,
@@ -186,22 +191,38 @@ impl Dataset {
186191
let cur_src_pos = cur_box.min() - dst_pos;
187192
let cur_dst_pos = cur_box.min() - cur_file_box.min();
188193

189-
let mut file = match File::open_or_create(&cur_path, &self.header) {
190-
Ok(file) => file,
191-
Err(err) => {
192-
return Err(format!(
193-
"Error while open file {:?} for writing: {}",
194-
&cur_path, err
195-
));
194+
{
195+
let mut file = match File::open_or_create(&cur_path, &self.header) {
196+
Ok(file) => file,
197+
Err(err) => {
198+
return Err(format!(
199+
"Error while open file {:?} for writing: {}",
200+
&cur_path, err
201+
));
202+
}
203+
};
204+
match file.write_mat(cur_dst_pos, mat, cur_src_pos) {
205+
Ok(_) => {}
206+
Err(err) => {
207+
return Err(format!(
208+
"Error while writing to file {:?}: {}",
209+
&cur_path, err
210+
));
211+
}
196212
}
197-
};
198-
match file.write_mat(cur_dst_pos, mat, cur_src_pos) {
199-
Ok(_) => {}
200-
Err(err) => {
201-
return Err(format!(
202-
"Error while writing to file {:?}: {}",
203-
&cur_path, err
204-
));
213+
}
214+
// moving compressed file into final file
215+
if self.header.is_compressed() {
216+
let mut new_path = cur_path.clone();
217+
new_path.set_extension("wkw");
218+
match File::rename(&cur_path, &new_path) {
219+
Ok(_) => {}
220+
Err(err) => {
221+
return Err(format!(
222+
"Error while renaming temporary file {:?} to {:?}: {}",
223+
&cur_path, &new_path, err
224+
));
225+
}
205226
}
206227
}
207228
}

rust/src/file.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ impl File {
5151

5252
let mut file = open_opts
5353
.open(path)
54-
.or(Err(format!("Could not open file {:?}", path)))?;
54+
.or(Err(format!("Could not open file {:?}", &path)))?;
5555

5656
// check if file was created
5757
let (header, created) = match Header::read(&mut file) {
@@ -70,6 +70,10 @@ impl File {
7070
Ok(file)
7171
}
7272

73+
pub(crate) fn rename(old_path: &path::Path, new_path: &path::Path) -> Result<()> {
74+
fs::rename(old_path, new_path).or_else(|err| Err(err.to_string()))
75+
}
76+
7377
pub(crate) fn read_mat(
7478
&mut self,
7579
src_pos: Vec3,

rust/src/header.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,10 @@ impl Header {
250250
self.voxel_size as usize > self.voxel_type_size()
251251
}
252252

253+
pub fn is_compressed(&self) -> bool {
254+
return self.block_type == BlockType::LZ4 || self.block_type == BlockType::LZ4HC;
255+
}
256+
253257
fn from_bytes(buf: [u8; 16]) -> Result<Header> {
254258
let raw: HeaderRaw = unsafe { mem::transmute(buf) };
255259

0 commit comments

Comments
 (0)