Skip to content

Commit a95545c

Browse files
committed
Resolve transactional pooling in java backend
1 parent 3257a15 commit a95545c

File tree

9 files changed

+71
-108
lines changed

9 files changed

+71
-108
lines changed

application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/commandhandler/CommandController.java

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ public void processCommand(
3131
commandHandler.handleCommand(command);
3232
postgresTransactionalEventStore.commitTransaction();
3333
mongoTransactionalProjectionOperator.commitTransaction();
34+
35+
postgresTransactionalEventStore.abortDanglingTransactionsAndReturnConnectionToPool();
36+
mongoTransactionalProjectionOperator.abortDanglingTransactionsAndReturnSessionToPool();
3437
} catch (Exception e) {
3538
log.error("Failed to process reaction command.");
3639
log.error(e);
@@ -41,25 +44,8 @@ public void processCommand(
4144
.collect(Collectors.joining("\n"));
4245
log.error(stackTraceString);
4346

44-
try {
45-
if (postgresTransactionalEventStore.isTransactionActive()) {
46-
postgresTransactionalEventStore.abortTransaction();
47-
}
48-
} catch (Exception postgresException) {
49-
log.error("Failed to abort postgres transaction.");
50-
log.error(postgresException);
51-
log.error(postgresException.getMessage());
52-
}
53-
54-
try {
55-
if (mongoTransactionalProjectionOperator.isTransactionActive()) {
56-
mongoTransactionalProjectionOperator.abortTransaction();
57-
}
58-
} catch (Exception mongoException) {
59-
log.error("Failed to abort mongo transaction.");
60-
log.error(mongoException);
61-
log.error(mongoException.getMessage());
62-
}
47+
postgresTransactionalEventStore.abortDanglingTransactionsAndReturnConnectionToPool();
48+
mongoTransactionalProjectionOperator.abortDanglingTransactionsAndReturnSessionToPool();
6349

6450
throw new RuntimeException("Failed to process command with exception: " + e);
6551
}

application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/eventstore/PostgresTransactionalEventStore.java

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import cloud.ambar.common.event.CreationEvent;
55
import cloud.ambar.common.event.Event;
66
import cloud.ambar.common.event.TransformationEvent;
7-
import cloud.ambar.common.queryhandler.QueryController;
87
import cloud.ambar.common.serializedevent.Deserializer;
98
import cloud.ambar.common.serializedevent.SerializedEvent;
109
import cloud.ambar.common.serializedevent.Serializer;
@@ -18,7 +17,7 @@
1817
import java.util.Optional;
1918

2019
@RequiredArgsConstructor
21-
public class PostgresTransactionalEventStore implements AutoCloseable {
20+
public class PostgresTransactionalEventStore {
2221
private static final Logger log = LogManager.getLogger(PostgresTransactionalEventStore.class);
2322

2423
private final Connection connection;
@@ -108,32 +107,21 @@ public void commitTransaction() {
108107
}
109108
}
110109

111-
public void abortTransaction() {
112-
if (!isTransactionActive) {
113-
throw new RuntimeException("Transaction must be active to abort!");
114-
}
110+
public void abortDanglingTransactionsAndReturnConnectionToPool() {
111+
log.info("PostgresTransactionalEventStore: Aborting dangling transactions and returning connection to pool.");
115112
try {
116113
connection.rollback();
117114
isTransactionActive = false;
118115
} catch (SQLException e) {
119-
throw new RuntimeException("Failed to abort transaction", e);
116+
log.error("Failed to abort transaction", e);
120117
}
121-
}
122-
123-
public boolean isTransactionActive() {
124-
return isTransactionActive;
125-
}
126118

127-
// IMPLEMENTATION OF AutoCloseable INTERFACE - cleanly close dangling transactions
128-
// when the transaction event store gets garbage collected.
129-
// I.e., it will return the event store's connection back to the connection pool.
130-
// Note: There is need to close the connection, because that would mess with the library's connection pool.
131-
// The transactional event store is meant to be used in @RequestScope, so the connection will be cleaned up
132-
// by the library when the transactional event store and its connection are garbage collected.
133-
public void close() {
134-
if (isTransactionActive) {
135-
abortTransaction();
119+
try {
120+
connection.close();
121+
} catch (SQLException e) {
122+
log.error("Failed to close connection", e);
136123
}
124+
log.info("Aborted dangling transactions and returning connection to pool");
137125
}
138126

139127
private List<SerializedEvent> findAllSerializedEventsByAggregateId(String aggregateId) {

application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/MongoTransactionalProjectionOperator.java

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,16 @@
66
import com.mongodb.WriteConcern;
77
import com.mongodb.client.ClientSession;
88
import lombok.RequiredArgsConstructor;
9+
import org.apache.logging.log4j.LogManager;
10+
import org.apache.logging.log4j.Logger;
911
import org.springframework.data.mongodb.core.MongoTemplate;
1012

13+
import java.sql.SQLException;
14+
1115
@RequiredArgsConstructor
12-
public class MongoTransactionalProjectionOperator implements AutoCloseable {
16+
public class MongoTransactionalProjectionOperator {
17+
private static final Logger log = LogManager.getLogger(MongoTransactionalProjectionOperator.class);
18+
1319
private final MongoTemplate mongoTemplate;
1420

1521
private final ClientSession session;
@@ -41,26 +47,11 @@ public void commitTransaction() {
4147
session.commitTransaction();
4248
}
4349

44-
public boolean isTransactionActive() {
45-
return session.hasActiveTransaction();
46-
}
47-
48-
public void abortTransaction() {
49-
if (!session.hasActiveTransaction()) {
50-
throw new RuntimeException("Transaction must be active to abort transaction for MongoDB!");
51-
}
52-
session.abortTransaction();
53-
}
54-
55-
// IMPLEMENTATION OF AutoCloseable INTERFACE - cleanly close dangling transactions
56-
// when the transactional projection operator gets garbage collected.
57-
// I.e., it will return the projection operator's session back to the connection pool.
58-
// Note: There is no need to close the session, because that would mess with the library's session pool.
59-
// The transactional projection operator is meant to be used in @RequestScope, so the session will be cleaned up
60-
// by the library when the transactional projection operator and its session are garbage collected.
61-
public void close() {
50+
public void abortDanglingTransactionsAndReturnSessionToPool() {
51+
log.info("MongoTransactionalProjectionOperator: Aborting dangling transactions and returning connection to pool.");
6252
if (session.hasActiveTransaction()) {
6353
session.abortTransaction();
6454
}
55+
session.close();
6556
}
6657
}

application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/projection/ProjectionController.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,16 @@ protected String processProjectionHttpRequest(
5555
projectionHandler.project(event);
5656

5757
mongoTransactionalProjectionOperator.commitTransaction();
58+
mongoTransactionalProjectionOperator.abortDanglingTransactionsAndReturnSessionToPool();
59+
5860
return AmbarResponseFactory.successResponse();
5961
} catch (Exception e) {
6062
if (e.getMessage() != null && e.getMessage().startsWith("Unknown event type")) {
6163
log.warn("Unknown event type. Skipping projection.");
6264
log.warn(e);
65+
66+
mongoTransactionalProjectionOperator.abortDanglingTransactionsAndReturnSessionToPool();
67+
6368
return AmbarResponseFactory.successResponse();
6469
}
6570

@@ -70,9 +75,9 @@ protected String processProjectionHttpRequest(
7075
.map(StackTraceElement::toString)
7176
.collect(Collectors.joining("\n"));
7277
log.error(stackTraceString);
73-
if (mongoTransactionalProjectionOperator.isTransactionActive()) {
74-
mongoTransactionalProjectionOperator.abortTransaction();
75-
}
78+
79+
mongoTransactionalProjectionOperator.abortDanglingTransactionsAndReturnSessionToPool();
80+
7681
return AmbarResponseFactory.retryResponse(e);
7782
}
7883
}

application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/queryhandler/QueryController.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public Object processQuery(final Query query, final QueryHandler queryHandler) {
2020
mongoTransactionalProjectionOperator.startTransaction();
2121
Object object = queryHandler.handleQuery(query);
2222
mongoTransactionalProjectionOperator.commitTransaction();
23+
mongoTransactionalProjectionOperator.abortDanglingTransactionsAndReturnSessionToPool();
2324

2425
return object;
2526
} catch (Exception e) {
@@ -32,15 +33,7 @@ public Object processQuery(final Query query, final QueryHandler queryHandler) {
3233
.collect(Collectors.joining("\n"));
3334
log.error(stackTraceString);
3435

35-
try {
36-
if (mongoTransactionalProjectionOperator.isTransactionActive()) {
37-
mongoTransactionalProjectionOperator.abortTransaction();
38-
}
39-
} catch (Exception mongoException) {
40-
log.error("Failed to abort mongo transaction.");
41-
log.error(mongoException);
42-
log.error(mongoException.getMessage());
43-
}
36+
mongoTransactionalProjectionOperator.abortDanglingTransactionsAndReturnSessionToPool();
4437

4538
throw new RuntimeException("Failed to process query with exception: " + e);
4639
}

application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/reaction/ReactionController.java

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,16 @@ public String processReactionHttpRequest(final AmbarHttpRequest ambarHttpRequest
3333
postgresTransactionalEventStore.commitTransaction();
3434
mongoTransactionalProjectionOperator.commitTransaction();
3535

36+
postgresTransactionalEventStore.abortDanglingTransactionsAndReturnConnectionToPool();
37+
mongoTransactionalProjectionOperator.abortDanglingTransactionsAndReturnSessionToPool();
38+
3639
return AmbarResponseFactory.successResponse();
3740
} catch (Exception e) {
3841
if (e.getMessage() != null && e.getMessage().startsWith("Unknown event type")) {
3942
log.warn("Unknown event type. Skipping reaction.");
4043
log.warn(e);
44+
postgresTransactionalEventStore.abortDanglingTransactionsAndReturnConnectionToPool();
45+
mongoTransactionalProjectionOperator.abortDanglingTransactionsAndReturnSessionToPool();
4146
return AmbarResponseFactory.successResponse();
4247
}
4348

@@ -49,25 +54,8 @@ public String processReactionHttpRequest(final AmbarHttpRequest ambarHttpRequest
4954
.collect(Collectors.joining("\n"));
5055
log.error(stackTraceString);
5156

52-
try {
53-
if (postgresTransactionalEventStore.isTransactionActive()) {
54-
postgresTransactionalEventStore.abortTransaction();
55-
}
56-
} catch (Exception postgresException) {
57-
log.error("Failed to abort postgres transaction.");
58-
log.error(postgresException);
59-
log.error(postgresException.getMessage());
60-
}
61-
62-
try {
63-
if (mongoTransactionalProjectionOperator.isTransactionActive()) {
64-
mongoTransactionalProjectionOperator.abortTransaction();
65-
}
66-
} catch (Exception mongoException) {
67-
log.error("Failed to abort mongo transaction.");
68-
log.error(mongoException);
69-
log.error(mongoException.getMessage());
70-
}
57+
postgresTransactionalEventStore.abortDanglingTransactionsAndReturnConnectionToPool();
58+
mongoTransactionalProjectionOperator.abortDanglingTransactionsAndReturnSessionToPool();
7159

7260
return AmbarResponseFactory.retryResponse(e);
7361
}

application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/util/MongoConfig.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,13 @@
66
import com.mongodb.client.ClientSession;
77
import com.mongodb.client.MongoClient;
88
import com.mongodb.client.MongoClients;
9+
import org.apache.logging.log4j.LogManager;
10+
import org.apache.logging.log4j.Logger;
911
import org.springframework.beans.factory.annotation.Qualifier;
1012
import org.springframework.beans.factory.annotation.Value;
1113
import org.springframework.context.annotation.Bean;
1214
import org.springframework.context.annotation.Configuration;
15+
import org.springframework.context.annotation.Lazy;
1316
import org.springframework.data.mongodb.core.MongoTemplate;
1417
import org.springframework.data.mongodb.core.convert.DefaultMongoTypeMapper;
1518
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
@@ -25,6 +28,8 @@ public class MongoConfig {
2528
@Value("${app.mongodb.database}")
2629
private String mongoDatabaseName;
2730

31+
private static final Logger log = LogManager.getLogger(MongoConfig.class);
32+
2833
@Bean("MongoClientForTransactionalSupport")
2934
public MongoClient mongoClientForTransactionalSupport() {
3035
ConnectionString connectionString = new ConnectionString(mongodbUri);
@@ -47,23 +52,23 @@ public MongoClient mongoClientForNonTransactionalOperations() {
4752
ConnectionString connectionString = new ConnectionString(mongodbUri);
4853
MongoClientSettings settings = MongoClientSettings.builder()
4954
.applyConnectionString(connectionString)
50-
.applyToConnectionPoolSettings(builder ->
51-
builder.maxSize(20)
52-
.minSize(5)
53-
.maxWaitTime(2000, TimeUnit.MILLISECONDS)
54-
.maxConnectionLifeTime(30, TimeUnit.MINUTES)
55-
.maxConnectionIdleTime(10, TimeUnit.MINUTES)
56-
)
5755
.build();
5856

5957
return MongoClients.create(settings);
6058
}
6159

60+
// It's extremely important to lazily initialize this bean. Why?
61+
// Because the session must be closed each time, so anyone who asks for this bean must either close
62+
// it explicitly or rely on something else closing it explicitly (such as the controller).
63+
// Why must it be closed? Because we might run out of slots in the pool.
64+
// If we didn't initalize it lazily, requests that don't need this bean would still create a session.
6265
@Bean
66+
@Lazy
6367
@RequestScope
64-
public MongoTransactionalProjectionOperator mongoTransactionalAPI(
68+
public MongoTransactionalProjectionOperator mongoTransactionalProjectionOperator(
6569
@Qualifier("MongoClientForNonTransactionalOperations") MongoClient mongoClient
6670
) {
71+
log.info("MongoClientForNonTransactionalOperations: Creating new session.");
6772
ClientSession session = mongoClient.startSession();
6873
MongoTemplate mongoTemplate = new MongoTemplate(mongoClient, mongoDatabaseName).withSession(session);
6974

@@ -75,7 +80,7 @@ public MongoTransactionalProjectionOperator mongoTransactionalAPI(
7580
}
7681

7782
@Bean
78-
public MongoInitializerApi mongoInitalizerApi(
83+
public MongoInitializerApi mongoInitializerApi(
7984
@Qualifier("MongoClientForNonTransactionalOperations") MongoClient mongoClient
8085
) {
8186
MongoTemplate mongoTemplate = new MongoTemplate(mongoClient, mongoDatabaseName);

application/backend-credit-card-enrollment/backend-java/src/main/java/cloud/ambar/common/util/PostgresConfig.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@
33
import cloud.ambar.common.eventstore.PostgresTransactionalEventStore;
44
import cloud.ambar.common.serializedevent.Deserializer;
55
import cloud.ambar.common.serializedevent.Serializer;
6+
import org.apache.logging.log4j.LogManager;
7+
import org.apache.logging.log4j.Logger;
68
import org.springframework.beans.factory.annotation.Qualifier;
79
import org.springframework.beans.factory.annotation.Value;
810
import org.springframework.context.annotation.Bean;
911
import org.springframework.context.annotation.Configuration;
12+
import org.springframework.context.annotation.Lazy;
1013
import org.springframework.web.context.annotation.RequestScope;
1114

1215
import javax.sql.DataSource;
@@ -35,6 +38,8 @@ public class PostgresConfig {
3538
@Value("${app.postgresql.eventStoreCreateReplicationPublication}")
3639
private String postgresReplicationPublicationName;
3740

41+
private static final Logger log = LogManager.getLogger(PostgresConfig.class);
42+
3843
@Bean
3944
@Qualifier("DataSourceForTransactionalSupport")
4045
public DataSource dataSourceForTransactionalSupport() {
@@ -44,8 +49,8 @@ public DataSource dataSourceForTransactionalSupport() {
4449

4550
config.setMaximumPoolSize(10);
4651
config.setMinimumIdle(5);
47-
config.setIdleTimeout(300000); // 5 minutes
48-
config.setConnectionTimeout(20000); // 20 seconds
52+
config.setIdleTimeout(300000);
53+
config.setConnectionTimeout(20000);
4954

5055
return new HikariDataSource(config);
5156
}
@@ -56,22 +61,24 @@ public DataSource dataSourceNonTransactionalOperations() {
5661
HikariConfig config = new HikariConfig();
5762
config.setJdbcUrl(postgresUri);
5863

59-
config.setMaximumPoolSize(10);
60-
config.setMinimumIdle(5);
61-
config.setIdleTimeout(300000);
62-
config.setConnectionTimeout(20000);
63-
6464
return new HikariDataSource(config);
6565
}
6666

67+
// It's extremely important to lazily initialize this bean. Why?
68+
// Because the connection must be closed each time, so anyone who asks for this bean must either close
69+
// it explicitly or rely on something else closing it explicitly (such as the controller).
70+
// Why must it be closed? Because we might run out of slots in the pool.
71+
// If we didn't initalize it lazily, requests that don't need this bean would still create a connection.
6772
@Bean
73+
@Lazy
6874
@RequestScope
69-
public PostgresTransactionalEventStore postgresTransactionalAPI(
75+
public PostgresTransactionalEventStore postgresTransactionalEventStore(
7076
@Qualifier("DataSourceForTransactionalSupport") DataSource dataSource,
7177
Serializer serializer,
7278
Deserializer deserializer
7379
) {
7480
try {
81+
log.info("PostgresTransactionalEventStore: Creating new connection.");
7582
Connection connection = dataSource.getConnection();
7683
return new PostgresTransactionalEventStore(connection, serializer, deserializer, postgresTable);
7784
} catch (SQLException e) {

local-development/docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ services:
191191
ipv4_address: 172.30.0.108
192192

193193
ambar-emulator:
194-
image: ambarltd/emulator:v1.5
194+
image: ambarltd/emulator:v1.6
195195
container_name: ambar-emulator
196196
restart: always
197197
healthcheck:

0 commit comments

Comments
 (0)