Skip to content

Commit 1f7d5e4

Browse files
committed
Project Integration Tests
1 parent e39e7dc commit 1f7d5e4

File tree

4 files changed

+75
-64
lines changed

4 files changed

+75
-64
lines changed

src/SQLite.jl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -358,8 +358,7 @@ type Source <: Data.Source
358358
end
359359

360360
"SQLite.Sink implements the `Sink` interface in the `DataStreams` framework"
361-
type Sink <: Data.Sink # <: IO
362-
schema::Data.Schema
361+
type Sink <: Data.Sink
363362
db::DB
364363
tablename::String
365364
stmt::Stmt

src/Sink.jl

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,38 +4,49 @@ sqlitetype{T<:AbstractString}(::Type{T}) = "TEXT"
44
sqlitetype(::Type{NullType}) = "NULL"
55
sqlitetype(x) = "BLOB"
66

7+
function createtable!(db::DB, name::AbstractString, schema::Data.Schema; temp::Bool=false, ifnotexists::Bool=true)
8+
rows, cols = size(schema)
9+
temp = temp ? "TEMP" : ""
10+
ifnotexists = ifnotexists ? "IF NOT EXISTS" : ""
11+
header, types = Data.header(schema), Data.types(schema)
12+
columns = [string(esc_id(header[i]), ' ', sqlitetype(types[i])) for i = 1:cols]
13+
SQLite.execute!(db, "CREATE $temp TABLE $ifnotexists $(esc_id(name)) ($(join(columns, ',')))")
14+
return name
15+
end
16+
717
"""
818
independent SQLite.Sink constructor to create a new or wrap an existing SQLite table with name `name`.
919
must provide a `Data.Schema` through the `schema` argument
1020
can optionally provide an existing SQLite table name or new name that a created SQLite table will be called through the `name` argument
1121
`temp=true` will create a temporary SQLite table that will be destroyed automatically when the database is closed
1222
`ifnotexists=false` will throw an error if `name` already exists in `db`
1323
"""
14-
function Sink(db::DB, schema::Data.Schema; name::AbstractString="julia_"*randstring(), temp::Bool=false, ifnotexists::Bool=true, append::Bool=false)
15-
rows, cols = size(schema)
16-
temp = temp ? "TEMP" : ""
17-
ifnotexists = ifnotexists ? "IF NOT EXISTS" : ""
18-
columns = [string(esc_id(schema.header[i]), ' ', sqlitetype(schema.types[i])) for i = 1:cols]
19-
SQLite.execute!(db, "CREATE $temp TABLE $ifnotexists $(esc_id(name)) ($(join(columns, ',')))")
20-
!append && execute!(db, "delete from $(esc_id(name))")
24+
function Sink(db::DB, name::AbstractString, schema::Data.Schema=Data.Schema(); temp::Bool=false, ifnotexists::Bool=true, append::Bool=false)
25+
cols = size(SQLite.query(db, "pragma table_info($name)"), 1)
26+
if cols == 0
27+
createtable!(db, name, schema)
28+
cols = size(SQLite.query(db, "pragma table_info($name)"), 1)
29+
else
30+
!append && execute!(db, "delete from $(esc_id(name))")
31+
end
2132
params = chop(repeat("?,", cols))
2233
stmt = SQLite.Stmt(db, "INSERT INTO $(esc_id(name)) VALUES ($params)")
23-
return Sink(schema, db, name, stmt, "")
34+
return Sink(db, name, stmt, "")
2435
end
2536

2637
# DataStreams interface
2738
Data.streamtypes{T<:SQLite.Sink}(::Type{T}) = [Data.Field]
2839

29-
function Sink{T}(sch::Data.Schema, ::Type{T}, append::Bool, ref::Vector{UInt8}, db::DB, name::AbstractString="julia_" * randstring(); kwargs...)
30-
sink = Sink(db, sch; name=name, append=append, kwargs...)
31-
execute!(sink.db,"PRAGMA synchronous = OFF;")
40+
function Sink{T}(sch::Data.Schema, ::Type{T}, append::Bool, ref::Vector{UInt8}, db::DB, name::AbstractString; kwargs...)
41+
sink = Sink(db, name, sch; append=append, kwargs...)
42+
execute!(sink.db, "PRAGMA synchronous = OFF;")
3243
sink.transaction = string("SQLITE",randstring(10))
3344
transaction(sink.db, sink.transaction)
3445
return sink
3546
end
3647
function Sink{T}(sink, sch::Data.Schema, ::Type{T}, append::Bool, ref::Vector{UInt8})
37-
execute!(sink.db,"PRAGMA synchronous = OFF;")
38-
sink.transaction = string("SQLITE",randstring(10))
48+
execute!(sink.db, "PRAGMA synchronous = OFF;")
49+
sink.transaction = string("SQLITE", randstring(10))
3950
transaction(sink.db, sink.transaction)
4051
!append && execute!(sink.db, "delete from $(esc_id(sink.tablename))")
4152
return sink
@@ -62,9 +73,9 @@ else
6273
getbind!{T}(val::T, col, stmt) = SQLite.bind!(stmt, col, val)
6374
end
6475

65-
function Data.stream!{T}(sink::SQLite.Sink, ::Type{Data.Field}, val::T, row, col, cols)
76+
function Data.streamto!{T}(sink::SQLite.Sink, ::Type{Data.Field}, val::T, row, col, sch)
6677
getbind!(val, col, sink.stmt)
67-
if col == cols
78+
if col == size(sch, 2)
6879
SQLite.sqlite3_step(sink.stmt.handle)
6980
SQLite.sqlite3_reset(sink.stmt.handle)
7081
end

src/Source.jl

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,15 @@ function Source(db::DB, sql::AbstractString, values=[]; rows::Int=-1, stricttype
2525
types[i] = stricttypes ? SQLite.juliatype(stmt.handle, i) : Any
2626
end
2727
end
28-
return SQLite.Source(Data.Schema(Data.Field, header, types, rows), stmt, status)
28+
return SQLite.Source(Data.Schema(header, types, rows), stmt, status)
2929
end
3030

3131
"""
3232
`SQLite.Source(sink::SQLite.Sink, sql="select * from \$(sink.tablename)")`
3333
3434
constructs an SQLite.Source from an SQLite.Sink; selects all rows/columns from the underlying Sink table by default
3535
"""
36-
Source(sink::SQLite.Sink,sql::AbstractString="select * from $(sink.tablename)") = Source(sink.db, sql::AbstractString)
36+
Source(sink::SQLite.Sink, sql::AbstractString="select * from $(sink.tablename)") = Source(sink.db, sql::AbstractString)
3737

3838
function juliatype(handle,col)
3939
x = SQLite.sqlite3_column_type(handle, col)
@@ -61,18 +61,19 @@ function sqlitevalue{T}(::Type{T}, handle, col)
6161
end
6262

6363
# DataStreams interface
64+
Data.schema(source::SQLite.Source, ::Type{Data.Field}) = source.schema
6465
function Data.isdone(s::Source, row, col)
6566
(s.status == SQLITE_DONE || s.status == SQLITE_ROW) || sqliteerror(s.stmt.db)
6667
return s.status == SQLITE_DONE
6768
end
6869
# resets an SQLite.Source, ready to read data from at the start of the resultset
69-
Data.reset!(io::SQLite.Source) = (sqlite3_reset(io.stmt.handle); execute!(io.stmt))
70+
# Data.reset!(io::SQLite.Source) = (sqlite3_reset(io.stmt.handle); execute!(io.stmt))
7071
Data.streamtype{T<:SQLite.Source}(::Type{T}, ::Type{Data.Field}) = true
7172

7273
# `T` might be Int, Float64, String, WeakRefString, any Julia type, Any, NullType
7374
# `t` (the actual type of the value we're returning), might be SQLITE_INTEGER, SQLITE_FLOAT, SQLITE_TEXT, SQLITE_BLOB, SQLITE_NULL
74-
# `SQLite.getfield` returns the next `Nullable{T}` value from the `SQLite.Source`
75-
function Data.getfield{T}(source::SQLite.Source, ::Type{Nullable{T}}, row, col)
75+
# `SQLite.streamfrom` returns the next `Nullable{T}` value from the `SQLite.Source`
76+
function Data.streamfrom{T}(source::SQLite.Source, ::Type{Data.Field}, ::Type{Nullable{T}}, row, col)
7677
handle = source.stmt.handle
7778
t = SQLite.sqlite3_column_type(handle, col)
7879
if t == SQLite.SQLITE_NULL
@@ -84,7 +85,7 @@ function Data.getfield{T}(source::SQLite.Source, ::Type{Nullable{T}}, row, col)
8485
col == source.schema.cols && (source.status = sqlite3_step(handle))
8586
return val
8687
end
87-
function Data.getfield{T}(source::SQLite.Source, ::Type{T}, row, col)
88+
function Data.streamfrom{T}(source::SQLite.Source, ::Type{Data.Field}, ::Type{T}, row, col)
8889
handle = source.stmt.handle
8990
t = SQLite.sqlite3_column_type(handle, col)
9091
if t == SQLite.SQLITE_NULL

test/runtests.jl

Lines changed: 41 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ ds5 = SQLite.query(db, selall(sink_table))
4545
@test Data.types(ds) == Data.types(ds2) && Data.header(ds) == Data.header(ds2)
4646

4747
# constructed SQLite.Sink
48-
sink = SQLite.Sink(db, Data.schema(ds5); name=sink_table)
48+
sink = SQLite.Sink(db, sink_table, Data.schema(ds5))
4949
SQLite.query(db, selall(source_table), sink)
5050
ds2 = SQLite.query(db, selall(sink_table))
5151
@test size(ds2) == (8,15)
@@ -67,7 +67,7 @@ SQLite.query(source, SQLite.Sink, db, sink_table; append=true)
6767
ds3 = SQLite.query(db, selall(sink_table))
6868
@test size(ds3) == (16,15)
6969

70-
sink = SQLite.Sink(db, Data.schema(ds); name=sink_table)
70+
sink = SQLite.Sink(db, sink_table, Data.schema(ds))
7171
source = SQLite.Source(db, selall(source_table))
7272
SQLite.query(source, sink)
7373
ds = SQLite.query(db, selall(sink_table))
@@ -94,22 +94,22 @@ si = SQLite.load(db, sink_table, source; append=true)
9494
ds2 = SQLite.query(db, selall(sink_table))
9595
@test size(ds2) == (16,15)
9696

97-
sink = SQLite.Sink(db, Data.schema(ds); name=sink_table)
97+
sink = SQLite.Sink(db, sink_table, Data.schema(ds))
9898
si = SQLite.load(sink, SQLite.Source, db, selall(source_table))
9999
ds = SQLite.query(db, selall(sink_table))
100100
@test size(ds) == (8,15)
101-
sink = SQLite.Sink(db, Data.schema(ds); name=sink_table, append=true)
101+
sink = SQLite.Sink(db, sink_table, Data.schema(ds); append=true)
102102
si = SQLite.load(sink, SQLite.Source, db, selall(source_table); append=true)
103103
ds2 = SQLite.query(db, selall(sink_table))
104104
@test size(ds2) == (16,15)
105105

106106
source = SQLite.Source(db, selall(source_table))
107-
sink = SQLite.Sink(db, Data.schema(ds); name=sink_table)
107+
sink = SQLite.Sink(db, sink_table, Data.schema(ds))
108108
si = SQLite.load(sink, source)
109109
ds = SQLite.query(db, selall(sink_table))
110110
@test size(ds) == (8,15)
111111
source = SQLite.Source(db, selall(source_table))
112-
sink = SQLite.Sink(db, Data.schema(ds); name=sink_table, append=true)
112+
sink = SQLite.Sink(db, sink_table, Data.schema(ds); append=true)
113113
si = SQLite.load(sink, source; append=true)
114114
ds2 = SQLite.query(db, selall(sink_table))
115115
@test size(ds2) == (16,15)
@@ -163,8 +163,8 @@ r = SQLite.query(db,"select * from temp limit 10")
163163
@test all(Bool[get(x) == Date(2014,1,1) for x in r[:,5]])
164164
@test length(SQLite.query(db,"drop table temp").columns) == 0
165165

166-
dt = DataFrame(Data.Schema(Data.Field, [Float64,Float64,Float64,Float64,Float64],5))
167-
sink = SQLite.Sink(db,Data.schema(dt, Data.Field))
166+
dt = DataFrame(Data.Schema([Float64,Float64,Float64,Float64,Float64],5))
167+
sink = SQLite.Sink(db, "temp", Data.schema(dt, Data.Field))
168168
Data.stream!(dt,sink)
169169
Data.close!(sink)
170170
r = SQLite.query(db,"select * from $(sink.tablename)")
@@ -173,45 +173,45 @@ r = SQLite.query(db,"select * from $(sink.tablename)")
173173
SQLite.drop!(db,"$(sink.tablename)")
174174

175175
dt = DataFrame(zeros(5,5))
176-
sink = SQLite.Sink(db,Data.schema(dt, Data.Field))
177-
Data.stream!(dt,sink)
176+
sink = SQLite.Sink(db, "temp", Data.schema(dt, Data.Field))
177+
Data.stream!(dt, sink)
178178
Data.close!(sink)
179-
r = SQLite.query(db,"select * from $(sink.tablename)")
179+
r = SQLite.query(db, "select * from $(sink.tablename)")
180180
@test size(r) == (5,5)
181181
@test all([get(i) for i in r.columns[1]] .== 0.0)
182182
@test all([eltype(i) for i in r.columns[1]] .== Float64)
183-
SQLite.drop!(db,"$(sink.tablename)")
183+
SQLite.drop!(db, "$(sink.tablename)")
184184

185185
dt = DataFrame(zeros(Int,5,5))
186-
sink = SQLite.Sink(db,Data.schema(dt, Data.Field))
187-
Data.stream!(dt,sink)
186+
sink = SQLite.Sink(db, "temp", Data.schema(dt, Data.Field))
187+
Data.stream!(dt, sink)
188188
Data.close!(sink)
189-
r = SQLite.query(db,"select * from $(sink.tablename)")
189+
r = SQLite.query(db, "select * from $(sink.tablename)")
190190
@test size(r) == (5,5)
191191
@test all([get(i) for i in r.columns[1]] .== 0)
192192
@test all([eltype(i) for i in r.columns[1]] .== Int)
193193

194194
dt = DataFrame(ones(Int,5,5))
195-
Data.stream!(dt,sink,true) # stream to an existing Sink
195+
Data.stream!(dt, sink, true) # stream to an existing Sink
196196
Data.close!(sink)
197-
r = SQLite.query(db,"select * from $(sink.tablename)")
197+
r = SQLite.query(db, "select * from $(sink.tablename)")
198198
@test size(r) == (10,5)
199199
@test [get(i) for i in r.columns[1]] == [0,0,0,0,0,1,1,1,1,1]
200200
@test all([eltype(i) for i in r.columns[1]] .== Int)
201-
SQLite.drop!(db,"$(sink.tablename)")
201+
SQLite.drop!(db, "$(sink.tablename)")
202202

203203
rng = Date(2013):Date(2013,1,5)
204204
dt = DataFrame([i for i = rng, j = rng])
205-
sink = SQLite.Sink(db,Data.schema(dt, Data.Field))
206-
Data.stream!(dt,sink)
205+
sink = SQLite.Sink(db, "temp", Data.schema(dt, Data.Field))
206+
Data.stream!(dt, sink)
207207
Data.close!(sink)
208-
r = SQLite.query(db,"select * from $(sink.tablename)")
208+
r = SQLite.query(db, "select * from $(sink.tablename)")
209209
@test size(r) == (5,5)
210210
@test all([get(i) for i in r.columns[1]] .== rng)
211211
@test all([eltype(i) for i in r.columns[1]] .== Date)
212-
SQLite.drop!(db,"$(sink.tablename)")
212+
SQLite.drop!(db, "$(sink.tablename)")
213213

214-
SQLite.query(db,"CREATE TABLE temp AS SELECT * FROM Album")
214+
SQLite.query(db, "CREATE TABLE temp AS SELECT * FROM Album")
215215
r = SQLite.query(db, "SELECT * FROM temp LIMIT ?"; values=[3])
216216
@test size(r) == (3,3)
217217
r = SQLite.query(db, "SELECT * FROM temp WHERE Title LIKE ?"; values=["%time%"])
@@ -227,11 +227,11 @@ binddb = SQLite.DB()
227227
SQLite.query(binddb, "CREATE TABLE temp (n NULL, i6 INT, f REAL, s TEXT, a BLOB)")
228228
SQLite.query(binddb, "INSERT INTO temp VALUES (?1, ?2, ?3, ?4, ?5)"; values=Any[SQLite.NULL, convert(Int64,6), 6.4, "some text", b"bytearray"])
229229
r = SQLite.query(binddb, "SELECT * FROM temp")
230-
@test isa(get(r.columns[1][1],SQLite.NULL),SQLite.NullType)
231-
@test isa(get(r.columns[2][1]),Int)
232-
@test isa(get(r.columns[3][1]),Float64)
233-
@test isa(get(r.columns[4][1]),AbstractString)
234-
@test isa(get(r.columns[5][1]),Vector{UInt8})
230+
@test isa(get(r.columns[1][1], SQLite.NULL), SQLite.NullType)
231+
@test isa(get(r.columns[2][1]), Int)
232+
@test isa(get(r.columns[3][1]), Float64)
233+
@test isa(get(r.columns[4][1]), AbstractString)
234+
@test isa(get(r.columns[5][1]), Vector{UInt8})
235235
SQLite.query(binddb, "CREATE TABLE blobtest (a BLOB, b BLOB)")
236236
SQLite.query(binddb, "INSERT INTO blobtest VALUES (?1, ?2)"; values=Any[b"a", b"b"])
237237
SQLite.query(binddb, "INSERT INTO blobtest VALUES (?1, ?2)"; values=Any[b"a", BigInt(2)])
@@ -253,7 +253,7 @@ for (v1, v2) in zip(r.columns[2], Any[b"b", BigInt(2), p1, p2])
253253
end
254254
############################################
255255

256-
SQLite.query(db,"CREATE TABLE temp AS SELECT * FROM Album")
256+
SQLite.query(db, "CREATE TABLE temp AS SELECT * FROM Album")
257257
r = SQLite.query(db, "SELECT * FROM temp LIMIT :a"; values=Dict(:a => 3))
258258
@test size(r) == (3,3)
259259
r = SQLite.query(db, "SELECT * FROM temp WHERE Title LIKE @word"; values=Dict(:word => "%time%"))
@@ -356,12 +356,12 @@ SQLite.drop!(db2, "tab2", ifexists=true)
356356
SQLite.drop!(db, "sqlite_stat1")
357357
@test size(SQLite.tables(db)) == (11,1)
358358

359-
source = SQLite.Source(db,"select * from album")
359+
source = SQLite.Source(db, "select * from album")
360360
temp = tempname()
361361
sink = CSV.Sink(temp)
362362
Data.stream!(source,sink)
363363
Data.close!(sink)
364-
dt = Data.stream!(CSV.Source(sink),DataFrame)
364+
dt = Data.stream!(CSV.Source(sink), DataFrame)
365365
@test get(dt[1,1]) == 1
366366
@test get(dt[1,2]) == "For Those About To Rock We Salute You"
367367
@test get(dt[1,3]) == 1
@@ -370,20 +370,20 @@ db = nothing; gc(); gc();
370370

371371
db = SQLite.DB()
372372
source = CSV.Source(temp)
373-
sink = SQLite.Sink(db,Data.schema(source); name="temp")
374-
Data.stream!(source,sink)
373+
sink = SQLite.Sink(db, "temp", Data.schema(source))
374+
Data.stream!(source, sink)
375375
Data.close!(sink)
376376
source2 = SQLite.Source(sink)
377-
dt = Data.stream!(source2,DataFrame)
377+
dt = Data.stream!(source2, DataFrame)
378378
@test get(dt[1,1]) == 1
379379
@test string(get(dt[1,2])) == "For Those About To Rock We Salute You"
380380
@test get(dt[1,3]) == 1
381381

382-
sink = SQLite.Sink(db, Data.schema(dt, Data.Field); name="temp2")
383-
Data.stream!(dt,sink)
382+
sink = SQLite.Sink(db, "temp2", Data.schema(dt, Data.Field))
383+
Data.stream!(dt, sink)
384384
Data.close!(sink)
385385
source3 = SQLite.Source(sink)
386-
dt = Data.stream!(source3,DataFrame)
386+
dt = Data.stream!(source3, DataFrame)
387387
@test get(dt[1,1]) == 1
388388
@test string(get(dt[1,2])) == "For Those About To Rock We Salute You"
389389
@test get(dt[1,3]) == 1
@@ -394,7 +394,7 @@ arr = Array(String,2)
394394
arr[1] = "1" #Now an array with the second value undefined
395395
dt = DataFrame(Any[NullableArrays.NullableArray(arr, [false, true])])
396396
SQLite.drop!(db, "temp", ifexists=true)
397-
sink = SQLite.Sink(db, Data.schema(dt, Data.Field); name="temp")
397+
sink = SQLite.Sink(db, "temp", Data.schema(dt, Data.Field))
398398
Data.stream!(dt, sink)
399399
Data.close!(sink)
400400
dt2 = SQLite.query(db, "Select * from temp")
@@ -409,12 +409,12 @@ strs = String["A", "A", "B", "C", "C"]
409409
nvInts = NullableArrays.NullableArray(ints)
410410
nvStrs = NullableArrays.NullableArray(strs)
411411
d = Any[nvInts, nvStrs]
412-
dt = DataFrame(d, [:ints,:strs])
412+
dt = DataFrame(d, [:ints, :strs])
413413
SQLite.drop!(db, "temp", ifexists=true)
414-
sink = SQLite.Sink(db, Data.schema(dt, Data.Field); name="temp")
414+
sink = SQLite.Sink(db, "temp", Data.schema(dt, Data.Field))
415415
Data.stream!(dt, sink)
416416
Data.close!(sink)
417-
SQLite.removeduplicates!(db, "temp", ["ints","strs"]) #New format
417+
SQLite.removeduplicates!(db, "temp", ["ints", "strs"]) #New format
418418
dt3 = SQLite.query(db, "Select * from temp")
419419
@test get(dt3[1,1]) == 1
420420
@test get(dt3[1,2]) == "A"

0 commit comments

Comments
 (0)