diff --git a/dune-project b/dune-project index e97ed395..aaa4c9e0 100644 --- a/dune-project +++ b/dune-project @@ -30,6 +30,7 @@ (and (>= 4.14.0))) (yojson (>= 1.6.0)) (stdint (>= 0.7.2)) + (zipc (>= 0.2.0)) (checkseum (>= 0.4.0)) (odoc :with-doc) (ounit2 :with-test) diff --git a/examples/dune b/examples/dune index f110b852..2d80a4cf 100644 --- a/examples/dune +++ b/examples/dune @@ -4,12 +4,6 @@ (ocamlopt_flags (:standard -O3)) (libraries zarr-eio camlzip)) -(executable - (name inmemory_zipstore) - (modules inmemory_zipstore) - (ocamlopt_flags (:standard -O3)) - (libraries zarr-lwt zipc)) - (executable (name picos_fs_store) (modules picos_fs_store) diff --git a/examples/inmemory_zipstore.ml b/examples/inmemory_zipstore.ml deleted file mode 100644 index 089024a6..00000000 --- a/examples/inmemory_zipstore.ml +++ /dev/null @@ -1,203 +0,0 @@ -(* This module implements a Zip file zarr store that is Lwt-aware. - It supports both read and write operations. This is because the - underlying Zip library used reads all Zip file bytes into memory. All - store updates are done in-memory and thus to update the actual zip file - we must write the update bytes to disk. The `with_open` convenience - function serves this purpose; it ensures that any updates to the store - are written to the zip file upon exit. - - The main requirement is to implement the signature of Zarr.Types.IO. - We use Zarr_lwt's Deferred module for `Deferred` so that the store can be - Lwt-aware. - - To compile & run this example execute the command - dune exec -- examples/inmemory_zipstore.exe - in your shell at the root of this project. *) - -module ZipStore : sig - include Zarr.Storage.STORE with module Deferred = Zarr_lwt.Deferred - val with_open : ?level:Zipc_deflate.level -> Unix.file_perm -> string -> (t -> 'a Deferred.t) -> 'a Deferred.t -end = struct - module M = Map.Make(String) - - module Z = struct - module Deferred = Zarr_lwt.Deferred - open Deferred.Syntax - - type t = {ic : Zipc.t Atomic.t; level : Zipc_deflate.level} - - let is_member t key = - Deferred.return @@ Zipc.mem key @@ Atomic.get t.ic - - let size t key = - Deferred.return @@ - match Zipc.find key @@ Atomic.get t.ic with - | None -> 0 - | Some m -> - match Zipc.Member.kind m with - | Zipc.Member.Dir -> 0 - | Zipc.Member.File f -> Zipc.File.decompressed_size f - - let get t key = - Deferred.return @@ - match Zipc.find key @@ Atomic.get t.ic with - | None -> raise (Zarr.Storage.Key_not_found key) - | Some m -> - match Zipc.Member.kind m with - | Zipc.Member.Dir -> failwith "A chunk key cannot be a directory." - | Zipc.Member.File f -> - Result.fold ~error:failwith ~ok:Fun.id @@ Zipc.File.to_binary_string f - - let get_partial_values t key ranges = - let+ data = get t key in - let size = String.length data in - ranges |> List.map @@ fun (ofs, len) -> - let f v = String.sub data ofs v in - Option.fold ~none:(f (size - ofs)) ~some:f len - - let list t = - Deferred.return @@ Zipc.fold - (fun m acc -> - match Zipc.Member.kind m with - | Zipc.Member.Dir -> acc - | Zipc.Member.File _ -> Zipc.Member.path m :: acc) (Atomic.get t.ic) [] - - let list_dir t prefix = - let module S = Set.Make(String) in - let n = String.length prefix in - let m = Zipc.to_string_map @@ Atomic.get t.ic in - let prefs, keys = - M.fold - (fun key v ((l, r) as acc) -> - match Zipc.Member.kind v with - | Zipc.Member.Dir -> acc - | Zipc.Member.File _ -> - let pred = String.starts_with ~prefix key in - match key with - | k when pred && String.contains_from k n '/' -> - S.add String.(sub k 0 @@ 1 + index_from k n '/') l, r - | k when pred -> l, k :: r - | _ -> acc) m (S.empty, []) - in Deferred.return (keys, S.elements prefs) - - let rec set t key value = - match Zipc.File.deflate_of_binary_string ~level:t.level value with - | Error e -> failwith e - | Ok f -> - match Zipc.Member.(make ~path:key @@ File f) with - | Error e -> failwith e - | Ok m -> - let z = Atomic.get t.ic in - if Atomic.compare_and_set t.ic z @@ Zipc.add m z - then Deferred.return_unit else set t key value - - let rec set_partial_values t key ?(append=false) rv = - let z = Atomic.get t.ic in - let mem = match Zipc.find key z with - | Some m -> m - | None -> - let empty = Result.fold - ~error:failwith ~ok:Fun.id @@ Zipc.File.stored_of_binary_string String.empty in - Result.fold - ~error:failwith ~ok:Fun.id @@ Zipc.Member.make ~path:key (Zipc.Member.File empty) - in - match Zipc.Member.kind mem with - | Zipc.Member.Dir -> Deferred.return_unit - | Zipc.Member.File file -> - match Zipc.File.to_binary_string file with - | Error e -> failwith e - | Ok s -> - let f = if append || s = String.empty then - fun acc (_, v) -> Deferred.return @@ acc ^ v else - fun acc (rs, v) -> - let s = Bytes.unsafe_of_string acc in - String.(length v |> Bytes.blit_string v 0 s rs); - Deferred.return @@ Bytes.unsafe_to_string s - in - let* value = Deferred.fold_left f s rv in - match Zipc.File.deflate_of_binary_string ~level:t.level value with - | Error e -> failwith e - | Ok f -> - match Zipc.Member.(make ~path:key @@ File f) with - | Error e -> failwith e - | Ok m -> - if Atomic.compare_and_set t.ic z @@ Zipc.add m z - then Deferred.return_unit else set_partial_values t key ~append rv - - let rec erase t key = - let z = Atomic.get t.ic in - let z' = Zipc.remove key z in - if Atomic.compare_and_set t.ic z z' - then Deferred.return_unit else erase t key - - let rec erase_prefix t prefix = - let z = Atomic.get t.ic in - let m = Zipc.to_string_map z in - let m' = M.filter_map - (fun k v -> if String.starts_with ~prefix k then None else Some v) m in - let z' = Zipc.of_string_map m' in - if Atomic.compare_and_set t.ic z z' - then Deferred.return_unit else erase_prefix t prefix - - (* Adapted from: https://github.com/dbuenzli/zipc/issues/8#issuecomment-2392417890 *) - let rec rename t prefix new_prefix = - let rename_member ~prefix ~new_prefix m = - let path = Zipc.Member.path m in - if not (String.starts_with ~prefix path) then m else - let l = String.length prefix in - let path = new_prefix ^ String.sub path l (String.length path - l) in - let mtime = Zipc.Member.mtime m in - let mode = Zipc.Member.mode m in - let kind = Zipc.Member.kind m in - match Zipc.Member.make ~mtime ~mode ~path kind with - | Ok m' -> m' | Error e -> failwith e - in - let z = Atomic.get t.ic in - let add m acc = Zipc.add (rename_member ~prefix ~new_prefix m) acc in - let z' = Zipc.fold add z Zipc.empty in - if Atomic.compare_and_set t.ic z z' - then Deferred.return_unit else rename t prefix new_prefix - end - (* this functor generates the public signature of our Zip file store. *) - include Zarr.Storage.Make(Z) - - let with_open ?(level=`Default) perm path f = - let s = In_channel.(with_open_bin path input_all) in - let x = match Zipc.of_binary_string s with - | Ok z -> Z.{ic = Atomic.make z; level} - | Error e -> failwith e - in - let open Deferred.Syntax in - let+ out = f x in - let flags = [Open_wronly; Open_trunc; Open_creat] in - match Zipc.to_binary_string @@ Atomic.get x.ic with - | Error e -> failwith e - | Ok v -> - Out_channel.with_open_gen flags perm path @@ fun oc -> - Out_channel.output_string oc v; - Out_channel.flush oc; - out -end - -let _ = - Lwt_main.run @@ begin - let open Zarr in - let open Zarr.Ndarray in - let open Zarr.Indexing in - let open ZipStore.Deferred.Syntax in - - ZipStore.with_open 0o700 "examples/data/testdata.zip" @@ fun store -> - let* xs, _ = ZipStore.hierarchy store in - let anode = List.hd @@ List.filter - (fun node -> Node.Array.to_path node = "/some/group/name") xs in - let slice = [|R [|0; 20|]; I 10; R [||]|] in - let* x = ZipStore.Array.read store anode slice Char in - let x' = x |> Zarr.Ndarray.map @@ fun _ -> Random.int 256 |> Char.chr in - let* () = ZipStore.Array.write store anode slice x' in - let* y = ZipStore.Array.read store anode slice Char in - assert (Zarr.Ndarray.equal x' y); - let* () = ZipStore.Array.rename store anode "name2" in - let+ exists = ZipStore.Array.exists store @@ Node.Array.of_path "/some/group/name2" in - assert exists - end; - print_endline "Zip store has been updated." diff --git a/zarr-eio/src/storage.ml b/zarr-eio/src/storage.ml index 5779d8ff..1d36f294 100644 --- a/zarr-eio/src/storage.ml +++ b/zarr-eio/src/storage.ml @@ -3,6 +3,12 @@ module MemoryStore = struct let create = Zarr.Memory.create end +module ZipStore = struct + module Z = Zarr.Zip.Make(Deferred) + include Zarr.Storage.Make(Z) + let with_open = Z.with_open +end + module FilesystemStore = struct module FS = struct module Deferred = Deferred diff --git a/zarr-eio/src/storage.mli b/zarr-eio/src/storage.mli index b324eb5d..c1586971 100644 --- a/zarr-eio/src/storage.mli +++ b/zarr-eio/src/storage.mli @@ -7,6 +7,9 @@ module MemoryStore : sig (** [create ()] returns a new In-memory Zarr store type. *) end +(** An Eio-aware Zip file storage backend for a Zarr v3 hierarchy. *) +module ZipStore : sig include Zarr.Zip.S with type 'a Deferred.t = 'a end + module FilesystemStore : sig (** A local filesystem storage backend for a Zarr V3 hierarchy. *) diff --git a/zarr-eio/test/test_eio.ml b/zarr-eio/test/test_eio.ml index 33f1968a..ab4fed8e 100644 --- a/zarr-eio/test/test_eio.ml +++ b/zarr-eio/test/test_eio.ml @@ -143,6 +143,11 @@ let _ = (Zarr.Storage.Not_a_filesystem_store fn) (fun () -> FilesystemStore.open_store ~env fn); + (* test with non-existant archive *) + let zpath = tmp_dir ^ ".zip" in + ZipStore.with_open `Read_write zpath (fun z -> test_storage (module ZipStore) z); + (* test just opening the now exisitant archive created by the previous test. *) + ZipStore.with_open `Read_only zpath (fun _ -> ZipStore.Deferred.return_unit); test_storage (module MemoryStore) @@ MemoryStore.create (); test_storage (module FilesystemStore) s) ]) diff --git a/zarr-lwt/src/storage.ml b/zarr-lwt/src/storage.ml index 0c952934..705288af 100644 --- a/zarr-lwt/src/storage.ml +++ b/zarr-lwt/src/storage.ml @@ -3,6 +3,12 @@ module MemoryStore = struct let create = Zarr.Memory.create end +module ZipStore = struct + module Z = Zarr.Zip.Make(Deferred) + include Zarr.Storage.Make(Z) + let with_open = Z.with_open +end + module FilesystemStore = struct module FS = struct module Deferred = Deferred diff --git a/zarr-lwt/src/storage.mli b/zarr-lwt/src/storage.mli index e55b1b29..e790e8c0 100644 --- a/zarr-lwt/src/storage.mli +++ b/zarr-lwt/src/storage.mli @@ -7,6 +7,9 @@ module MemoryStore : sig (** [create ()] returns a new In-memory Zarr store type. *) end +(** An Lwt-aware Zip file storage backend for a Zarr v3 hierarchy. *) +module ZipStore : sig include Zarr.Zip.S with type 'a Deferred.t = 'a Lwt.t end + module FilesystemStore : sig (** A local filesystem storage backend for a Zarr V3 hierarchy. *) diff --git a/zarr-lwt/test/test_lwt.ml b/zarr-lwt/test/test_lwt.ml index 1fa91b1b..e3bf3f98 100644 --- a/zarr-lwt/test/test_lwt.ml +++ b/zarr-lwt/test/test_lwt.ml @@ -141,8 +141,11 @@ let _ = (Zarr.Storage.Not_a_filesystem_store fn) (fun () -> FilesystemStore.open_store fn); - Lwt_main.run @@ - Lwt.join - [test_storage (module MemoryStore) @@ MemoryStore.create () + let zpath = tmp_dir ^ ".zip" in + Lwt_main.run @@ Lwt.join + [ZipStore.with_open `Read_write zpath (fun z -> test_storage (module ZipStore) z) + (* test just opening the now exisitant archive created by the previous test. *) + ;ZipStore.with_open `Read_only zpath (fun _ -> ZipStore.Deferred.return_unit) + ;test_storage (module MemoryStore) @@ MemoryStore.create () ;test_storage (module FilesystemStore) s]) ]) diff --git a/zarr-sync/src/storage.ml b/zarr-sync/src/storage.ml index 26044138..fbef48a9 100644 --- a/zarr-sync/src/storage.ml +++ b/zarr-sync/src/storage.ml @@ -3,6 +3,12 @@ module MemoryStore = struct let create = Zarr.Memory.create end +module ZipStore = struct + module Z = Zarr.Zip.Make(Deferred) + include Zarr.Storage.Make(Z) + let with_open = Z.with_open +end + module FilesystemStore = struct module F = struct module Deferred = Deferred diff --git a/zarr-sync/src/storage.mli b/zarr-sync/src/storage.mli index 3935f041..914d879f 100644 --- a/zarr-sync/src/storage.mli +++ b/zarr-sync/src/storage.mli @@ -7,6 +7,9 @@ module MemoryStore : sig (** [create ()] returns a new In-memory Zarr store type. *) end +(** A blocking I/O Zip file storage backend for a Zarr v3 hierarchy. *) +module ZipStore : sig include Zarr.Zip.S with type 'a Deferred.t = 'a end + module FilesystemStore : sig (** A local filesystem storage backend for a Zarr V3 hierarchy. *) diff --git a/zarr-sync/test/test_sync.ml b/zarr-sync/test/test_sync.ml index 28eaf4bd..bd5f9e54 100644 --- a/zarr-sync/test/test_sync.ml +++ b/zarr-sync/test/test_sync.ml @@ -203,6 +203,11 @@ let _ = (Zarr.Storage.Not_a_filesystem_store fn) (fun () -> FilesystemStore.open_store fn); + (* test with non-existant archive *) + let zpath = tmp_dir ^ ".zip" in + ZipStore.with_open `Read_write zpath (fun z -> test_storage (module ZipStore) z); + (* test just opening the now exisitant archive created by the previous test. *) + ZipStore.with_open `Read_only zpath (fun _ -> ZipStore.Deferred.return_unit); test_storage (module MemoryStore) @@ MemoryStore.create (); test_storage (module FilesystemStore) s) ]) diff --git a/zarr.opam b/zarr.opam index 75d2ebc3..b70d30c7 100644 --- a/zarr.opam +++ b/zarr.opam @@ -18,6 +18,7 @@ depends: [ "ocaml" {>= "4.14.0"} "yojson" {>= "1.6.0"} "stdint" {>= "0.7.2"} + "zipc" {>= "0.2.0"} "checkseum" {>= "0.4.0"} "odoc" {with-doc} "ounit2" {with-test} diff --git a/zarr/src/dune b/zarr/src/dune index f7998cfb..27d3b773 100644 --- a/zarr/src/dune +++ b/zarr/src/dune @@ -5,6 +5,7 @@ yojson bytesrw.zstd bytesrw.zlib + zipc stdint checkseum) (ocamlopt_flags diff --git a/zarr/src/storage/zip.ml b/zarr/src/storage/zip.ml new file mode 100644 index 00000000..e7651a10 --- /dev/null +++ b/zarr/src/storage/zip.ml @@ -0,0 +1,175 @@ +module type S = sig + include Storage.STORE + + val with_open : + ?level:[ `None | `Fast | `Default | `Best ] -> + ?perm:int -> + [< `Read_only | `Read_write ] -> + string -> + (t -> 'a Deferred.t) -> + 'a Deferred.t + (** [with_open mode p f] opens the zip archive at path [p] and applies + function [f] to its open handle and writes any changes back to the zip + archive if [mode] is [`Read_write], otherwise discards them at exit. + If [p] does not exist, a handle to an empty zip archive is opened. + Note that this function loads the entire zip archive bytes into memory, + so care must be taken to ensure that these bytes can fit into the local + machine's available memory. For now it does not handle ZIP64. ZIP64 is + needed if your ZIP archive or decompressed file sizes exceed 2{^32}-1 + bytes or if you need more than 65535 archive members. + + {ul + {- [level] is the DEFLATE algorithm compression level used when writing + data to the store and defaults to [`Default]. Choose [`None] for no + compression, [`Fast] for best speed, [`Best] for high compression rate + and [`Default] for a mix of good speed and compression rate.} + {- [perm] is the file permission to use when opening an existing zip file + and defaults to [0o700].} + } *) +end + +module Make (Deferred : Types.Deferred) = struct + module Deferred = Deferred + open Deferred.Syntax + + type t = {ic : Zipc.t Atomic.t; level : Zipc_deflate.level} + + let fold_kind ~dir ~file = function + | Zipc.Member.Dir -> dir + | Zipc.Member.File f -> file f + + let fold_result ~ok res = Result.fold ~error:failwith ~ok res + + let with_open ?(level=`Default) ?(perm=0o700) mode path f = + let write_to_disk ~perm ~path str = + let write ~str oc = Out_channel.output_string oc str; flush oc in + let flags = [Open_wronly; Open_trunc; Open_creat] in + Out_channel.with_open_gen flags perm path (write ~str) + in + let make z = {ic = Atomic.make z; level} in + let x = if not (Sys.file_exists path) then make Zipc.empty else + let s = In_channel.(with_open_bin path input_all) in + fold_result ~ok:make (Zipc.of_binary_string s) + in + match mode with + | `Read_only -> f x + | `Read_write -> + let+ out = f x in + let str = Zipc.to_binary_string (Atomic.get x.ic) in + fold_result ~ok:(write_to_disk ~perm ~path) str; + out + + let is_member t key = + let z = Atomic.get t.ic in + Deferred.return (Zipc.mem key z) + + let size t key = + let decompressed_size = function + | Some m -> fold_kind ~dir:0 ~file:Zipc.File.decompressed_size (Zipc.Member.kind m) + | None -> 0 + in + let z = Atomic.get t.ic in + let entry_opt = Zipc.find key z in + Deferred.return (decompressed_size entry_opt) + + let get t key = + let to_string f = fold_result ~ok:Fun.id (Zipc.File.to_binary_string f) in + let decompressed_value = function + | Some m -> fold_kind ~dir:String.empty ~file:to_string (Zipc.Member.kind m) + | None -> raise (Storage.Key_not_found key) + in + let z = Atomic.get t.ic in + let entry_opt = Zipc.find key z in + Deferred.return (decompressed_value entry_opt) + + let get_partial_values t key ranges = + let read_range ~data ~size (ofs, len) = match len with + | None -> String.sub data ofs (size - ofs) + | Some l -> String.sub data ofs l + in + let+ data = get t key in + let size = String.length data in + List.map (read_range ~data ~size) ranges + + let list t = + let z = Atomic.get t.ic in + Deferred.return (Zipc.fold (fun m acc -> Zipc.Member.path m :: acc) z []) + + let list_dir t prefix = + let module S = Set.Make(String) in + let accumulate ~prefix m ((l, r) as acc) = + let key = Zipc.Member.path m in + if not (String.starts_with ~prefix key) then acc else + let n = String.length prefix in + if not (String.contains_from key n '/') then key :: l, r else + l, S.add String.(sub key 0 @@ 1 + index_from key n '/') r + in + let z = Atomic.get t.ic in + let ks, ps = Zipc.fold (accumulate ~prefix) z ([], S.empty) in + Deferred.return (ks, S.elements ps) + + let rec set t key value = + let res = Zipc.File.deflate_of_binary_string ~level:t.level value in + let f = Zipc.Member.File (fold_result ~ok:Fun.id res) in + let m = fold_result ~ok:Fun.id Zipc.Member.(make ~path:key f) in + let z = Atomic.get t.ic in + if Atomic.compare_and_set t.ic z (Zipc.add m z) + then Deferred.return_unit else set t key value + + let rec set_partial_values t key ?(append=false) rv = + let to_string f = fold_result ~ok:Fun.id (Zipc.File.to_binary_string f) in + let empty = + let res = Zipc.File.deflate_of_binary_string ~level:t.level String.empty in + let res' = Zipc.Member.File (fold_result ~ok:Fun.id res) in + fold_result ~ok:Fun.id Zipc.Member.(make ~path:key res') + in + let z = Atomic.get t.ic in + let mem = Option.fold ~none:empty ~some:Fun.id (Zipc.find key z) in + let ov = fold_kind ~dir:String.empty ~file:to_string (Zipc.Member.kind mem) in + let f = if append || ov = String.empty then + fun acc (_, v) -> acc ^ v else + fun acc (rs, v) -> + let s = Bytes.unsafe_of_string acc in + Bytes.blit_string v 0 s rs String.(length v); + Bytes.unsafe_to_string s + in + let ov' = List.fold_left f ov rv in + let res = Zipc.File.deflate_of_binary_string ~level:t.level ov' in + let file = Zipc.Member.File (fold_result ~ok:Fun.id res) in + let m = fold_result ~ok:Fun.id Zipc.Member.(make ~path:key file) in + if Atomic.compare_and_set t.ic z (Zipc.add m z) + then Deferred.return_unit else set_partial_values t key ~append rv + + let rec erase t key = + let z = Atomic.get t.ic in + if Atomic.compare_and_set t.ic z (Zipc.remove key z) + then Deferred.return_unit else erase t key + + let rec erase_prefix t prefix = + let accumulate ~prefix m acc = + if String.starts_with ~prefix (Zipc.Member.path m) + then acc else Zipc.add m acc + in + let z = Atomic.get t.ic in + let z' = Zipc.fold (accumulate ~prefix) z Zipc.empty in + if Atomic.compare_and_set t.ic z z' + then Deferred.return_unit else erase_prefix t prefix + + (* Adapted from: https://github.com/dbuenzli/zipc/issues/8#issuecomment-2392417890 *) + let rec rename t prefix new_prefix = + let accumulate ~prefix ~new_prefix m acc = + let path = Zipc.Member.path m in + if not (String.starts_with ~prefix path) then Zipc.add m acc else + let l = String.length prefix in + let path = new_prefix ^ String.sub path l (String.length path - l) in + let mtime = Zipc.Member.mtime m in + let mode = Zipc.Member.mode m in + let kind = Zipc.Member.kind m in + let m' = Zipc.Member.make ~mtime ~mode ~path kind in + Zipc.add (fold_result ~ok:Fun.id m') acc + in + let z = Atomic.get t.ic in + let z' = Zipc.fold (accumulate ~prefix ~new_prefix) z Zipc.empty in + if Atomic.compare_and_set t.ic z z' + then Deferred.return_unit else rename t prefix new_prefix +end diff --git a/zarr/src/zarr.ml b/zarr/src/zarr.ml index 9033dc8c..330a7962 100644 --- a/zarr/src/zarr.ml +++ b/zarr/src/zarr.ml @@ -5,5 +5,6 @@ module Metadata = Metadata module Storage = Storage module Codecs = Codecs module Memory = Memory +module Zip = Zip module Types = Types module Ndarray = Ndarray diff --git a/zarr/src/zarr.mli b/zarr/src/zarr.mli index faf43d3d..fb74a7e1 100644 --- a/zarr/src/zarr.mli +++ b/zarr/src/zarr.mli @@ -30,6 +30,7 @@ module Metadata = Metadata module Storage = Storage module Memory = Memory +module Zip = Zip module Types = Types (** {1 Codecs} *)