Skip to content

Commit b9a2128

Browse files
rvansagalderz
authored andcommitted
HHH-10083 Support replicated and distributed caches
1 parent 5c36abf commit b9a2128

File tree

59 files changed

+2378
-693
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+2378
-693
lines changed

hibernate-infinispan/hibernate-infinispan.gradle

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ test {
5959
systemProperties['hibernate.cache.infinispan.jgroups_cfg'] = '2lc-test-tcp.xml'
6060
// systemProperties['log4j.configuration'] = 'file:/log4j/log4j-infinispan.xml'
6161
enabled = true
62+
// Without this I have trouble running specific test using --tests switch
63+
doFirst {
64+
filter.includePatterns.each {
65+
include "${it.replaceAll('\\.', "\\${File.separator}")}.class"
66+
}
67+
filter.setIncludePatterns('*')
68+
}
6269
}
6370

6471
task packageTests(type: Jar) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Hibernate, Relational Persistence for Idiomatic Java
3+
*
4+
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
5+
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
6+
*/
7+
package org.hibernate.cache.infinispan.access;
8+
9+
import org.hibernate.cache.infinispan.util.FutureUpdate;
10+
import org.hibernate.cache.infinispan.util.InvocationAfterCompletion;
11+
import org.hibernate.resource.transaction.TransactionCoordinator;
12+
import org.infinispan.AdvancedCache;
13+
import org.infinispan.util.logging.Log;
14+
import org.infinispan.util.logging.LogFactory;
15+
16+
import java.util.UUID;
17+
18+
/**
19+
* @author Radim Vansa &lt;[email protected]&gt;
20+
*/
21+
public class FutureUpdateSynchronization extends InvocationAfterCompletion {
22+
private static final Log log = LogFactory.getLog( FutureUpdateSynchronization.class );
23+
24+
private final UUID uuid = UUID.randomUUID();
25+
private final Object key;
26+
private final Object value;
27+
28+
public FutureUpdateSynchronization(TransactionCoordinator tc, AdvancedCache cache, boolean requiresTransaction, Object key, Object value) {
29+
super(tc, cache, requiresTransaction);
30+
this.key = key;
31+
this.value = value;
32+
}
33+
34+
public UUID getUuid() {
35+
return uuid;
36+
}
37+
38+
@Override
39+
protected void invoke(boolean success, AdvancedCache cache) {
40+
// Exceptions in #afterCompletion() are silently ignored, since the transaction
41+
// is already committed in DB. However we must not return until we update the cache.
42+
for (;;) {
43+
try {
44+
cache.put(key, new FutureUpdate(uuid, success ? value : null));
45+
return;
46+
}
47+
catch (Exception e) {
48+
log.error("Failure updating cache in afterCompletion, will retry", e);
49+
}
50+
}
51+
}
52+
}

hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/InvalidationCacheAccessDelegate.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,6 @@ public abstract class InvalidationCacheAccessDelegate implements AccessDelegate
2828
protected final PutFromLoadValidator putValidator;
2929
protected final AdvancedCache<Object, Object> writeCache;
3030

31-
public static InvalidationCacheAccessDelegate create(BaseRegion region, PutFromLoadValidator validator) {
32-
if (region.getCache().getCacheConfiguration().transaction().transactionMode().isTransactional()) {
33-
return new TxInvalidationCacheAccessDelegate(region, validator);
34-
}
35-
else {
36-
return new NonTxInvalidationCacheAccessDelegate(region, validator);
37-
}
38-
}
39-
4031
/**
4132
* Create a new transactional access delegate instance.
4233
*

hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/Synchronization.java renamed to hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/InvalidationSynchronization.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@
1313
*
1414
* @author Radim Vansa &lt;[email protected]&gt;
1515
*/
16-
public class Synchronization implements javax.transaction.Synchronization {
16+
public class InvalidationSynchronization implements javax.transaction.Synchronization {
1717
public final UUID uuid = UUID.randomUUID();
1818
private final NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor;
1919
private final Object[] keys;
2020

21-
public Synchronization(NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor, Object[] keys) {
21+
public InvalidationSynchronization(NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor, Object[] keys) {
2222
this.nonTxPutFromLoadInterceptor = nonTxPutFromLoadInterceptor;
2323
this.keys = keys;
2424
}

hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxPutFromLoadInterceptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
* Non-transactional counterpart of {@link TxPutFromLoadInterceptor}.
2121
* Invokes {@link PutFromLoadValidator#beginInvalidatingKey(Object, Object)} for each invalidation from
2222
* remote node ({@link BeginInvalidationCommand} and sends {@link EndInvalidationCommand} after the transaction
23-
* is complete, with help of {@link Synchronization};
23+
* is complete, with help of {@link InvalidationSynchronization};
2424
*
2525
* @author Radim Vansa &lt;[email protected]&gt;
2626
*/

hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,7 @@ public Object registerRemoteInvalidations(Object[] keys) {
617617
if (trace) {
618618
log.tracef("Registering lock owner %s for %s: %s", lockOwnerToString(session), cache.getName(), Arrays.toString(keys));
619619
}
620-
Synchronization sync = new Synchronization(nonTxPutFromLoadInterceptor, keys);
620+
InvalidationSynchronization sync = new InvalidationSynchronization(nonTxPutFromLoadInterceptor, keys);
621621
transactionCoordinator.getLocalSynchronizations().registerSynchronization(sync);
622622
return sync.uuid;
623623
}
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
* Hibernate, Relational Persistence for Idiomatic Java
3+
*
4+
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
5+
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
6+
*/
7+
package org.hibernate.cache.infinispan.access;
8+
9+
import org.hibernate.cache.CacheException;
10+
import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion;
11+
import org.hibernate.cache.infinispan.util.Caches;
12+
import org.hibernate.cache.infinispan.util.FutureUpdate;
13+
import org.hibernate.cache.infinispan.util.TombstoneUpdate;
14+
import org.hibernate.cache.infinispan.util.Tombstone;
15+
import org.hibernate.cache.spi.access.SoftLock;
16+
import org.hibernate.engine.spi.SessionImplementor;
17+
import org.hibernate.resource.transaction.TransactionCoordinator;
18+
import org.infinispan.AdvancedCache;
19+
import org.infinispan.configuration.cache.Configuration;
20+
import org.infinispan.context.Flag;
21+
import org.infinispan.util.logging.Log;
22+
import org.infinispan.util.logging.LogFactory;
23+
24+
import java.util.concurrent.TimeUnit;
25+
26+
/**
27+
* @author Radim Vansa &lt;[email protected]&gt;
28+
*/
29+
public class TombstoneAccessDelegate implements AccessDelegate {
30+
private static final Log log = LogFactory.getLog( TombstoneAccessDelegate.class );
31+
32+
protected final BaseTransactionalDataRegion region;
33+
protected final AdvancedCache cache;
34+
protected final AdvancedCache writeCache;
35+
protected final AdvancedCache asyncWriteCache;
36+
protected final AdvancedCache putFromLoadCache;
37+
protected final boolean requiresTransaction;
38+
39+
public TombstoneAccessDelegate(BaseTransactionalDataRegion region) {
40+
this.region = region;
41+
this.cache = region.getCache();
42+
this.writeCache = Caches.ignoreReturnValuesCache(cache);
43+
this.asyncWriteCache = Caches.asyncWriteCache(cache, Flag.IGNORE_RETURN_VALUES);
44+
this.putFromLoadCache = writeCache.withFlags( Flag.ZERO_LOCK_ACQUISITION_TIMEOUT, Flag.FAIL_SILENTLY );
45+
Configuration configuration = cache.getCacheConfiguration();
46+
if (configuration.clustering().cacheMode().isInvalidation()) {
47+
throw new IllegalArgumentException("For tombstone-based caching, invalidation cache is not allowed.");
48+
}
49+
if (configuration.transaction().transactionMode().isTransactional()) {
50+
throw new IllegalArgumentException("Currently transactional caches are not supported.");
51+
}
52+
requiresTransaction = configuration.transaction().transactionMode().isTransactional()
53+
&& !configuration.transaction().autoCommit();
54+
}
55+
56+
@Override
57+
public Object get(SessionImplementor session, Object key, long txTimestamp) throws CacheException {
58+
if (txTimestamp < region.getLastRegionInvalidation() ) {
59+
return null;
60+
}
61+
Object value = cache.get(key);
62+
if (value instanceof Tombstone) {
63+
return null;
64+
}
65+
else if (value instanceof FutureUpdate) {
66+
return ((FutureUpdate) value).getValue();
67+
}
68+
else {
69+
return value;
70+
}
71+
}
72+
73+
@Override
74+
public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version) {
75+
return putFromLoad(session, key, value, txTimestamp, version, false);
76+
}
77+
78+
@Override
79+
public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version, boolean minimalPutOverride) throws CacheException {
80+
long lastRegionInvalidation = region.getLastRegionInvalidation();
81+
if (txTimestamp < lastRegionInvalidation) {
82+
log.tracef("putFromLoad not executed since tx started at %d, before last region invalidation finished = %d", txTimestamp, lastRegionInvalidation);
83+
return false;
84+
}
85+
if (minimalPutOverride) {
86+
Object prev = cache.get(key);
87+
if (prev instanceof Tombstone) {
88+
Tombstone tombstone = (Tombstone) prev;
89+
long lastTimestamp = tombstone.getLastTimestamp();
90+
if (txTimestamp <= lastTimestamp) {
91+
log.tracef("putFromLoad not executed since tx started at %d, before last invalidation finished = %d", txTimestamp, lastTimestamp);
92+
return false;
93+
}
94+
}
95+
else if (prev != null) {
96+
log.tracef("putFromLoad not executed since cache contains %s", prev);
97+
return false;
98+
}
99+
}
100+
// we can't use putForExternalRead since the PFER flag means that entry is not wrapped into context
101+
// when it is present in the container. TombstoneCallInterceptor will deal with this.
102+
putFromLoadCache.put(key, new TombstoneUpdate(session.getTimestamp(), value));
103+
return true;
104+
}
105+
106+
@Override
107+
public boolean insert(SessionImplementor session, Object key, Object value, Object version) throws CacheException {
108+
write(session, key, value);
109+
return true;
110+
}
111+
112+
@Override
113+
public boolean update(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion) throws CacheException {
114+
write(session, key, value);
115+
return true;
116+
}
117+
118+
protected void write(SessionImplementor session, Object key, Object value) {
119+
TransactionCoordinator tc = session.getTransactionCoordinator();
120+
FutureUpdateSynchronization sync = new FutureUpdateSynchronization(tc, writeCache, requiresTransaction, key, value);
121+
// FutureUpdate is handled in TombstoneCallInterceptor
122+
writeCache.put(key, new FutureUpdate(sync.getUuid(), null), region.getTombstoneExpiration(), TimeUnit.MILLISECONDS);
123+
tc.getLocalSynchronizations().registerSynchronization(sync);
124+
}
125+
126+
@Override
127+
public void remove(SessionImplementor session, Object key) throws CacheException {
128+
TransactionCoordinator transactionCoordinator = session.getTransactionCoordinator();
129+
TombstoneSynchronization sync = new TombstoneSynchronization(transactionCoordinator, asyncWriteCache, requiresTransaction, region, key);
130+
Tombstone tombstone = new Tombstone(sync.getUuid(), session.getTimestamp() + region.getTombstoneExpiration(), false);
131+
writeCache.put(key, tombstone, region.getTombstoneExpiration(), TimeUnit.MILLISECONDS);
132+
transactionCoordinator.getLocalSynchronizations().registerSynchronization(sync);
133+
}
134+
135+
@Override
136+
public void removeAll() throws CacheException {
137+
region.beginInvalidation();
138+
try {
139+
Caches.broadcastEvictAll(cache);
140+
}
141+
finally {
142+
region.endInvalidation();
143+
}
144+
}
145+
146+
@Override
147+
public void evict(Object key) throws CacheException {
148+
writeCache.put(key, TombstoneUpdate.EVICT);
149+
}
150+
151+
@Override
152+
public void evictAll() throws CacheException {
153+
region.beginInvalidation();
154+
try {
155+
Caches.broadcastEvictAll(cache);
156+
}
157+
finally {
158+
region.endInvalidation();
159+
}
160+
}
161+
162+
@Override
163+
public void unlockItem(SessionImplementor session, Object key) throws CacheException {
164+
}
165+
166+
@Override
167+
public boolean afterInsert(SessionImplementor session, Object key, Object value, Object version) {
168+
return false;
169+
}
170+
171+
@Override
172+
public boolean afterUpdate(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion, SoftLock lock) {
173+
return false;
174+
}
175+
}

0 commit comments

Comments
 (0)