Skip to content

Commit c7f534e

Browse files
committed
In-memory and file datastores
1 parent 771d524 commit c7f534e

File tree

12 files changed

+156
-57
lines changed

12 files changed

+156
-57
lines changed

src/BigData/BigData.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
include("entities/AbstractDataStore.jl")
2-
include("entities/DataEntries.jl")
2+
include("entities/AbstractBigDataEntries.jl")
33
include("entities/InMemoryDataStore.jl")
44
include("entities/FileDataStore.jl")
55

src/BigData/entities/DataEntries.jl renamed to src/BigData/entities/AbstractBigDataEntries.jl

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ reproducible key using userId_robotId_sessionId_variableId_key.
77
"""
88
mutable struct GeneralBigDataEntry <: AbstractBigDataEntry
99
key::Symbol
10-
originalKey::Symbol
10+
storeKey::Symbol # Could swap this to string, but using it as an index later, it's better as a symbol I believe.
1111
createdTimestamp::DateTime
1212
lastUpdatedTimestamp::DateTime
1313
mimeType::String
@@ -16,7 +16,7 @@ end
1616
## TODO: Move this
1717
function ==(a::GeneralBigDataEntry, b::GeneralBigDataEntry)
1818
return a.key == b.key &&
19-
a.originalKey == b.originalKey &&
19+
a.storeKey == b.storeKey &&
2020
a.mimeType == b.mimeType &&
2121
Dates.value(a.createdTimestamp - b.createdTimestamp) < 1000 &&
2222
Dates.value(a.lastUpdatedTimestamp - b.lastUpdatedTimestamp) < 1000 #1 second
@@ -27,24 +27,24 @@ end
2727
"""
2828
$(SIGNATURES)
2929
Generates a unique key for the entry - userId_robotId_sessionId_variable_key.
30-
Simple.
30+
Simple symbol.
3131
"""
32-
function uniqueKey(dfg::G, v::V, key::Symbol) where {G <: AbstractDFG, V <: AbstractDFGVariable}
32+
function uniqueKey(dfg::G, v::V, key::Symbol)::Symbol where {G <: AbstractDFG, V <: AbstractDFGVariable}
3333
key = join(String.([dfg.userId, dfg.robotId, dfg.sessionId, label(v), String(key)]), "_")
3434
return Symbol(key)
3535
end
3636

3737
GeneralBigDataEntry(key::Symbol,
38-
originalKey::Symbol;
38+
storeKey::Symbol;
3939
mimeType::String="application/octet-stream") =
40-
GeneralBigDataEntry(key, originalKey, now(), now(), mimeType)
40+
GeneralBigDataEntry(key, storeKey, now(), now(), mimeType)
4141

4242
function GeneralBigDataEntry(dfg::G,
4343
var::V,
44-
originalKey::Symbol;
44+
key::Symbol;
4545
mimeType::String="application/octet-stream") where {G <: AbstractDFG, V <: AbstractDFGVariable}
46-
return GeneralBigDataEntry(uniqueKey(dfg, var, originalKey),
47-
originalKey,
46+
return GeneralBigDataEntry(key,
47+
uniqueKey(dfg, var, key),
4848
mimeType=mimeType)
4949
end
5050

src/BigData/services/FileDataStore.jl

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
searchdir(path,key) = filter(x->occursin(key,x), readdir(path))
2-
filename(store::FileDataStore, entry::E) where E <: AbstractBigDataEntry = "$(store.folder)/$(entry.key).dat"
2+
filename(store::FileDataStore, entry::E) where E <: AbstractBigDataEntry = "$(store.folder)/$(entry.storeKey).dat"
33
function readentry(store::FileDataStore, entry::E) where E <: AbstractBigDataEntry
44
open(filename(store, entry)) do f
55
return read(f)
@@ -12,32 +12,34 @@ function writeentry(store::FileDataStore, entry::E, data::Vector{UInt8}) where E
1212
end
1313

1414
function getBigData(store::FileDataStore, entry::E)::Union{Vector{UInt8}, Nothing} where {E <: AbstractBigDataEntry}
15-
length(searchdir(store.folder, String(entry.key))) !=1 && (@warn "Could not find unique file for key '$(entry.key)'."; return nothing)
15+
length(searchdir(store.folder, String(entry.storeKey)*".dat")) !=1 && (@warn "Could not find unique file for key '$(entry.storeKey)'."; return nothing)
1616
return readentry(store, entry)
1717
end
1818

1919
function addBigData!(store::FileDataStore, entry::E, data::Vector{UInt8})::Vector{UInt8} where {E <: AbstractBigDataEntry}
20-
length(searchdir(store.folder, String(entry.key))) !=0 && @warn "Key '$(entry.key)' already exists, overwriting!."
20+
length(searchdir(store.folder, String(entry.storeKey)*".dat")) !=0 && @warn "Key '$(entry.storeKey)' already exists, overwriting!."
2121
writeentry(store, entry, data)
2222
# Update timestamp
2323
entry.lastUpdatedTimestamp = now()
2424
return getBigData(store, entry)
2525
end
2626

2727
function updateBigData!(store::FileDataStore, entry::E, data::Vector{UInt8})::Union{Vector{UInt8}, Nothing} where {E <: AbstractBigDataEntry}
28-
length(searchdir(store.folder, String(entry.key))) !=1 && (@warn "Could not find unique file for key '$(entry.key)'."; return nothing)
28+
length(searchdir(store.folder, String(entry.storeKey)*".dat")) !=1 && (@warn "Could not find unique file for key '$(entry.storeKey)'."; return nothing)
2929
writeentry(store, entry, data)
3030
# Update timestamp
3131
entry.lastUpdatedTimestamp = now()
3232
return getBigData(store, entry)
3333
end
3434

3535
function deleteBigData!(store::FileDataStore, entry::E)::Vector{UInt8} where {E <: AbstractBigDataEntry}
36-
data = getData(store, v, entry.key)
36+
data = getBigData(store, entry)
3737
data == nothing && return nothing
3838
rm(filename(store, entry))
3939
return data
4040
end
41+
42+
# TODO: Manifest file
4143
#
4244
# function listStoreEntries(store::FileDataStore)::Vector{E} where {E <: AbstractBigDataEntry}
4345
# return collect(values(store.entries))

src/BigData/services/InMemoryDataStore.jl

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,29 @@
11
function getBigData(store::InMemoryDataStore{T, E}, entry::E)::Union{T, Nothing} where {T, E <: AbstractBigDataEntry}
2-
!haskey(store.data, entry.key) && return nothing
3-
return store.data[entry.key]
2+
!haskey(store.data, entry.storeKey) && return nothing
3+
return store.data[entry.storeKey]
44
end
55

66
function addBigData!(store::InMemoryDataStore{T, E}, entry::E, data::T)::T where {T, E <: AbstractBigDataEntry}
7-
haskey(store.entries, entry.key) && @warn "Key '$(entry.key)' already exists in the data store, overwriting!"
8-
store.entries[entry.key] = entry
7+
haskey(store.entries, entry.storeKey) && @warn "Key '$(entry.storeKey)' already exists in the data store, overwriting!"
8+
store.entries[entry.storeKey] = entry
99
# Update timestamp
1010
entry.lastUpdatedTimestamp = now()
11-
return store.data[entry.key] = data
11+
return store.data[entry.storeKey] = data
1212
end
1313

1414
function updateBigData!(store::InMemoryDataStore{T, E}, entry::E, data::T)::Union{T, Nothing} where {T, E <: AbstractBigDataEntry}
15-
!haskey(store.entries, entry.key) && (@error "Key '$(entry.key)' doesn't exist in the data store!"; return nothing)
16-
store.entries[entry.key] = entry
15+
!haskey(store.entries, entry.storeKey) && (@error "Key '$(entry.storeKey)' doesn't exist in the data store!"; return nothing)
16+
store.entries[entry.storeKey] = entry
1717
# Update timestamp
1818
entry.lastUpdatedTimestamp = now()
19-
return store.data[entry.key] = data
19+
return store.data[entry.storeKey] = data
2020
end
2121

2222
function deleteBigData!(store::InMemoryDataStore{T, E}, entry::E)::T where {T, E <: AbstractBigDataEntry}
2323
data = getBigData(store, entry)
2424
data == nothing && return nothing
25-
delete!(store.data, entry.key)
26-
delete!(store.entries, entry.key)
25+
delete!(store.data, entry.storeKey)
26+
delete!(store.entries, entry.storeKey)
2727
return data
2828
end
2929

src/BigData/services/MongoDataStore.jl

Whitespace-only changes.

src/FileDFG/services/FileDFG.jl

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11

2+
"""
3+
@(SIGNATURES)
4+
Save a DFG to a folder. Will create/overwrite folder if it exists.
5+
"""
26
function saveDFG(dfg::G, folder::String) where G <: AbstractDFG
37
variables = getVariables(dfg)
48
factors = getFactors(dfg)
@@ -30,6 +34,19 @@ function saveDFG(dfg::G, folder::String) where G <: AbstractDFG
3034
end
3135
end
3236

37+
"""
38+
@(SIGNATURES)
39+
Load a DFG from a saved folder. You will need to provide the IIFModule as the second
40+
parameter.
41+
42+
Example:
43+
using DistributedFactorGraphs, IncrementalInference
44+
# Create a DFG - can make one directly, e.g. GraphsDFG{NoSolverParams}() or use IIF:
45+
dfg = initfg()
46+
# Load the graph
47+
loadDFG("/tmp/savedgraph", IncrementalInference, dfg)
48+
# Use it as normal.
49+
"""
3350
function loadDFG(folder::String, iifModule, dfgLoadInto::G) where G <: AbstractDFG
3451
variables = DFGVariable[]
3552
factors = DFGFactor[]

src/services/DFGVariable.jl

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ end
171171

172172
"""
173173
$(SIGNATURES)
174-
Add Big Data Entry to distrubuted factor graph.
174+
Add Big Data Entry to distributed factor graph.
175175
Should be extended if DFG variable is not returned by reference.
176176
"""
177177
function addBigDataEntry!(dfg::AbstractDFG, label::Symbol, bde::AbstractBigDataEntry)::AbstractDFGVariable
@@ -194,27 +194,34 @@ end
194194
$(SIGNATURES)
195195
Update big data entry
196196
"""
197-
function updateBigDataEntry!(var::AbstractDFGVariable, bde::AbstractBigDataEntry)::AbstractDFGVariable
198-
!haskey(var.bigData,bde.key) && (@error "$(bde.key) does not exist in variable!"; return false)
197+
function updateBigDataEntry!(var::AbstractDFGVariable, bde::AbstractBigDataEntry)::Union{Nothing, AbstractDFGVariable}
198+
!haskey(var.bigData,bde.key) && (@error "$(bde.key) does not exist in variable!"; return nothing)
199199
var.bigData[bde.key] = bde
200200
return var
201201
end
202-
function updateBigDataEntry!(dfg::AbstractDFG, label::Symbol, bde::AbstractBigDataEntry)::AbstractDFGVariable
202+
function updateBigDataEntry!(dfg::AbstractDFG, label::Symbol, bde::AbstractBigDataEntry)::Union{Nothing, AbstractDFGVariable}
203+
!isVariable(dfg, label) && return nothing
203204
return updateBigDataEntry!(getVariable(dfg, label), bde)
204205
end
205206

206207
"""
207208
$(SIGNATURES)
208-
Delete big data entry
209+
Delete big data entry from the factor graph.
210+
Note this doesn't remove it from any data stores.
209211
"""
210-
function deleteBigDataEntry!(var::AbstractDFGVariable, key::Symbol)::AbstractBigDataEntry #users responsibility to delete big data in db before deleting entry
212+
function deleteBigDataEntry!(var::AbstractDFGVariable, key::Symbol)::Union{Nothing, AbstractDFGVariable} #users responsibility to delete big data in db before deleting entry
211213
bde = getBigDataEntry(var, key)
214+
bde == nothing && return nothing
212215
delete!(var.bigData, key)
213-
return bde
216+
return var
217+
end
218+
function deleteBigDataEntry!(dfg::AbstractDFG, label::Symbol, key::Symbol)::Union{Nothing, AbstractDFGVariable} #users responsibility to delete big data in db before deleting entry
219+
!isVariable(dfg, label) && return nothing
220+
return deleteBigDataEntry!(getVariable(dfg, label), key)
214221
end
215222

216-
function deleteBigDataEntry!(dfg::AbstractDFG, label::Symbol, key::Symbol)::AbstractBigDataEntry #users responsibility to delete big data in db before deleting entry
217-
deleteBigDataEntry!(getVariable(dfg, label), key)
223+
function deleteBigDataEntry!(var::AbstractDFGVariable, entry::AbstractBigDataEntry)::Union{Nothing, AbstractDFGVariable} #users responsibility to delete big data in db before deleting entry
224+
return deleteBigDataEntry!(var, entry.key)
218225
end
219226

220227
"""
@@ -225,7 +232,8 @@ function getBigDataEntries(var::AbstractDFGVariable)::Vector{AbstractBigDataEntr
225232
#or should we return the iterator, Base.ValueIterator{Dict{Symbol,AbstractBigDataEntry}}?
226233
collect(values(var.bigData))
227234
end
228-
function getBigDataEntries(dfg::AbstractDFG, label::Symbol)::Vector{AbstractBigDataEntry}
235+
function getBigDataEntries(dfg::AbstractDFG, label::Symbol)::Union{Nothing, Vector{AbstractBigDataEntry}}
236+
!isVariable(dfg, label) && return nothing
229237
#or should we return the iterator, Base.ValueIterator{Dict{Symbol,AbstractBigDataEntry}}?
230238
getBigDataEntries(getVariable(dfg, label))
231239
end
@@ -238,6 +246,7 @@ getBigDataKeys
238246
function getBigDataKeys(var::AbstractDFGVariable)::Vector{Symbol}
239247
collect(keys(var.bigData))
240248
end
241-
function getBigDataKeys(dfg::AbstractDFG, label::Symbol)::Vector{Symbol}
249+
function getBigDataKeys(dfg::AbstractDFG, label::Symbol)::Union{Nothing, Vector{Symbol}}
250+
!isVariable(dfg, label) && return nothing
242251
getBigDataKeys(getVariable(dfg, label))
243252
end

test/DataStoreTests.jl

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
using DistributedFactorGraphs
2+
using Test
3+
4+
# Build a basic graph.
5+
6+
dfg = GraphsDFG{NoSolverParams}()
7+
#add types for softtypes
8+
struct TestInferenceVariable1 <: InferenceVariable end
9+
v1 = DFGVariable(:a, TestInferenceVariable1())
10+
v2 = DFGVariable(:b, TestInferenceVariable1())
11+
f1 = DFGFactor{Int, :Symbol}(:f1)
12+
#add tags for filters
13+
append!(v1.tags, [:VARIABLE, :POSE])
14+
append!(v2.tags, [:VARIABLE, :LANDMARK])
15+
append!(f1.tags, [:FACTOR])
16+
# @testset "Creating Graphs" begin
17+
addVariable!(dfg, v1)
18+
addVariable!(dfg, v2)
19+
addFactor!(dfg, [v1, v2], f1)
20+
21+
# Stores to test
22+
testStores = [InMemoryDataStore(), FileDataStore("/tmp/dfgFilestore")]
23+
24+
for testStore in testStores
25+
# Create a data store and a dataset
26+
ds = testStore
27+
# ds = FileDataStore("/tmp/filestore")
28+
dataset = rand(UInt8, 1000)
29+
dataset2 = rand(UInt8, 1000)
30+
31+
entry1 = GeneralBigDataEntry(dfg, v1, :test1)
32+
# Set it in the store
33+
@test addBigData!(ds, entry1, dataset) == dataset
34+
@test getBigData(ds, entry1) == dataset
35+
# Now add it to the variable
36+
@test addBigDataEntry!(v1, entry1) == v1
37+
@test entry1 in getBigDataEntries(v1)
38+
# Update test
39+
copyEntry = deepcopy(entry1)
40+
@test updateBigData!(ds, entry1, dataset2) == dataset2
41+
# Data updated?
42+
@test getBigData(ds, entry1) == dataset2
43+
# Timestamp updated?
44+
@test entry1.lastUpdatedTimestamp > copyEntry.lastUpdatedTimestamp
45+
# Delete data
46+
@test deleteBigData!(ds, entry1) == dataset2
47+
# Delete entry
48+
@test deleteBigDataEntry!(v1, entry1) == v1
49+
#TODO
50+
#listStoreEntries(ds)
51+
end

test/fileDFGTests.jl

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,13 @@
1818
updateVariable!(dfg, verts[7])
1919
updateVariable!(dfg, verts[8])
2020

21+
# Add some bigData to x1, x2
22+
addBigDataEntry!(verts[1], GeneralBigDataEntry(:testing, :testing; mimeType="application/nuthin!"))
23+
addBigDataEntry!(verts[2], FileBigDataEntry(:testing2, "/dev/null"))
24+
#call update to set it on cloud
25+
updateVariable!(dfg, verts[1])
26+
updateVariable!(dfg, verts[2])
27+
2128
facts = map(n -> addFactor!(dfg, [verts[n], verts[n+1]], LinearConditional(Normal(50.0,2.0))), 1:(numNodes-1))
2229

2330
# Save and load the graph to test.
@@ -26,6 +33,7 @@
2633

2734
copyDfg = DistributedFactorGraphs._getDuplicatedEmptyDFG(dfg)
2835
retDFG = loadDFG(saveFolder, Main, copyDfg)
36+
2937
@test symdiff(ls(dfg), ls(retDFG)) == []
3038
@test symdiff(lsf(dfg), lsf(retDFG)) == []
3139
for var in ls(dfg)
@@ -34,4 +42,10 @@
3442
for fact in lsf(dfg)
3543
@test getFactor(dfg, fact) == getFactor(retDFG, fact)
3644
end
45+
46+
@test length(getBigDataEntries(getVariable(retDFG, :x1))) == 1
47+
@test typeof(getBigDataEntry(getVariable(retDFG, :x1),:testing)) == GeneralBigDataEntry
48+
@test length(getBigDataEntries(getVariable(retDFG, :x2))) == 1
49+
@test typeof(getBigDataEntry(getVariable(retDFG, :x2),:testing2)) == FileBigDataEntry
50+
3751
end

test/iifInterfaceTests.jl

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -209,20 +209,21 @@ end
209209

210210
#add
211211
v1 = getVariable(dfg, :a)
212-
@test addBigDataEntry!(v1, de1)
213-
@test addBigDataEntry!(dfg, :a, de2)
214-
@test addBigDataEntry!(v1, de1)
212+
@test addBigDataEntry!(v1, de1) == v1
213+
@test addBigDataEntry!(dfg, :a, de2) == v1
214+
@test addBigDataEntry!(v1, de1) == v1
215+
@test de2 in getBigDataEntries(v1)
215216

216217
#get
217218
@test deepcopy(de1) == getBigDataEntry(v1, :key1)
218219
@test deepcopy(de2) == getBigDataEntry(dfg, :a, :key2)
219-
@test_throws Any getBigDataEntry(v2, :key1)
220-
@test_throws Any getBigDataEntry(dfg, :b, :key1)
220+
@test getBigDataEntry(v2, :key1) == nothing
221+
@test getBigDataEntry(dfg, :b, :key1) == nothing
221222

222223
#update
223-
@test updateBigDataEntry!(dfg, :a, de2_update)
224+
@test updateBigDataEntry!(dfg, :a, de2_update) == v1
224225
@test deepcopy(de2_update) == getBigDataEntry(dfg, :a, :key2)
225-
@test !updateBigDataEntry!(dfg, :b, de2_update)
226+
@test updateBigDataEntry!(dfg, :b, de2_update) == nothing
226227

227228
#list
228229
entries = getBigDataEntries(dfg, :a)
@@ -234,10 +235,10 @@ end
234235
@test getBigDataKeys(dfg, :b) == Symbol[]
235236

236237
#delete
237-
@test deepcopy(de1) == deleteBigDataEntry!(v1, :key1)
238+
@test deleteBigDataEntry!(v1, :key1) == v1
238239
@test getBigDataKeys(v1) == Symbol[:key2]
239240
#delete from dfg
240-
@test deepcopy(de2_update) == deleteBigDataEntry!(dfg, :a, :key2)
241+
@test deleteBigDataEntry!(dfg, :a, :key2) == v1
241242
@test getBigDataKeys(v1) == Symbol[]
242243
end
243244
end

0 commit comments

Comments
 (0)