Skip to content

Commit 87b9e11

Browse files
authored
Fix closing connection and synchronizations (#3661)
1 parent b4fffa6 commit 87b9e11

File tree

13 files changed

+796
-7
lines changed

13 files changed

+796
-7
lines changed

.github/workflows/gradle.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ jobs:
3838
run: |
3939
sudo rm -rf "/usr/local/share/boost"
4040
sudo rm -rf "$AGENT_TOOLSDIRECTORY"
41+
sudo rm -rf "/opt/ghc"
42+
sudo rm -rf "/usr/share/dotnet"
43+
sudo rm -rf "/usr/local/lib/android"
4144
sudo apt-get clean
4245
df -h
4346
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package io.micronaut.data.connection.reactive
2+
3+
import io.micronaut.data.connection.ConnectionDefinition
4+
import io.micronaut.data.connection.ConnectionSynchronization
5+
import org.reactivestreams.Publisher
6+
import reactor.core.publisher.Mono
7+
import spock.lang.Specification
8+
9+
import java.util.concurrent.atomic.AtomicInteger
10+
11+
class ReactiveConnectionSynchronizationSpec extends Specification {
12+
13+
static final class Tracker {
14+
final AtomicInteger executionComplete = new AtomicInteger()
15+
final AtomicInteger beforeClosed = new AtomicInteger()
16+
final AtomicInteger afterClosed = new AtomicInteger()
17+
}
18+
19+
def "reactive: all registered synchronizations execute even if one throws"() {
20+
given:
21+
def tracker = new Tracker()
22+
// new reactive connection status with a dummy connection
23+
def status = new DefaultReactiveConnectionStatus<>(new Object(), ConnectionDefinition.DEFAULT, true)
24+
25+
and: "a normal synchronization that tracks all callbacks"
26+
status.registerSynchronization(new ConnectionSynchronization() {
27+
@Override
28+
void executionComplete() {
29+
tracker.executionComplete.incrementAndGet()
30+
}
31+
32+
@Override
33+
void beforeClosed() {
34+
tracker.beforeClosed.incrementAndGet()
35+
}
36+
37+
@Override
38+
void afterClosed() {
39+
tracker.afterClosed.incrementAndGet()
40+
}
41+
})
42+
43+
and: "a synchronization that throws during executionComplete"
44+
status.registerReactiveSynchronization(new ReactiveConnectionSynchronization() {
45+
@Override
46+
Publisher<Void> onComplete() {
47+
return Mono.defer {
48+
throw new RuntimeException("simulated onComplete failure")
49+
}
50+
}
51+
52+
@Override
53+
Publisher<Void> onClose() {
54+
return Mono.empty()
55+
}
56+
57+
@Override
58+
Publisher<Void> afterClose() {
59+
return Mono.empty()
60+
}
61+
})
62+
63+
and: "a close supplier that succeeds"
64+
def closeSupplier = { Mono.empty() } as java.util.function.Supplier<Publisher<Void>>
65+
66+
when: "we signal completion which triggers executionComplete and finally chain"
67+
try {
68+
Mono.from(status.onComplete(closeSupplier)).block()
69+
} catch (Throwable ignored) {
70+
// expected: we only care callbacks were invoked despite failure
71+
}
72+
73+
then: "all normal synchronization callbacks should have been invoked at least once"
74+
tracker.executionComplete.get() >= 1
75+
tracker.beforeClosed.get() >= 1
76+
tracker.afterClosed.get() >= 1
77+
}
78+
}

data-connection/src/main/java/io/micronaut/data/connection/reactive/DefaultReactiveConnectionStatus.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,12 +112,29 @@ private Publisher<Void> forEachSynchronizations(Function<ReactiveConnectionSynch
112112
return Mono.empty();
113113
}
114114
Mono<Void> chain = Mono.empty();
115+
List<Throwable> exceptions = new ArrayList<>(connectionSynchronizations.size());
115116
ListIterator<ReactiveConnectionSynchronization> listIterator = connectionSynchronizations.listIterator(connectionSynchronizations.size());
116117
while (listIterator.hasPrevious()) {
117-
Publisher<Void> next = consumer.apply(listIterator.previous());
118-
if (next != EMPTY) {
119-
chain = chain.then(Mono.from(next));
118+
try {
119+
Publisher<Void> next = consumer.apply(listIterator.previous());
120+
Mono<Void> wrapped = next == EMPTY ? Mono.empty() : Mono.from(next).onErrorResume(e -> {
121+
exceptions.add(e);
122+
return Mono.empty();
123+
});
124+
chain = chain.then(wrapped);
125+
} catch (Exception e) {
126+
exceptions.add(e);
127+
}
128+
}
129+
if (!exceptions.isEmpty()) {
130+
if (exceptions.size() == 1) {
131+
return chain.then(Mono.error(exceptions.get(0)));
132+
}
133+
IllegalStateException e = new IllegalStateException("Error executing connection synchronizations", exceptions.get(0));
134+
for (int i = 1; i < exceptions.size(); i++) {
135+
e.addSuppressed(exceptions.get(i));
120136
}
137+
return chain.then(Mono.error(e));
121138
}
122139
return chain;
123140
}

data-connection/src/main/java/io/micronaut/data/connection/support/AbstractConnectionOperations.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -207,10 +207,13 @@ public void complete(@NonNull ConnectionStatus<C> status) {
207207
try {
208208
connectionStatus.beforeClosed();
209209
} finally {
210-
if (connectionStatus.isNew()) {
211-
closeConnection(status);
210+
try {
211+
if (connectionStatus.isNew()) {
212+
closeConnection(status);
213+
}
214+
} finally {
215+
connectionStatus.afterClosed();
212216
}
213-
connectionStatus.afterClosed();
214217
}
215218
}
216219
}

data-connection/src/main/java/io/micronaut/data/connection/support/DefaultConnectionStatus.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,13 +74,34 @@ public void registerSynchronization(ConnectionSynchronization synchronization) {
7474

7575
private void forEachSynchronizations(Consumer<ConnectionSynchronization> consumer) {
7676
if (connectionSynchronizations != null) {
77+
List<Exception> exceptions = new ArrayList<>(connectionSynchronizations.size());
7778
ListIterator<ConnectionSynchronization> listIterator = connectionSynchronizations.listIterator(connectionSynchronizations.size());
7879
while (listIterator.hasPrevious()) {
79-
consumer.accept(listIterator.previous());
80+
try {
81+
consumer.accept(listIterator.previous());
82+
} catch (Exception e) {
83+
exceptions.add(e);
84+
}
85+
}
86+
if (!exceptions.isEmpty()) {
87+
if (exceptions.size() == 1) {
88+
sneakyThrow(exceptions.get(0));
89+
} else {
90+
IllegalStateException e = new IllegalStateException("Error executing connection synchronizations", exceptions.get(0));
91+
for (int i = 1; i < exceptions.size(); i++) {
92+
e.addSuppressed(exceptions.get(i));
93+
}
94+
throw e;
95+
}
8096
}
8197
}
8298
}
8399

100+
@SuppressWarnings("unchecked")
101+
private static <T extends Throwable, R> R sneakyThrow(Throwable t) throws T {
102+
throw (T) t;
103+
}
104+
84105
public void complete() {
85106
forEachSynchronizations(ConnectionSynchronization::executionComplete);
86107
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package io.micronaut.transaction.jdbc
2+
3+
import io.micronaut.test.extensions.spock.annotation.MicronautTest
4+
import io.micronaut.transaction.SynchronousTransactionManager
5+
import io.micronaut.transaction.TransactionDefinition
6+
import io.micronaut.transaction.exceptions.TransactionSystemException
7+
import io.micronaut.transaction.jdbc.mock.MockConnection
8+
import io.micronaut.transaction.jdbc.mock.MockDataSource
9+
import io.micronaut.transaction.jdbc.mock.TestSyncTracker
10+
import io.micronaut.transaction.jdbc.mock.SyncTrackerSynchronization
11+
import io.micronaut.transaction.jdbc.mock.ThrowingExecutionCompleteSynchronization
12+
import jakarta.inject.Inject
13+
14+
import java.sql.Connection
15+
import java.sql.SQLException
16+
17+
@MicronautTest(transactional = false, environments = "broken-conn")
18+
class BrokenConnectionPropagationSpec extends spock.lang.Specification {
19+
20+
@Inject
21+
SynchronousTransactionManager<Connection> txManager
22+
23+
@Inject
24+
MockDataSource mockDataSource
25+
26+
@Inject
27+
TestSyncTracker tracker
28+
29+
def "reproducer: closed connection remains cached in PropagatedContext after rollback/reset throws"() {
30+
given: "a new transaction is opened using the synchronous transaction API (context-bound connection)"
31+
tracker.reset()
32+
def status = txManager.getTransaction(TransactionDefinition.DEFAULT)
33+
def firstConn = status.getConnection() as MockConnection
34+
int firstId = firstConn.id()
35+
36+
and: "register connection synchronizations including one that throws during executionComplete"
37+
status.getConnectionStatus().registerSynchronization(new SyncTrackerSynchronization(tracker, 0))
38+
status.getConnectionStatus().registerSynchronization(new ThrowingExecutionCompleteSynchronization(1))
39+
40+
when: "the underlying connection becomes broken/closed before rollback (e.g. socket timeout)"
41+
mockDataSource.getLastConnection().breakAndClose()
42+
txManager.rollback(status)
43+
44+
then: "rollback fails (simulated driver throws due to closed connection or sync failure)"
45+
def ex = thrown(Throwable)
46+
assert ex instanceof io.micronaut.data.connection.exceptions.ConnectionException || ex instanceof IllegalStateException
47+
and: "all connection synchronizations executed even when one throws"
48+
assert tracker.executionComplete.get() >= 1
49+
assert tracker.beforeClosed.get() >= 1
50+
assert tracker.afterClosed.get() >= 1
51+
52+
when: "on the same thread, a subsequent transaction is started"
53+
def status2 = txManager.getTransaction(TransactionDefinition.DEFAULT)
54+
def newConn = status2.getConnection() as MockConnection
55+
int secondId = newConn.id()
56+
// any simple operation should now succeed on a fresh, open connection
57+
newConn.getAutoCommit()
58+
59+
then: "a fresh connection is obtained and is not the previously closed one"
60+
secondId != firstId
61+
and: "a new connection was created"
62+
mockDataSource.getCreatedCount() == 2
63+
64+
when: "the new transaction is committed to complete the cleanup path"
65+
txManager.commit(status2)
66+
67+
then:
68+
noExceptionThrown()
69+
}
70+
}

0 commit comments

Comments
 (0)