Skip to content

Commit a2a250a

Browse files
committed
Big overhaul to implement the DataStreams framework
1 parent d86fcf6 commit a2a250a

File tree

9 files changed

+1455
-421
lines changed

9 files changed

+1455
-421
lines changed

src/SQLite.jl

Lines changed: 109 additions & 213 deletions
Large diffs are not rendered by default.

src/Sink.jl

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
sqlitetype{T<:Integer}(::Type{T}) = "INT"
2+
sqlitetype{T<:AbstractFloat}(::Type{T}) = "REAL"
3+
sqlitetype{T<:AbstractString}(::Type{T}) = "TEXT"
4+
sqlitetype(x) = "BLOB"
5+
6+
type Sink <: Data.Sink # <: IO
7+
schema::Data.Schema
8+
db::DB
9+
tablename::UTF8String
10+
stmt::Stmt
11+
end
12+
13+
function Source(sink::SQLite.Sink)
14+
stmt = SQLite.Stmt(sink.db,"select * from $(sink.tablename)")
15+
status = SQLite.execute!(stmt)
16+
return SQLite.Source(sink.schema, stmt, status)
17+
end
18+
19+
# independent Sink constructor for new or existing SQLite tables
20+
function Sink(db::DB,tablename::AbstractString="julia_"*randstring(),schema::Data.Schema=Data.EMPTYSCHEMA;temp::Bool=false,ifnotexists::Bool=true)
21+
rows, cols = size(schema)
22+
temp = temp ? "TEMP" : ""
23+
ifnotexists = ifnotexists ? "if not exists" : ""
24+
columns = [string(schema.header[i],' ',sqlitetype(schema.types[i])) for i = 1:cols]
25+
SQLite.execute!(db,"CREATE $temp TABLE $ifnotexists $tablename ($(join(columns,',')))")
26+
params = chop(repeat("?,",cols))
27+
stmt = SQLite.Stmt(db,"insert into $tablename values ($params)")
28+
return Sink(schema,db,utf8(tablename),stmt)
29+
end
30+
31+
# create a new SQLite table
32+
# Data.Table
33+
function getbind!{T}(dt::NullableVector{T},row,col,stmt)
34+
@inbounds SQLite.bind!(stmt,col,ifelse(dt.isnull[row], NULL, dt.values[row]::T))
35+
return
36+
end
37+
38+
function Data.stream!(dt::Data.Table,sink::SQLite.Sink)
39+
rows, cols = size(dt)
40+
types = Data.types(dt)
41+
transaction(sink.db) do
42+
if rows*cols != 0
43+
for row = 1:rows
44+
for col = 1:cols
45+
@inbounds SQLite.getbind!(Data.column(dt,col,types[col]),row,col,sink.stmt)
46+
end
47+
SQLite.execute!(sink.stmt)
48+
end
49+
end
50+
end
51+
SQLite.execute!(sink.db,"analyze $(sink.tablename)")
52+
return sink
53+
end
54+
function Sink(dt::Data.Table,db::DB,tablename::AbstractString="julia_"*randstring();temp::Bool=false,ifnotexists::Bool=false)
55+
sink = Sink(db,tablename,dt.schema;temp=temp,ifnotexists=ifnotexists)
56+
return Data.stream!(dt,sink)
57+
end
58+
# CSV.Source
59+
function getbind!{T}(io,::Type{T},opts,row,col,stmt)
60+
val, isnull = CSV.getfield(io,T,opts,row,col)
61+
SQLite.bind!(stmt,col,ifelse(isnull,NULL,val))
62+
return
63+
end
64+
function Data.stream!(source::CSV.Source,sink::SQLite.Sink)
65+
rows, cols = size(source)
66+
types = Data.types(source)
67+
io = source.data
68+
opts = source.options
69+
transaction(sink.db) do
70+
if rows*cols != 0
71+
for row = 1:rows
72+
for col = 1:cols
73+
@inbounds SQLite.getbind!(io, types[col], opts, row, col, sink.stmt)
74+
end
75+
SQLite.execute!(sink.stmt)
76+
end
77+
end
78+
end
79+
SQLite.execute!(sink.db,"analyze $(sink.tablename)")
80+
return sink
81+
end
82+
function Sink(csv::CSV.Source,db::DB,tablename::AbstractString="julia_"*randstring();temp::Bool=false,ifnotexists::Bool=false)
83+
sink = Sink(db,tablename,csv.schema;temp=temp,ifnotexists=ifnotexists)
84+
return Data.stream!(csv,sink)
85+
end

src/Source.jl

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
type Source <: Data.Source # <: IO
2+
schema::Data.Schema
3+
stmt::Stmt
4+
status::Cint
5+
end
6+
7+
function Source(db::DB,sql::AbstractString, values=[];rows::Int=0,stricttypes::Bool=true)
8+
stmt = SQLite.Stmt(db,sql)
9+
bind!(stmt, values)
10+
status = SQLite.execute!(stmt)
11+
cols = SQLite.sqlite3_column_count(stmt.handle)
12+
header = Array(UTF8String,cols)
13+
types = Array(DataType,cols)
14+
for i = 1:cols
15+
header[i] = bytestring(SQLite.sqlite3_column_name(stmt.handle,i-1))
16+
# do better column type inference; query what the column was created for?
17+
types[i] = stricttypes ? SQLite.juliatype(stmt.handle,i) : Any
18+
end
19+
# rows == -1 && count(*)?
20+
return SQLite.Source(Data.Schema(header,types,rows),stmt,status)
21+
end
22+
23+
function Base.eof(s::Source)
24+
(s.status == SQLITE_DONE || s.status == SQLITE_ROW) || sqliteerror(s.stmt.db)
25+
return s.status == SQLITE_DONE
26+
end
27+
28+
function Base.readline(s::Source,delim::Char=',',buf::IOBuffer=IOBuffer())
29+
eof(s) && return ""
30+
cols = s.schema.cols
31+
for i = 1:cols
32+
val = sqlite3_column_text(s.stmt.handle,i-1)
33+
val != C_NULL && write(buf,bytestring(val))
34+
write(buf,ifelse(i == cols,'\n',delim))
35+
end
36+
s.status = sqlite3_step(s.stmt.handle)
37+
return takebuf_string(buf)
38+
end
39+
40+
function readsplitline(s::Source)
41+
eof(s) && return UTF8String[]
42+
cols = s.schema.cols
43+
vals = Array(UTF8String, cols)
44+
for i = 1:cols
45+
val = sqlite3_column_text(s.stmt.handle,i-1)
46+
valsl[i] = val == C_NULL ? "" : bytestring(val)
47+
end
48+
s.status = sqlite3_step(s.stmt.handle)
49+
return vals
50+
end
51+
52+
reset!(io::SQLite.Source) = (sqlite3_reset(io.stmt.handle); execute!(io.stmt))
53+
54+
sqlitetypecode{T<:Integer}(::Type{T}) = SQLITE_INTEGER
55+
sqlitetypecode{T<:AbstractFloat}(::Type{T}) = SQLITE_FLOAT
56+
sqlitetypecode{T<:AbstractString}(::Type{T}) = SQLITE_TEXT
57+
sqlitetypecode(::Type{BigInt}) = SQLITE_BLOB
58+
sqlitetypecode(::Type{BigFloat}) = SQLITE_BLOB
59+
sqlitetypecode(x) = SQLITE_BLOB
60+
function juliatype(handle,col)
61+
x = SQLite.sqlite3_column_type(handle,col-1)
62+
if x == SQLITE_BLOB
63+
val = sqlitevalue(Any,handle,col)
64+
return typeof(val)
65+
else
66+
return juliatype(x)
67+
end
68+
end
69+
juliatype(x) = x == SQLITE_INTEGER ? Int : x == SQLITE_FLOAT ? Float64 : x == SQLITE_TEXT ? UTF8String : Any
70+
71+
sqlitevalue{T<:Integer}(::Type{T},handle,col) = sqlite3_column_int64(handle,col-1)
72+
sqlitevalue{T<:AbstractFloat}(::Type{T},handle,col) = sqlite3_column_double(handle,col-1)
73+
#TODO: test returning a PointerString instead of calling `bytestring`
74+
sqlitevalue{T<:AbstractString}(::Type{T},handle,col) = convert(T,bytestring(sqlite3_column_text(handle,col-1)))
75+
sqlitevalue(::Type{PointerString},handle,col) = bytestring(sqlite3_column_text(handle,col-1))
76+
sqlitevalue(::Type{BigInt},handle,col) = sqlitevalue(Any,handle,col)
77+
sqlitevalue(::Type{BigFloat},handle,col) = sqlitevalue(Any,handle,col)
78+
function sqlitevalue{T}(::Type{T},handle,col)
79+
blob = convert(Ptr{UInt8},SQLite.sqlite3_column_blob(handle,col-1))
80+
b = SQLite.sqlite3_column_bytes(handle,col-1)
81+
buf = zeros(UInt8,b) # global const?
82+
unsafe_copy!(pointer(buf), blob, b)
83+
r = SQLite.sqldeserialize(buf)::T
84+
return r
85+
end
86+
87+
function getfield{T}(source::SQLite.Source, ::Type{T}, row, col)
88+
handle = source.stmt.handle
89+
t = sqlite3_column_type(handle,col-1)
90+
if t == SQLite.SQLITE_NULL
91+
val = Nullable{T}()
92+
elseif t == SQLite.sqlitetypecode(T)
93+
val = Nullable(sqlitevalue(T,handle,col))
94+
elseif T === Any
95+
val = Nullable(sqlitevalue(juliatype(t),handle,col))
96+
else
97+
throw(SQLiteException("strict type error trying to retrieve type `$T` on row: $(row+1), col: $col; SQLite reports a type of $(sqlitetypecode(T))"))
98+
end
99+
col == source.schema.cols && (source.status = sqlite3_step(handle))
100+
return val
101+
end
102+
103+
function getfield!{T}(source::SQLite.Source, dest::NullableVector{T}, ::Type{T}, row, col)
104+
@inbounds dest[row] = SQLite.getfield(source, T, row, col)
105+
return
106+
end
107+
function pushfield!{T}(source::SQLite.Source, dest::NullableVector{T}, ::Type{T}, row, col)
108+
push!(dest, SQLite.getfield(source, T, row, col))
109+
return
110+
end
111+
112+
function Data.stream!(source::SQLite.Source,sink::Data.Table)
113+
rows, cols = size(source)
114+
types = Data.types(source)
115+
if rows == 0
116+
row = 0
117+
while !eof(source)
118+
for col = 1:cols
119+
@inbounds T = types[col]
120+
SQLite.pushfield!(source, Data.unsafe_column(sink,col,T), T, row, col) # row + datarow
121+
end
122+
row += 1
123+
end
124+
source.schema.rows = row
125+
else
126+
for row = 1:rows, col = 1:cols
127+
@inbounds T = types[col]
128+
SQLite.getfield!(source, Data.unsafe_column(sink,col,T), T, row, col) # row + datarow
129+
end
130+
end
131+
sink.schema = source.schema
132+
return sink
133+
end
134+
# creates a new DataTable according to `source` schema and streams `Source` data into it
135+
function Data.Table(source::SQLite.Source)
136+
sink = Data.Table(source.schema)
137+
return Data.stream!(source,sink)
138+
end
139+
140+
function Data.stream!(source::SQLite.Source,sink::CSV.Sink;header::Bool=true)
141+
header && CSV.writeheaders(source,sink)
142+
rows, cols = size(source)
143+
types = Data.types(source)
144+
row = 0
145+
while !eof(source)
146+
for col = 1:cols
147+
val = SQLite.getfield(source, types[col], row, col)
148+
CSV.writefield(sink, isnull(val) ? sink.null : get(val), col, cols)
149+
end
150+
row += 1
151+
end
152+
source.schema.rows = row
153+
sink.schema = source.schema
154+
close(sink)
155+
return sink
156+
end
157+
158+
function query(db::DB,sql::AbstractString, values=[];rows::Int=0,stricttypes::Bool=true)
159+
so = Source(db,sql,values;rows=rows,stricttypes=stricttypes)
160+
return Data.Table(so)
161+
end
162+
163+
tables(db::DB) = query(db,"SELECT name FROM sqlite_master WHERE type='table';")
164+
indices(db::DB) = query(db,"SELECT name FROM sqlite_master WHERE type='index';")
165+
columns(db::DB,table::AbstractString) = query(db,"pragma table_info($table)")

src/UDF.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ macro register(db, func)
187187
end
188188

189189
# User-facing method for registering a Julia function to be used within SQLite
190-
function register(db::SQLiteDB, func::Function; nargs::Int=-1, name::AbstractString=string(func), isdeterm::Bool=true)
190+
function register(db, func::Function; nargs::Int=-1, name::AbstractString=string(func), isdeterm::Bool=true)
191191
@assert nargs <= 127 "use -1 if > 127 arguments are needed"
192192
# assume any negative number means a varargs function
193193
nargs < -1 && (nargs = -1)
@@ -207,7 +207,7 @@ end
207207

208208
# as above but for aggregate functions
209209
function register(
210-
db::SQLiteDB, init, step::Function, final::Function=identity;
210+
db, init, step::Function, final::Function=identity;
211211
nargs::Int=-1, name::AbstractString=string(step), isdeterm::Bool=true
212212
)
213213
@assert nargs <= 127 "use -1 if > 127 arguments are needed"

0 commit comments

Comments
 (0)