Skip to content

Commit 25531d3

Browse files
committed
Project Sink Decouple
1 parent 73d26f5 commit 25531d3

File tree

4 files changed

+66
-46
lines changed

4 files changed

+66
-46
lines changed

src/SQLite.jl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,8 +250,8 @@ function transaction(db, mode="DEFERRED")
250250
end
251251
function transaction(f::Function, db)
252252
# generate a random name for the savepoint
253-
name = string("SQLITE",randstring(10))
254-
execute!(db,"PRAGMA synchronous = OFF;")
253+
name = string("SQLITE", randstring(10))
254+
execute!(db, "PRAGMA synchronous = OFF;")
255255
transaction(db, name)
256256
try
257257
f()
@@ -261,7 +261,7 @@ function transaction(f::Function, db)
261261
finally
262262
# savepoints are not released on rollback
263263
commit(db, name)
264-
execute!(db,"PRAGMA synchronous = ON;")
264+
execute!(db, "PRAGMA synchronous = ON;")
265265
end
266266
end
267267

@@ -363,6 +363,7 @@ type Sink <: Data.Sink # <: IO
363363
db::DB
364364
tablename::String
365365
stmt::Stmt
366+
transaction::String
366367
end
367368

368369
include("Source.jl")

src/Sink.jl

Lines changed: 44 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,22 @@ sqlitetype(::Type{NullType}) = "NULL"
55
sqlitetype(x) = "BLOB"
66

77
"""
8-
independent SQLite.Sink constructor to create a new or wrap an existing SQLite table with name `tablename`.
8+
independent SQLite.Sink constructor to create a new or wrap an existing SQLite table with name `name`.
99
must provide a `Data.Schema` through the `schema` argument
10-
can optionally provide an existing SQLite table name or new name that a created SQLite table will be called through the `tablename` argument
10+
can optionally provide an existing SQLite table name or new name that a created SQLite table will be called through the `name` argument
1111
`temp=true` will create a temporary SQLite table that will be destroyed automatically when the database is closed
12-
`ifnotexists=false` will throw an error if `tablename` already exists in `db`
12+
`ifnotexists=false` will throw an error if `name` already exists in `db`
1313
"""
1414
function Sink(db::DB, schema::Data.Schema; name::AbstractString="julia_"*randstring(), temp::Bool=false, ifnotexists::Bool=true, append::Bool=false)
1515
rows, cols = size(schema)
1616
temp = temp ? "TEMP" : ""
1717
ifnotexists = ifnotexists ? "IF NOT EXISTS" : ""
1818
columns = [string(esc_id(schema.header[i]), ' ', sqlitetype(schema.types[i])) for i = 1:cols]
1919
SQLite.execute!(db, "CREATE $temp TABLE $ifnotexists $(esc_id(name)) ($(join(columns, ',')))")
20+
!append && execute!(db, "delete from $(esc_id(name))")
2021
params = chop(repeat("?,", cols))
2122
stmt = SQLite.Stmt(db, "INSERT INTO $(esc_id(name)) VALUES ($params)")
22-
return Sink(schema, db, name, stmt)
23+
return Sink(schema, db, name, stmt, "")
2324
end
2425

2526
"constructs a new SQLite.Sink from the given `SQLite.Source`; uses `source` schema to create the SQLite table"
@@ -35,12 +36,11 @@ end
3536
Data.streamtypes{T<:SQLite.Sink}(::Type{T}) = [Data.Field]
3637

3738
function Sink{T}(source, ::Type{T}, append::Bool, db::DB, name::AbstractString="julia_" * randstring())
38-
sink = Sink(db, Data.schema(source); name=name)
39-
!append && execute!(db, "delete from $name")
39+
sink = Sink(db, Data.schema(source); name=name, append=append)
4040
return sink
4141
end
4242
function Sink{T}(sink, source, ::Type{T}, append::Bool)
43-
!append && execute!(sink.db, "delete from $(sink.tablename)")
43+
!append && execute!(sink.db, "delete from $(esc_id(sink.tablename))")
4444
return sink
4545
end
4646

@@ -64,33 +64,35 @@ if isdefined(:isna)
6464
else
6565
getbind!{T}(val::T, col, stmt) = SQLite.bind!(stmt, col, val)
6666
end
67-
function getfield!{T}(source, ::Type{T}, stmt, row, col)
68-
val = Data.getfield(source, T, row, col)
69-
getbind!(val, col, stmt)
67+
68+
function Data.open!(sink::SQLite.Sink, source)
69+
execute!(sink.db,"PRAGMA synchronous = OFF;")
70+
sink.transaction = string("SQLITE",randstring(10))
71+
transaction(sink.db, sink.transaction)
72+
return nothing
7073
end
71-
# stream the data in `dt` into the SQLite table represented by `sink`
72-
function Data.stream!(source, ::Type{Data.Field}, sink::SQLite.Sink, append::Bool)
73-
!append && execute!(sink.db, "delete from $(sink.tablename)")
74-
rows, cols = size(source)
75-
Data.isdone(source, 1, 1) && return sink
76-
types = Data.types(source)
77-
handle = sink.stmt.handle
78-
transaction(sink.db) do
79-
row = 1
80-
while true
81-
for col = 1:cols
82-
@inbounds T = types[col]
83-
@inbounds getfield!(source, T, sink.stmt, row, col)
84-
end
85-
SQLite.sqlite3_step(handle)
86-
SQLite.sqlite3_reset(handle)
87-
row += 1
88-
Data.isdone(source, row, cols) && break
89-
end
90-
Data.setrows!(source, row - 1)
74+
75+
function Data.streamfield!{T}(sink::SQLite.Sink, source, ::Type{T}, row, col, cols)
76+
val = Data.getfield(source, T, row ,col)
77+
getbind!(val, col, sink.stmt)
78+
if col == cols
79+
SQLite.sqlite3_step(sink.stmt.handle)
80+
SQLite.sqlite3_reset(sink.stmt.handle)
9181
end
92-
SQLite.execute!(sink.db,"ANALYZE $(esc_id(sink.tablename))")
93-
return sink
82+
return nothing
83+
end
84+
85+
function Data.cleanup!(sink::SQLite.Sink)
86+
rollback(sink.db, sink.transaction)
87+
execute!(sink.db, "PRAGMA synchronous = ON;")
88+
return nothing
89+
end
90+
91+
function Data.close!(sink::SQLite.Sink)
92+
commit(sink.db, sink.transaction)
93+
execute!(sink.db, "PRAGMA synchronous = ON;")
94+
SQLite.execute!(sink.db, "ANALYZE $(esc_id(sink.tablename))")
95+
return nothing
9496
end
9597

9698
"""
@@ -105,17 +107,20 @@ function load{T}(db, name, ::Type{T}, args...;
105107
append::Bool=false)
106108
source = T(args...)
107109
schema = Data.schema(source)
108-
sink = Sink(db, schema; name=name, temp=temp, ifnotexists=ifnotexists)
109-
return Data.stream!(source, sink, append)
110+
sink = Sink(db, schema; name=name, temp=temp, ifnotexists=ifnotexists, append=append)
111+
Data.stream!(source, sink, append)
112+
Data.close!(sink)
113+
return sink
110114
end
111115
function load{T}(db, name, source::T;
112116
temp::Bool=false,
113117
ifnotexists::Bool=true,
114118
append::Bool=false)
115-
schema = Data.schema(source)
116-
sink = Sink(db, schema; name=name, temp=temp, ifnotexists=ifnotexists)
117-
return Data.stream!(source, sink, append)
119+
sink = Sink(db, Data.schema(source); name=name, temp=temp, ifnotexists=ifnotexists, append=append)
120+
Data.stream!(source, sink, append)
121+
Data.close!(sink)
122+
return sink
118123
end
119124

120-
load{T}(sink::Sink, ::Type{T}, args...; append::Bool=false) = Data.stream!(T(args...), sink, append)
121-
load(sink::Sink, source; append::Bool=false) = Data.stream!(source, sink, append)
125+
load{T}(sink::Sink, ::Type{T}, args...; append::Bool=false) = (sink = Data.stream!(T(args...), sink, append); Data.close!(sink); return sink)
126+
load(sink::Sink, source; append::Bool=false) = (sink = Data.stream!(source, sink, append); Data.close!(sink); return sink)

src/Source.jl

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,15 +108,19 @@ Will bind `values` to any parameters in `sql`.
108108
"""
109109
function query(db::DB, sql::AbstractString, sink=DataFrame, args...; append::Bool=false, values=[], rows::Int=-1, stricttypes::Bool=true, nullable::Bool=true)
110110
source = Source(db, sql, values; rows=rows, stricttypes=stricttypes, nullable=nullable)
111-
return Data.stream!(source, sink, append, args...)
111+
sink = Data.stream!(source, sink, append, args...)
112+
Data.close!(sink)
113+
return sink
112114
end
113115

114116
function query{T}(db::DB, sql::AbstractString, sink::T; append::Bool=false, values=[], rows::Int=-1, stricttypes::Bool=true, nullable::Bool=true)
115117
source = Source(db, sql, values; rows=rows, stricttypes=stricttypes, nullable=nullable)
116-
return Data.stream!(source, sink, append)
118+
sink = Data.stream!(source, sink, append)
119+
Data.close!(sink)
120+
return sink
117121
end
118-
query(source::SQLite.Source, sink=DataFrame, args...; append::Bool=false) = Data.stream!(source, sink, append, args...)
119-
query{T}(source::SQLite.Source, sink::T; append::Bool=false) = Data.stream!(source, sink, append)
122+
query(source::SQLite.Source, sink=DataFrame, args...; append::Bool=false) = (sink = Data.stream!(source, sink, append, args...); Data.close!(sink); return sink)
123+
query{T}(source::SQLite.Source, sink::T; append::Bool=false) = (sink = Data.stream!(source, sink, append); Data.close!(sink); return sink)
120124

121125
"""
122126
`SQLite.tables(db, sink=DataFrame)`

test/runtests.jl

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ r = SQLite.query(db,"select * from temp limit 10")
166166
dt = DataFrame(Data.Schema([Float64,Float64,Float64,Float64,Float64],5))
167167
sink = SQLite.Sink(db,dt)
168168
Data.stream!(dt,sink)
169+
Data.close!(sink)
169170
r = SQLite.query(db,"select * from $(sink.tablename)")
170171
@test size(r) == (5,5)
171172
@test Data.header(r) == ["Column1","Column2","Column3","Column4","Column5"]
@@ -174,6 +175,7 @@ SQLite.drop!(db,"$(sink.tablename)")
174175
dt = DataFrame(zeros(5,5))
175176
sink = SQLite.Sink(db,dt)
176177
Data.stream!(dt,sink)
178+
Data.close!(sink)
177179
r = SQLite.query(db,"select * from $(sink.tablename)")
178180
@test size(r) == (5,5)
179181
@test all([get(i) for i in r.columns[1]] .== 0.0)
@@ -183,13 +185,15 @@ SQLite.drop!(db,"$(sink.tablename)")
183185
dt = DataFrame(zeros(Int,5,5))
184186
sink = SQLite.Sink(db,dt)
185187
Data.stream!(dt,sink)
188+
Data.close!(sink)
186189
r = SQLite.query(db,"select * from $(sink.tablename)")
187190
@test size(r) == (5,5)
188191
@test all([get(i) for i in r.columns[1]] .== 0)
189192
@test all([eltype(i) for i in r.columns[1]] .== Int)
190193

191194
dt = DataFrame(ones(Int,5,5))
192195
Data.stream!(dt,sink,true) # stream to an existing Sink
196+
Data.close!(sink)
193197
r = SQLite.query(db,"select * from $(sink.tablename)")
194198
@test size(r) == (10,5)
195199
@test [get(i) for i in r.columns[1]] == [0,0,0,0,0,1,1,1,1,1]
@@ -200,6 +204,7 @@ rng = Date(2013):Date(2013,1,5)
200204
dt = DataFrame([i for i = rng, j = rng])
201205
sink = SQLite.Sink(db,dt)
202206
Data.stream!(dt,sink)
207+
Data.close!(sink)
203208
r = SQLite.query(db,"select * from $(sink.tablename)")
204209
@test size(r) == (5,5)
205210
@test all([get(i) for i in r.columns[1]] .== rng)
@@ -355,6 +360,7 @@ source = SQLite.Source(db,"select * from album")
355360
temp = tempname()
356361
sink = CSV.Sink(temp)
357362
Data.stream!(source,sink)
363+
Data.close!(sink)
358364
dt = Data.stream!(CSV.Source(sink),DataFrame)
359365
@test get(dt[1,1]) == 1
360366
@test get(dt[1,2]) == "For Those About To Rock We Salute You"
@@ -366,6 +372,7 @@ db = SQLite.DB()
366372
source = CSV.Source(temp)
367373
sink = SQLite.Sink(db,source; name="temp")
368374
Data.stream!(source,sink)
375+
Data.close!(sink)
369376
source2 = SQLite.Source(sink)
370377
dt = Data.stream!(source2,DataFrame)
371378
@test get(dt[1,1]) == 1
@@ -374,6 +381,7 @@ dt = Data.stream!(source2,DataFrame)
374381

375382
sink = SQLite.Sink(db, Data.schema(dt); name="temp2")
376383
Data.stream!(dt,sink)
384+
Data.close!(sink)
377385
source3 = SQLite.Source(sink)
378386
dt = Data.stream!(source3,DataFrame)
379387
@test get(dt[1,1]) == 1
@@ -391,6 +399,7 @@ dt = DataFrame(d)
391399
SQLite.drop!(db, "temp", ifexists=true)
392400
sink = SQLite.Sink(db, dt; name="temp")
393401
Data.stream!(dt, sink)
402+
Data.close!(sink)
394403
dt2 = SQLite.query(db, "Select * from temp")
395404
#There might be a better way to check this
396405
@test dt.columns[1][1].value==dt2.columns[1][1].value
@@ -408,6 +417,7 @@ dt = DataFrame(d,[:ints,:strs])
408417
SQLite.drop!(db, "temp", ifexists=true)
409418
sink = SQLite.Sink(db, dt; name="temp")
410419
Data.stream!(dt, sink)
420+
Data.close!(sink)
411421
SQLite.removeduplicates!(db, "temp", ["ints","strs"]) #New format
412422
dt3 = SQLite.query(db, "Select * from temp")
413423
@test get(dt3[1,1]) == 1

0 commit comments

Comments
 (0)