Skip to content
28 changes: 17 additions & 11 deletions src/asyncresults.jl
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,8 @@ function _consume(jl_conn::Connection)
# this is important?
# https://github.com/postgres/postgres/blob/master/src/interfaces/libpq/fe-exec.c#L1266
# if we used non-blocking connections we would need to check for `1` as well
if libpq_c.PQflush(jl_conn.conn) < 0
error(LOGGER, Errors.PQConnectionError(jl_conn))
end
# See flush(jl_conn::Connection) in connections.jl
flush(jl_conn)

async_result = jl_conn.async_result
result_ptrs = Ptr{libpq_c.PGresult}[]
Expand Down Expand Up @@ -231,7 +230,7 @@ end

function _multi_async_execute(jl_conn::Connection, query::AbstractString; kwargs...)
async_result = _async_execute(jl_conn; kwargs...) do jl_conn
_async_submit(jl_conn.conn, query)
_async_submit(jl_conn, query)
end

return async_result
Expand All @@ -252,9 +251,10 @@ function async_execute(
string_params = string_parameters(parameters)
pointer_params = parameter_pointers(string_params)

async_result = _async_execute(jl_conn; binary_format=binary_format, kwargs...) do jl_conn
async_result =
_async_execute(jl_conn; binary_format=binary_format, kwargs...) do jl_conn
GC.@preserve string_params _async_submit(
jl_conn.conn, query, pointer_params; binary_format=binary_format
jl_conn, query, pointer_params; binary_format=binary_format
)
end

Expand Down Expand Up @@ -289,16 +289,22 @@ function _async_execute(
return async_result
end

function _async_submit(conn_ptr::Ptr{libpq_c.PGconn}, query::AbstractString)
return libpq_c.PQsendQuery(conn_ptr, query) == 1
function _async_submit(jl_conn::Connection, query::AbstractString)
send_status = libpq_c.PQsendQuery(jl_conn.conn::Ptr{libpq_c.PGconn}, query)
if isnonblocking(jl_conn) == 0
return send_status == 1
else
return flush(jl_conn)
end
end

function _async_submit(
conn_ptr::Ptr{libpq_c.PGconn},
jl_conn::Connection,
query::AbstractString,
parameters::Vector{Ptr{UInt8}};
binary_format::Bool=false,
)
conn_ptr::Ptr{libpq_c.PGconn} = jl_conn.conn
num_params = length(parameters)

send_status = libpq_c.PQsendQueryParams(
Expand All @@ -311,6 +317,6 @@ function _async_submit(
zeros(Cint, num_params), # all parameters in text format
Cint(binary_format), # return result in text or binary format
)

return send_status == 1
# send_status must be 1, if nonblock, we also want to flush
return send_status == 1 && (isnonblocking(jl_conn) == 0 || flush(jl_conn))
end
63 changes: 62 additions & 1 deletion src/connections.jl
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ end
"""
ConnectionOption(pq_opt::libpq_c.PQconninfoOption) -> ConnectionOption

Construct a `ConnectionOption` from a `libpg_c.PQconninfoOption`.
Construct a `ConnectionOption` from a `libpq_c.PQconninfoOption`.
"""
function ConnectionOption(pq_opt::libpq_c.PQconninfoOption)
return ConnectionOption(
Expand Down Expand Up @@ -789,3 +789,64 @@ function socket(conn::Ptr{libpq_c.PGconn})
end

socket(jl_conn::Connection) = socket(jl_conn.conn)

"""
Sets the nonblocking connection status of the PG connections.
While async_execute is non-blocking on the receiving side,
the sending side is still nonblockign without this
Returns true on success, false on failure

https://www.postgresql.org/docs/current/libpq-async.html
"""
function setnonblocking(jl_conn::Connection; nonblock=true)
return libpq_c.PQsetnonblocking(jl_conn.conn, convert(Cint, nonblock)) == 0
end

"""
Checks whether the connection is non-blocking.
Returns true if the connection is set to non-blocking, false otherwise

https://www.postgresql.org/docs/current/libpq-async.html
"""
function isnonblocking(jl_conn)
return libpq_c.PQisnonblocking(jl_conn.conn) == 1
end

"""
Do the flush dance described in the libpq docs. Required when the
connections are set to nonblocking and we want do send queries/data
without blocking.

https://www.postgresql.org/docs/current/libpq-async.html#LIBPQ-PQFlush
"""
function flush(jl_conn)
watcher = FDWatcher(socket(jl_conn), true, true) # can wait for reads and writes
try
while true # Iterators.repeated(true) # would make me more comfotable I think
flushstatus = libpq_c.PQflush(jl_conn.conn)
# 0 indicates success
flushstatus == 0 && return true
# -1 indicates error
flushstatus < 0 && error(LOGGER, Errors.PQConnectionError(jl_conn))
# Could not send all data without blocking, need to wait FD
flushstatus == 1 && begin
wait(watcher) # Wait for the watcher
# If it becomes write-ready, call PQflush again.
if watcher.mask.writable
continue # Call PGflush again, to send more data
end
if watcher.mask.readable
# if the stream is readable, we have to consume data from the server first.
success = libpq_c.PQconsumeInput(jl_conn.conn) == 1
!success && error(LOGGER, Errors.PQConnectionError(jl_conn))
end
end
end
catch
# We don't want to manage anything here
rethrow()
finally
# Just close the watcher
close(watcher)
end
end
Loading