Skip to content

Commit 8fbfb0e

Browse files
committed
More updates to better reflect the DataStreams interface. Also cleaned up the result-fetch typing as well
1 parent 10cff2c commit 8fbfb0e

File tree

6 files changed

+132
-162
lines changed

6 files changed

+132
-162
lines changed

src/SQLite.jl

Lines changed: 49 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
1+
using DataStreams
12
module SQLite
23

34
using Compat, NullableArrays, CSV, Libz, DataStreams
4-
import CSV.PointerString
5-
6-
# Deprecated exports
7-
export NULL, SQLiteDB, SQLiteStmt, ResultSet,
8-
execute, query, tables, indices, columns, droptable, dropindex,
9-
create, createindex, append, deleteduplicates
10-
11-
import Base: ==, show, convert, bind, close
5+
const PointerString = Data.PointerString
6+
const NULLSTRING = Data.NULLSTRING
127

138
type SQLiteException <: Exception
149
msg::AbstractString
@@ -17,38 +12,34 @@ end
1712
include("consts.jl")
1813
include("api.jl")
1914

20-
# Custom NULL type
2115
immutable NullType end
16+
"custom NULL value for interacting with the SQLite database"
2217
const NULL = NullType()
2318
show(io::IO,::NullType) = print(io,"#NULL")
2419

25-
"internal wrapper type to, in-effect, mark something which has been serialized"
26-
immutable Serialization
27-
object
28-
end
29-
3020
#TODO: Support sqlite3_open_v2
3121
# Normal constructor from filename
3222
sqliteopen(file,handle) = sqlite3_open(file,handle)
3323
sqliteopen(file::UTF16String,handle) = sqlite3_open16(file,handle)
3424
sqliteerror() = throw(SQLiteException(bytestring(sqlite3_errmsg())))
3525
sqliteerror(db) = throw(SQLiteException(bytestring(sqlite3_errmsg(db.handle))))
3626

27+
"represents an SQLite database, either backed by an on-disk file or in-memory"
3728
type DB
3829
file::UTF8String
3930
handle::Ptr{Void}
4031
changes::Int
4132

4233
function DB(f::UTF8String)
43-
handle = [C_NULL]
34+
handle = Ref{Ptr{Void}}()
4435
f = isempty(f) ? f : expanduser(f)
4536
if @OK sqliteopen(f,handle)
46-
db = new(f,handle[1],0)
37+
db = new(f,handle[],0)
4738
register(db, regexp, nargs=2, name="regexp")
4839
finalizer(db, _close)
4940
return db
5041
else # error
51-
sqlite3_close(handle[1])
42+
sqlite3_close(handle[])
5243
sqliteerror()
5344
end
5445
end
@@ -74,9 +65,9 @@ type Stmt
7465
handle::Ptr{Void}
7566

7667
function Stmt(db::DB,sql::AbstractString)
77-
handle = [C_NULL]
78-
sqliteprepare(db,sql,handle,[C_NULL])
79-
stmt = new(db,handle[1])
68+
handle = Ref{Ptr{Void}}()
69+
sqliteprepare(db,sql,handle,Ref{Ptr{Void}}())
70+
stmt = new(db,handle[])
8071
finalizer(stmt, _close)
8172
return stmt
8273
end
@@ -88,8 +79,7 @@ function _close(stmt::Stmt)
8879
return
8980
end
9081

91-
sqliteprepare(db,sql,stmt,null) =
92-
@CHECK db sqlite3_prepare_v2(db.handle,utf8(sql),stmt,null)
82+
sqliteprepare(db,sql,stmt,null) = @CHECK db sqlite3_prepare_v2(db.handle,utf8(sql),stmt,null)
9383

9484
# TO DEPRECATE
9585
type SQLiteDB{T<:AbstractString}
@@ -140,8 +130,14 @@ bind!(stmt::Stmt,i::Int,val::PointerString) = (sqlite3_bind_text(stmt.handle,i,
140130
bind!(stmt::Stmt,i::Int,val::UTF16String) = (sqlite3_bind_text16(stmt.handle,i,val); return nothing)
141131
# We may want to track the new ByteVec type proposed at https://github.com/JuliaLang/julia/pull/8964
142132
# as the "official" bytes type instead of Vector{UInt8}
133+
"bind a byte vector as an SQLite BLOB"
143134
bind!(stmt::Stmt,i::Int,val::Vector{UInt8}) = (sqlite3_bind_blob(stmt.handle,i,val); return nothing)
144135
# Fallback is BLOB and defaults to serializing the julia value
136+
"internal wrapper type to, in-effect, mark something which has been serialized"
137+
immutable Serialization
138+
object
139+
end
140+
145141
function sqlserialize(x)
146142
t = IOBuffer()
147143
# deserialize will sometimes return a random object when called on an array
@@ -151,8 +147,21 @@ function sqlserialize(x)
151147
serialize(t,s)
152148
return takebuf_array(t)
153149
end
154-
"bind `val` to the parameter at index `i`"
150+
"fallback method to bind arbitrary julia `val` to the parameter at index `i` (object is serialized)"
155151
bind!(stmt::Stmt,i::Int,val) = bind!(stmt,i,sqlserialize(val))
152+
153+
# magic bytes that indicate that a value is in fact a serialized julia value, instead of just a byte vector
154+
const SERIALIZATION = UInt8[0x11,0x01,0x02,0x0d,0x53,0x65,0x72,0x69,0x61,0x6c,0x69,0x7a,0x61,0x74,0x69,0x6f,0x6e,0x23]
155+
function sqldeserialize(r)
156+
ret = ccall(:memcmp, Int32, (Ptr{UInt8},Ptr{UInt8}, UInt),
157+
SERIALIZATION, r, min(18,length(r)))
158+
if ret == 0
159+
v = deserialize(IOBuffer(r))
160+
return v.object
161+
else
162+
return r
163+
end
164+
end
156165
#TODO:
157166
#int sqlite3_bind_zeroblob(sqlite3_stmt*, int, int n);
158167
#int sqlite3_bind_value(sqlite3_stmt*, int, const sqlite3_value*);
@@ -173,19 +182,6 @@ function execute!(db::DB,sql::AbstractString)
173182
return execute!(stmt)
174183
end
175184

176-
const SERIALIZATION = UInt8[0x11,0x01,0x02,0x0d,0x53,0x65,0x72,0x69,0x61,0x6c,0x69,0x7a,0x61,0x74,0x69,0x6f,0x6e,0x23]
177-
function sqldeserialize(r)
178-
ret = ccall(:memcmp, Int32, (Ptr{UInt8},Ptr{UInt8}, UInt),
179-
SERIALIZATION, r, min(18,length(r)))
180-
181-
if ret == 0
182-
v = deserialize(IOBuffer(r))
183-
return v.object
184-
else
185-
return r
186-
end
187-
end
188-
189185
# Transaction-based commands
190186
"""
191187
Begin a transaction in the spedified `mode`, default = "DEFERRED".
@@ -271,6 +267,23 @@ function removeduplicates!(db,table::AbstractString,cols::AbstractString)
271267
return
272268
end
273269

270+
"""
271+
`SQLite.Source` implementes the `DataStreams` framework for interacting with SQLite databases
272+
"""
273+
type Source <: Data.Source
274+
schema::Data.Schema
275+
stmt::Stmt
276+
status::Cint
277+
end
278+
279+
"SQLite.Sink implements the `Sink` interface in the `DataStreams` framework"
280+
type Sink <: Data.Sink # <: IO
281+
schema::Data.Schema
282+
db::DB
283+
tablename::UTF8String
284+
stmt::Stmt
285+
end
286+
274287
include("Source.jl")
275288
include("Sink.jl")
276289

src/Sink.jl

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,15 @@ sqlitetype{T<:AbstractFloat}(::Type{T}) = "REAL"
33
sqlitetype{T<:AbstractString}(::Type{T}) = "TEXT"
44
sqlitetype(::Type{NullType}) = "NULL"
55
sqlitetype(x) = "BLOB"
6-
"SQLite.Sink implements the `Sink` interface in the `DataStreams` framework"
7-
type Sink <: Data.Sink # <: IO
8-
schema::Data.Schema
9-
db::DB
10-
tablename::UTF8String
11-
stmt::Stmt
12-
end
13-
"constructs an SQLite.Source from an SQLite.Sink; selects all rows/columns from the underlying Sink table by default"
14-
function Source(sink::SQLite.Sink,sql::AbstractString="select * from $(sink.tablename)")
15-
stmt = SQLite.Stmt(sink.db,sql)
16-
status = SQLite.execute!(stmt)
17-
return SQLite.Source(sink.schema, stmt, status)
18-
end
196

207
"""
218
independent SQLite.Sink constructor to create a new or wrap an existing SQLite table with name `tablename`.
22-
can optionally provide a `Data.Schema` through the `schema` argument.
9+
must provide a `Data.Schema` through the `schema` argument
10+
can optionally provide an existing SQLite table name or new name that a created SQLite table will be called through the `tablename` argument
2311
`temp=true` will create a temporary SQLite table that will be destroyed automatically when the database is closed
2412
`ifnotexists=false` will throw an error if `tablename` already exists in `db`
2513
"""
26-
function Sink(db::DB,tablename::AbstractString="julia_"*randstring(),schema::Data.Schema=Data.EMPTYSCHEMA;temp::Bool=false,ifnotexists::Bool=true)
14+
function Sink(schema::Data.Schema,db::DB,tablename::AbstractString="julia_"*randstring();temp::Bool=false,ifnotexists::Bool=true)
2715
rows, cols = size(schema)
2816
temp = temp ? "TEMP" : ""
2917
ifnotexists = ifnotexists ? "if not exists" : ""
@@ -33,10 +21,14 @@ function Sink(db::DB,tablename::AbstractString="julia_"*randstring(),schema::Dat
3321
stmt = SQLite.Stmt(db,"insert into $tablename values ($params)")
3422
return Sink(schema,db,utf8(tablename),stmt)
3523
end
24+
25+
"constructs a new SQLite.Sink from the given `SQLite.Source`; uses `source` schema to create the SQLite table"
26+
function Sink(source::SQLite.Source, tablename::AbstractString="julia_"*randstring();temp::Bool=false,ifnotexists::Bool=true)
27+
return Sink(source.schema, source.db, tablename; temp=temp, ifnotexists=ifnotexists)
28+
end
3629
"constructs a new SQLite.Sink from the given `Data.Source`; uses `source` schema to create the SQLite table"
3730
function Sink(source::Data.Source, db::DB, tablename::AbstractString="julia_"*randstring();temp::Bool=false,ifnotexists::Bool=true)
38-
sink = Sink(db, tablename, source.schema; temp=temp, ifnotexists=ifnotexists)
39-
return Data.stream!(source,sink)
31+
return Sink(source.schema, db, tablename; temp=temp, ifnotexists=ifnotexists)
4032
end
4133

4234
# create a new SQLite table
@@ -59,7 +51,7 @@ function Data.stream!(dt::Data.Table,sink::SQLite.Sink)
5951
if rows*cols != 0
6052
for row = 1:rows
6153
for col = 1:cols
62-
@inbounds SQLite.getbind!(Data.column(dt,col,types[col]),row,col,sink.stmt)
54+
@inbounds SQLite.getbind!(Data.unsafe_column(dt,col,types[col]),row,col,sink.stmt)
6355
end
6456
SQLite.sqlite3_step(handle)
6557
SQLite.sqlite3_reset(handle)

0 commit comments

Comments
 (0)