Skip to content

Commit cf37289

Browse files
committed
Add CopyOut! API
1 parent de302c7 commit cf37289

File tree

2 files changed

+138
-0
lines changed

2 files changed

+138
-0
lines changed

src/copy.jl

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,3 +81,87 @@ function execute(
8181
Result(copy_end_result, jl_conn, kwargs...); throw_error=throw_error
8282
)
8383
end
84+
85+
"""
86+
CopyOut!(data, query) -> CopyOut!
87+
88+
Create a `CopyOut!` query instance which can be executed to receive data from PostgreSQL via a
89+
`COPY <table_name> TO STDIN` query.
90+
91+
`query` must be a `COPY TO STDIN` query as described in the [PostgreSQL documentation](https://www.postgresql.org/docs/10/sql-copy.html).
92+
`COPY TO` queries which use a file or `PROGRAM` source can instead use the standard
93+
[`execute`](@ref) query interface.
94+
95+
`data` is an IOBuffer where strings of data received from PostgreSQL are written to.
96+
The data is received as text in CSV format.
97+
"""
98+
struct CopyOut!
99+
data::IOBuffer
100+
query::String
101+
end
102+
103+
"""
104+
execute(jl_conn::Connection, copyout::CopyOut!, args...;
105+
throw_error::Bool=true, kwargs...
106+
) -> Result
107+
108+
Runs [`execute`](@ref execute(::Connection, ::String)) on `copyout`'s query, then fills
109+
`copyout`'s data from the server.
110+
111+
All other arguments are passed through to the `execute` call for the initial query.
112+
"""
113+
function execute(
114+
jl_conn::Connection,
115+
copy::CopyOut!,
116+
parameters=nothing;
117+
throw_error=true,
118+
kwargs...,
119+
)
120+
level = throw_error ? error : warn
121+
if parameters !== nothing
122+
string_params = string_parameters(parameters)
123+
pointer_params = parameter_pointers(string_params)
124+
end
125+
126+
copy_end_result = lock(jl_conn) do
127+
if parameters === nothing
128+
result = _execute(jl_conn.conn, copy.query)
129+
else
130+
result = _execute(jl_conn.conn, copy.query, pointer_params)
131+
end
132+
result_status = libpq_c.PQresultStatus(result)
133+
134+
if result_status != libpq_c.PGRES_COPY_OUT
135+
if !(result_status in (libpq_c.PGRES_BAD_RESPONSE, libpq_c.PGRES_FATAL_ERROR))
136+
level(LOGGER, Errors.JLResultError(
137+
"Expected PGRES_COPY_OUT after COPY query, got $result_status"
138+
))
139+
end
140+
return result
141+
end
142+
143+
io = copy.data # store csv string
144+
async::Cint = 0 # blocking call
145+
rowRef = Ref{Cstring}()
146+
status_code = Cint(0)
147+
while (status_code = libpq_c.PQgetCopyData(jl_conn.conn, rowRef, async)) > 0
148+
rowPtr = rowRef[]
149+
write(io, unsafe_string(rowPtr))
150+
if rowPtr != C_NULL
151+
libpq_c.PQfreemem(convert(Ptr{Cvoid}, rowPtr))
152+
end
153+
end
154+
seekstart(io) # rewind iobuffer so future user read will begin from start
155+
if -2 == status_code
156+
level(LOGGER, Errors.JLResultError(
157+
"PQgetCopyData error: $(error_message(jl_conn))"
158+
))
159+
end
160+
161+
libpq_c.PQgetResult(jl_conn.conn)
162+
end
163+
164+
return handle_result(
165+
Result(copy_end_result, jl_conn, kwargs...); throw_error=throw_error
166+
)
167+
end

test/runtests.jl

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using LibPQ
22
using Test
3+
using CSV
34
using Dates
45
using DataFrames
56
using DataFrames: eachrow
@@ -444,6 +445,59 @@ end
444445
end
445446
end
446447

448+
@testset "COPY TO" begin
449+
@testset "Example COPY TO" begin
450+
conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER")
451+
452+
# make data
453+
no_nulls = map(string, 'a':'z')
454+
yes_nulls = Union{String, Missing}[isodd(Int(c)) ? string(c) : missing for c in 'a':'z']
455+
data = DataFrame(no_nulls=no_nulls, yes_nulls=yes_nulls)
456+
457+
row_strings = imap(eachrow(data)) do row
458+
if ismissing(row[:yes_nulls])
459+
"$(row[:no_nulls]),\n"
460+
else
461+
"$(row[:no_nulls]),$(row[:yes_nulls])\n"
462+
end
463+
end
464+
465+
# setup db table
466+
result = execute(conn, """
467+
CREATE TEMPORARY TABLE libpqjl_test (
468+
no_nulls varchar(10) PRIMARY KEY,
469+
yes_nulls varchar(10)
470+
);
471+
""")
472+
@test status(result) == LibPQ.libpq_c.PGRES_COMMAND_OK
473+
close(result)
474+
475+
# populate db table
476+
copyin = LibPQ.CopyIn("COPY libpqjl_test FROM STDIN (FORMAT CSV);", row_strings)
477+
478+
result = execute(conn, copyin)
479+
@test isopen(result)
480+
@test status(result) == LibPQ.libpq_c.PGRES_COMMAND_OK
481+
@test isempty(LibPQ.error_message(result))
482+
close(result)
483+
484+
# test CopyOut!
485+
databuf = IOBuffer()
486+
copyout = LibPQ.CopyOut!(databuf, "COPY (SELECT * FROM libpqjl_test) TO STDOUT (FORMAT CSV, HEADER, ENCODING 'UTF8');")
487+
488+
result = execute(conn, copyout)
489+
@test isopen(result)
490+
@test status(result) == LibPQ.libpq_c.PGRES_COMMAND_OK
491+
@test isempty(LibPQ.error_message(result))
492+
close(result)
493+
494+
df = DataFrame(CSV.File(databuf, stringtype=String))
495+
@test isequal(data, df)
496+
497+
close(conn)
498+
end
499+
end
500+
447501
@testset "LibPQ.Connection" begin
448502
@testset "do" begin
449503
local saved_conn

0 commit comments

Comments
 (0)