Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ public static void init() throws IOException {

Datastore mongoDatastore = DatastoreProvider.getDatastore("Mongo", config);
System.out.println(mongoDatastore.listCollections());

postgres =
new GenericContainer<>(DockerImageName.parse("postgres:13.1"))
.withEnv("POSTGRES_PASSWORD", "postgres")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -828,38 +828,61 @@ protected long countWithParser(
org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser queryParser) {
String subQuery = queryParser.parse();
String sqlQuery = String.format("SELECT COUNT(*) FROM (%s) p(countWithParser)", subQuery);
try {
PreparedStatement preparedStatement =
queryExecutor.buildPreparedStatement(
sqlQuery, queryParser.getParamsBuilder().build(), client.getConnection());
ResultSet resultSet = preparedStatement.executeQuery();
resultSet.next();
return resultSet.getLong(1);
try (Connection connection = client.getPooledConnection()) {
connection.setAutoCommit(true);
try (PreparedStatement preparedStatement =
queryExecutor.buildPreparedStatement(
sqlQuery, queryParser.getParamsBuilder().build(), connection);
ResultSet resultSet = preparedStatement.executeQuery()) {
resultSet.next();
long count = resultSet.getLong(1);
// Reset autoCommit before returning connection to pool
connection.setAutoCommit(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why this is necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Methods like PostgresCollection#update, we commit manually as:

if (documentOptional.isEmpty()) {
  connection.commit();
  return empty();
  }

Setting auto-commit for such queries can be risky (haven't validate what the exact behaviour is in this case).

So, we turn this to off whenever we return a connection back to the pool. So when update() gets a pooled connection, its auto-commit is set to false.

return count;
}
} catch (SQLException e) {
LOGGER.error(
"SQLException querying documents. Original query: {}, sql query: {}", query, sqlQuery, e);
throw new RuntimeException(e);
}
}

// This overload obtains a pooled connection and passes it to the iterator
// The iterator is responsible for returning the connection to the pool when closed
protected CloseableIterator<Document> queryWithParser(
org.hypertrace.core.documentstore.query.Query query,
org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser queryParser) {
Connection connection = null;
try {
ResultSet resultSet = queryExecutor.execute(client.getConnection(), queryParser);
connection = client.getPooledConnection();
connection.setAutoCommit(true);

ResultSet resultSet = queryExecutor.execute(connection, queryParser);

if (queryParser.getPgColTransformer().getDocumentType() == DocumentType.NESTED) {
return !query.getSelections().isEmpty()
? new PostgresResultIteratorWithMetaData(resultSet)
: new PostgresResultIterator(resultSet);
? new PostgresResultIteratorWithMetaData(resultSet, connection)
: new PostgresResultIterator(resultSet, connection);
} else {
return new PostgresResultIteratorWithBasicTypes(resultSet, DocumentType.FLAT);
return new PostgresResultIteratorWithBasicTypes(resultSet, connection, DocumentType.FLAT);
}
} catch (Exception e) {
if (connection != null) {
try {
// Reset autoCommit to pool default (false) before returning connection
connection.setAutoCommit(false);
connection.close();
LOGGER.debug("Returned connection to pool after exception: {}", connection);
} catch (SQLException ex) {
LOGGER.error("Failed to return connection to pool after exception", ex);
}
}
throw new UnsupportedOperationException(e);
}
}

// This overload accepts an externally-managed connection (e.g., for transactions)
// The connection is NOT passed to iterators since the caller manages its lifecycle
protected CloseableIterator<Document> queryWithParser(
Connection connection,
org.hypertrace.core.documentstore.query.Query query,
Expand Down Expand Up @@ -1292,6 +1315,12 @@ public PostgresResultIteratorWithBasicTypes(ResultSet resultSet, DocumentType do
super(resultSet, documentType);
}

// New constructors that accept connection
public PostgresResultIteratorWithBasicTypes(
ResultSet resultSet, Connection connection, DocumentType documentType) {
super(resultSet, connection, documentType);
}

@Override
public Document next() {
try {
Expand Down Expand Up @@ -1439,27 +1468,42 @@ static class PostgresResultIterator implements CloseableIterator<Document> {

protected final ObjectMapper MAPPER = new ObjectMapper();
protected ResultSet resultSet;
protected Connection connection; // Hold reference to connection for cleanup
protected boolean cursorMovedForward = false;
protected boolean hasNext = false;

private final boolean removeDocumentId;
protected DocumentType documentType;

public PostgresResultIterator(ResultSet resultSet) {
this(resultSet, true);
this(resultSet, null, true, DocumentType.NESTED);
}

PostgresResultIterator(ResultSet resultSet, boolean removeDocumentId) {
this(resultSet, removeDocumentId, DocumentType.NESTED);
this(resultSet, null, removeDocumentId, DocumentType.NESTED);
}

public PostgresResultIterator(ResultSet resultSet, DocumentType documentType) {
this(resultSet, true, documentType);
this(resultSet, null, true, documentType);
}

// New constructor that accepts connection
public PostgresResultIterator(ResultSet resultSet, Connection connection) {
this(resultSet, connection, true, DocumentType.NESTED);
}

public PostgresResultIterator(
ResultSet resultSet, Connection connection, DocumentType documentType) {
this(resultSet, connection, true, documentType);
}

PostgresResultIterator(
ResultSet resultSet, boolean removeDocumentId, DocumentType documentType) {
ResultSet resultSet,
Connection connection,
boolean removeDocumentId,
DocumentType documentType) {
this.resultSet = resultSet;
this.connection = connection;
this.removeDocumentId = removeDocumentId;
this.documentType = documentType;
}
Expand Down Expand Up @@ -1521,7 +1565,18 @@ protected void closeResultSet() {
resultSet.close();
}
} catch (SQLException ex) {
LOGGER.error("Unable to close connection", ex);
LOGGER.error("Unable to close resultSet", ex);
}
// Return pooled connection back to pool
if (connection != null) {
try {
// Reset autoCommit to pool default (false) before returning connection
connection.setAutoCommit(false);
connection.close(); // For pooled connections, close() returns them to the pool
LOGGER.debug("Returned connection to pool: {}", connection);
} catch (SQLException ex) {
LOGGER.error("Unable to close/return connection to pool", ex);
}
}
}

Expand All @@ -1538,11 +1593,15 @@ protected boolean shouldRemoveDocumentId() {
static class PostgresResultIteratorWithMetaData extends PostgresResultIterator {

public PostgresResultIteratorWithMetaData(ResultSet resultSet) {
super(resultSet, true);
super(resultSet, null, true, DocumentType.NESTED);
}

PostgresResultIteratorWithMetaData(ResultSet resultSet, boolean removeDocumentId) {
super(resultSet, removeDocumentId);
super(resultSet, null, removeDocumentId, DocumentType.NESTED);
}

public PostgresResultIteratorWithMetaData(ResultSet resultSet, Connection connection) {
super(resultSet, connection, true, DocumentType.NESTED);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class PostgresCollectionTest {

@Mock private PostgresClient mockClient;
@Mock private Connection mockConnection;
@Mock private Connection mockPooledConnection;
@Mock private PreparedStatement mockSelectPreparedStatement;
@Mock private PreparedStatement mockUpdatePreparedStatement;
@Mock private ResultSet mockResultSet;
Expand Down Expand Up @@ -496,7 +497,9 @@ void testUpdateBulkWithFilter() throws IOException, SQLException {
COLLECTION_NAME);

when(mockClient.getConnection()).thenReturn(mockConnection);
when(mockConnection.prepareStatement(selectQuery)).thenReturn(mockSelectPreparedStatement);
when(mockClient.getPooledConnection()).thenReturn(mockPooledConnection);
when(mockPooledConnection.prepareStatement(selectQuery))
.thenReturn(mockSelectPreparedStatement);
when(mockSelectPreparedStatement.executeQuery()).thenReturn(mockResultSet);
when(mockResultSet.next()).thenReturn(true).thenReturn(false);
when(mockResultSet.getMetaData()).thenReturn(mockResultSetMetaData);
Expand Down Expand Up @@ -551,8 +554,9 @@ void testUpdateBulkWithFilter() throws IOException, SQLException {
assertFalse(oldDocument.hasNext());

// Obtain 2 connections: One for update and one for selecting
verify(mockClient, times(2)).getConnection();
verify(mockConnection, times(1)).prepareStatement(selectQuery);
verify(mockClient, times(1)).getPooledConnection();
verify(mockClient, times(1)).getConnection();
verify(mockPooledConnection, times(1)).prepareStatement(selectQuery);
verify(mockSelectPreparedStatement, times(1)).setObject(1, "Soap");
verify(mockSelectPreparedStatement, times(1)).setObject(2, "2022-08-09T18:53:17Z");
verify(mockSelectPreparedStatement, times(1)).executeQuery();
Expand Down Expand Up @@ -666,7 +670,9 @@ void testUpdateBulkWithFilter_emptyResults() throws IOException, SQLException {
COLLECTION_NAME);

when(mockClient.getConnection()).thenReturn(mockConnection);
when(mockConnection.prepareStatement(selectQuery)).thenReturn(mockSelectPreparedStatement);
when(mockClient.getPooledConnection()).thenReturn(mockPooledConnection);
when(mockPooledConnection.prepareStatement(selectQuery))
.thenReturn(mockSelectPreparedStatement);
when(mockSelectPreparedStatement.executeQuery()).thenReturn(mockResultSet);
when(mockResultSet.next()).thenReturn(false);
when(mockResultSet.isClosed()).thenReturn(false, true);
Expand Down Expand Up @@ -713,9 +719,11 @@ void testUpdateBulkWithFilter_emptyResults() throws IOException, SQLException {

assertFalse(oldDocument.hasNext());

// Obtain 2 connections: One for update and one for selecting
verify(mockClient, times(2)).getConnection();
verify(mockConnection, times(1)).prepareStatement(selectQuery);
// Obtain 2 connections: One for update and one for selecting. SELECT obtains a pooled
// connection
verify(mockClient, times(1)).getPooledConnection();
verify(mockClient, times(1)).getConnection();
verify(mockPooledConnection, times(1)).prepareStatement(selectQuery);
verify(mockSelectPreparedStatement, times(1)).setObject(1, "Soap");
verify(mockSelectPreparedStatement, times(1)).setObject(2, "2022-08-09T18:53:17Z");
verify(mockSelectPreparedStatement, times(1)).executeQuery();
Expand Down Expand Up @@ -759,7 +767,7 @@ void testUpdateBulkWithFilter_throwsExceptionBeforeUpdate() throws IOException,
+ "document->'date' DESC NULLS LAST",
COLLECTION_NAME);

when(mockClient.getConnection()).thenReturn(mockConnection);
when(mockClient.getPooledConnection()).thenReturn(mockConnection);
when(mockConnection.prepareStatement(selectQuery)).thenReturn(mockSelectPreparedStatement);
when(mockSelectPreparedStatement.executeQuery()).thenThrow(SQLException.class);

Expand All @@ -769,7 +777,7 @@ void testUpdateBulkWithFilter_throwsExceptionBeforeUpdate() throws IOException,
postgresCollection.bulkUpdate(
query, updates, UpdateOptions.builder().returnDocumentType(BEFORE_UPDATE).build()));

verify(mockClient, times(1)).getConnection();
verify(mockClient, times(1)).getPooledConnection();
verify(mockConnection, times(1)).prepareStatement(selectQuery);
verify(mockSelectPreparedStatement, times(1)).setObject(1, "Soap");
verify(mockSelectPreparedStatement, times(1)).setObject(2, "2022-08-09T18:53:17Z");
Expand Down Expand Up @@ -799,7 +807,9 @@ void testUpdateBulkWithFilter_throwsExceptionAfterUpdate() throws IOException, S
COLLECTION_NAME);

when(mockClient.getConnection()).thenReturn(mockConnection);
when(mockConnection.prepareStatement(selectQuery)).thenReturn(mockSelectPreparedStatement);
when(mockClient.getPooledConnection()).thenReturn(mockPooledConnection);
when(mockPooledConnection.prepareStatement(selectQuery))
.thenReturn(mockSelectPreparedStatement);
when(mockSelectPreparedStatement.executeQuery()).thenThrow(SQLException.class);

final String updateQuery =
Expand Down Expand Up @@ -845,8 +855,9 @@ void testUpdateBulkWithFilter_throwsExceptionAfterUpdate() throws IOException, S
query, updates, UpdateOptions.builder().returnDocumentType(AFTER_UPDATE).build()));

// Obtain 2 connections: One for update and one for selecting
verify(mockClient, times(2)).getConnection();
verify(mockConnection, times(1)).prepareStatement(selectQuery);
verify(mockClient, times(1)).getPooledConnection();
verify(mockClient, times(1)).getConnection();
verify(mockPooledConnection, times(1)).prepareStatement(selectQuery);
verify(mockSelectPreparedStatement, times(1)).setObject(1, "Soap");
verify(mockSelectPreparedStatement, times(1)).setObject(2, "2022-08-09T18:53:17Z");
verify(mockSelectPreparedStatement, times(1)).executeQuery();
Expand Down
Loading