Skip to content

Commit 70472ff

Browse files
mcmcgrath13quinnj
authored andcommitted
Streaming queries (#124)
For large datasets, often don't want to load the full set at once, this allows fetching and processing rows one at a time
1 parent a0e70de commit 70472ff

File tree

6 files changed

+75
-45
lines changed

6 files changed

+75
-45
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ os:
88
- linux
99

1010
julia:
11-
- 1.0
11+
- 1.1
1212
- nightly
1313

1414
notifications:

Project.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ version = "0.7.0"
77
BinaryProvider = "b99e7846-7c00-51b0-8f62-c81ae34c0232"
88
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
99
DecFP = "55939f99-70c6-5e9b-8bb0-5071ed7d61fd"
10+
Libdl = "8f399da3-3557-5675-b5ff-fb832c97cbdb"
1011
Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c"
1112

1213
[compat]

README.md

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ julia> Pkg.add("MySQL")
2828

2929
## Project Status
3030

31-
The package is tested against the current Julia `1.0` release and nightly on Linux and OS X.
31+
The package is tested against the current Julia `1.1` release and nightly on Linux and OS X.
3232

3333
## Contributing and Questions
3434

@@ -121,15 +121,9 @@ MySQL.escape(conn::MySQL.Connection, str::String) -> String
121121
```
122122
Escape an SQL statement
123123

124-
#### MySQL.Query (previously MySQL.query)
124+
#### MySQL.query (deprecated)
125125

126-
```julia
127-
MySQL.Query(conn::MySQL.Connection, sql::String; append::Bool=false) => sink
128-
```
129-
Execute an SQL statement and return the results as a MySQL.Query object (see [MySQL.Query](#mysqlquery)).
130-
131-
The results can be materialized as a data sink that implements the Tables.jl interface.
132-
E.g. `MySQL.Query(conn, sql) |> DataFrame` or `MySQL.Query(conn, sql) |> columntable`
126+
Deprecated - see [MySQL.Query](#mysqlquery)
133127

134128
#### MySQL.execute!
135129

@@ -175,10 +169,17 @@ Alternately, a source implementing the Tables.jl interface can be streamed by ex
175169
#### MySQL.Query
176170

177171
```julia
178-
MySQL.Query(conn, sql, sink=Data.Table, kwargs...) => MySQL.Query
172+
MySQL.Query(conn, sql, kwargs...) => MySQL.Query
179173
```
180174

181-
Execute an SQL statement and return a `MySQL.Query` object. Result rows can be iterated.
175+
Execute an SQL statement and return a `MySQL.Query` object. Result rows can be
176+
iterated as NamedTuples via `Table.rows(query)` where `query` is the `MySQL.Query`
177+
object.
178+
179+
Supported Key Word Arguments:
180+
* `streaming` - Defaults to false. If true, length of the result size is unknown as the result is returned row by row. May be more memory efficient.
181+
182+
To materialize the results as a `DataFrame`, use `MySQL.Query(conn, sql) |> DataFrame`.
182183

183184
### Example
184185

@@ -189,7 +190,7 @@ using DataFrames
189190

190191
conn = MySQL.connect("localhost", "root", "password", db = "test_db")
191192

192-
foo = MySQL.query(conn, """SELECT COUNT(*) FROM my_first_table;""") |> DataFrame
193+
foo = MySQL.Query(conn, """SELECT COUNT(*) FROM my_first_table;""") |> DataFrame
193194
num_foo = foo[1,1]
194195

195196
my_stmt = MySQL.Stmt(conn, """INSERT INTO my_second_table ('foo_id','foo_name') VALUES (?,?);""")

src/api.jl

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ const MYSQL_ROW = Ptr{Ptr{Cchar}} # pointer to an array of strings
1919
const MYSQL_TYPE = UInt32
2020

2121
"""
22-
The field object that contains the metadata of the table.
22+
The field object that contains the metadata of the table.
2323
Returned by mysql_fetch_fields API.
2424
"""
2525
struct MYSQL_FIELD
@@ -114,8 +114,8 @@ struct MYSQL_BIND
114114
store_param_func::Ptr{Cvoid}
115115
fetch_result::Ptr{Cvoid}
116116
skip_result::Ptr{Cvoid}
117-
buffer_length::Culong
118-
offset::Culong
117+
buffer_length::Culong
118+
offset::Culong
119119
length_value::Culong
120120
param_number::Cuint
121121
pack_length::Cuint
@@ -191,20 +191,6 @@ macro c(func, ret, args, vals...)
191191
end
192192
end
193193

194-
# function mysql_library_init(argc=0, argv=C_NULL, groups=C_NULL)
195-
# return ccall((:mysql_library_init, libmariadb),
196-
# Cint,
197-
# (Cint, Ptr{Ptr{UInt8}}, Ptr{Ptr{UInt8}}),
198-
# argc, argv, groups)
199-
# end
200-
201-
# function mysql_library_end()
202-
# return ccall((:mysql_library_end, libmariadb),
203-
# Cvoid,
204-
# (),
205-
# )
206-
# end
207-
208194
"""
209195
Initializes the MYSQL object. Must be called before mysql_real_connect.
210196
Memory allocated by mysql_init can be freed with mysql_close.
@@ -428,6 +414,9 @@ function mysql_stmt_bind_result(stmtptr, bind::Ptr{MYSQL_BIND})
428414
bind)
429415
end
430416

417+
"""
418+
Submit a query to the server
419+
"""
431420
function mysql_query(mysqlptr::Ptr{Cvoid}, sql::String)
432421
return @c(:mysql_query,
433422
Cint,
@@ -436,13 +425,26 @@ function mysql_query(mysqlptr::Ptr{Cvoid}, sql::String)
436425
sql)
437426
end
438427

428+
"""
429+
After mysql_query or mysql_real_query used to store result in memory and send all to client
430+
"""
439431
function mysql_store_result(mysqlptr::Ptr{Cvoid})
440432
return @c(:mysql_store_result,
441433
MYSQL_RES,
442434
(Ptr{Cvoid}, ),
443435
mysqlptr)
444436
end
445437

438+
"""
439+
After mysql_query or mysql_real_query used to stream result and send to client row by row
440+
"""
441+
function mysql_use_result(mysqlptr::Ptr{Cvoid})
442+
return @c(:mysql_use_result,
443+
MYSQL_RES,
444+
(Ptr{Cvoid}, ),
445+
mysqlptr)
446+
end
447+
446448
"""
447449
Returns the field metadata.
448450
"""

src/types.jl

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ function metadata(result::API.MYSQL_RES)
4848
return unsafe_wrap(Array, rawfields, nfields)
4949
end
5050

51-
mutable struct Query{hasresult, names, T}
51+
# resulttype is a symbol with values :none :streaming :default
52+
# names and types relate to the returned columns in the query
53+
mutable struct Query{resulttype, names, T}
5254
result::Result
5355
ptr::Ptr{Ptr{Int8}}
5456
ncols::Int
@@ -62,39 +64,51 @@ function julia_type(field_type, notnullable, isunsigned)
6264
end
6365

6466
"""
65-
MySQL.Query(conn, sql, sink=Data.Table; kwargs...) => MySQL.Query
67+
MySQL.Query(conn, sql; kwargs...) => MySQL.Query
6668
6769
Execute an SQL statement and return a `MySQL.Query` object. Result rows can be
68-
iterated as NamedTuples via `Data.rows(query)` where `query` is the `MySQL.Query`
70+
iterated as NamedTuples via `Table.rows(query)` where `query` is the `MySQL.Query`
6971
object.
7072
71-
To materialize the results as a `DataFrame`, use `MySQL.query(conn, sql) |> DataFrame`.
73+
Supported Key Word Arguments:
74+
* `streaming` - Defaults to false. If true, length of the result size is unknown as the result is returned row by row. May be more memory efficient.
75+
76+
To materialize the results as a `DataFrame`, use `MySQL.Query(conn, sql) |> DataFrame`.
7277
"""
73-
function Query(conn::Connection, sql::String; kwargs...)
78+
function Query(conn::Connection, sql::String; streaming::Bool=false, kwargs...)
7479
conn.ptr == C_NULL && throw(MySQLInterfaceError("Method called with null connection."))
7580
MySQL.API.mysql_query(conn.ptr, sql) != 0 && throw(MySQLInternalError(conn))
76-
result = MySQL.Result(MySQL.API.mysql_store_result(conn.ptr))
81+
82+
if streaming
83+
resulttype = :streaming
84+
result = MySQL.Result(MySQL.API.mysql_use_result(conn.ptr))
85+
else
86+
resulttype = :default
87+
result = result = MySQL.Result(MySQL.API.mysql_store_result(conn.ptr))
88+
end
89+
7790
if result.ptr != C_NULL
7891
nrows = MySQL.API.mysql_num_rows(result.ptr)
7992
fields = MySQL.metadata(result.ptr)
8093
names = Tuple(ccall(:jl_symbol_n, Ref{Symbol}, (Ptr{UInt8}, Csize_t), x.name, x.name_length) for x in fields)
8194
T = Tuple{(julia_type(x.field_type, API.notnullable(x), API.isunsigned(x)) for x in fields)...}
82-
hasresult = true
8395
ncols = length(fields)
8496
ptr = MySQL.API.mysql_fetch_row(result.ptr)
8597
elseif API.mysql_field_count(conn.ptr) == 0
8698
result = Result(Int(API.mysql_affected_rows(conn.ptr)))
8799
nrows = ncols = 1
88100
names = (:num_rows_affected,)
89101
T = Tuple{Int}
90-
hasresult = false
102+
resulttype = :none
91103
ptr = C_NULL
92104
else
93105
throw(MySQLInterfaceError("Query expected to produce results but did not."))
94106
end
95-
return Query{hasresult, names, T}(result, ptr, ncols, nrows)
107+
return Query{resulttype, names, T}(result, ptr, ncols, nrows)
96108
end
97109

110+
Base.IteratorSize(::Type{Query{resulttype, names, T}}) where {resulttype, names, T} = resulttype == :streaming ? Base.SizeUnknown() : Base.HasLength()
111+
98112
Tables.istable(::Type{<:Query}) = true
99113
Tables.rowaccess(::Type{<:Query}) = true
100114
Tables.rows(q::Query) = q
@@ -126,9 +140,8 @@ function generate_namedtuple(::Type{NamedTuple{names, types}}, q) where {names,
126140
end
127141
end
128142

129-
function Base.iterate(q::Query{hasresult, names, types}, st=1) where {hasresult, names, types}
130-
st > length(q) && return nothing
131-
!hasresult && return (num_rows_affected=Int(q.result.ptr),), 2
143+
function Base.iterate(q::Query{resulttype, names, types}, st=1) where {resulttype, names, types}
144+
st == 1 && resulttype == :none && return (num_rows_affected=Int(q.result.ptr),), 2
132145
q.ptr == C_NULL && return nothing
133146
nt = generate_namedtuple(NamedTuple{names, types}, q)
134147
q.ptr = API.mysql_fetch_row(q.result.ptr)

test/runtests.jl

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
using Test, MySQL, Tables, Dates
22

33
if haskey(ENV, "APPVEYOR_BUILD_NUMBER")
4-
pwd = "Password12!"
4+
pswd = "Password12!"
55
else
6-
pwd = ""
6+
pswd = ""
77
end
88

9-
const conn = MySQL.connect("127.0.0.1", "root", pwd; port=3306)
9+
const conn = MySQL.connect("127.0.0.1", "root", pswd; port=3306)
1010

1111
MySQL.execute!(conn, "DROP DATABASE if exists mysqltest")
1212
MySQL.execute!(conn, "CREATE DATABASE mysqltest")
@@ -58,6 +58,19 @@ expected = (
5858

5959
@test res == expected
6060

61+
# Streaming Queries
62+
sres = MySQL.Query(conn, "select * from Employee", streaming=true)
63+
64+
@test sres.nrows == 0
65+
66+
data = []
67+
for row in sres
68+
push!(data, row)
69+
end
70+
@test length(data) == 4
71+
@test length(data[1]) == 10
72+
@test data[1].Name == "John"
73+
6174
# insert null row
6275
MySQL.execute!(conn, "INSERT INTO Employee () VALUES ();")
6376

0 commit comments

Comments
 (0)