@@ -23,23 +23,20 @@ function Sink(db::DB, schema::Data.Schema; name::AbstractString="julia_"*randstr
23
23
return Sink (schema, db, name, stmt, " " )
24
24
end
25
25
26
- " constructs a new SQLite.Sink from the given `SQLite.Source`; uses `source` schema to create the SQLite table"
27
- function Sink (source:: SQLite.Source ; name:: AbstractString = " julia_" * randstring (), temp:: Bool = false , ifnotexists:: Bool = true )
28
- return Sink (source. db, source. schema; name= name, temp= temp, ifnotexists= ifnotexists)
29
- end
30
- " constructs a new SQLite.Sink from the given `Data.Source`; uses `source` schema to create the SQLite table"
31
- function Sink (db:: DB , source; name:: AbstractString = " julia_" * randstring (), temp:: Bool = false , ifnotexists:: Bool = true )
32
- return Sink (db, Data. schema (source); name= name, temp= temp, ifnotexists= ifnotexists)
33
- end
34
-
35
26
# DataStreams interface
36
27
Data. streamtypes {T<:SQLite.Sink} (:: Type{T} ) = [Data. Field]
37
28
38
- function Sink {T} (source, :: Type{T} , append:: Bool , db:: DB , name:: AbstractString = " julia_" * randstring ())
39
- sink = Sink (db, Data. schema (source); name= name, append= append)
29
+ function Sink {T} (sch:: Data.Schema , :: Type{T} , append:: Bool , ref:: Vector{UInt8} , db:: DB , name:: AbstractString = " julia_" * randstring (); kwargs... )
30
+ sink = Sink (db, sch; name= name, append= append, kwargs... )
31
+ execute! (sink. db," PRAGMA synchronous = OFF;" )
32
+ sink. transaction = string (" SQLITE" ,randstring (10 ))
33
+ transaction (sink. db, sink. transaction)
40
34
return sink
41
35
end
42
- function Sink {T} (sink, source, :: Type{T} , append:: Bool )
36
+ function Sink {T} (sink, sch:: Data.Schema , :: Type{T} , append:: Bool , ref:: Vector{UInt8} )
37
+ execute! (sink. db," PRAGMA synchronous = OFF;" )
38
+ sink. transaction = string (" SQLITE" ,randstring (10 ))
39
+ transaction (sink. db, sink. transaction)
43
40
! append && execute! (sink. db, " delete from $(esc_id (sink. tablename)) " )
44
41
return sink
45
42
end
65
62
getbind! {T} (val:: T , col, stmt) = SQLite. bind! (stmt, col, val)
66
63
end
67
64
68
- function Data. open! (sink:: SQLite.Sink , source)
69
- execute! (sink. db," PRAGMA synchronous = OFF;" )
70
- sink. transaction = string (" SQLITE" ,randstring (10 ))
71
- transaction (sink. db, sink. transaction)
72
- return nothing
73
- end
74
-
75
- function Data. streamfield! {T} (sink:: SQLite.Sink , source, :: Type{T} , row, col, cols)
76
- val = Data. getfield (source, T, row ,col)
65
+ function Data. stream! {T} (sink:: SQLite.Sink , :: Type{Data.Field} , val:: T , row, col, cols)
77
66
getbind! (val, col, sink. stmt)
78
67
if col == cols
79
68
SQLite. sqlite3_step (sink. stmt. handle)
84
73
85
74
function Data. cleanup! (sink:: SQLite.Sink )
86
75
rollback (sink. db, sink. transaction)
76
+ commit (sink. db, sink. transaction)
87
77
execute! (sink. db, " PRAGMA synchronous = ON;" )
88
78
return nothing
89
79
end
@@ -101,26 +91,16 @@ Load a Data.Source `source` into an SQLite table that will be named `tablename`
101
91
`temp=true` will create a temporary SQLite table that will be destroyed automatically when the database is closed
102
92
`ifnotexists=false` will throw an error if `tablename` already exists in `db`
103
93
"""
104
- function load {T} (db, name, :: Type{T} , args... ;
105
- temp:: Bool = false ,
106
- ifnotexists:: Bool = true ,
107
- append:: Bool = false )
108
- source = T (args... )
109
- schema = Data. schema (source)
110
- sink = Sink (db, schema; name= name, temp= temp, ifnotexists= ifnotexists, append= append)
111
- Data. stream! (source, sink, append)
94
+ function load {T} (db, name, :: Type{T} , args... ; append:: Bool = false , transforms:: Dict = Dict {Int,Function} (), kwargs... )
95
+ sink = Data. stream! (T (args... ), SQLite. Sink, append, transforms, db, name; kwargs... )
112
96
Data. close! (sink)
113
97
return sink
114
98
end
115
- function load {T} (db, name, source:: T ;
116
- temp:: Bool = false ,
117
- ifnotexists:: Bool = true ,
118
- append:: Bool = false )
119
- sink = Sink (db, Data. schema (source); name= name, temp= temp, ifnotexists= ifnotexists, append= append)
120
- Data. stream! (source, sink, append)
99
+ function load {T} (db, name, source:: T ; append:: Bool = false , transforms:: Dict = Dict {Int,Function} (), kwargs... )
100
+ sink = Data. stream! (source, SQLite. Sink, append, transforms, db, name; kwargs... )
121
101
Data. close! (sink)
122
102
return sink
123
103
end
124
104
125
- load {T} (sink:: Sink , :: Type{T} , args... ; append:: Bool = false ) = (sink = Data. stream! (T (args... ), sink, append); Data. close! (sink); return sink)
126
- load (sink:: Sink , source; append:: Bool = false ) = (sink = Data. stream! (source, sink, append); Data. close! (sink); return sink)
105
+ load {T} (sink:: Sink , :: Type{T} , args... ; append:: Bool = false , transforms :: Dict = Dict {Int,Function} ()) = (sink = Data. stream! (T (args... ), sink, append, transforms ); Data. close! (sink); return sink)
106
+ load (sink:: Sink , source; append:: Bool = false , transforms :: Dict = Dict {Int,Function} ()) = (sink = Data. stream! (source, sink, append, transforms ); Data. close! (sink); return sink)
0 commit comments