diff --git a/Project.toml b/Project.toml index 8223d27a..c52fb0b9 100644 --- a/Project.toml +++ b/Project.toml @@ -37,8 +37,9 @@ TimeZones = "0.9.2, 0.10, 0.11, 1" julia = "1.6" [extras] +CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b" DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] -test = ["Test", "DataFrames"] +test = ["Test", "CSV", "DataFrames"] diff --git a/docs/src/index.md b/docs/src/index.md index 3d5c0121..b12803d8 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -75,9 +75,9 @@ LibPQ.load!( execute(conn, "COMMIT;") ``` -### `COPY` +### `COPY FROM` -An alternative to repeated `INSERT` queries is the PostgreSQL `COPY` query. +An alternative to repeated `INSERT` queries is the PostgreSQL `COPY FROM` query. `LibPQ.CopyIn` makes it easier to stream data to the server using a `COPY FROM STDIN` query. ```julia @@ -99,3 +99,23 @@ execute(conn, copyin) close(conn) ``` + +### `COPY TO` + +An alternative to selection for large datasets in `SELECT` queries is the PostgreSQL `COPY TO` query. +`LibPQ.CopyOut!` makes it easier to stream data out of the server using a `COPY TO STDIN` query. + +```julia +using LibPQ, CSV, DataFrames + +conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") + +databuf = IOBuffer() +copyout = LibPQ.CopyOut!(databuf, "COPY (SELECT * FROM libpqjl_test) TO STDOUT (FORMAT CSV, HEADER);") + +execute(conn, copyout) + +df = DataFrame(CSV.File(databuf)) + +close(conn) +``` \ No newline at end of file diff --git a/docs/src/pages/api.md b/docs/src/pages/api.md index 7296e593..198b1021 100644 --- a/docs/src/pages/api.md +++ b/docs/src/pages/api.md @@ -43,6 +43,8 @@ LibPQ.load! ```@docs LibPQ.CopyIn execute(::LibPQ.Connection, ::LibPQ.CopyIn) +LibPQ.CopyOut! +execute(::LibPQ.Connection, ::LibPQ.CopyOut!) ``` ### Asynchronous diff --git a/src/copy.jl b/src/copy.jl index 60e8ae56..166eb50d 100644 --- a/src/copy.jl +++ b/src/copy.jl @@ -81,3 +81,77 @@ function execute( Result(copy_end_result, jl_conn, kwargs...); throw_error=throw_error ) end + +""" + CopyOut!(data, query) -> CopyOut! + +Create a `CopyOut!` query instance which can be executed to receive data from PostgreSQL via a +`COPY TO STDIN` query. + +`query` must be a `COPY TO STDIN` query as described in the [PostgreSQL documentation](https://www.postgresql.org/docs/10/sql-copy.html). +`COPY TO` queries which use a file or `PROGRAM` source can instead use the standard +[`execute`](@ref) query interface. + +`data` is an IOBuffer where strings of data received from PostgreSQL are written to. +The data is received as text in CSV format. +""" +struct CopyOut! + data::IOBuffer + query::String +end + +""" + execute(jl_conn::Connection, copyout::CopyOut!, args...; + throw_error::Bool=true, kwargs... + ) -> Result + +Runs [`execute`](@ref execute(::Connection, ::String)) on `copyout`'s query, then fills +`copyout`'s data from the server. + +All other arguments are passed through to the `execute` call for the initial query. +""" +function execute( + jl_conn::Connection, + copy::CopyOut!, + parameters=nothing; + throw_error=true, + kwargs..., +) + level = throw_error ? error : warn + if parameters !== nothing + # https://postgrespro.com/list/thread-id/1893680 + throw(ArgumentError("COPY can't take any parameter")) + end + + copy_end_result = lock(jl_conn) do + result = _execute(jl_conn.conn, copy.query) + result_status = libpq_c.PQresultStatus(result) + + if result_status != libpq_c.PGRES_COPY_OUT + level(LOGGER, Errors.JLResultError( + "Expected PGRES_COPY_OUT after COPY query, got $result_status" + )) + return result + end + + io = copy.data # store csv string + async::Cint = 0 # blocking call + rowRef = Ref{Cstring}() + status_code = Cint(0) + while (status_code = libpq_c.PQgetCopyData(jl_conn.conn, rowRef, async)) > 0 + rowPtr = rowRef[] + write(io, unsafe_string(rowPtr)) + if rowPtr != C_NULL + libpq_c.PQfreemem(convert(Ptr{Cvoid}, rowPtr)) + end + end + seekstart(io) # rewind iobuffer so future user read will begin from start + -2 == status_code && level(LOGGER, Errors.JLResultError("PQgetCopyData error: $(error_message(jl_conn))")) + + libpq_c.PQgetResult(jl_conn.conn) + end + + return handle_result( + Result(copy_end_result, jl_conn, kwargs...); throw_error=throw_error + ) +end diff --git a/test/runtests.jl b/test/runtests.jl index 07cc8478..69331212 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,5 +1,6 @@ using LibPQ using Test +using CSV using Dates using DataFrames using DataFrames: eachrow @@ -444,6 +445,86 @@ end end end + @testset "COPY TO" begin + @testset "Example COPY TO" begin + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") + + # make data + no_nulls = map(string, 'a':'z') + yes_nulls = Union{String, Missing}[isodd(Int(c)) ? string(c) : missing for c in 'a':'z'] + data = DataFrame(no_nulls=no_nulls, yes_nulls=yes_nulls) + + row_strings = imap(eachrow(data)) do row + if ismissing(row[:yes_nulls]) + "$(row[:no_nulls]),\n" + else + "$(row[:no_nulls]),$(row[:yes_nulls])\n" + end + end + + # setup db table + result = execute(conn, """ + CREATE TEMPORARY TABLE libpqjl_test ( + no_nulls varchar(10) PRIMARY KEY, + yes_nulls varchar(10) + ); + """) + @test status(result) == LibPQ.libpq_c.PGRES_COMMAND_OK + close(result) + + # populate db table + copyin = LibPQ.CopyIn("COPY libpqjl_test FROM STDIN (FORMAT CSV);", row_strings) + + result = execute(conn, copyin) + @test isopen(result) + @test status(result) == LibPQ.libpq_c.PGRES_COMMAND_OK + @test isempty(LibPQ.error_message(result)) + close(result) + + # test CopyOut! + databuf = IOBuffer() + copyout = LibPQ.CopyOut!(databuf, "COPY (SELECT * FROM libpqjl_test) TO STDOUT (FORMAT CSV, HEADER, ENCODING 'UTF8');") + + result = execute(conn, copyout) + @test isopen(result) + @test status(result) == LibPQ.libpq_c.PGRES_COMMAND_OK + @test isempty(LibPQ.error_message(result)) + close(result) + + @static if VERSION >= v"1.3.0-1" + csvfile = CSV.File(databuf, stringtype=String) + else + csvfile = CSV.File(databuf) + end + df = DataFrame(csvfile) + @test isequal(data, df) + + close(conn) + end + + @testset "Wrong COPY TO" begin + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") + + # test CopyOut! with an error + databuf = IOBuffer() + copyout = LibPQ.CopyOut!(databuf, "SELECT libpqjl_test;") + + result = execute(conn, copyout; throw_error=false) + @test isopen(result) + @test status(result) == LibPQ.libpq_c.PGRES_FATAL_ERROR + + err_msg = LibPQ.error_message(result) + @test occursin("ERROR", err_msg) + close(result) + + # parameters are not supported + copyout = LibPQ.CopyOut!(databuf, "COPY (SELECT * FROM libpqjl_test WHERE no_nulls = \$1) TO STDOUT (FORMAT CSV, HEADER);") + @test_throws ArgumentError execute(conn, copyout, ['z']) + + close(conn) + end + end + @testset "LibPQ.Connection" begin @testset "do" begin local saved_conn