7
7
package org .hibernate .cache .infinispan .query ;
8
8
9
9
import javax .transaction .Transaction ;
10
+ import javax .transaction .TransactionManager ;
11
+ import javax .transaction .Status ;
12
+ import javax .transaction .Synchronization ;
10
13
14
+ import org .hibernate .HibernateException ;
11
15
import org .hibernate .cache .CacheException ;
12
16
import org .hibernate .cache .infinispan .impl .BaseTransactionalDataRegion ;
13
17
import org .hibernate .cache .infinispan .util .Caches ;
14
18
import org .hibernate .cache .spi .QueryResultsRegion ;
15
19
import org .hibernate .cache .spi .RegionFactory ;
16
20
import org .hibernate .engine .spi .SessionImplementor ;
21
+ import org .hibernate .jdbc .WorkExecutor ;
22
+ import org .hibernate .jdbc .WorkExecutorVisitable ;
23
+ import org .hibernate .resource .transaction .TransactionCoordinator ;
17
24
import org .infinispan .AdvancedCache ;
25
+ import org .infinispan .configuration .cache .TransactionConfiguration ;
18
26
import org .infinispan .context .Flag ;
27
+ import org .infinispan .transaction .TransactionMode ;
28
+ import org .infinispan .util .logging .Log ;
29
+ import org .infinispan .util .logging .LogFactory ;
30
+
31
+ import java .sql .Connection ;
32
+ import java .sql .SQLException ;
33
+ import java .util .HashMap ;
34
+ import java .util .Map ;
35
+ import java .util .concurrent .ConcurrentHashMap ;
36
+ import java .util .concurrent .ConcurrentMap ;
19
37
20
38
/**
21
39
* Region for caching query results.
25
43
* @since 3.5
26
44
*/
27
45
public class QueryResultsRegionImpl extends BaseTransactionalDataRegion implements QueryResultsRegion {
46
+ private static final Log log = LogFactory .getLog ( QueryResultsRegionImpl .class );
28
47
29
48
private final AdvancedCache evictCache ;
30
49
private final AdvancedCache putCache ;
31
50
private final AdvancedCache getCache ;
51
+ private final ConcurrentMap <SessionImplementor , Map > transactionContext = new ConcurrentHashMap <SessionImplementor , Map >();
52
+ private final boolean putCacheRequiresTransaction ;
32
53
33
54
/**
34
55
* Query region constructor
@@ -50,15 +71,28 @@ public QueryResultsRegionImpl(AdvancedCache cache, String name, RegionFactory fa
50
71
Caches .failSilentWriteCache ( cache );
51
72
52
73
this .getCache = Caches .failSilentReadCache ( cache );
74
+
75
+ TransactionConfiguration transactionConfiguration = putCache .getCacheConfiguration ().transaction ();
76
+ boolean transactional = transactionConfiguration .transactionMode () != TransactionMode .NON_TRANSACTIONAL ;
77
+ this .putCacheRequiresTransaction = transactional && !transactionConfiguration .autoCommit ();
78
+ // Since we execute the query update explicitly form transaction synchronization, the putCache does not need
79
+ // to be transactional anymore (it had to be in the past to prevent revealing uncommitted changes).
80
+ if (transactional ) {
81
+ log .warn ("Use non-transactional query caches for best performance!" );
82
+ }
53
83
}
54
84
55
85
@ Override
56
86
public void evict (Object key ) throws CacheException {
87
+ for (Map map : transactionContext .values ()) {
88
+ map .remove (key );
89
+ }
57
90
evictCache .remove ( key );
58
91
}
59
92
60
93
@ Override
61
94
public void evictAll () throws CacheException {
95
+ transactionContext .clear ();
62
96
final Transaction tx = suspend ();
63
97
try {
64
98
// Invalidate the local region and then go remote
@@ -89,18 +123,42 @@ public Object get(SessionImplementor session, Object key) throws CacheException
89
123
// to avoid holding locks that would prevent updates.
90
124
// Add a zero (or low) timeout option so we don't block
91
125
// waiting for tx's that did a put to commit
126
+ Object result ;
92
127
if ( skipCacheStore ) {
93
- return getCache .withFlags ( Flag .SKIP_CACHE_STORE ).get ( key );
128
+ result = getCache .withFlags ( Flag .SKIP_CACHE_STORE ).get ( key );
94
129
}
95
130
else {
96
- return getCache .get ( key );
131
+ result = getCache .get ( key );
132
+ }
133
+ if (result == null ) {
134
+ Map map = transactionContext .get (session );
135
+ if (map != null ) {
136
+ result = map .get (key );
137
+ }
97
138
}
139
+ return result ;
98
140
}
99
141
100
142
@ Override
101
143
@ SuppressWarnings ("unchecked" )
102
144
public void put (SessionImplementor session , Object key , Object value ) throws CacheException {
103
145
if ( checkValid () ) {
146
+ // See HHH-7898: Even with FAIL_SILENTLY flag, failure to write in transaction
147
+ // fails the whole transaction. It is an Infinispan quirk that cannot be fixed
148
+ // ISPN-5356 tracks that. This is because if the transaction continued the
149
+ // value could be committed on backup owners, including the failed operation,
150
+ // and the result would not be consistent.
151
+ TransactionCoordinator tc = session .getTransactionCoordinator ();
152
+ if (tc != null && tc .isJoined ()) {
153
+ tc .getLocalSynchronizations ().registerSynchronization (new PostTransactionQueryUpdate (tc , session , key , value ));
154
+ // no need to synchronize as the transaction will be accessed by only one thread
155
+ Map map = transactionContext .get (session );
156
+ if (map == null ) {
157
+ transactionContext .put (session , map = new HashMap ());
158
+ }
159
+ map .put (key , value );
160
+ return ;
161
+ }
104
162
// Here we don't want to suspend the tx. If we do:
105
163
// 1) We might be caching query results that reflect uncommitted
106
164
// changes. No tx == no WL on cache node, so other threads
@@ -120,4 +178,52 @@ public void put(SessionImplementor session, Object key, Object value) throws Cac
120
178
}
121
179
}
122
180
181
+ private class PostTransactionQueryUpdate implements Synchronization {
182
+ private final TransactionCoordinator tc ;
183
+ private final SessionImplementor session ;
184
+ private final Object key ;
185
+ private final Object value ;
186
+
187
+ public PostTransactionQueryUpdate (TransactionCoordinator tc , SessionImplementor session , Object key , Object value ) {
188
+ this .tc = tc ;
189
+ this .session = session ;
190
+ this .key = key ;
191
+ this .value = value ;
192
+ }
193
+
194
+ @ Override
195
+ public void beforeCompletion () {
196
+ }
197
+
198
+ @ Override
199
+ public void afterCompletion (int status ) {
200
+ transactionContext .remove (session );
201
+ switch (status ) {
202
+ case Status .STATUS_COMMITTING :
203
+ case Status .STATUS_COMMITTED :
204
+ try {
205
+ // TODO: isolation without obtaining Connection
206
+ tc .createIsolationDelegate ().delegateWork (new WorkExecutorVisitable <Void >() {
207
+ @ Override
208
+ public Void accept (WorkExecutor <Void > executor , Connection connection ) throws SQLException {
209
+ putCache .put (key , value );
210
+ return null ;
211
+ }
212
+ }
213
+ , putCacheRequiresTransaction );
214
+ }
215
+ catch (HibernateException e ) {
216
+ // silently fail any exceptions
217
+ if (log .isTraceEnabled ()) {
218
+ log .trace ("Exception during query cache update" , e );
219
+ }
220
+ }
221
+ break ;
222
+ default :
223
+ // it would be nicer to react only on ROLLING_BACK and ROLLED_BACK statuses
224
+ // but TransactionCoordinator gives us UNKNOWN on rollback
225
+ break ;
226
+ }
227
+ }
228
+ }
123
229
}
0 commit comments