Skip to content
6 changes: 5 additions & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ jobs:
arch:
- x64
- x86
nonblocking:
- true
- false
exclude:
# Don't test 32-bit on macOS
- os: macOS-latest
Expand Down Expand Up @@ -67,7 +70,7 @@ jobs:
cache-name: cache-artifacts
with:
path: ~/.julia/artifacts
key: ${{ runner.os }}-${{ matrix.arch }}-test-${{ env.cache-name }}-${{ hashFiles('**/Project.toml') }}
key: ${{ runner.os }}-${{ matrix.arch }}-test-${{ env.cache-name }}-nonblocking-${{ matrix.nonblocking }}-${{ hashFiles('**/Project.toml') }}
restore-keys: |
${{ runner.os }}-${{ matrix.arch }}-test-${{ env.cache-name }}-
${{ runner.os }}-${{ matrix.arch }}-test-
Expand Down Expand Up @@ -100,6 +103,7 @@ jobs:
run: |
echo "PGUSER=$USER" >> $GITHUB_ENV
echo "LIBPQJL_DATABASE_USER=$USER" >> $GITHUB_ENV
echo "LIBPQJL_CONNECTION_NONBLOCKING=${{ matrix.nonblocking }}" >> $GITHUB_ENV
if: ${{ runner.os == 'macOS' }}
- name: Start Homebrew PostgreSQL service
run: pg_ctl -D /usr/local/var/postgresql@$(psql --version | cut -f3 -d' ' | cut -f1 -d.) start
Expand Down
31 changes: 22 additions & 9 deletions src/asyncresults.jl
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ function _consume(jl_conn::Connection)
# this is important?
# https://github.com/postgres/postgres/blob/master/src/interfaces/libpq/fe-exec.c#L1266
# if we used non-blocking connections we would need to check for `1` as well
if libpq_c.PQflush(jl_conn.conn) < 0
# See _flush(jl_conn::Connection) in connections.jl
if !_flush(jl_conn)
error(LOGGER, Errors.PQConnectionError(jl_conn))
end

Expand Down Expand Up @@ -231,7 +232,7 @@ end

function _multi_async_execute(jl_conn::Connection, query::AbstractString; kwargs...)
async_result = _async_execute(jl_conn; kwargs...) do jl_conn
_async_submit(jl_conn.conn, query)
_async_submit(jl_conn, query)
end

return async_result
Expand All @@ -252,9 +253,10 @@ function async_execute(
string_params = string_parameters(parameters)
pointer_params = parameter_pointers(string_params)

async_result = _async_execute(jl_conn; binary_format=binary_format, kwargs...) do jl_conn
async_result =
_async_execute(jl_conn; binary_format=binary_format, kwargs...) do jl_conn
GC.@preserve string_params _async_submit(
jl_conn.conn, query, pointer_params; binary_format=binary_format
jl_conn, query, pointer_params; binary_format=binary_format
)
end

Expand Down Expand Up @@ -289,16 +291,22 @@ function _async_execute(
return async_result
end

function _async_submit(conn_ptr::Ptr{libpq_c.PGconn}, query::AbstractString)
return libpq_c.PQsendQuery(conn_ptr, query) == 1
function _async_submit(jl_conn::Connection, query::AbstractString)
send_status = libpq_c.PQsendQuery(jl_conn.conn::Ptr{libpq_c.PGconn}, query)
if isnonblocking(jl_conn)
return _flush(jl_conn)
else
return send_status == 1
end
end

function _async_submit(
conn_ptr::Ptr{libpq_c.PGconn},
jl_conn::Connection,
query::AbstractString,
parameters::Vector{Ptr{UInt8}};
binary_format::Bool=false,
)
conn_ptr::Ptr{libpq_c.PGconn} = jl_conn.conn
num_params = length(parameters)

send_status = libpq_c.PQsendQueryParams(
Expand All @@ -311,6 +319,11 @@ function _async_submit(
zeros(Cint, num_params), # all parameters in text format
Cint(binary_format), # return result in text or binary format
)

return send_status == 1
# send_status must be 1
# if nonblock, we also want to _flush
if isnonblocking(jl_conn)
return send_status == 1 && _flush(jl_conn)
else
return send_status == 1
end
end
92 changes: 88 additions & 4 deletions src/connections.jl
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ function Connection(
throw_error::Bool=true,
connect_timeout::Real=0,
options::Dict{String, String}=CONNECTION_OPTION_DEFAULTS,
nonblocking::Bool=false,
kwargs...
)
if options === CONNECTION_OPTION_DEFAULTS
Expand Down Expand Up @@ -300,7 +301,7 @@ function Connection(
)

# If password needed and not entered, prompt the user
if libpq_c.PQconnectionNeedsPassword(jl_conn.conn) == 1
connection = if libpq_c.PQconnectionNeedsPassword(jl_conn.conn) == 1
push!(keywords, "password")
user = unsafe_string(libpq_c.PQuser(jl_conn.conn))
# close this connection; will open another one below with the user-provided password
Expand All @@ -309,19 +310,33 @@ function Connection(
pass = Base.getpass(prompt)
push!(values, read(pass, String))
Base.shred!(pass)
return handle_new_connection(
handle_new_connection(
Connection(
_connect_nonblocking(keywords, values, false; timeout=connect_timeout);
kwargs...
);
throw_error=throw_error,
)
else
return handle_new_connection(
handle_new_connection(
jl_conn;
throw_error=throw_error,
)
end

if nonblocking
success = libpq_c.PQsetnonblocking(connection.conn, convert(Cint, nonblocking)) == 0
if success
return connection
elseif throw_error
close(connection)
error(LOGGER, "Could not provide a non-blocking connection")
else
warn(LOGGER, "Could not provide a non-blocking connection")
return connection
end
end
return connection
end

# AbstractLock primitives:
Expand Down Expand Up @@ -672,7 +687,7 @@ end
"""
ConnectionOption(pq_opt::libpq_c.PQconninfoOption) -> ConnectionOption

Construct a `ConnectionOption` from a `libpg_c.PQconninfoOption`.
Construct a `ConnectionOption` from a `libpq_c.PQconninfoOption`.
"""
function ConnectionOption(pq_opt::libpq_c.PQconninfoOption)
return ConnectionOption(
Expand Down Expand Up @@ -789,3 +804,72 @@ function socket(conn::Ptr{libpq_c.PGconn})
end

socket(jl_conn::Connection) = socket(jl_conn.conn)

"""
isnonblocking(jl_conn::Connection)

Sets the nonblocking connection status of the PG connections.
While async_execute is non-blocking on the receiving side,
the sending side is still nonblocking without this
Returns true on success, false on failure

https://www.postgresql.org/docs/current/libpq-async.html
"""
function setnonblocking(jl_conn::Connection; nonblocking=true)
return libpq_c.PQsetnonblocking(jl_conn.conn, convert(Cint, nonblocking)) == 0
end

"""
Checks whether the connection is non-blocking.
Returns true if the connection is set to non-blocking, false otherwise

https://www.postgresql.org/docs/current/libpq-async.html
"""
function isnonblocking(jl_conn::Connection)
return libpq_c.PQisnonblocking(jl_conn.conn) == 1
end

"""
_flush(jl_conn::Connection)

Do the _flush dance described in the libpq docs. Required when the
connections are set to nonblocking and we want do send queries/data
without blocking.

https://www.postgresql.org/docs/current/libpq-async.html#LIBPQ-PQFLUSH
"""
function _flush(jl_conn::Connection)
local watcher = nothing
if isnonblocking(jl_conn)
watcher = FDWatcher(socket(jl_conn), true, true) # can wait for reads and writes
end
try
while true
flushstatus = libpq_c.PQflush(jl_conn.conn)
# 0 indicates success
if flushstatus == 0
return true
# -1 indicates error
elseif flushstatus < 0
return false
# 1 indicates that we could not send all data without blocking,
elseif flushstatus == 1
# need to wait FD
# Only applicable when the connection is in nonblocking mode
wait(watcher) # Wait for the watcher
# If it becomes write-ready, call PQflush again.
if watcher.mask.writable
continue # Call PGflush again, to send more data
end
if watcher.mask.readable
# if the stream is readable, we have to consume data from the server first.
success = libpq_c.PQconsumeInput(jl_conn.conn) == 1
!success && return false
end
end
end
finally
# Just close the watcher
!isnothing(watcher) && close(watcher)
end
end
Loading