6
6
*/
7
7
package org .hibernate .cache .infinispan .access ;
8
8
9
- import javax .transaction .RollbackException ;
10
9
import javax .transaction .Status ;
11
10
import javax .transaction .SystemException ;
12
11
import javax .transaction .Transaction ;
24
23
import java .util .concurrent .TimeUnit ;
25
24
import java .util .concurrent .locks .ReentrantLock ;
26
25
27
- import org .hibernate .cache .CacheException ;
28
26
import org .hibernate .cache .infinispan .InfinispanRegionFactory ;
29
27
import org .hibernate .cache .infinispan .util .CacheCommandInitializer ;
30
28
import org .hibernate .cache .spi .RegionFactory ;
53
51
* not find data is:
54
52
* <p/>
55
53
* <ol>
56
- * <li> Call {@link #registerPendingPut(Object, long)}</li>
54
+ * <li> Call {@link #registerPendingPut(SessionImplementor, Object, long)}</li>
57
55
* <li> Read the database</li>
58
- * <li> Call {@link #acquirePutFromLoadLock(Object, long)}
56
+ * <li> Call {@link #acquirePutFromLoadLock(SessionImplementor, Object, long)}
59
57
* <li> if above returns <code>null</code>, the thread should not cache the data;
60
58
* only if above returns instance of <code>AcquiredLock</code>, put data in the cache and...</li>
61
59
* <li> then call {@link #releasePutFromLoadLock(Object, Lock)}</li>
68
66
* call
69
67
* <p/>
70
68
* <ul>
71
- * <li> {@link #beginInvalidatingKey(Object)} (for a single key invalidation)</li>
69
+ * <li> {@link #beginInvalidatingKey(SessionImplementor, Object)} (for a single key invalidation)</li>
72
70
* <li>or {@link #beginInvalidatingRegion()} followed by {@link #endInvalidatingRegion()}
73
71
* (for a general invalidation all pending puts)</li>
74
72
* </ul>
75
- * After transaction commit (when the DB is updated) {@link #endInvalidatingKey(Object)} should
73
+ * After transaction commit (when the DB is updated) {@link #endInvalidatingKey(SessionImplementor, Object)} should
76
74
* be called in order to allow further attempts to cache entry.
77
75
* </p>
78
76
* <p/>
79
77
* <p>
80
78
* This class also supports the concept of "naked puts", which are calls to
81
- * {@link #acquirePutFromLoadLock(Object, long)} without a preceding {@link #registerPendingPut(Object, long)}.
82
- * Besides not acquiring lock in {@link #registerPendingPut(Object, long)} this can happen when collection
79
+ * {@link #acquirePutFromLoadLock(SessionImplementor, Object, long)} without a preceding {@link #registerPendingPut(SessionImplementor, Object, long)}.
80
+ * Besides not acquiring lock in {@link #registerPendingPut(SessionImplementor, Object, long)} this can happen when collection
83
81
* elements are loaded after the collection has not been found in the cache, where the elements
84
82
* don't have their own table but can be listed as 'select ... from Element where collection_id = ...'.
85
83
* Naked puts are handled according to txTimestamp obtained by calling {@link RegionFactory#nextTimestamp()}
@@ -141,8 +139,8 @@ public class PutFromLoadValidator {
141
139
142
140
/**
143
141
* Creates a new put from load validator instance.
144
- *
145
- * @param cache Cache instance on which to store pending put information.
142
+ *
143
+ * @param cache Cache instance on which to store pending put information.
146
144
* @param transactionManager Transaction manager
147
145
*/
148
146
public PutFromLoadValidator (AdvancedCache cache , TransactionManager transactionManager ) {
@@ -247,20 +245,15 @@ public static void removeFromCache(AdvancedCache cache) {
247
245
}
248
246
249
247
public void setCurrentSession (SessionImplementor session ) {
250
- // we register synchronizations directly on JTA transactions, let's make this noop with TM
251
- if (transactionManager == null ) {
252
- currentSession .set (session );
253
- }
248
+ currentSession .set (session );
254
249
}
255
250
256
251
public void resetCurrentSession () {
257
- if (transactionManager == null ) {
258
- currentSession .remove ();
259
- }
252
+ currentSession .remove ();
260
253
}
261
254
262
255
/**
263
- * Marker for lock acquired in {@link #acquirePutFromLoadLock(Object, long)}
256
+ * Marker for lock acquired in {@link #acquirePutFromLoadLock(SessionImplementor, Object, long)}
264
257
*/
265
258
public static abstract class Lock {
266
259
private Lock () {}
@@ -274,13 +267,14 @@ private Lock() {}
274
267
* should always be matched with a call to {@link #releasePutFromLoadLock(Object, Lock)}.
275
268
* </p>
276
269
*
270
+ * @param session
277
271
* @param key the key
278
272
*
279
273
* @param txTimestamp
280
274
* @return <code>AcquiredLock</code> if the lock is acquired and the cache put
281
275
* can proceed; <code>null</code> if the data should not be cached
282
276
*/
283
- public Lock acquirePutFromLoadLock (Object key , long txTimestamp ) {
277
+ public Lock acquirePutFromLoadLock (SessionImplementor session , Object key , long txTimestamp ) {
284
278
if (trace ) {
285
279
log .tracef ("acquirePutFromLoadLock(%s#%s, %d)" , cache .getName (), key , txTimestamp );
286
280
}
@@ -305,7 +299,7 @@ public Lock acquirePutFromLoadLock(Object key, long txTimestamp) {
305
299
}
306
300
continue ;
307
301
}
308
- final PendingPut toCancel = pending .remove (getLocalLockOwner () );
302
+ final PendingPut toCancel = pending .remove (session );
309
303
if (toCancel != null ) {
310
304
valid = !toCancel .completed ;
311
305
toCancel .completed = true ;
@@ -359,7 +353,7 @@ else if (pending.lastInvalidationEnd != Long.MIN_VALUE) {
359
353
}
360
354
}
361
355
362
- PendingPut pendingPut = new PendingPut (getLocalLockOwner () );
356
+ PendingPut pendingPut = new PendingPut (session );
363
357
pending = new PendingPutMap (pendingPut );
364
358
PendingPutMap existing = pendingPuts .putIfAbsent (key , pending );
365
359
if (existing != null ) {
@@ -388,7 +382,7 @@ else if (t instanceof Error) {
388
382
389
383
/**
390
384
* Releases the lock previously obtained by a call to
391
- * {@link #acquirePutFromLoadLock(Object, long)}.
385
+ * {@link #acquirePutFromLoadLock(SessionImplementor, Object, long)}.
392
386
*
393
387
* @param key the key
394
388
*/
@@ -407,9 +401,9 @@ public void releasePutFromLoadLock(Object key, Lock lock) {
407
401
}
408
402
409
403
/**
410
- * Invalidates all {@link #registerPendingPut(Object, long) previously registered pending puts} ensuring a subsequent call to
411
- * {@link #acquirePutFromLoadLock(Object, long)} will return <code>false</code>. <p> This method will block until any
412
- * concurrent thread that has {@link #acquirePutFromLoadLock(Object, long) acquired the putFromLoad lock} for the any key has
404
+ * Invalidates all {@link #registerPendingPut(SessionImplementor, Object, long) previously registered pending puts} ensuring a subsequent call to
405
+ * {@link #acquirePutFromLoadLock(SessionImplementor, Object, long)} will return <code>false</code>. <p> This method will block until any
406
+ * concurrent thread that has {@link #acquirePutFromLoadLock(SessionImplementor, Object, long) acquired the putFromLoad lock} for the any key has
413
407
* released the lock. This allows the caller to be certain the putFromLoad will not execute after this method returns,
414
408
* possibly caching stale data. </p>
415
409
*
@@ -506,16 +500,17 @@ public void endInvalidatingRegion() {
506
500
507
501
/**
508
502
* Notifies this validator that it is expected that a database read followed by a subsequent {@link
509
- * #acquirePutFromLoadLock(Object, long)} call will occur. The intent is this method would be called following a cache miss
503
+ * #acquirePutFromLoadLock(SessionImplementor, Object, long)} call will occur. The intent is this method would be called following a cache miss
510
504
* wherein it is expected that a database read plus cache put will occur. Calling this method allows the validator to
511
505
* treat the subsequent <code>acquirePutFromLoadLock</code> as if the database read occurred when this method was
512
506
* invoked. This allows the validator to compare the timestamp of this call against the timestamp of subsequent removal
513
507
* notifications.
514
508
*
509
+ * @param session
515
510
* @param key key that will be used for subsequent cache put
516
511
* @param txTimestamp
517
512
*/
518
- public void registerPendingPut (Object key , long txTimestamp ) {
513
+ public void registerPendingPut (SessionImplementor session , Object key , long txTimestamp ) {
519
514
long invalidationTimestamp = this .regionInvalidationTimestamp ;
520
515
if (txTimestamp <= invalidationTimestamp ) {
521
516
boolean skip ;
@@ -543,7 +538,7 @@ public void registerPendingPut(Object key, long txTimestamp) {
543
538
}
544
539
}
545
540
546
- final PendingPut pendingPut = new PendingPut ( getLocalLockOwner () );
541
+ final PendingPut pendingPut = new PendingPut ( session );
547
542
final PendingPutMap pendingForKey = new PendingPutMap ( pendingPut );
548
543
549
544
for (;;) {
@@ -586,21 +581,23 @@ public void registerPendingPut(Object key, long txTimestamp) {
586
581
587
582
/**
588
583
* Calls {@link #beginInvalidatingKey(Object, Object)} with current transaction or thread.
584
+ *
585
+ * @param session
589
586
* @param key
590
587
* @return
591
588
*/
592
- public boolean beginInvalidatingKey (Object key ) {
593
- return beginInvalidatingKey (key , getLocalLockOwner () );
589
+ public boolean beginInvalidatingKey (SessionImplementor session , Object key ) {
590
+ return beginInvalidatingKey (key , session );
594
591
}
595
592
596
593
/**
597
- * Invalidates any {@link #registerPendingPut(Object, long) previously registered pending puts}
598
- * and disables further registrations ensuring a subsequent call to {@link #acquirePutFromLoadLock(Object, long)}
594
+ * Invalidates any {@link #registerPendingPut(SessionImplementor, Object, long) previously registered pending puts}
595
+ * and disables further registrations ensuring a subsequent call to {@link #acquirePutFromLoadLock(SessionImplementor, Object, long)}
599
596
* will return <code>false</code>. <p> This method will block until any concurrent thread that has
600
- * {@link #acquirePutFromLoadLock(Object, long) acquired the putFromLoad lock} for the given key
597
+ * {@link #acquirePutFromLoadLock(SessionImplementor, Object, long) acquired the putFromLoad lock} for the given key
601
598
* has released the lock. This allows the caller to be certain the putFromLoad will not execute after this method
602
599
* returns, possibly caching stale data. </p>
603
- * After this transaction completes, {@link #endInvalidatingKey(Object)} needs to be called }
600
+ * After this transaction completes, {@link #endInvalidatingKey(SessionImplementor, Object)} needs to be called }
604
601
*
605
602
* @param key key identifying data whose pending puts should be invalidated
606
603
*
@@ -643,16 +640,18 @@ public boolean beginInvalidatingKey(Object key, Object lockOwner) {
643
640
644
641
/**
645
642
* Calls {@link #endInvalidatingKey(Object, Object)} with current transaction or thread.
643
+ *
644
+ * @param session
646
645
* @param key
647
646
* @return
648
647
*/
649
- public boolean endInvalidatingKey (Object key ) {
650
- return endInvalidatingKey (key , getLocalLockOwner () );
648
+ public boolean endInvalidatingKey (SessionImplementor session , Object key ) {
649
+ return endInvalidatingKey (key , session );
651
650
}
652
651
653
652
/**
654
653
* Called after the transaction completes, allowing caching of entries. It is possible that this method
655
- * is called without previous invocation of {@link #beginInvalidatingKey(Object)}, then it should be a no-op.
654
+ * is called without previous invocation of {@link #beginInvalidatingKey(SessionImplementor, Object)}, then it should be a no-op.
656
655
*
657
656
* @param key
658
657
* @param lockOwner owner of the invalidation - transaction or thread
@@ -690,31 +689,6 @@ public boolean endInvalidatingKey(Object key, Object lockOwner) {
690
689
}
691
690
692
691
public Object registerRemoteInvalidations (Object [] keys ) {
693
- Transaction tx = null ;
694
- try {
695
- if ( transactionManager != null ) {
696
- tx = transactionManager .getTransaction ();
697
- }
698
- }
699
- catch (SystemException se ) {
700
- throw new CacheException ( "Could not obtain transaction" , se );
701
- }
702
- if (tx != null ) {
703
- if (trace ) {
704
- log .tracef ("Registering lock owner %s for %s: %s" , tx , cache .getName (), Arrays .toString (keys ));
705
- }
706
- try {
707
- Synchronization sync = new Synchronization (nonTxPutFromLoadInterceptor , keys );
708
- tx .registerSynchronization (sync );
709
- return sync .uuid ;
710
- }
711
- catch (SystemException se ) {
712
- throw new CacheException ("Cannot register synchronization" , se );
713
- }
714
- catch (RollbackException e ) {
715
- return null ;
716
- }
717
- }
718
692
SessionImplementor session = currentSession .get ();
719
693
TransactionCoordinator transactionCoordinator = session == null ? null : session .getTransactionCoordinator ();
720
694
if (transactionCoordinator != null ) {
@@ -731,20 +705,6 @@ public Object registerRemoteInvalidations(Object[] keys) {
731
705
732
706
// ---------------------------------------------------------------- Private
733
707
734
- private Object getLocalLockOwner () {
735
- Transaction tx = null ;
736
- try {
737
- if ( transactionManager != null ) {
738
- tx = transactionManager .getTransaction ();
739
- }
740
- }
741
- catch (SystemException se ) {
742
- throw new CacheException ( "Could not obtain transaction" , se );
743
- }
744
- return tx == null ? Thread .currentThread () : tx ;
745
-
746
- }
747
-
748
708
/**
749
709
* Lazy-initialization map for PendingPut. Optimized for the expected usual case where only a
750
710
* single put is pending for a given key.
@@ -787,7 +747,7 @@ public String toString() {
787
747
sb .append ("[]" );
788
748
}
789
749
else {
790
- sb .append (invalidators );
750
+ sb .append (invalidators . values () );
791
751
}
792
752
}
793
753
else {
@@ -968,7 +928,8 @@ private PendingPut(Object owner) {
968
928
}
969
929
970
930
public String toString () {
971
- return (completed ? "C@" : "R@" ) + owner ;
931
+ // we can't use SessionImpl.toString() concurrently
932
+ return (completed ? "C@" : "R@" ) + (owner instanceof SessionImplementor ? "Session#" + owner .hashCode () : owner .toString ());
972
933
}
973
934
974
935
public boolean invalidate (long now , long expirationPeriod ) {
@@ -995,7 +956,8 @@ private Invalidator(Object owner, long registeredTimestamp) {
995
956
@ Override
996
957
public String toString () {
997
958
final StringBuilder sb = new StringBuilder ("{" );
998
- sb .append ("Owner=" ).append (owner );
959
+ // we can't use SessionImpl.toString() concurrently
960
+ sb .append ("Owner=" ).append (owner instanceof SessionImplementor ? "Session#" + owner .hashCode () : owner .toString ());
999
961
sb .append (", Timestamp=" ).append (registeredTimestamp );
1000
962
sb .append ('}' );
1001
963
return sb .toString ();
0 commit comments