Skip to content

Commit c304c8e

Browse files
authored
async fixes
1 parent d07979e commit c304c8e

File tree

1 file changed

+259
-16
lines changed

1 file changed

+259
-16
lines changed

dbdimp.c

Lines changed: 259 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3972,7 +3972,12 @@ int dbd_st_finish (SV * sth, imp_sth_t * imp_sth)
39723972
}
39733973

39743974
imp_sth->async_status = 0;
3975-
imp_dbh->async_sth = NULL;
3975+
/* GitHub Issue #105 fix: Only clear async_sth if THIS statement is the current async
3976+
statement. This prevents finishing an unrelated statement from invalidating a pending async query. */
3977+
if (imp_dbh->async_sth == imp_sth) {
3978+
imp_dbh->async_sth = NULL;
3979+
imp_dbh->async_status = 0;
3980+
}
39763981

39773982
DBIc_ACTIVE_off(imp_sth);
39783983
if (TEND_slow) TRC(DBILOGFP, "%sEnd dbd_st_finish\n", THEADER_slow);
@@ -4117,7 +4122,9 @@ void dbd_st_destroy (SV * sth, imp_sth_t * imp_sth)
41174122
return;
41184123
}
41194124

4120-
if (imp_dbh->async_status) {
4125+
/* GitHub Issue #105 fix: Only call handle_old_async if THIS statement owns the async query.
4126+
This prevents destroying one statement from cancelling another statement's async query. */
4127+
if (imp_dbh->async_status && imp_dbh->async_sth == imp_sth) {
41214128
handle_old_async(aTHX_ sth, imp_dbh, PG_OLDQUERY_WAIT);
41224129
}
41234130

@@ -4184,8 +4191,10 @@ void dbd_st_destroy (SV * sth, imp_sth_t * imp_sth)
41844191
}
41854192
imp_sth->ph = NULL;
41864193

4187-
if (NULL != imp_dbh->async_sth)
4194+
/* GitHub Issue #105 fix: Only clear async_sth if THIS statement owns it */
4195+
if (imp_dbh->async_sth == imp_sth) {
41884196
imp_dbh->async_sth = NULL;
4197+
}
41894198

41904199
DBIc_IMPSET_off(imp_sth); /* let DBI know we've done it */
41914200

@@ -5283,11 +5292,101 @@ long pg_db_result (SV *h, imp_dbh_t *imp_dbh)
52835292

52845293
if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_result\n", THEADER_slow);
52855294

5286-
if (1 != imp_dbh->async_status) {
5295+
/* Determine if we were called from a statement handle or database handle */
5296+
/* The Pg.xs passes sth/dbh as h, but imp_dbh is always the database handle */
5297+
D_imp_xxh(h);
5298+
imp_sth_t *imp_sth = NULL;
5299+
if (DBIc_TYPE(imp_xxh) == DBIt_ST) {
5300+
/* Called from statement handle - get imp_sth */
5301+
imp_sth = (imp_sth_t*)imp_xxh;
5302+
}
5303+
/* Ownership will be checked below for statement-based calls when needed */
5304+
5305+
/* Check if this statement had auto-retrieved results from PG_OLDQUERY_WAIT */
5306+
if (imp_sth && imp_sth->async_status == 100) {
5307+
/* Results were auto-retrieved - proceed to full processing below */
5308+
if (TRACE3_slow) {
5309+
TRC(DBILOGFP, "%sProcessing auto-retrieved results (rows: %ld)\n",
5310+
THEADER_slow, imp_sth->rows);
5311+
}
5312+
/* Don't return early - let the full processing happen below */
5313+
}
5314+
5315+
if (imp_sth && imp_sth->async_status == -99) {
5316+
pg_error(aTHX_ h, PGRES_FATAL_ERROR,
5317+
"Results for this query were discarded by PG_OLDQUERY_WAIT. "
5318+
"To preserve results, call pg_result() before executing another async query.\n");
5319+
if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_result (error: results discarded)\n", THEADER_slow);
5320+
return -2;
5321+
}
5322+
5323+
/* Skip async status check for auto-retrieved results and errors */
5324+
if (1 != imp_dbh->async_status && !(imp_sth && (imp_sth->async_status == 100 || imp_sth->async_status == -1))) {
52875325
pg_error(aTHX_ h, PGRES_FATAL_ERROR, "No asynchronous query is running\n");
52885326
if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_result (error: no async)\n", THEADER_slow);
52895327
return -2;
5290-
}
5328+
}
5329+
5330+
/* Special case: Statement has auto-retrieved results */
5331+
if (imp_sth && imp_sth->async_status == 100) {
5332+
/* This statement has auto-retrieved results from PG_OLDQUERY_WAIT */
5333+
if (!imp_sth->result) {
5334+
pg_error(aTHX_ h, PGRES_FATAL_ERROR, "Auto-retrieved results already consumed");
5335+
return -2;
5336+
}
5337+
5338+
result = imp_sth->result;
5339+
status = _sqlstate(aTHX_ imp_dbh, result);
5340+
5341+
/* Handle the auto-retrieved result based on its status */
5342+
if (status == PGRES_TUPLES_OK) {
5343+
rows = imp_sth->rows;
5344+
/* Result already stored, just return the row count */
5345+
imp_sth->async_status = 0; /* Mark as consumed */
5346+
}
5347+
else if (status == PGRES_COMMAND_OK) {
5348+
rows = imp_sth->rows;
5349+
imp_sth->async_status = 0; /* Mark as consumed */
5350+
}
5351+
else {
5352+
/* Error result - report the error */
5353+
TRACE_PQERRORMESSAGE;
5354+
pg_error(aTHX_ h, status, PQerrorMessage(imp_dbh->conn));
5355+
rows = -2;
5356+
imp_sth->async_status = 0; /* Mark as consumed */
5357+
}
5358+
5359+
if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_result (auto-retrieved, %ld rows)\n", THEADER_slow, rows);
5360+
return rows;
5361+
}
5362+
else if (imp_sth && imp_sth->async_status == -1) {
5363+
/* This statement has auto-retrieved error result from PG_OLDQUERY_WAIT */
5364+
if (!imp_sth->result) {
5365+
pg_error(aTHX_ h, PGRES_FATAL_ERROR, "Auto-retrieved error already reported");
5366+
return -2;
5367+
}
5368+
5369+
result = imp_sth->result;
5370+
status = _sqlstate(aTHX_ imp_dbh, result);
5371+
5372+
/* Report the error from the stored result */
5373+
TRACE_PQERRORMESSAGE;
5374+
pg_error(aTHX_ h, status, PQresultErrorMessage(result));
5375+
rows = -2;
5376+
imp_sth->async_status = 0; /* Mark as consumed */
5377+
5378+
if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_result (auto-retrieved error)\n", THEADER_slow);
5379+
return rows;
5380+
}
5381+
5382+
/* Check ownership BEFORE consuming the result - but only for statement calls */
5383+
if (imp_sth && imp_dbh->async_sth && imp_sth != imp_dbh->async_sth &&
5384+
!(imp_sth->async_status == 100 || imp_sth->async_status == -1)) {
5385+
/* Wrong statement handle and not auto-retrieved - don't allow */
5386+
pg_error(aTHX_ h, PGRES_FATAL_ERROR, "pg_result() called on wrong statement handle - this statement does not own the async query\n");
5387+
if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_result (wrong statement)\n", THEADER_slow);
5388+
return -2;
5389+
}
52915390

52925391
imp_dbh->copystate = 0; /* Assume not in copy mode until told otherwise */
52935392

@@ -5300,11 +5399,30 @@ long pg_db_result (SV *h, imp_dbh_t *imp_dbh)
53005399
TRACE_PQNTUPLES;
53015400
rows = PQntuples(result);
53025401

5303-
if (NULL != imp_dbh->async_sth) {
5402+
/* Store metadata in the appropriate statement handle */
5403+
/* CRITICAL: Only store results if the calling statement owns the async query */
5404+
if (imp_sth && imp_sth == imp_dbh->async_sth) {
5405+
/* Called via $sth->pg_result AND this statement owns the async query */
5406+
imp_sth->cur_tuple = 0;
5407+
TRACE_PQNFIELDS;
5408+
DBIc_NUM_FIELDS(imp_sth) = PQnfields(result);
5409+
DBIc_ACTIVE_on(imp_sth);
5410+
}
5411+
else if (!imp_sth && imp_dbh->async_sth) {
5412+
/* Called via $dbh->pg_result - store in the async statement */
53045413
imp_dbh->async_sth->cur_tuple = 0;
53055414
TRACE_PQNFIELDS;
53065415
DBIc_NUM_FIELDS(imp_dbh->async_sth) = PQnfields(result);
53075416
DBIc_ACTIVE_on(imp_dbh->async_sth);
5417+
if (TRACE3_slow) {
5418+
TRC(DBILOGFP, "%sUsing imp_dbh->async_sth for $dbh->pg_result()\n", THEADER_slow);
5419+
}
5420+
}
5421+
else if (imp_sth && imp_sth != imp_dbh->async_sth) {
5422+
/* ERROR: Called from wrong statement handle */
5423+
if (TRACEWARN_slow) {
5424+
TRC(DBILOGFP, "%sWARNING: pg_result called from wrong statement handle!\n", THEADER_slow);
5425+
}
53085426
}
53095427

53105428
break;
@@ -5353,7 +5471,33 @@ long pg_db_result (SV *h, imp_dbh_t *imp_dbh)
53535471
break;
53545472
}
53555473

5356-
if (NULL != imp_dbh->async_sth) {
5474+
/* Store the result in the appropriate statement handle
5475+
Support both $sth->pg_result and $dbh->pg_result patterns
5476+
CRITICAL: Only store if the calling statement owns the async query */
5477+
if (imp_sth && imp_sth == imp_dbh->async_sth) {
5478+
/* Called via $sth->pg_result AND this statement owns the async query */
5479+
/* Free the last_result as needed */
5480+
if (imp_dbh->last_result && imp_dbh->result_clearable) {
5481+
TRACE_PQCLEAR;
5482+
#if DEBUG_LAST_RESULT
5483+
fprintf(stderr, "CLEAR last_result of %ld at line %d of %s\n", (long int)imp_dbh->last_result,__LINE__,__func__);
5484+
#endif
5485+
PQclear(imp_dbh->last_result);
5486+
imp_dbh->last_result = NULL;
5487+
}
5488+
if (imp_sth->result) {
5489+
TRACE_PQCLEAR;
5490+
#if DEBUG_LAST_RESULT
5491+
fprintf(stderr, "CLEAR imp_sth->result of %ld at line %d of %s\n", (long int)imp_sth->result,__LINE__,__func__);
5492+
#endif
5493+
PQclear(imp_sth->result);
5494+
imp_sth->result = NULL;
5495+
}
5496+
imp_dbh->last_result = imp_sth->result = result;
5497+
imp_dbh->result_clearable = DBDPG_FALSE;
5498+
}
5499+
else if (!imp_sth && imp_dbh->async_sth) {
5500+
/* $dbh->pg_result() case - store in the async statement */
53575501
/* Free the last_result as needed */
53585502
if (imp_dbh->last_result && imp_dbh->result_clearable) {
53595503
TRACE_PQCLEAR;
@@ -5375,17 +5519,35 @@ long pg_db_result (SV *h, imp_dbh_t *imp_dbh)
53755519

53765520
imp_dbh->last_result = imp_dbh->async_sth->result = result;
53775521
imp_dbh->result_clearable = DBDPG_FALSE;
5522+
5523+
if (TRACE3_slow) {
5524+
TRC(DBILOGFP, "%sStored result in async_sth for $dbh->pg_result()\n", THEADER_slow);
5525+
}
53785526
}
53795527
else {
5528+
/* No statement to store results - this is a problem */
5529+
if (TRACEWARN_slow) {
5530+
TRC(DBILOGFP, "%sWARNING: Discarding query results - no statement handle available!\n", THEADER_slow);
5531+
}
53805532
TRACE_PQCLEAR;
53815533
PQclear(result);
5534+
/* Return error to notify user */
5535+
pg_error(aTHX_ h, PGRES_FATAL_ERROR, "Query results discarded - no statement handle available to store results\n");
5536+
rows = -2;
53825537
}
53835538
}
53845539

5385-
if (NULL != imp_dbh->async_sth) {
5540+
/* Update row count and clear async status for the statement handle */
5541+
if (imp_sth && imp_sth == imp_dbh->async_sth) {
5542+
imp_sth->rows = rows;
5543+
imp_sth->async_status = 0;
5544+
}
5545+
else if (!imp_sth && imp_dbh->async_sth) {
53865546
imp_dbh->async_sth->rows = rows;
53875547
imp_dbh->async_sth->async_status = 0;
53885548
}
5549+
5550+
/* Let the application know we are not active anymore */
53895551
imp_dbh->async_status = 0;
53905552
if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_result (rows: %ld)\n", THEADER_slow, rows);
53915553
return rows;
@@ -5588,11 +5750,83 @@ static int handle_old_async(pTHX_ SV * handle, imp_dbh_t * imp_dbh, const int as
55885750
else if (asyncflag & PG_OLDQUERY_WAIT || imp_dbh->async_status == -1) {
55895751
/* Finish up the outstanding query and throw out the result, unless an error */
55905752
if (TRACE3_slow) { TRC(DBILOGFP, "%sWaiting for old async command to finish\n", THEADER_slow); }
5753+
5754+
/* PG_OLDQUERY_WAIT will auto-retrieve results instead of discarding */
5755+
if ((asyncflag & PG_OLDQUERY_WAIT) && imp_dbh->async_sth) {
5756+
if (TRACE3_slow) {
5757+
TRC(DBILOGFP, "%sPG_OLDQUERY_WAIT: Will auto-retrieve results for original statement\n", THEADER_slow);
5758+
}
5759+
/* Note: We'll mark as auto-retrieved (100) when we actually store the results below */
5760+
}
5761+
55915762
TRACE_PQGETRESULT;
55925763
while ((result = PQgetResult(imp_dbh->conn)) != NULL) {
55935764
status = _sqlstate(aTHX_ imp_dbh, result);
5594-
TRACE_PQCLEAR;
5595-
PQclear(result);
5765+
5766+
/* AUTO-RETRIEVE: Store results in the original statement instead of discarding */
5767+
if ((asyncflag & PG_OLDQUERY_WAIT) && imp_dbh->async_sth &&
5768+
(status == PGRES_TUPLES_OK || status == PGRES_COMMAND_OK)) {
5769+
5770+
imp_sth_t *orig_sth = imp_dbh->async_sth;
5771+
5772+
/* Free any old result */
5773+
if (orig_sth->result) {
5774+
TRACE_PQCLEAR;
5775+
PQclear(orig_sth->result);
5776+
}
5777+
5778+
/* Store this result for the original statement */
5779+
orig_sth->result = result;
5780+
orig_sth->rows = (status == PGRES_TUPLES_OK) ? PQntuples(result) : 0;
5781+
5782+
if (status == PGRES_TUPLES_OK) {
5783+
orig_sth->cur_tuple = 0;
5784+
DBIc_NUM_FIELDS(orig_sth) = PQnfields(result);
5785+
DBIc_ACTIVE_on(orig_sth);
5786+
}
5787+
5788+
/* Mark as auto-retrieved, not discarded */
5789+
orig_sth->async_status = 100; /* Special value: auto-retrieved */
5790+
5791+
if (TRACE3_slow) {
5792+
TRC(DBILOGFP, "%sPG_OLDQUERY_WAIT: Auto-retrieved %ld rows for original statement\n",
5793+
THEADER_slow, orig_sth->rows);
5794+
}
5795+
5796+
/* Don't clear the result - we're keeping it */
5797+
result = NULL;
5798+
}
5799+
/* AUTO-RETRIEVE ERROR: Mark original statement as failed */
5800+
else if ((asyncflag & PG_OLDQUERY_WAIT) && imp_dbh->async_sth &&
5801+
status != PGRES_EMPTY_QUERY &&
5802+
status != PGRES_COMMAND_OK &&
5803+
status != PGRES_TUPLES_OK) {
5804+
5805+
imp_sth_t *orig_sth = imp_dbh->async_sth;
5806+
5807+
/* Store the error result for the original statement to discover */
5808+
if (orig_sth->result) {
5809+
TRACE_PQCLEAR;
5810+
PQclear(orig_sth->result);
5811+
}
5812+
5813+
/* Store error result - pg_result will handle the error */
5814+
orig_sth->result = result;
5815+
orig_sth->rows = -1; /* Indicate error */
5816+
orig_sth->async_status = -1; /* Mark as error */
5817+
5818+
if (TRACE3_slow) {
5819+
TRC(DBILOGFP, "%sPG_OLDQUERY_WAIT: Stored error result for original statement\n", THEADER_slow);
5820+
}
5821+
5822+
/* Don't clear the result - we're keeping it for error reporting */
5823+
result = NULL;
5824+
}
5825+
else {
5826+
/* Normal case - discard the result */
5827+
TRACE_PQCLEAR;
5828+
PQclear(result);
5829+
}
55965830
if (status == PGRES_COPY_IN) { /* In theory, this should be caught by copystate, but we'll be careful */
55975831
TRACE_PQPUTCOPYEND;
55985832
if (-1 == PQputCopyEnd(imp_dbh->conn, NULL)) {
@@ -5610,10 +5844,13 @@ static int handle_old_async(pTHX_ SV * handle, imp_dbh_t * imp_dbh, const int as
56105844
else if (status != PGRES_EMPTY_QUERY
56115845
&& status != PGRES_COMMAND_OK
56125846
&& status != PGRES_TUPLES_OK) {
5613-
TRACE_PQERRORMESSAGE;
5614-
pg_error(aTHX_ handle, status, PQerrorMessage(imp_dbh->conn));
5615-
if (TEND_slow) TRC(DBILOGFP, "%sEnd handle_old_async (error: bad status)\n", THEADER_slow);
5616-
return -2;
5847+
/* Only report error to current handle if not PG_OLDQUERY_WAIT */
5848+
if (!(asyncflag & PG_OLDQUERY_WAIT)) {
5849+
TRACE_PQERRORMESSAGE;
5850+
pg_error(aTHX_ handle, status, PQerrorMessage(imp_dbh->conn));
5851+
if (TEND_slow) TRC(DBILOGFP, "%sEnd handle_old_async (error: bad status)\n", THEADER_slow);
5852+
return -2;
5853+
}
56175854
}
56185855
}
56195856
}
@@ -5625,8 +5862,14 @@ static int handle_old_async(pTHX_ SV * handle, imp_dbh_t * imp_dbh, const int as
56255862

56265863
/* If we made it this far, safe to assume there is no running query */
56275864
imp_dbh->async_status = 0;
5628-
if (NULL != imp_dbh->async_sth)
5629-
imp_dbh->async_sth->async_status = 0;
5865+
if (NULL != imp_dbh->async_sth) {
5866+
if (imp_dbh->async_sth->async_status != 100 && imp_dbh->async_sth->async_status != -1) {
5867+
/* Don't clear if it's auto-retrieved (100) or error (-1) */
5868+
imp_dbh->async_sth->async_status = 0;
5869+
}
5870+
/* Clear the async_sth pointer since query is complete */
5871+
imp_dbh->async_sth = NULL;
5872+
}
56305873

56315874
if (TEND_slow) TRC(DBILOGFP, "%sEnd handle_old_async\n", THEADER_slow);
56325875
return 0;

0 commit comments

Comments
 (0)