Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
(package (name mina_wire_types))
(package (name missing_blocks_auditor))
(package (name monad_lib))
(package (name multi_key_file_storage))
(package (name network_peer))
(package (name network_pool))
(package (name node_addrs_and_ports))
Expand Down
9 changes: 9 additions & 0 deletions src/lib/multi-key-file-storage/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
(library
(name multi_key_file_storage)
(public_name multi_key_file_storage)
(libraries core_kernel bin_prot mina_stdlib)
(preprocess
(pps ppx_jane ppx_version))
(modules_without_implementation intf)
(instrumentation
(backend bisect_ppx)))
68 changes: 68 additions & 0 deletions src/lib/multi-key-file-storage/intf.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
module type S = sig
(** Tag representing the location and metadata of a stored value *)
type 'a tag

(** Writer object used to write values to the single-file database *)
type writer_t

(** Type that represents the key used to identify the file *)
type filename_key

(** Write a value to the database.

[write_value writer bin_prot_module value] serializes [value] using the
provided bin_prot serializer and returns a [tag] that can be used to read the value later.

Example (assuming the default implementation with [type filename_key = string]):
{[
write_values_exn "my.db" ~f:(fun writer ->
let tag1 = write_value writer (module Int) 42 in
let tag2 = write_value writer (module String) "hello" in
(* ... store tags for later use ... *)
)
]}
*)
val write_value :
writer_t -> (module Bin_prot.Binable.S with type t = 'a) -> 'a -> 'a tag

(** Write multiple keys to a database file.

The [filename] parameter specifies the target file.
The file will be overwritten if exists (note, it is not appending).

The [f] parameter is a callback that receives a [write_value] function which can be
called multiple times to write different key-value pairs to the database.

Each call to [write_value bin_prot_module value] serializes [value] using the
provided bin_prot serializer and returns a [tag] that can be used to read the value later.

Example (assuming the default implementation with [type filename_key = string]):
{[
write_values_exn "my.db" ~f:(fun writer ->
let tag1 = write_value writer (module Int) 42 in
let tag2 = write_value writer (module String) "hello" in
(* ... store tags for later use ... *)
)
]}
*)
val write_values_exn : f:(writer_t -> 'a) -> filename_key -> 'a

(** Read a value from the database using a tag.

[read m tag] takes a [tag] (obtained from a previous [write] operation)
and a bin_prot module [m] to deserialize the stored bytes back into a typed value.

Returns [Ok value] on success, or [Error msg] if reading or deserialization fails.

Example:
{[
match read (module Int) tag1 with
| Ok value -> Printf.printf "Read value: %d\n" value
| Error msg -> Printf.eprintf "Error: %s\n" msg
]}
*)
val read :
(module Bin_prot.Binable.S with type t = 'a)
-> 'a tag
-> 'a Core_kernel.Or_error.t
end
128 changes: 128 additions & 0 deletions src/lib/multi-key-file-storage/multi_key_file_storage.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
open Core_kernel

(** Buffer size for writing: 128 KB *)
let buffer_size = 131072

module type S = Intf.S

module Tag = struct
[%%versioned
module Stable = struct
module V1 = struct
type ('filename_key, 'a) t =
{ filename_key : 'filename_key; offset : int64; size : int }
end
end]
end

module Make_custom (Inputs : sig
type filename_key

val filename : filename_key -> string
end) :
S
with type 'a tag = (Inputs.filename_key, 'a) Tag.t
and type filename_key = Inputs.filename_key = struct
type 'a tag = (Inputs.filename_key, 'a) Tag.t

type filename_key = Inputs.filename_key

type writer_t =
{ f : 'a. (module Bin_prot.Binable.S with type t = 'a) -> 'a -> 'a tag }

let write_value { f } = f

(* Flush buffer to file when it exceeds threshold *)
let flush_buffer oc buffer =
Out_channel.output_string oc (Buffer.contents buffer)

(* Write key function provided to the callback *)
let make_writer ~oc ~filename_key ~buffer : writer_t =
{ f =
(fun (type a) (module B : Bin_prot.Binable.S with type t = a)
(value : a) ->
(* Serialize the value to a bigstring *)
let serialized_size = B.bin_size_t value in
let buf = Bigstring.create serialized_size in
let written = B.bin_write_t buf ~pos:0 value in
assert (written = serialized_size) ;

(* Convert bigstring to string for writing *)
let data = Bigstring.to_string buf in

(* Create tag before writing *)
let tag =
{ Tag.filename_key
; offset = Int64.of_int @@ Buffer.length buffer
; size = serialized_size
}
in

(* Add to buffer *)
Buffer.add_string buffer data ;

(* Flush if buffer is large enough *)
if Buffer.length buffer >= buffer_size then (
flush_buffer oc buffer ; Buffer.clear buffer ) ;

tag )
}

(** Write multiple keys to a database file with buffered I/O *)
let write_values_exn ~f filename_key =
let do_writing oc =
(* Buffer for accumulating writes *)
let buffer = Buffer.create buffer_size in
let writer = make_writer ~oc ~filename_key ~buffer in

(* Call user function with write_value *)
let result = f writer in

(* Flush any remaining data *)
if Buffer.length buffer > 0 then flush_buffer oc buffer ;

result
in
Out_channel.with_file
(Inputs.filename filename_key)
~binary:true ~f:do_writing

(** Read a value from the database using a tag *)
let read :
type a.
(module Bin_prot.Binable.S with type t = a) -> a tag -> a Or_error.t =
fun (module B : Bin_prot.Binable.S with type t = a) tag ->
let do_reading ic =
(* Seek to the specified offset *)
In_channel.seek ic tag.offset ;

(* Read the exact number of bytes *)
let buffer = Bytes.create tag.size in
In_channel.really_input_exn ic ~buf:buffer ~pos:0 ~len:tag.size ;

(* Deserialize using bin_prot *)
let bigstring = Bigstring.of_bytes buffer in
let pos_ref = ref 0 in
let%bind.Or_error value =
Or_error.try_with ~backtrace:true
@@ fun () -> B.bin_read_t bigstring ~pos_ref
in
if !pos_ref <> tag.size then
Or_error.error_string
(sprintf "Size mismatch: expected %d bytes, read %d bytes" tag.size
!pos_ref )
else Ok value
in
Or_error.tag ~tag:(Inputs.filename tag.filename_key)
@@ Or_error.try_with_join ~backtrace:true
@@ fun () ->
In_channel.with_file
(Inputs.filename tag.filename_key)
~binary:true ~f:do_reading
end

include Make_custom (struct
type filename_key = string

let filename = ident
end)
20 changes: 20 additions & 0 deletions src/lib/multi-key-file-storage/multi_key_file_storage.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
(** Multi-key file storage - stores multiple keys with heterogeneous types in a single file *)

module Tag : sig
[%%versioned:
module Stable : sig
module V1 : sig
type ('filename_key, 'a) t
end
end]
end

module type S = Intf.S

include S with type 'a tag = (string, 'a) Tag.t and type filename_key = string

module Make_custom (Inputs : sig
type filename_key

val filename : filename_key -> string
end) : S with type filename_key = Inputs.filename_key
15 changes: 15 additions & 0 deletions src/lib/multi-key-file-storage/tests/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
(test
(name test_multi_key_file_storage)
(libraries
;; opam libraries
alcotest
core
core_kernel
bin_prot
;; local libraries
multi_key_file_storage
data_hash_lib)
(instrumentation
(backend bisect_ppx))
(preprocess
(pps ppx_jane ppx_version)))
Loading