Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 255 additions & 19 deletions dbdimp.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,12 @@ typedef enum
SvRMAGICAL(SvRV(h)) && (SvMAGIC(SvRV(h)))->mg_type == 'P')

enum {
STH_ASYNC_AUTOERROR = -2, /* PG_OLDQUERY_WAIT auto-retrieved an error result */
STH_ASYNC_CANCELLED = -1,
STH_NO_ASYNC,
STH_ASYNC,
STH_ASYNC_PREPARE
STH_ASYNC_PREPARE,
STH_ASYNC_AUTORETRIEVED = 100 /* PG_OLDQUERY_WAIT auto-retrieved results */
};

enum {
Expand Down Expand Up @@ -4169,7 +4171,12 @@ int dbd_st_finish (SV * sth, imp_sth_t * imp_sth)
}

imp_sth->async_status = STH_NO_ASYNC;
imp_dbh->async_sth = NULL;
/* GitHub Issue #105 fix: Only clear async_sth if THIS statement is the current async
statement. This prevents finishing an unrelated statement from invalidating a pending async query. */
if (imp_dbh->async_sth == imp_sth) {
imp_dbh->async_sth = NULL;
imp_dbh->async_status = 0;
}

DBIc_ACTIVE_off(imp_sth);
if (TEND_slow) TRC(DBILOGFP, "%sEnd dbd_st_finish\n", THEADER_slow);
Expand Down Expand Up @@ -4314,7 +4321,9 @@ void dbd_st_destroy (SV * sth, imp_sth_t * imp_sth)
return;
}

if (imp_dbh->async_status) {
/* GitHub Issue #105 fix: Only call handle_old_async if THIS statement owns the async query.
This prevents destroying one statement from cancelling another statement's async query. */
if (imp_dbh->async_status && imp_dbh->async_sth == imp_sth) {
handle_old_async(aTHX_ sth, imp_dbh, PG_OLDQUERY_WAIT);
}

Expand Down Expand Up @@ -4381,8 +4390,10 @@ void dbd_st_destroy (SV * sth, imp_sth_t * imp_sth)
}
imp_sth->ph = NULL;

if (NULL != imp_dbh->async_sth)
/* GitHub Issue #105 fix: Only clear async_sth if THIS statement owns it */
if (imp_dbh->async_sth == imp_sth) {
imp_dbh->async_sth = NULL;
}

DBIc_IMPSET_off(imp_sth); /* let DBI know we've done it */

Expand Down Expand Up @@ -5477,15 +5488,97 @@ long pg_db_result (SV *h, imp_dbh_t *imp_dbh)
ExecStatusType status = PGRES_FATAL_ERROR;
long rows = 0;
char *cmdStatus = NULL;
/* Determine if we were called from a statement handle or database handle */
/* The Pg.xs passes sth/dbh as h, but imp_dbh is always the database handle */
D_imp_xxh(h);
imp_sth_t *imp_sth = NULL;

if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_result\n", THEADER_slow);

if (DBH_ASYNC != imp_dbh->async_status) {
if (DBIc_TYPE(imp_xxh) == DBIt_ST) {
/* Called from statement handle - get imp_sth */
imp_sth = (imp_sth_t*)imp_xxh;
}
/* Ownership will be checked below for statement-based calls when needed */

/* Check if this statement had auto-retrieved results from PG_OLDQUERY_WAIT */
if (imp_sth && imp_sth->async_status == STH_ASYNC_AUTORETRIEVED) {
/* Results were auto-retrieved - proceed to full processing below */
if (TRACE3_slow) {
TRC(DBILOGFP, "%sProcessing auto-retrieved results (rows: %ld)\n",
THEADER_slow, imp_sth->rows);
}
/* Don't return early - let the full processing happen below */
}

/* Skip async status check for auto-retrieved results and errors */
if (DBH_ASYNC != imp_dbh->async_status && !(imp_sth && (imp_sth->async_status == STH_ASYNC_AUTORETRIEVED || imp_sth->async_status == STH_ASYNC_AUTOERROR))) {
pg_error(aTHX_ h, PGRES_FATAL_ERROR, "No asynchronous query is running\n");
if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_result (error: no async)\n", THEADER_slow);
return -2;
}

/* Special case: Statement has auto-retrieved results */
if (imp_sth && imp_sth->async_status == STH_ASYNC_AUTORETRIEVED) {
/* This statement has auto-retrieved results from PG_OLDQUERY_WAIT */
if (!imp_sth->result) {
pg_error(aTHX_ h, PGRES_FATAL_ERROR, "Auto-retrieved results already consumed");
return -2;
}

result = imp_sth->result;
status = _sqlstate(aTHX_ imp_dbh, result);

/* Handle the auto-retrieved result based on its status */
if (status == PGRES_TUPLES_OK) {
rows = imp_sth->rows;
/* Result already stored, just return the row count */
imp_sth->async_status = STH_NO_ASYNC; /* Mark as consumed */
}
else if (status == PGRES_COMMAND_OK) {
rows = imp_sth->rows;
imp_sth->async_status = STH_NO_ASYNC; /* Mark as consumed */
}
else {
/* Error result - report the error */
TRACE_PQERRORMESSAGE;
pg_error(aTHX_ h, status, PQerrorMessage(imp_dbh->conn));
rows = -2;
imp_sth->async_status = STH_NO_ASYNC; /* Mark as consumed */
}

if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_result (auto-retrieved, %ld rows)\n", THEADER_slow, rows);
return rows;
}
else if (imp_sth && imp_sth->async_status == STH_ASYNC_AUTOERROR) {
/* This statement has auto-retrieved error result from PG_OLDQUERY_WAIT */
if (!imp_sth->result) {
pg_error(aTHX_ h, PGRES_FATAL_ERROR, "Auto-retrieved error already reported");
return -2;
}

result = imp_sth->result;
status = _sqlstate(aTHX_ imp_dbh, result);

/* Report the error from the stored result */
TRACE_PQERRORMESSAGE;
pg_error(aTHX_ h, status, PQresultErrorMessage(result));
rows = -2;
imp_sth->async_status = STH_NO_ASYNC; /* Mark as consumed */

if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_result (auto-retrieved error)\n", THEADER_slow);
return rows;
}

/* Check ownership BEFORE consuming the result - but only for statement calls */
if (imp_sth && imp_dbh->async_sth && imp_sth != imp_dbh->async_sth &&
!(imp_sth->async_status == STH_ASYNC_AUTORETRIEVED || imp_sth->async_status == STH_ASYNC_AUTOERROR)) {
/* Wrong statement handle and not auto-retrieved - don't allow */
pg_error(aTHX_ h, PGRES_FATAL_ERROR, "pg_result() called on wrong statement handle - this statement does not own the async query\n");
if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_result (wrong statement)\n", THEADER_slow);
return -2;
}

imp_dbh->copystate = 0; /* Assume not in copy mode until told otherwise */

TRACE_PQGETRESULT;
Expand All @@ -5497,11 +5590,30 @@ long pg_db_result (SV *h, imp_dbh_t *imp_dbh)
TRACE_PQNTUPLES;
rows = PQntuples(result);

if (NULL != imp_dbh->async_sth) {
/* Store metadata in the appropriate statement handle */
/* CRITICAL: Only store results if the calling statement owns the async query */
if (imp_sth && imp_sth == imp_dbh->async_sth) {
/* Called via $sth->pg_result AND this statement owns the async query */
imp_sth->cur_tuple = 0;
TRACE_PQNFIELDS;
DBIc_NUM_FIELDS(imp_sth) = PQnfields(result);
DBIc_ACTIVE_on(imp_sth);
}
else if (!imp_sth && imp_dbh->async_sth) {
/* Called via $dbh->pg_result - store in the async statement */
imp_dbh->async_sth->cur_tuple = 0;
TRACE_PQNFIELDS;
DBIc_NUM_FIELDS(imp_dbh->async_sth) = PQnfields(result);
DBIc_ACTIVE_on(imp_dbh->async_sth);
if (TRACE3_slow) {
TRC(DBILOGFP, "%sUsing imp_dbh->async_sth for $dbh->pg_result()\n", THEADER_slow);
}
}
else if (imp_sth && imp_sth != imp_dbh->async_sth) {
/* ERROR: Called from wrong statement handle */
if (TRACEWARN_slow) {
TRC(DBILOGFP, "%sWARNING: pg_result called from wrong statement handle!\n", THEADER_slow);
}
}

break;
Expand Down Expand Up @@ -5556,7 +5668,33 @@ long pg_db_result (SV *h, imp_dbh_t *imp_dbh)
break;
}

if (NULL != imp_dbh->async_sth) {
/* Store the result in the appropriate statement handle
Support both $sth->pg_result and $dbh->pg_result patterns
CRITICAL: Only store if the calling statement owns the async query */
if (imp_sth && imp_sth == imp_dbh->async_sth) {
/* Called via $sth->pg_result AND this statement owns the async query */
/* Free the last_result as needed */
if (imp_dbh->last_result && imp_dbh->result_clearable) {
TRACE_PQCLEAR;
#if DEBUG_LAST_RESULT
fprintf(stderr, "CLEAR last_result of %ld at line %d of %s\n", (long int)imp_dbh->last_result,__LINE__,__func__);
#endif
PQclear(imp_dbh->last_result);
imp_dbh->last_result = NULL;
}
if (imp_sth->result) {
TRACE_PQCLEAR;
#if DEBUG_LAST_RESULT
fprintf(stderr, "CLEAR imp_sth->result of %ld at line %d of %s\n", (long int)imp_sth->result,__LINE__,__func__);
#endif
PQclear(imp_sth->result);
imp_sth->result = NULL;
}
imp_dbh->last_result = imp_sth->result = result;
imp_dbh->result_clearable = DBDPG_FALSE;
}
else if (!imp_sth && imp_dbh->async_sth) {
/* $dbh->pg_result() case - store in the async statement */
/* Free the last_result as needed */
if (imp_dbh->last_result && imp_dbh->result_clearable) {
TRACE_PQCLEAR;
Expand All @@ -5578,14 +5716,32 @@ long pg_db_result (SV *h, imp_dbh_t *imp_dbh)

imp_dbh->last_result = imp_dbh->async_sth->result = result;
imp_dbh->result_clearable = DBDPG_FALSE;

if (TRACE3_slow) {
TRC(DBILOGFP, "%sStored result in async_sth for $dbh->pg_result()\n", THEADER_slow);
}
}
else {
TRACE_PQCLEAR;
PQclear(result);
/* No statement handle - this is $dbh->do() or $dbh->pg_result() without a statement
Just store in last_result for error reporting and clear the result */
if (TRACE3_slow) {
TRC(DBILOGFP, "%sNo statement handle - storing result in last_result only\n", THEADER_slow);
}
if (imp_dbh->last_result && imp_dbh->result_clearable) {
TRACE_PQCLEAR;
PQclear(imp_dbh->last_result);
}
imp_dbh->last_result = result;
imp_dbh->result_clearable = DBDPG_TRUE;
}
}

if (NULL != imp_dbh->async_sth) {
/* Update row count and clear async status for the statement handle */
if (imp_sth && imp_sth == imp_dbh->async_sth) {
imp_sth->rows = rows;
imp_sth->async_status = 0;
}
else if (!imp_sth && imp_dbh->async_sth) {
imp_dbh->async_sth->rows = rows;
imp_dbh->async_sth->async_status = STH_NO_ASYNC;
}
Expand Down Expand Up @@ -5815,7 +5971,7 @@ static int handle_old_async(pTHX_ SV * handle, imp_dbh_t * imp_dbh, const int as
status = -1;
}

if (STH_ASYNC_PREPARE == async_sth->async_status
if (async_sth && STH_ASYNC_PREPARE == async_sth->async_status
&& -1 == status) {
Safefree(async_sth->prepare_name);
async_sth->prepare_name = NULL;
Expand All @@ -5834,12 +5990,83 @@ static int handle_old_async(pTHX_ SV * handle, imp_dbh_t * imp_dbh, const int as
/* Finish up the outstanding query and throw out the result, unless an error */
if (TRACE3_slow) { TRC(DBILOGFP, "%sWaiting for old async command to finish\n", THEADER_slow); }

/* PG_OLDQUERY_WAIT will auto-retrieve results instead of discarding */
if ((asyncflag & PG_OLDQUERY_WAIT) && imp_dbh->async_sth) {
if (TRACE3_slow) {
TRC(DBILOGFP, "%sPG_OLDQUERY_WAIT: Will auto-retrieve results for original statement\n", THEADER_slow);
}
/* Note: We'll mark as auto-retrieved (100) when we actually store the results below */
}

wait_for_result:
TRACE_PQGETRESULT;
while ((result = PQgetResult(imp_dbh->conn)) != NULL) {
status = _sqlstate(aTHX_ imp_dbh, result);
TRACE_PQCLEAR;
PQclear(result);

/* AUTO-RETRIEVE: Store results in the original statement instead of discarding */
if ((asyncflag & PG_OLDQUERY_WAIT) && imp_dbh->async_sth &&
(status == PGRES_TUPLES_OK || status == PGRES_COMMAND_OK)) {

imp_sth_t *orig_sth = imp_dbh->async_sth;

/* Free any old result */
if (orig_sth->result) {
TRACE_PQCLEAR;
PQclear(orig_sth->result);
}

/* Store this result for the original statement */
orig_sth->result = result;
orig_sth->rows = (status == PGRES_TUPLES_OK) ? PQntuples(result) : 0;

if (status == PGRES_TUPLES_OK) {
orig_sth->cur_tuple = 0;
DBIc_NUM_FIELDS(orig_sth) = PQnfields(result);
DBIc_ACTIVE_on(orig_sth);
}

/* Mark as auto-retrieved, not discarded */
orig_sth->async_status = STH_ASYNC_AUTORETRIEVED; /* Special value: auto-retrieved */

if (TRACE3_slow) {
TRC(DBILOGFP, "%sPG_OLDQUERY_WAIT: Auto-retrieved %ld rows for original statement\n",
THEADER_slow, orig_sth->rows);
}

/* Don't clear the result - we're keeping it */
result = NULL;
}
/* AUTO-RETRIEVE ERROR: Mark original statement as failed */
else if ((asyncflag & PG_OLDQUERY_WAIT) && imp_dbh->async_sth &&
status != PGRES_EMPTY_QUERY &&
status != PGRES_COMMAND_OK &&
status != PGRES_TUPLES_OK) {

imp_sth_t *orig_sth = imp_dbh->async_sth;

/* Store the error result for the original statement to discover */
if (orig_sth->result) {
TRACE_PQCLEAR;
PQclear(orig_sth->result);
}

/* Store error result - pg_result will handle the error */
orig_sth->result = result;
orig_sth->rows = -1; /* Indicate error */
orig_sth->async_status = STH_ASYNC_AUTOERROR; /* Mark as error */

if (TRACE3_slow) {
TRC(DBILOGFP, "%sPG_OLDQUERY_WAIT: Stored error result for original statement\n", THEADER_slow);
}

/* Don't clear the result - we're keeping it for error reporting */
result = NULL;
}
else {
/* Normal case - discard the result */
TRACE_PQCLEAR;
PQclear(result);
}
if (status == PGRES_COPY_IN) { /* In theory, this should be caught by copystate, but we'll be careful */
TRACE_PQPUTCOPYEND;
if (-1 == PQputCopyEnd(imp_dbh->conn, NULL)) {
Expand All @@ -5857,10 +6084,13 @@ static int handle_old_async(pTHX_ SV * handle, imp_dbh_t * imp_dbh, const int as
else if (status != PGRES_EMPTY_QUERY
&& status != PGRES_COMMAND_OK
&& status != PGRES_TUPLES_OK) {
TRACE_PQERRORMESSAGE;
pg_error(aTHX_ handle, status, PQerrorMessage(imp_dbh->conn));
if (TEND_slow) TRC(DBILOGFP, "%sEnd handle_old_async (error: bad status)\n", THEADER_slow);
return -2;
/* Only report error to current handle if not PG_OLDQUERY_WAIT */
if (!(asyncflag & PG_OLDQUERY_WAIT)) {
TRACE_PQERRORMESSAGE;
pg_error(aTHX_ handle, status, PQerrorMessage(imp_dbh->conn));
if (TEND_slow) TRC(DBILOGFP, "%sEnd handle_old_async (error: bad status)\n", THEADER_slow);
return -2;
}
}
}

Expand Down Expand Up @@ -5890,8 +6120,14 @@ static int handle_old_async(pTHX_ SV * handle, imp_dbh_t * imp_dbh, const int as

/* If we made it this far, safe to assume there is no running query */
imp_dbh->async_status = 0;
if (async_sth)
async_sth->async_status = STH_NO_ASYNC;
if (async_sth) {
if (async_sth->async_status != STH_ASYNC_AUTORETRIEVED && async_sth->async_status != STH_ASYNC_AUTOERROR) {
/* Don't clear if it's auto-retrieved or error */
async_sth->async_status = STH_NO_ASYNC;
}
/* Clear the async_sth pointer since query is complete */
imp_dbh->async_sth = NULL;
}

if (TEND_slow) TRC(DBILOGFP, "%sEnd handle_old_async\n", THEADER_slow);
return 0;
Expand Down
Loading