|
56 | 56 | #define FLOAT8LABEL "float8" |
57 | 57 | #define FLOAT8OID 701 |
58 | 58 |
|
| 59 | +#define FIN_DISCARD 0x1 |
| 60 | +#define FIN_CLOSE 0x2 |
| 61 | +#define FIN_ABORT 0x4 |
59 | 62 |
|
60 | 63 |
|
61 | | -static int pgsql_stmt_dtor(pdo_stmt_t *stmt) |
| 64 | + |
| 65 | +static void pgsql_stmt_finish(pdo_pgsql_stmt *S, int fin_mode) |
62 | 66 | { |
63 | | - pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data; |
64 | | - bool server_obj_usable = !Z_ISUNDEF(stmt->database_object_handle) |
65 | | - && IS_OBJ_VALID(EG(objects_store).object_buckets[Z_OBJ_HANDLE(stmt->database_object_handle)]) |
66 | | - && !(OBJ_FLAGS(Z_OBJ(stmt->database_object_handle)) & IS_OBJ_FREE_CALLED); |
| 67 | + pdo_pgsql_db_handle *H = S->H; |
| 68 | + |
| 69 | + if (S->is_running_unbuffered && S->result && (fin_mode & FIN_ABORT)) { |
| 70 | + PGcancel *cancel = PQgetCancel(H->server); |
| 71 | + char errbuf[256]; |
| 72 | + PQcancel(cancel, errbuf, 256); |
| 73 | + PQfreeCancel(cancel); |
| 74 | + S->is_running_unbuffered = false; |
| 75 | + } |
67 | 76 |
|
68 | 77 | if (S->result) { |
69 | 78 | /* free the resource */ |
70 | 79 | PQclear(S->result); |
71 | 80 | S->result = NULL; |
72 | 81 | } |
73 | 82 |
|
74 | | - if (S->stmt_name) { |
75 | | - if (S->is_prepared && server_obj_usable) { |
76 | | - pdo_pgsql_db_handle *H = S->H; |
77 | | - PGresult *res; |
| 83 | + if (S->is_running_unbuffered) { |
| 84 | + /* https://postgresql.org/docs/current/libpq-async.html: |
| 85 | + * "PQsendQuery cannot be called again until PQgetResult has returned NULL" |
| 86 | + * And as all single-row functions are connection-wise instead of statement-wise, |
| 87 | + * any new single-row query has to make sure no preceding one is still running. |
| 88 | + */ |
| 89 | + // @todo Implement !(fin_mode & FIN_DISCARD) |
| 90 | + // instead of discarding results we could store them to their statement |
| 91 | + // so that their fetch() will get them (albeit not in lazy mode anymore). |
| 92 | + while ((S->result = PQgetResult(H->server))) { |
| 93 | + PQclear(S->result); |
| 94 | + S->result = NULL; |
| 95 | + } |
| 96 | + S->is_running_unbuffered = false; |
| 97 | + } |
| 98 | + |
| 99 | + if (S->stmt_name && S->is_prepared && (fin_mode & FIN_CLOSE)) { |
| 100 | + PGresult *res; |
78 | 101 | #ifndef HAVE_PQCLOSEPREPARED |
79 | | - // TODO (??) libpq does not support close statement protocol < postgres 17 |
80 | | - // check if we can circumvent this. |
81 | | - char *q = NULL; |
82 | | - spprintf(&q, 0, "DEALLOCATE %s", S->stmt_name); |
83 | | - res = PQexec(H->server, q); |
84 | | - efree(q); |
| 102 | + // TODO (??) libpq does not support close statement protocol < postgres 17 |
| 103 | + // check if we can circumvent this. |
| 104 | + char *q = NULL; |
| 105 | + spprintf(&q, 0, "DEALLOCATE %s", S->stmt_name); |
| 106 | + res = PQexec(H->server, q); |
| 107 | + efree(q); |
85 | 108 | #else |
86 | | - res = PQclosePrepared(H->server, S->stmt_name); |
| 109 | + res = PQclosePrepared(H->server, S->stmt_name); |
87 | 110 | #endif |
88 | | - if (res) { |
89 | | - PQclear(res); |
90 | | - } |
| 111 | + if (res) { |
| 112 | + PQclear(res); |
| 113 | + } |
| 114 | + |
| 115 | + S->is_prepared = false; |
| 116 | + if (H->running_stmt == S) { |
| 117 | + H->running_stmt = NULL; |
91 | 118 | } |
| 119 | + } |
| 120 | +} |
| 121 | + |
| 122 | +static int pgsql_stmt_dtor(pdo_stmt_t *stmt) |
| 123 | +{ |
| 124 | + pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data; |
| 125 | + bool server_obj_usable = !Z_ISUNDEF(stmt->database_object_handle) |
| 126 | + && IS_OBJ_VALID(EG(objects_store).object_buckets[Z_OBJ_HANDLE(stmt->database_object_handle)]) |
| 127 | + && !(OBJ_FLAGS(Z_OBJ(stmt->database_object_handle)) & IS_OBJ_FREE_CALLED); |
| 128 | + |
| 129 | + pgsql_stmt_finish(S, FIN_DISCARD|(server_obj_usable ? FIN_CLOSE|FIN_ABORT : 0)); |
| 130 | + |
| 131 | + if (S->stmt_name) { |
92 | 132 | efree(S->stmt_name); |
93 | 133 | S->stmt_name = NULL; |
94 | 134 | } |
@@ -142,14 +182,20 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt) |
142 | 182 | pdo_pgsql_stmt *S = (pdo_pgsql_stmt*)stmt->driver_data; |
143 | 183 | pdo_pgsql_db_handle *H = S->H; |
144 | 184 | ExecStatusType status; |
| 185 | + int dispatch_result = 1; |
145 | 186 |
|
146 | 187 | bool in_trans = stmt->dbh->methods->in_transaction(stmt->dbh); |
147 | 188 |
|
148 | | - /* ensure that we free any previous unfetched results */ |
149 | | - if(S->result) { |
150 | | - PQclear(S->result); |
151 | | - S->result = NULL; |
| 189 | + /* in unbuffered mode, finish any running statement: libpq explicitely prohibits this |
| 190 | + * and returns a PGRES_FATAL_ERROR when PQgetResult gets called for stmt 2 if DEALLOCATE |
| 191 | + * was called for stmt 1 inbetween |
| 192 | + * (maybe it will change with pipeline mode in libpq 14?) */ |
| 193 | + if (S->is_unbuffered && H->running_stmt) { |
| 194 | + pgsql_stmt_finish(H->running_stmt, FIN_CLOSE); |
| 195 | + H->running_stmt = NULL; |
152 | 196 | } |
| 197 | + /* ensure that we free any previous unfetched results */ |
| 198 | + pgsql_stmt_finish(S, 0); |
153 | 199 |
|
154 | 200 | S->current_row = 0; |
155 | 201 |
|
@@ -198,6 +244,7 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt) |
198 | 244 | /* it worked */ |
199 | 245 | S->is_prepared = 1; |
200 | 246 | PQclear(S->result); |
| 247 | + S->result = NULL; |
201 | 248 | break; |
202 | 249 | default: { |
203 | 250 | char *sqlstate = pdo_pgsql_sqlstate(S->result); |
@@ -227,30 +274,72 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt) |
227 | 274 | } |
228 | 275 | } |
229 | 276 | } |
230 | | - S->result = PQexecPrepared(H->server, S->stmt_name, |
| 277 | + if (S->is_unbuffered) { |
| 278 | + dispatch_result = PQsendQueryPrepared(H->server, S->stmt_name, |
| 279 | + stmt->bound_params ? |
| 280 | + zend_hash_num_elements(stmt->bound_params) : |
| 281 | + 0, |
| 282 | + (const char**)S->param_values, |
| 283 | + S->param_lengths, |
| 284 | + S->param_formats, |
| 285 | + 0); |
| 286 | + } else { |
| 287 | + S->result = PQexecPrepared(H->server, S->stmt_name, |
231 | 288 | stmt->bound_params ? |
232 | 289 | zend_hash_num_elements(stmt->bound_params) : |
233 | 290 | 0, |
234 | 291 | (const char**)S->param_values, |
235 | 292 | S->param_lengths, |
236 | 293 | S->param_formats, |
237 | 294 | 0); |
| 295 | + } |
238 | 296 | } else if (stmt->supports_placeholders == PDO_PLACEHOLDER_NAMED) { |
239 | 297 | /* execute query with parameters */ |
240 | | - S->result = PQexecParams(H->server, ZSTR_VAL(S->query), |
| 298 | + if (S->is_unbuffered) { |
| 299 | + dispatch_result = PQsendQueryParams(H->server, ZSTR_VAL(S->query), |
| 300 | + stmt->bound_params ? zend_hash_num_elements(stmt->bound_params) : 0, |
| 301 | + S->param_types, |
| 302 | + (const char**)S->param_values, |
| 303 | + S->param_lengths, |
| 304 | + S->param_formats, |
| 305 | + 0); |
| 306 | + } else { |
| 307 | + S->result = PQexecParams(H->server, ZSTR_VAL(S->query), |
241 | 308 | stmt->bound_params ? zend_hash_num_elements(stmt->bound_params) : 0, |
242 | 309 | S->param_types, |
243 | 310 | (const char**)S->param_values, |
244 | 311 | S->param_lengths, |
245 | 312 | S->param_formats, |
246 | 313 | 0); |
| 314 | + } |
247 | 315 | } else { |
248 | 316 | /* execute plain query (with embedded parameters) */ |
249 | | - S->result = PQexec(H->server, ZSTR_VAL(stmt->active_query_string)); |
| 317 | + if (S->is_unbuffered) { |
| 318 | + dispatch_result = PQsendQuery(H->server, ZSTR_VAL(stmt->active_query_string)); |
| 319 | + } else { |
| 320 | + S->result = PQexec(H->server, ZSTR_VAL(stmt->active_query_string)); |
| 321 | + } |
250 | 322 | } |
| 323 | + |
| 324 | + H->running_stmt = S; |
| 325 | + |
| 326 | + if (S->is_unbuffered) { |
| 327 | + if (!dispatch_result) { |
| 328 | + pdo_pgsql_error_stmt(stmt, 0, NULL); |
| 329 | + H->running_stmt = NULL; |
| 330 | + return 0; |
| 331 | + } |
| 332 | + S->is_running_unbuffered = true; |
| 333 | + (void)PQsetSingleRowMode(H->server); |
| 334 | + /* no matter if it returns 0: PQ then transparently fallbacks to full result fetching */ |
| 335 | + |
| 336 | + /* try a first fetch to at least have column names and so on */ |
| 337 | + S->result = PQgetResult(S->H->server); |
| 338 | + } |
| 339 | + |
251 | 340 | status = PQresultStatus(S->result); |
252 | 341 |
|
253 | | - if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK) { |
| 342 | + if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK && status != PGRES_SINGLE_TUPLE) { |
254 | 343 | pdo_pgsql_error_stmt(stmt, status, pdo_pgsql_sqlstate(S->result)); |
255 | 344 | return 0; |
256 | 345 | } |
@@ -472,6 +561,34 @@ static int pgsql_stmt_fetch(pdo_stmt_t *stmt, |
472 | 561 | return 0; |
473 | 562 | } |
474 | 563 | } else { |
| 564 | + if (S->is_running_unbuffered && S->current_row >= stmt->row_count) { |
| 565 | + ExecStatusType status; |
| 566 | + |
| 567 | + /* @todo in unbuffered mode, PQ allows multiple queries to be passed: |
| 568 | + * column_count should be recomputed on each iteration */ |
| 569 | + |
| 570 | + if(S->result) { |
| 571 | + PQclear(S->result); |
| 572 | + S->result = NULL; |
| 573 | + } |
| 574 | + |
| 575 | + S->result = PQgetResult(S->H->server); |
| 576 | + status = PQresultStatus(S->result); |
| 577 | + |
| 578 | + if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK && status != PGRES_SINGLE_TUPLE) { |
| 579 | + pdo_pgsql_error_stmt(stmt, status, pdo_pgsql_sqlstate(S->result)); |
| 580 | + return 0; |
| 581 | + } |
| 582 | + |
| 583 | + stmt->row_count = (zend_long)PQntuples(S->result); |
| 584 | + S->current_row = 0; |
| 585 | + |
| 586 | + if (!stmt->row_count) { |
| 587 | + S->is_running_unbuffered = false; |
| 588 | + /* libpq requires looping until getResult returns null */ |
| 589 | + pgsql_stmt_finish(S, 0); |
| 590 | + } |
| 591 | + } |
475 | 592 | if (S->current_row < stmt->row_count) { |
476 | 593 | S->current_row++; |
477 | 594 | return 1; |
|
0 commit comments