Skip to content
1 change: 1 addition & 0 deletions deps/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ postgresql/postgresql/src/interfaces/libpq/libpq.a:
cd postgresql/postgresql && patch -p0 < ../handle_row_data.patch
cd postgresql/postgresql && patch -p0 < ../fmt_err_msg.patch
cd postgresql/postgresql && patch -p0 < ../bind_fmt_text.patch
cd postgresql/postgresql && patch -p0 < ../pqsendpipelinesync.patch
#cd postgresql/postgresql && LD_LIBRARY_PATH="$(shell pwd)/libssl/openssl" ./configure --with-ssl=openssl --with-includes="$(shell pwd)/libssl/openssl/include/" --with-libraries="$(shell pwd)/libssl/openssl/" --without-readline --enable-debug CFLAGS="-ggdb -O0 -fno-omit-frame-pointer" CPPFLAGS="-g -O0"
cd postgresql/postgresql && LD_LIBRARY_PATH="$(SSL_LDIR)" ./configure --with-ssl=openssl --with-includes="$(SSL_IDIR)" --with-libraries="$(SSL_LDIR)" --without-readline
cd postgresql/postgresql/src/interfaces/libpq && CC=${CC} CXX=${CXX} ${MAKE} MAKELEVEL=0
Expand Down
84 changes: 84 additions & 0 deletions deps/postgresql/pqsendpipelinesync.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
diff --git src/interfaces/libpq/fe-exec.c src/interfaces/libpq/fe-exec.c
index b833e76..51ad8d8 100644
--- src/interfaces/libpq/fe-exec.c
+++ src/interfaces/libpq/fe-exec.c
@@ -4558,3 +4558,65 @@ int PShandleRowData(PGconn *conn, bool is_first_packet, PSresult* result) {
return psHandleRowData(conn, is_first_packet, result);
}

+int
+PQsendPipelineSync(PGconn *conn)
+{
+ PGcmdQueueEntry *entry;
+
+ if (!conn)
+ return 0;
+
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+ {
+ libpq_append_conn_error(conn, "cannot send pipeline when not in pipeline mode");
+ return 0;
+ }
+
+ switch (conn->asyncStatus)
+ {
+ case PGASYNC_COPY_IN:
+ case PGASYNC_COPY_OUT:
+ case PGASYNC_COPY_BOTH:
+ /* should be unreachable */
+ appendPQExpBufferStr(&conn->errorMessage,
+ "internal error: cannot send pipeline while in COPY\n");
+ return 0;
+ case PGASYNC_READY:
+ case PGASYNC_READY_MORE:
+ case PGASYNC_BUSY:
+ case PGASYNC_IDLE:
+ case PGASYNC_PIPELINE_IDLE:
+ /* OK to send sync */
+ break;
+ }
+
+ entry = pqAllocCmdQueueEntry(conn);
+ if (entry == NULL)
+ return 0; /* error msg already set */
+
+ entry->queryclass = PGQUERY_SYNC;
+ entry->query = NULL;
+
+ /* construct the Sync message */
+ if (pqPutMsgStart('S', conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ goto sendFailed;
+
+ /*
+ * Give the data a push (in pipeline mode, only if we're past the size
+ * threshold). In nonblock mode, don't complain if we're unable to send
+ * it all; PQgetResult() will do any additional flushing needed.
+ */
+ if (pqPipelineFlush(conn) < 0)
+ goto sendFailed;
+
+ /* OK, it's launched! */
+ pqAppendCmdQueueEntry(conn, entry);
+
+ return 1;
+
+sendFailed:
+ pqRecycleCmdQueueEntry(conn, entry);
+ /* error message should be set up already */
+ return 0;
+}
diff --git src/interfaces/libpq/libpq-fe.h src/interfaces/libpq/libpq-fe.h
index 47f25e0..b769b64 100644
--- src/interfaces/libpq/libpq-fe.h
+++ src/interfaces/libpq/libpq-fe.h
@@ -688,6 +688,9 @@ extern const PGresult *PQgetResultFromPGconn(PGconn *conn);
/* ProxySQL special handler function */
extern int PShandleRowData(PGconn *conn, bool is_first_packet, PSresult* result);

+/* Send a pipeline sync message without flushing the send buffer */
+extern int PQsendPipelineSync(PGconn *conn);
+
#ifdef __cplusplus
}
#endif
19 changes: 14 additions & 5 deletions include/PgSQL_ExplicitTxnStateMgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ struct TxnCmd {
*/
class PgSQL_TxnCmdParser {
public:
PgSQL_TxnCmdParser() noexcept { tokens.reserve(16); }
~PgSQL_TxnCmdParser() noexcept = default;

TxnCmd parse(std::string_view input, bool in_transaction_mode) noexcept;

private:
Expand All @@ -67,14 +70,20 @@ class PgSQL_TxnCmdParser {
TxnCmd parse_start(size_t& pos) noexcept;

// Helpers
static std::string to_lower(std::string_view s) noexcept {
std::string s_copy(s);
std::transform(s_copy.begin(), s_copy.end(), s_copy.begin(), ::tolower);
return s_copy;
inline static bool iequals(std::string_view a, std::string_view b) noexcept {
if (a.size() != b.size()) return false;
for (size_t i = 0; i < a.size(); ++i) {
char ca = a[i];
char cb = b[i];
if (ca >= 'A' && ca <= 'Z') ca += 32;
if (cb >= 'A' && cb <= 'Z') cb += 32;
if (ca != cb) return false;
}
return true;
}

inline static bool contains(std::vector<std::string_view>&& list, std::string_view value) noexcept {
for (const auto& item : list) if (item == value) return true;
for (const auto& item : list) if (iequals(item, value)) return true;
return false;
}
};
Expand Down
31 changes: 30 additions & 1 deletion include/gen_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -394,4 +394,33 @@ std::unique_ptr<SQLite3_result> get_SQLite3_resulset(MYSQL_RES* resultset);

std::vector<std::string> split_string(const std::string& str, char delimiter);

#endif /* __GEN_FUNCTIONS */
inline constexpr bool fast_isspace(unsigned char c) noexcept
{
// Matches: '\t' (0x09) through '\r' (0x0D), and ' ' (0x20)
// That is: '\t', '\n', '\v', '\f', '\r', ' '
//
// (c - '\t') < 5 -> true for 0x09-0x0D inclusive
// (c == ' ') -> true for space
//
// Use bitwise OR `|` (not logical `||`) to keep it branchless.
return (c == ' ') | (static_cast<unsigned char>(c - '\t') < 5);
}

inline constexpr char* fast_uint32toa(uint32_t value, char* out) noexcept {
char* p = out;
do {
*p++ = '0' + (value % 10);
value /= 10;
} while (value);
*p = '\0';
char* start = out;
char* end = p - 1;
while (start < end) {
char t = *start;
*start++ = *end;
*end-- = t;
}
return p;
}

#endif /* __GEN_FUNCTIONS */
15 changes: 5 additions & 10 deletions lib/PgSQL_Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1663,8 +1663,7 @@ void PgSQL_Connection::stmt_prepare_start() {
return;
}
} else {
// FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher
if (PQpipelineSync(pgsql_conn) == 0) {
if (PQsendPipelineSync(pgsql_conn) == 0) {
set_error_from_PQerrorMessage();
proxy_error("Failed to send pipeline sync. %s\n", get_error_code_with_message().c_str());
return;
Expand Down Expand Up @@ -1730,8 +1729,7 @@ void PgSQL_Connection::stmt_describe_start() {
return;
}
} else {
// FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher
if (PQpipelineSync(pgsql_conn) == 0) {
if (PQsendPipelineSync(pgsql_conn) == 0) {
set_error_from_PQerrorMessage();
proxy_error("Failed to send pipeline sync. %s\n", get_error_code_with_message().c_str());
return;
Expand All @@ -1755,8 +1753,7 @@ void PgSQL_Connection::resync_start() {

PQsetNoticeReceiver(pgsql_conn, &PgSQL_Connection::notice_handler_cb, this);

// FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher
if (PQpipelineSync(pgsql_conn) == 0) {
if (PQsendPipelineSync(pgsql_conn) == 0) {
proxy_error("Failed to send pipeline sync.\n");
resync_failed = true;
return;
Expand Down Expand Up @@ -1878,8 +1875,7 @@ void PgSQL_Connection::stmt_execute_start() {
return;
}
} else {
// FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher
if (PQpipelineSync(pgsql_conn) == 0) {
if (PQsendPipelineSync(pgsql_conn) == 0) {
set_error_from_PQerrorMessage();
proxy_error("Failed to send pipeline sync. %s\n", get_error_code_with_message().c_str());
return;
Expand All @@ -1905,8 +1901,7 @@ void PgSQL_Connection::reset_session_start() {

reset_session_in_pipeline = is_pipeline_active();
if (reset_session_in_pipeline) {
// FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher
if (PQpipelineSync(pgsql_conn) == 0) {
if (PQsendPipelineSync(pgsql_conn) == 0) {
set_error_from_PQerrorMessage();
proxy_error("Failed to send pipeline sync. %s\n", get_error_code_with_message().c_str());
return;
Expand Down
3 changes: 2 additions & 1 deletion lib/PgSQL_Data_Stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,8 @@ void PgSQL_Data_Stream::shut_hard() {
}

void PgSQL_Data_Stream::check_data_flow() {
if ((PSarrayIN->len || queue_data(queueIN)) && (PSarrayOUT->len || queue_data(queueOUT))) {
// This does not apply to frontend data streams because response data is buffered during extended queries.
if ((myds_type != MYDS_FRONTEND) && (PSarrayIN->len || queue_data(queueIN)) && (PSarrayOUT->len || queue_data(queueOUT))) {
// there is data at both sides of the data stream: this is considered a fatal error
proxy_error("Session=%p, DataStream=%p -- Data at both ends of a PgSQL data stream: IN <%d bytes %d packets> , OUT <%d bytes %d packets>\n", sess, this, PSarrayIN->len, queue_data(queueIN), PSarrayOUT->len, queue_data(queueOUT));
shut_soft();
Expand Down
Loading