Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

import java.sql.Connection;
import java.sql.PreparedStatement;
Expand All @@ -24,6 +26,7 @@

@CustomLog
@Tag(TestTag.V2)
@Execution(ExecutionMode.SAME_THREAD)
class TransactionTest extends IntegrationTest {

@BeforeAll
Expand All @@ -41,6 +44,7 @@ void shouldNotStartTransactionIfAutoCommitIsTrue() throws SQLException {
try (Connection connection = createConnection();
Statement statement = connection.createStatement()) {
connection.setAutoCommit(true); //no-op, but ensures auto-commit is true
// In core this breaks Firebolt Core since a transaction is orphaned and Core cannot manage more than 1 transaction
statement.execute("BEGIN TRANSACTION;");
statement.execute("INSERT INTO transaction_test VALUES (2, 'test')");

Expand All @@ -58,6 +62,7 @@ void shouldNotStartTransactionIfAutoCommitIsTrue() throws SQLException {
}

@Test
@Tag(TestTag.CORE)
void shouldCommitTransactionWhenSwitchingToAutoCommit() throws SQLException {
try (Connection connection = createConnection();
Statement statement = connection.createStatement()) {
Expand All @@ -81,6 +86,7 @@ void shouldCommitTransactionWhenSwitchingToAutoCommit() throws SQLException {
}

@Test
@Tag(TestTag.CORE)
void shouldHandleSequentialTransactions() throws SQLException {
try (Connection connection = createConnection();
Statement statement = connection.createStatement()) {
Expand Down Expand Up @@ -109,6 +115,7 @@ void shouldHandleSequentialTransactions() throws SQLException {
}

@Test
@Tag(TestTag.CORE)
void shouldRollbackTransactionSuccessfully() throws SQLException {
try (Connection connection = createConnection();
Statement statement = connection.createStatement()) {
Expand All @@ -134,6 +141,7 @@ void shouldRollbackTransactionSuccessfully() throws SQLException {
}

@Test
@Tag(TestTag.CORE)
void shouldWorkWithPreparedStatements() throws SQLException {
try (Connection connection = createConnection();
PreparedStatement ps = connection.prepareStatement("INSERT INTO transaction_test VALUES (?, 'test')")) {
Expand All @@ -151,6 +159,7 @@ void shouldWorkWithPreparedStatements() throws SQLException {
}

@Test
@Tag(TestTag.CORE)
void shouldNotCommitTransactionWhenCommitWasManuallyExecuted() throws SQLException {
try (Connection connection = createConnection();
Statement statement = connection.createStatement()) {
Expand All @@ -174,7 +183,8 @@ void shouldNotCommitTransactionWhenCommitWasManuallyExecuted() throws SQLExcepti
}

@Test
void shouldNotCommitTransactionWhenConnectionClosesOnAutoCommitFalse() throws SQLException {
@Tag(TestTag.CORE)
void shouldRollbackTransactionWhenConnectionClosesOnAutoCommitFalse() throws SQLException {
try (Connection connection = createConnection();
Statement statement = connection.createStatement()) {

Expand All @@ -188,6 +198,7 @@ void shouldNotCommitTransactionWhenConnectionClosesOnAutoCommitFalse() throws SQ
}

@Test
@Tag(TestTag.CORE)
void shouldThrowExceptionWhenStartingTransactionDuringTransaction() throws SQLException {
try (Connection connection = createConnection();
Statement statement = connection.createStatement()) {
Expand All @@ -202,6 +213,7 @@ void shouldThrowExceptionWhenStartingTransactionDuringTransaction() throws SQLEx
}

@Test
@Tag(TestTag.CORE)
void shouldNotRollbackTransactionWhenRollbackWasManuallyExecuted() throws SQLException {
try (Connection connection = createConnection();
Statement statement = connection.createStatement()) {
Expand All @@ -225,6 +237,7 @@ void shouldNotRollbackTransactionWhenRollbackWasManuallyExecuted() throws SQLExc
}

@Test
@Tag(TestTag.CORE)
void shouldThrowExceptionWhenCommitingWithNoTransaction() throws SQLException {
try (Connection connection = createConnection();
Statement statement = connection.createStatement()) {
Expand All @@ -237,6 +250,7 @@ void shouldThrowExceptionWhenCommitingWithNoTransaction() throws SQLException {
}

@Test
@Tag(TestTag.CORE)
void shouldThrowExceptionWhenRollbackWithNoTransaction() throws SQLException {
try (Connection connection = createConnection();
Statement statement = connection.createStatement()) {
Expand All @@ -249,6 +263,7 @@ void shouldThrowExceptionWhenRollbackWithNoTransaction() throws SQLException {
}

@Test
@Tag(TestTag.CORE)
void shouldNotCommitOnSetStatement() throws SQLException {
try (Connection connection = createConnection();
Statement statement = connection.createStatement()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public Map<String, String> getQueryParams(FireboltProperties fireboltProperties,
addQueryParameterIfNeeded(params, statementInfoWrapper.getPreparedStatementParameters());
addQueryTimeoutIfNeeded(params, queryTimeout);
addServerAsyncIfNeeded(params, isServerAsync);
addTransactionIdIfNeeded(params, fireboltProperties.getTransactionId());
addTransactionSequenceIdIfNeeded(params, fireboltProperties.getTransactionSequenceId());

return params;
}
Expand Down
137 changes: 100 additions & 37 deletions src/main/java/com/firebolt/jdbc/connection/FireboltConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ public abstract class FireboltConnection extends JdbcBase implements Connection,
// Parameter parser is determined by the version we're running on
@Getter
public final ParserVersion parserVersion;
// Indicates if auto-commit is enabled
private boolean autoCommit = true;
// Indicates if the connection is currently in a transaction
private boolean inTransaction = false;
// Indicates if a transaction command (commit/rollback) is currently being executed
private boolean executingTransactionCommand = false;

protected FireboltConnection(@NonNull String url,
Properties connectionSettings,
Expand Down Expand Up @@ -220,16 +226,93 @@ private void addStatement(FireboltStatement statement) throws SQLException {
}
}

@Override
public boolean getAutoCommit() throws SQLException {
validateConnectionIsNotClose();
return true;
}

@Override
@ExcludeFromJacocoGeneratedReport
@NotImplemented
public void setAutoCommit(boolean autoCommit) throws SQLException {}
@Override
public boolean getAutoCommit() throws SQLException {
validateConnectionIsNotClose();
return autoCommit;
}

@Override
public void setAutoCommit(boolean autoCommit) throws SQLException {
validateConnectionIsNotClose();

if (autoCommit && inTransaction) {
commit();
}
this.autoCommit = autoCommit;
}

@Override
public int getTransactionIsolation() throws SQLException {
validateConnectionIsNotClose();
return Connection.TRANSACTION_REPEATABLE_READ;
}

@Override
public void setTransactionIsolation(int level) throws SQLException {
validateConnectionIsNotClose();
if (level != Connection.TRANSACTION_REPEATABLE_READ) {
throw new FireboltSQLFeatureNotSupportedException("Only TRANSACTION_REPEATABLE_READ isolation level is supported");
}
}

@Override
public void commit() throws SQLException {
executeTransactionCommand("COMMIT");
}

@Override
public void rollback() throws SQLException {
executeTransactionCommand("ROLLBACK");
}

/**
* Ensures a transaction is started if auto-commit is disabled and no transaction is active.
* Called automatically before query execution.
*
* @throws SQLException if there's an error starting the transaction
*/
public void ensureTransactionForQueryExecution() throws SQLException {
validateConnectionIsNotClose();
if (sessionProperties.getTransactionId() == null) {
inTransaction = false;
}

if (executingTransactionCommand || autoCommit) {
return;
}

if (!inTransaction) {
executingTransactionCommand = true;
try (Statement statement = createStatement()) {
statement.execute("BEGIN TRANSACTION");
inTransaction = true;
Comment on lines +275 to +289
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Outside of the scope of this PR, but is this thread-safe? Can there be a case where connection is shared across threads and one of them starts a transaction, but we check before the transaction id is updated and so set the inTransaction to false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good one, didn't think of it. To be safe I can move the rollback inside the synchronized block, but I don't think this should happen since only one thread should run the close and it should be after running all queries. Plus I don't think JDBC is assumed to be used with more than one thread and just one connection

} catch (SQLException ex) {
throw new FireboltException("Could not start transaction for query execution", ex);
} finally {
executingTransactionCommand = false;
}
}
}

private void executeTransactionCommand(String sql) throws SQLException {
validateConnectionIsNotClose();
if (autoCommit) {
throw new FireboltException(String.format("Cannot %s when auto-commit is enabled", sql.toLowerCase()));
}
if (!inTransaction) {
throw new FireboltException("No transaction is currently active");
}
executingTransactionCommand = true;
try (Statement statement = createStatement()) {
statement.execute(sql);
inTransaction = false;
} catch (SQLException ex) {
throw new FireboltException(String.format("Could not %s the transaction", sql.toLowerCase()), ex);
} finally {
executingTransactionCommand = false;
}
}

@Override
public boolean isClosed() {
Expand Down Expand Up @@ -266,19 +349,6 @@ public String getEngine() {
return getSessionProperties().getEngine();
}

@Override
public int getTransactionIsolation() throws SQLException {
validateConnectionIsNotClose();
return Connection.TRANSACTION_NONE;
}

@Override
public void setTransactionIsolation(int level) throws SQLException {
if (level != Connection.TRANSACTION_NONE) {
throw new FireboltSQLFeatureNotSupportedException();
}
}

@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
validateConnectionIsNotClose();
Expand Down Expand Up @@ -317,6 +387,13 @@ public void close() {
if (isClosed()) {
return;
} else {
if (inTransaction) {
try {
rollback();
} catch (SQLException e) {
log.error("Exception encountered while rolling back transaction on close");
}
}
closed = true;
}
}
Expand Down Expand Up @@ -525,20 +602,6 @@ public String getEndpoint() {
return httpConnectionUrl;
}

public void ensureTransactionForQueryExecution() throws SQLException {}

@Override
@NotImplemented
public void commit() throws SQLException {
// no-op as transactions are not supported
}

@Override
@NotImplemented
public void rollback() throws SQLException {
// no-op as transactions are not supported
}

@Override
public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
validateConnectionIsNotClose();
Expand Down
Loading
Loading