Skip to content

Commit 595ae4f

Browse files
committed
Implemented last(::PesistentDict) and first(::PersistentDict) methods
1 parent d583aef commit 595ae4f

File tree

3 files changed

+100
-47
lines changed

3 files changed

+100
-47
lines changed

src/PersistentCollections.jl

Lines changed: 75 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -13,65 +13,82 @@ module PersistentCollections
1313

1414
Base.show(io::IO, d::PersistentDict) = print(io, typeof(d), "(", isempty(d.id) ? "" : repr(e.id), ")")
1515

16-
function Base.get(d::PersistentDict{K,V}, key::K, default::D) where {K,V,D}
16+
function rotxn(func::Function, d::PersistentDict)
1717
isopen(d.env) || error("Environment is closed")
1818
txn = d.env.rotxn[Threads.threadid()]
1919
LMDB.mdb_txn_renew(txn)
2020
try
2121
dbi = LMDB.mdb_dbi_open(txn, d.id, zero(Cuint))
22-
mdbkey, mdbval = convert(LMDB.MDBValue, key), LMDB.MDBValue()
23-
found = GC.@preserve mdbkey LMDB.mdb_get!(txn, dbi, pointer(mdbkey), pointer(mdbval))
24-
found || return default
25-
# try converting if possible
26-
V == Any || return convert(V, mdbval)
27-
D == Nothing || return convert(D, mdbval)
28-
return mdbval # return unconverted
22+
return func(txn, dbi)
2923
finally
3024
LMDB.mdb_txn_reset(txn)
3125
end
3226
end
3327

28+
function rwtxn(func::Function, d::PersistentDict, sync::Bool)
29+
isopen(d.env) || error("Environment is closed")
30+
txn, commit = C_NULL, false
31+
lock(d.env.wlock) # need this lock otherwise it will deadlock
32+
try
33+
txn = LMDB.mdb_txn_begin(d.env.handle, sync ? zero(Cuint) : LMDB.MDB_NOSYNC)
34+
dbi = LMDB.mdb_dbi_open(txn, d.id, LMDB.MDB_CREATE)
35+
commit = func(txn, dbi)
36+
commit == false || LMDB.mdb_txn_commit(txn)
37+
return commit
38+
finally
39+
(commit || txn == C_NULL) || LMDB.mdb_txn_abort(txn)
40+
unlock(d.env.wlock)
41+
end
42+
end
43+
44+
function cursor(func::Function, dict::PersistentDict)
45+
isopen(dict.env) || error("Environment is closed")
46+
txn, cur = LMDB.mdb_txn_begin(dict.env.handle, LMDB.DEFAULT_ROTXN_FLAGS), C_NULL
47+
try
48+
dbi = LMDB.mdb_dbi_open(txn, dict.id, zero(Cuint))
49+
cur = LMDB.mdb_cursor_open(txn, dbi)
50+
func(cur)
51+
catch e
52+
e.code == LMDB.MDB_NOTFOUND || rethrow(e)
53+
return false
54+
finally
55+
cur == C_NULL || LMDB.mdb_cursor_close(cur)
56+
LMDB.mdb_txn_abort(txn)
57+
end
58+
end
59+
60+
function Base.get(d::PersistentDict{K,V}, key::K, default::D) where {K,V,D}
61+
mdbkey, mdbval = convert(LMDB.MDBValue, key), LMDB.MDBValue()
62+
found = rotxn(d) do txn, dbi
63+
GC.@preserve mdbkey LMDB.mdb_get!(txn, dbi, pointer(mdbkey), pointer(mdbval))
64+
end
65+
found || return default
66+
# try converting if possible
67+
V == Any || return convert(V, mdbval)
68+
D == Nothing || return convert(D, mdbval)
69+
return mdbval # return unconverted
70+
end
71+
3472
function Base.getindex(d::PersistentDict{K,V}, key::K) where {K,V}
3573
val = get(d, key, nothing)
3674
isnothing(val) || return val
3775
throw(KeyError(key))
3876
end
3977

4078
function Base.setindex!(d::PersistentDict{K,V}, val::V, key::K; sync=true) where {K,V}
41-
isopen(d.env) || error("Environment is closed")
42-
txn, committed = C_NULL, false
43-
try
44-
lock(d.env.wlock) # need this lock otherwise it will deadlock
45-
txn = LMDB.mdb_txn_begin(d.env.handle, sync ? zero(Cuint) : LMDB.MDB_NOSYNC)
46-
dbi = LMDB.mdb_dbi_open(txn, d.id, LMDB.MDB_CREATE)
47-
mdbkey, mdbval = convert(LMDB.MDBValue, key), convert(LMDB.MDBValue, val)
79+
mdbkey, mdbval = convert(LMDB.MDBValue, key), convert(LMDB.MDBValue, val)
80+
rwtxn(d, sync) do txn, dbi
4881
GC.@preserve mdbkey mdbval LMDB.mdb_put(txn, dbi, pointer(mdbkey), pointer(mdbval), zero(Cuint))
49-
LMDB.mdb_txn_commit(txn)
50-
committed = true
51-
finally
52-
(committed || txn == C_NULL) || LMDB.mdb_txn_abort(txn)
53-
unlock(d.env.wlock)
82+
true
5483
end
5584
return val
5685
end
5786

58-
function Base.delete!(d::PersistentDict{K,V}, key::K) where {K,V,D}
59-
isopen(d.env) || error("Environment is closed")
60-
txn, committed = C_NULL, false
61-
try
62-
lock(d.env.wlock) # need this lock otherwise it will deadlock
63-
txn = LMDB.mdb_txn_begin(d.env.handle, zero(Cuint))
64-
dbi = LMDB.mdb_dbi_open(txn, d.id, zero(Cuint))
65-
mdbkey = convert(LMDB.MDBValue, key)
66-
if GC.@preserve mdbkey LMDB.mdb_del(txn, dbi, pointer(mdbkey), C_NULL)
67-
LMDB.mdb_txn_commit(txn)
68-
committed = true
69-
end
70-
finally
71-
(committed || txn == C_NULL) || LMDB.mdb_txn_abort(txn)
72-
unlock(d.env.wlock)
87+
function Base.delete!(d::PersistentDict{K,V}, key::K; sync=true) where {K,V}
88+
mdbkey = convert(LMDB.MDBValue, key)
89+
return rwtxn(d, sync) do txn, dbi
90+
GC.@preserve mdbkey LMDB.mdb_del(txn, dbi, pointer(mdbkey), C_NULL)
7391
end
74-
return committed
7592
end
7693

7794
abstract type AbstractMDBCursor end
@@ -94,7 +111,8 @@ module PersistentCollections
94111
cur = LMDB.mdb_cursor_open(txn, dbi)
95112
catch e
96113
LMDB.mdb_txn_abort(txn)
97-
rethrow(e)
114+
# return C_NULL cur if database is empty
115+
e.code == LMDB.MDB_NOTFOUND || rethrow(e)
98116
end
99117
atomic = Threads.Atomic{UInt}(convert(UInt, cur))
100118
finalizer(close_atomic_cursor, atomic)
@@ -157,15 +175,27 @@ module PersistentCollections
157175
Base.values(d::PersistentDict{K,V}) where {K,V} = MDBValCursor{K,V}(create_atomic_cursor(d))
158176

159177
function Base.length(d::PersistentDict)
160-
isopen(d.env) || error("Environment is closed")
161-
txn = d.env.rotxn[Threads.threadid()]
162-
LMDB.mdb_txn_renew(txn)
163-
try
164-
dbi = LMDB.mdb_dbi_open(txn, d.id, zero(Cuint))
165-
return convert(Int, LMDB.mdb_stat(txn, dbi).ms_entries)
166-
finally
167-
LMDB.mdb_txn_reset(txn)
178+
return rotxn(d) do txn, dbi
179+
convert(Int, LMDB.mdb_stat(txn, dbi).ms_entries)
180+
end
181+
end
182+
183+
function Base.first(d::PersistentDict{K,V}) where {K,V}
184+
mdbkey, mdbval = LMDB.MDBValue(), LMDB.MDBValue()
185+
found = cursor(d) do cur
186+
LMDB.mdb_cursor_get!(cur, pointer(mdbkey), pointer(mdbval), LMDB.MDB_FIRST)
187+
end
188+
found || throw(BoundsError())
189+
return convert(K, mdbkey) => convert(V, mdbval)
190+
end
191+
192+
function Base.last(d::PersistentDict{K,V}) where {K,V}
193+
mdbkey, mdbval = LMDB.MDBValue(), LMDB.MDBValue()
194+
found = cursor(d) do cur
195+
LMDB.mdb_cursor_get!(cur, pointer(mdbkey), pointer(mdbval), LMDB.MDB_LAST)
168196
end
197+
found || throw(BoundsError())
198+
return convert(K, mdbkey) => convert(V, mdbval)
169199
end
170200

171201
end # module

src/lmdb.jl

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,14 @@ mutable struct MDBValue{T}
7070
end
7171
end
7272
Base.pointer(val::MDBValue) = pointer_from_objref(val)
73+
function Base.:(==)(a::MDBValue, b::MDBValue)
74+
a !== b || return true
75+
a.ptr == b.ptr || return true
76+
a.size == b.size || return true
77+
arr_a = unsafe_wrap(Array, reinterpret(Ptr{UInt8}, a.ptr). a.size)
78+
arr_b = unsafe_wrap(Array, reinterpret(Ptr{UInt8}, b.ptr). b.size)
79+
return arr_a == arr_b
80+
end
7381

7482
struct MDBStat
7583
ms_psize::Cuint
@@ -238,6 +246,10 @@ function mdb_stat(txn::Ptr{Cvoid}, dbi::Cuint)
238246
return statref[]
239247
end
240248

249+
function mdb_txn_id(txn::Ptr{Cvoid})
250+
return ccall((:mdb_stat, liblmdb), Csize_t, (Ptr{Cvoid},), txn)
251+
end
252+
241253
mutable struct Environment
242254
handle::Ptr{Cvoid}
243255
path::String

test/runtests.jl

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ fastval = LMDB.MDBValue("fastval")
5050
# Open up default database as a Dict
5151
d = PersistentDict{Any,Any}(env, id="foo")
5252

53+
@test_throws BoundsError first(d)
54+
@test_throws BoundsError last(d)
55+
5356
d["stringkey"] = "stringval"
5457
d["byteskey"] = Vector{UInt8}("bytesval")
5558
d["intkey"] = 1234
@@ -61,6 +64,10 @@ d[fastkey] = fastval
6164
# Set use-case
6265
d["nothing_key"] = nothing
6366

67+
# sorted lexically
68+
@test first(d) == Pair(LMDB.MDBValue("byteskey"), LMDB.MDBValue(Vector{UInt8}("bytesval")))
69+
@test last(d) == Pair(LMDB.MDBValue("tuplekey"), LMDB.MDBValue((1, 2.5)))
70+
6471
@test get(d, "stringkey", "") == "stringval"
6572
@test get(d, "byteskey", UInt8[]) == Vector{UInt8}("bytesval")
6673
@test get(d, "intkey", 0) == 1234
@@ -90,7 +97,7 @@ notfound = UInt8[0x1]
9097
@test (d["dictkey"] = "dictval") == "dictval"
9198

9299
# manual sync
93-
unsafe_env = LMDB.Environment(UNSAFE_ENV_DIR, maxdbs=1, flags=LMDB.MDB_NOSYNC)
100+
unsafe_env = LMDB.Environment(UNSAFE_ENV_DIR, maxdbs=1, flags=LMDB.MDB_WRITEMAP | LMDB.MDB_MAPASYNC)
94101
unsafe_dict = PersistentDict{String,Vector{UInt8}}(unsafe_env, id="foo")
95102
flush(unsafe_env) do
96103
unsafe_dict["unsafe_key1"] = Vector{UInt8}("unsafe_val1")
@@ -109,6 +116,7 @@ if Threads.nthreads() > 1
109116
Threads.@threads for i in 1:Threads.nthreads()
110117
while time() < deadline
111118
d[fastkey] = randvals[i]
119+
writes[i] += 1
112120
end
113121
end
114122
@info "... $(sum(writes)) writes completed!"
@@ -134,6 +142,9 @@ if get(ENV, "BENCH", "") == "y"
134142
@info "Benchmarking itertion: iterated(::PersistentDict)) ($n entries) ..."
135143
@btime for _ in d end
136144

137-
@info "Benchmarking MDB_NOSYNC + setindex!(::PersistentDict) ..."
145+
@info "Benchmarking last(::PersistentDict) ..."
146+
@btime last(d)
147+
148+
@info "Benchmarking (MDB_WRITEMAP | MDB_MAPASYNC) with setindex!(::PersistentDict) ..."
138149
@btime setindex!(unsafe_dict, v, longkey) setup=(v=rand(UInt8, 500))
139150
end

0 commit comments

Comments
 (0)