Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -842,24 +842,41 @@ protected long countWithParser(
}
}

// This overload obtains a pooled connection and passes it to the iterator
// The iterator is responsible for returning the connection to the pool when closed
protected CloseableIterator<Document> queryWithParser(
org.hypertrace.core.documentstore.query.Query query,
org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser queryParser) {
Connection connection = null;
try {
ResultSet resultSet = queryExecutor.execute(client.getConnection(), queryParser);
connection = client.getPooledConnection();
connection.setAutoCommit(true);

ResultSet resultSet = queryExecutor.execute(connection, queryParser);

if (queryParser.getPgColTransformer().getDocumentType() == DocumentType.NESTED) {
return !query.getSelections().isEmpty()
? new PostgresResultIteratorWithMetaData(resultSet)
: new PostgresResultIterator(resultSet);
? new PostgresResultIteratorWithMetaData(resultSet, connection)
: new PostgresResultIterator(resultSet, connection);
} else {
return new PostgresResultIteratorWithBasicTypes(resultSet, DocumentType.FLAT);
return new PostgresResultIteratorWithBasicTypes(resultSet, connection, DocumentType.FLAT);
}
} catch (Exception e) {
// If exception occurs before iterator is created, clean up connection immediately
if (connection != null) {
try {
connection.close();
LOGGER.debug("Returned connection to pool after exception: {}", connection);
} catch (SQLException ex) {
LOGGER.error("Failed to return connection to pool after exception", ex);
}
}
throw new UnsupportedOperationException(e);
}
}

// This overload accepts an externally-managed connection (e.g., for transactions)
// The connection is NOT passed to iterators since the caller manages its lifecycle
protected CloseableIterator<Document> queryWithParser(
Connection connection,
org.hypertrace.core.documentstore.query.Query query,
Expand Down Expand Up @@ -1292,6 +1309,12 @@ public PostgresResultIteratorWithBasicTypes(ResultSet resultSet, DocumentType do
super(resultSet, documentType);
}

// New constructors that accept connection
public PostgresResultIteratorWithBasicTypes(
ResultSet resultSet, Connection connection, DocumentType documentType) {
super(resultSet, connection, documentType);
}

@Override
public Document next() {
try {
Expand Down Expand Up @@ -1439,27 +1462,42 @@ static class PostgresResultIterator implements CloseableIterator<Document> {

protected final ObjectMapper MAPPER = new ObjectMapper();
protected ResultSet resultSet;
protected Connection connection; // Hold reference to connection for cleanup
protected boolean cursorMovedForward = false;
protected boolean hasNext = false;

private final boolean removeDocumentId;
protected DocumentType documentType;

public PostgresResultIterator(ResultSet resultSet) {
this(resultSet, true);
this(resultSet, null, true, DocumentType.NESTED);
}

PostgresResultIterator(ResultSet resultSet, boolean removeDocumentId) {
this(resultSet, removeDocumentId, DocumentType.NESTED);
this(resultSet, null, removeDocumentId, DocumentType.NESTED);
}

public PostgresResultIterator(ResultSet resultSet, DocumentType documentType) {
this(resultSet, true, documentType);
this(resultSet, null, true, documentType);
}

// New constructor that accepts connection
public PostgresResultIterator(ResultSet resultSet, Connection connection) {
this(resultSet, connection, true, DocumentType.NESTED);
}

public PostgresResultIterator(
ResultSet resultSet, Connection connection, DocumentType documentType) {
this(resultSet, connection, true, documentType);
}

PostgresResultIterator(
ResultSet resultSet, boolean removeDocumentId, DocumentType documentType) {
ResultSet resultSet,
Connection connection,
boolean removeDocumentId,
DocumentType documentType) {
this.resultSet = resultSet;
this.connection = connection;
this.removeDocumentId = removeDocumentId;
this.documentType = documentType;
}
Expand Down Expand Up @@ -1521,7 +1559,16 @@ protected void closeResultSet() {
resultSet.close();
}
} catch (SQLException ex) {
LOGGER.error("Unable to close connection", ex);
LOGGER.error("Unable to close resultSet", ex);
}
// Return pooled connection back to pool
if (connection != null) {
try {
connection.close(); // For pooled connections, close() returns them to the pool
LOGGER.debug("Returned connection to pool: {}", connection);
} catch (SQLException ex) {
LOGGER.error("Unable to close/return connection to pool", ex);
}
}
}

Expand All @@ -1538,11 +1585,15 @@ protected boolean shouldRemoveDocumentId() {
static class PostgresResultIteratorWithMetaData extends PostgresResultIterator {

public PostgresResultIteratorWithMetaData(ResultSet resultSet) {
super(resultSet, true);
super(resultSet, null, true, DocumentType.NESTED);
}

PostgresResultIteratorWithMetaData(ResultSet resultSet, boolean removeDocumentId) {
super(resultSet, removeDocumentId);
super(resultSet, null, removeDocumentId, DocumentType.NESTED);
}

public PostgresResultIteratorWithMetaData(ResultSet resultSet, Connection connection) {
super(resultSet, connection, true, DocumentType.NESTED);
}

@Override
Expand Down
Loading