Skip to content

Commit 771d524

Browse files
committed
WIP on big data stores
1 parent 21054f3 commit 771d524

16 files changed

+394
-146
lines changed

src/BigData.jl

Lines changed: 0 additions & 139 deletions
This file was deleted.

src/BigData/BigData.jl

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
include("entities/AbstractDataStore.jl")
2+
include("entities/DataEntries.jl")
3+
include("entities/InMemoryDataStore.jl")
4+
include("entities/FileDataStore.jl")
5+
6+
include("services/AbstractDataStore.jl")
7+
include("services/InMemoryDataStore.jl")
8+
include("services/FileDataStore.jl")
9+
10+
export AbstractDataStore
11+
12+
export GeneralBigDataEntry, MongodbBigDataEntry, FileBigDataEntry
13+
export InMemoryDataStore, FileDataStore
14+
15+
export getBigData, addBigData!, updateBigData!, deleteBigData!, listStoreEntries
16+
export copyStore
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
2+
"""
3+
$(TYPEDEF)
4+
Superclass of all key-value datastores.
5+
"""
6+
abstract type AbstractDataStore{T}
7+
end

src/BigData/entities/DataEntries.jl

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import Base: ==
2+
3+
"""
4+
$(TYPEDEF)
5+
GeneralBigDataEntry is a generic multipurpose data entry that creates a unique
6+
reproducible key using userId_robotId_sessionId_variableId_key.
7+
"""
8+
mutable struct GeneralBigDataEntry <: AbstractBigDataEntry
9+
key::Symbol
10+
originalKey::Symbol
11+
createdTimestamp::DateTime
12+
lastUpdatedTimestamp::DateTime
13+
mimeType::String
14+
end
15+
16+
## TODO: Move this
17+
function ==(a::GeneralBigDataEntry, b::GeneralBigDataEntry)
18+
return a.key == b.key &&
19+
a.originalKey == b.originalKey &&
20+
a.mimeType == b.mimeType &&
21+
Dates.value(a.createdTimestamp - b.createdTimestamp) < 1000 &&
22+
Dates.value(a.lastUpdatedTimestamp - b.lastUpdatedTimestamp) < 1000 #1 second
23+
end
24+
25+
26+
# Clean up and move this.
27+
"""
28+
$(SIGNATURES)
29+
Generates a unique key for the entry - userId_robotId_sessionId_variable_key.
30+
Simple.
31+
"""
32+
function uniqueKey(dfg::G, v::V, key::Symbol) where {G <: AbstractDFG, V <: AbstractDFGVariable}
33+
key = join(String.([dfg.userId, dfg.robotId, dfg.sessionId, label(v), String(key)]), "_")
34+
return Symbol(key)
35+
end
36+
37+
GeneralBigDataEntry(key::Symbol,
38+
originalKey::Symbol;
39+
mimeType::String="application/octet-stream") =
40+
GeneralBigDataEntry(key, originalKey, now(), now(), mimeType)
41+
42+
function GeneralBigDataEntry(dfg::G,
43+
var::V,
44+
originalKey::Symbol;
45+
mimeType::String="application/octet-stream") where {G <: AbstractDFG, V <: AbstractDFGVariable}
46+
return GeneralBigDataEntry(uniqueKey(dfg, var, originalKey),
47+
originalKey,
48+
mimeType=mimeType)
49+
end
50+
51+
# Types <: AbstractBigDataEntry
52+
"""
53+
$(TYPEDEF)
54+
BigDataEntry in MongoDB.
55+
"""
56+
struct MongodbBigDataEntry <: AbstractBigDataEntry
57+
key::Symbol
58+
oid::NTuple{12, UInt8} #mongodb object id
59+
#maybe other fields such as:
60+
#flags::Bool ready, valid, locked, permissions
61+
#MIMEType::Symbol
62+
end
63+
64+
65+
"""
66+
$(TYPEDEF)
67+
BigDataEntry in a file.
68+
"""
69+
struct FileBigDataEntry <: AbstractBigDataEntry
70+
key::Symbol
71+
filename::String
72+
end

src/BigData/entities/FileDataStore.jl

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
"""
2+
$(TYPEDEF)
3+
Simple file data store with a specified data type and a specified key type.
4+
"""
5+
struct FileDataStore <: AbstractDataStore{UInt8}
6+
folder::String
7+
FileDataStore(folder::String) = begin
8+
if !isdir(folder)
9+
@warn "Folder '$folder' doesn't exist - creating."
10+
mkpath(folder)
11+
end
12+
return new(folder)
13+
end
14+
end
15+
#
16+
# """
17+
# $(SIGNATURES)
18+
# Create a file data store using a specific data type.
19+
# """
20+
# function FileDataStore(folder::String)
21+
# if !isdir(folder)
22+
# @warn "Folder '$folder' doesn't exist - creating."
23+
# end
24+
# return FileDataStore(folder)
25+
# end
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
"""
2+
$(TYPEDEF)
3+
Simple in-memory data store with a specified data type and a specified key type.
4+
"""
5+
struct InMemoryDataStore{T, E <: AbstractBigDataEntry} <: AbstractDataStore{T}
6+
data::Dict{Symbol, T}
7+
entries::Dict{Symbol, E}
8+
end
9+
10+
"""
11+
$(SIGNATURES)
12+
Create an in-memory store using a specific data type.
13+
"""
14+
function InMemoryDataStore{T, E}() where {T, E <: AbstractBigDataEntry}
15+
return InMemoryDataStore{T, E}(Dict{Symbol, T}(), Dict{Symbol, E}())
16+
end
17+
18+
"""
19+
$(SIGNATURES)
20+
Create an in-memory store using binary data (UInt8) as a type.
21+
"""
22+
function InMemoryDataStore()
23+
return InMemoryDataStore{Vector{UInt8}, GeneralBigDataEntry}()
24+
end
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
"""
2+
$(SIGNATURES)
3+
Get the data for the specified entry, returns the data or Nothing.
4+
"""
5+
function getBigData(store::D, entry::E)::Union{Nothing, T} where {T, D <: AbstractDataStore{T}, E <: AbstractBigDataEntry}
6+
error("$(typeof(store)) doesn't override 'getData'.")
7+
end
8+
9+
"""
10+
$(SIGNATURES)
11+
Adds the data to the store with the given entry. The function will warn if the entry already
12+
exists and will overwrite it.
13+
"""
14+
function addBigData!(store::D, entry::E, data::T)::Union{Nothing, T} where {T, D <: AbstractDataStore{T}, E <: AbstractBigDataEntry}
15+
error("$(typeof(store)) doesn't override 'addData!'.")
16+
end
17+
18+
"""
19+
$(SIGNATURES)
20+
Update the data in the store. The function will error and return nothing if
21+
the entry does not exist.
22+
"""
23+
function updateBigData!(store::D, entry::E, data::T)::Union{Nothing, T} where {T, D <: AbstractDataStore{T}, E <: AbstractBigDataEntry}
24+
error("$(typeof(store)) doesn't override 'updateData!'.")
25+
end
26+
27+
"""
28+
$(SIGNATURES)
29+
Delete the data in the store for the given entry. The function will error and return nothing if
30+
the entry does not exist.
31+
"""
32+
function deleteBigData!(store::D, entry::E)::Union{Nothing, T} where {T, D <: AbstractDataStore{T}, E <: AbstractBigDataEntry}
33+
error("$(typeof(store)) doesn't override 'deleteData!'.")
34+
end
35+
36+
"""
37+
$(SIGNATURES)
38+
List all entries in the data store.
39+
"""
40+
function listStoreEntries(store::D)::Vector{E} where {D <: AbstractDataStore, E <: AbstractBigDataEntry}
41+
error("$(typeof(store)) doesn't override 'listEntries'.")
42+
end
43+
44+
"""
45+
$(SIGNATURES)
46+
Copies all the entries from the source into the destination.
47+
Can specify which entries to copy with the `sourceEntries` parameter.
48+
Returns the list of copied entries.
49+
"""
50+
function copyStore(sourceStore::D1, destStore::D2; sourceEntries=listEntries(sourceStore))::Vector{E} where {T, D1 <: AbstractDataStore{T}, D2 <: AbstractDataStore{T}, E <: AbstractBigDataEntry}
51+
# Quick check
52+
destEntries = listEntries(destStore)
53+
typeof(sourceEntries) != typeof(destEntries) && error("Can't copy stores, source has entries of type $(typeof(sourceEntries)), destination has entries of type $(typeof(destEntries)).")
54+
# Same source/destination check
55+
sourceStore == destStore && error("Can't specify same store for source and destination.")
56+
# Otherwise, continue
57+
for sourceEntry in sourceEntries
58+
addData!(destStore, deepcopy(sourceEntry), getData(sourceStore, sourceEntry))
59+
end
60+
return sourceEntries
61+
end

src/BigData/services/FileDataStore.jl

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
searchdir(path,key) = filter(x->occursin(key,x), readdir(path))
2+
filename(store::FileDataStore, entry::E) where E <: AbstractBigDataEntry = "$(store.folder)/$(entry.key).dat"
3+
function readentry(store::FileDataStore, entry::E) where E <: AbstractBigDataEntry
4+
open(filename(store, entry)) do f
5+
return read(f)
6+
end
7+
end
8+
function writeentry(store::FileDataStore, entry::E, data::Vector{UInt8}) where E <: AbstractBigDataEntry
9+
open(filename(store, entry), "w") do f
10+
write(f, data)
11+
end
12+
end
13+
14+
function getBigData(store::FileDataStore, entry::E)::Union{Vector{UInt8}, Nothing} where {E <: AbstractBigDataEntry}
15+
length(searchdir(store.folder, String(entry.key))) !=1 && (@warn "Could not find unique file for key '$(entry.key)'."; return nothing)
16+
return readentry(store, entry)
17+
end
18+
19+
function addBigData!(store::FileDataStore, entry::E, data::Vector{UInt8})::Vector{UInt8} where {E <: AbstractBigDataEntry}
20+
length(searchdir(store.folder, String(entry.key))) !=0 && @warn "Key '$(entry.key)' already exists, overwriting!."
21+
writeentry(store, entry, data)
22+
# Update timestamp
23+
entry.lastUpdatedTimestamp = now()
24+
return getBigData(store, entry)
25+
end
26+
27+
function updateBigData!(store::FileDataStore, entry::E, data::Vector{UInt8})::Union{Vector{UInt8}, Nothing} where {E <: AbstractBigDataEntry}
28+
length(searchdir(store.folder, String(entry.key))) !=1 && (@warn "Could not find unique file for key '$(entry.key)'."; return nothing)
29+
writeentry(store, entry, data)
30+
# Update timestamp
31+
entry.lastUpdatedTimestamp = now()
32+
return getBigData(store, entry)
33+
end
34+
35+
function deleteBigData!(store::FileDataStore, entry::E)::Vector{UInt8} where {E <: AbstractBigDataEntry}
36+
data = getData(store, v, entry.key)
37+
data == nothing && return nothing
38+
rm(filename(store, entry))
39+
return data
40+
end
41+
#
42+
# function listStoreEntries(store::FileDataStore)::Vector{E} where {E <: AbstractBigDataEntry}
43+
# return collect(values(store.entries))
44+
# end

0 commit comments

Comments
 (0)