Skip to content

Commit 2828936

Browse files
rvansagalderz
authored andcommitted
HHH-10101 Implement nonstrict-read-write mode in Infinispan 2LC
* requires non-transactional cache in repl/dist/local mode and versioned entities
1 parent b2c9724 commit 2828936

File tree

49 files changed

+1548
-764
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+1548
-764
lines changed
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/*
2+
* Hibernate, Relational Persistence for Idiomatic Java
3+
*
4+
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
5+
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
6+
*/
7+
package org.hibernate.cache.infinispan.access;
8+
9+
import org.hibernate.cache.CacheException;
10+
import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion;
11+
import org.hibernate.cache.infinispan.util.Caches;
12+
import org.hibernate.cache.infinispan.util.VersionedEntry;
13+
import org.hibernate.cache.spi.access.SoftLock;
14+
import org.hibernate.cache.spi.entry.CacheEntry;
15+
import org.hibernate.engine.spi.SessionImplementor;
16+
import org.hibernate.resource.transaction.TransactionCoordinator;
17+
import org.infinispan.AdvancedCache;
18+
import org.infinispan.configuration.cache.Configuration;
19+
import org.infinispan.context.Flag;
20+
import org.infinispan.util.logging.Log;
21+
import org.infinispan.util.logging.LogFactory;
22+
23+
import java.util.Comparator;
24+
import java.util.concurrent.TimeUnit;
25+
26+
/**
27+
* Access delegate that relaxes the consistency a bit: stale reads are prohibited only after the transaction
28+
* commits. This should also be able to work with async caches, and that would allow the replication delay
29+
* even after the commit.
30+
*
31+
* @author Radim Vansa &lt;[email protected]&gt;
32+
*/
33+
public class NonStrictAccessDelegate implements AccessDelegate {
34+
private static final Log log = LogFactory.getLog( NonStrictAccessDelegate.class );
35+
36+
private final BaseTransactionalDataRegion region;
37+
private final AdvancedCache cache;
38+
private final AdvancedCache writeCache;
39+
private final AdvancedCache putFromLoadCache;
40+
private final Comparator versionComparator;
41+
42+
43+
public NonStrictAccessDelegate(BaseTransactionalDataRegion region) {
44+
this.region = region;
45+
this.cache = region.getCache();
46+
this.writeCache = Caches.ignoreReturnValuesCache(cache);
47+
this.putFromLoadCache = writeCache.withFlags( Flag.ZERO_LOCK_ACQUISITION_TIMEOUT, Flag.FAIL_SILENTLY );
48+
Configuration configuration = cache.getCacheConfiguration();
49+
if (configuration.clustering().cacheMode().isInvalidation()) {
50+
throw new IllegalArgumentException("Nonstrict-read-write mode cannot use invalidation.");
51+
}
52+
if (configuration.transaction().transactionMode().isTransactional()) {
53+
throw new IllegalArgumentException("Currently transactional caches are not supported.");
54+
}
55+
this.versionComparator = region.getCacheDataDescription().getVersionComparator();
56+
if (versionComparator == null) {
57+
throw new IllegalArgumentException("This strategy requires versioned entities/collections but region " + region.getName() + " contains non-versioned data!");
58+
}
59+
}
60+
61+
@Override
62+
public Object get(SessionImplementor session, Object key, long txTimestamp) throws CacheException {
63+
if (txTimestamp < region.getLastRegionInvalidation() ) {
64+
return null;
65+
}
66+
Object value = cache.get(key);
67+
if (value instanceof VersionedEntry) {
68+
return ((VersionedEntry) value).getValue();
69+
}
70+
return value;
71+
}
72+
73+
@Override
74+
public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version) {
75+
return putFromLoad(session, key, value, txTimestamp, version, false);
76+
}
77+
78+
@Override
79+
public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version, boolean minimalPutOverride) throws CacheException {
80+
long lastRegionInvalidation = region.getLastRegionInvalidation();
81+
if (txTimestamp < lastRegionInvalidation) {
82+
log.tracef("putFromLoad not executed since tx started at %d, before last region invalidation finished = %d", txTimestamp, lastRegionInvalidation);
83+
return false;
84+
}
85+
assert version != null;
86+
87+
if (minimalPutOverride) {
88+
Object prev = cache.get(key);
89+
if (prev != null) {
90+
Object oldVersion = getVersion(prev);
91+
if (oldVersion != null) {
92+
if (versionComparator.compare(version, oldVersion) <= 0) {
93+
return false;
94+
}
95+
}
96+
else if (prev instanceof VersionedEntry && txTimestamp <= ((VersionedEntry) prev).getTimestamp()) {
97+
return false;
98+
}
99+
}
100+
}
101+
// we can't use putForExternalRead since the PFER flag means that entry is not wrapped into context
102+
// when it is present in the container. TombstoneCallInterceptor will deal with this.
103+
if (!(value instanceof CacheEntry)) {
104+
value = new VersionedEntry(value, version, txTimestamp);
105+
}
106+
putFromLoadCache.put(key, value);
107+
return true;
108+
}
109+
110+
@Override
111+
public boolean insert(SessionImplementor session, Object key, Object value, Object version) throws CacheException {
112+
return false;
113+
}
114+
115+
@Override
116+
public boolean update(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion) throws CacheException {
117+
return false;
118+
}
119+
120+
@Override
121+
public void remove(SessionImplementor session, Object key) throws CacheException {
122+
Object value = cache.get(key);
123+
Object version = getVersion(value);
124+
// there's no 'afterRemove', so we have to use our own synchronization
125+
TransactionCoordinator transactionCoordinator = session.getTransactionCoordinator();
126+
RemovalSynchronization sync = new RemovalSynchronization(transactionCoordinator, writeCache, false, region, key, version);
127+
transactionCoordinator.getLocalSynchronizations().registerSynchronization(sync);
128+
}
129+
130+
@Override
131+
public void removeAll() throws CacheException {
132+
region.beginInvalidation();
133+
try {
134+
Caches.broadcastEvictAll(cache);
135+
}
136+
finally {
137+
region.endInvalidation();
138+
}
139+
}
140+
141+
@Override
142+
public void evict(Object key) throws CacheException {
143+
writeCache.put(key, new VersionedEntry(null, null, region.nextTimestamp()), region.getTombstoneExpiration(), TimeUnit.MILLISECONDS);
144+
}
145+
146+
@Override
147+
public void evictAll() throws CacheException {
148+
region.beginInvalidation();
149+
try {
150+
Caches.broadcastEvictAll(cache);
151+
}
152+
finally {
153+
region.endInvalidation();
154+
}
155+
}
156+
157+
@Override
158+
public void unlockItem(SessionImplementor session, Object key) throws CacheException {
159+
}
160+
161+
@Override
162+
public boolean afterInsert(SessionImplementor session, Object key, Object value, Object version) {
163+
writeCache.put(key, getVersioned(value, version, session.getTimestamp()));
164+
return true;
165+
}
166+
167+
@Override
168+
public boolean afterUpdate(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion, SoftLock lock) {
169+
writeCache.put(key, getVersioned(value, currentVersion, session.getTimestamp()));
170+
return true;
171+
}
172+
173+
protected Object getVersion(Object value) {
174+
if (value instanceof CacheEntry) {
175+
return ((CacheEntry) value).getVersion();
176+
}
177+
else if (value instanceof VersionedEntry) {
178+
return ((VersionedEntry) value).getVersion();
179+
}
180+
return null;
181+
}
182+
183+
protected Object getVersioned(Object value, Object version, long timestamp) {
184+
assert value != null;
185+
assert version != null;
186+
return new VersionedEntry(value, version, timestamp);
187+
}
188+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Hibernate, Relational Persistence for Idiomatic Java
3+
*
4+
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
5+
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
6+
*/
7+
package org.hibernate.cache.infinispan.access;
8+
9+
import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion;
10+
import org.hibernate.cache.infinispan.util.InvocationAfterCompletion;
11+
import org.hibernate.cache.infinispan.util.VersionedEntry;
12+
import org.hibernate.resource.transaction.TransactionCoordinator;
13+
import org.infinispan.AdvancedCache;
14+
15+
import java.util.concurrent.TimeUnit;
16+
17+
/**
18+
* @author Radim Vansa &lt;[email protected]&gt;
19+
*/
20+
public class RemovalSynchronization extends InvocationAfterCompletion {
21+
private final BaseTransactionalDataRegion region;
22+
private final Object key;
23+
private final Object version;
24+
25+
public RemovalSynchronization(TransactionCoordinator tc, AdvancedCache cache, boolean requiresTransaction, BaseTransactionalDataRegion region, Object key, Object version) {
26+
super(tc, cache, requiresTransaction);
27+
this.region = region;
28+
this.key = key;
29+
this.version = version;
30+
}
31+
32+
@Override
33+
protected void invoke(boolean success, AdvancedCache cache) {
34+
if (success) {
35+
if (version == null) {
36+
cache.put(key, new VersionedEntry(null, null, region.nextTimestamp()), region.getTombstoneExpiration(), TimeUnit.MILLISECONDS);
37+
}
38+
else {
39+
cache.put(key, new VersionedEntry(null, version, Long.MIN_VALUE), region.getTombstoneExpiration(), TimeUnit.MILLISECONDS);
40+
}
41+
}
42+
}
43+
}
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* Hibernate, Relational Persistence for Idiomatic Java
3+
*
4+
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
5+
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
6+
*/
7+
package org.hibernate.cache.infinispan.access;
8+
9+
import org.hibernate.cache.infinispan.util.VersionedEntry;
10+
import org.infinispan.AdvancedCache;
11+
import org.infinispan.commands.read.SizeCommand;
12+
import org.infinispan.commands.write.PutKeyValueCommand;
13+
import org.infinispan.commons.util.CloseableIterable;
14+
import org.infinispan.container.entries.CacheEntry;
15+
import org.infinispan.container.entries.MVCCEntry;
16+
import org.infinispan.context.Flag;
17+
import org.infinispan.context.InvocationContext;
18+
import org.infinispan.factories.annotations.Inject;
19+
import org.infinispan.filter.NullValueConverter;
20+
import org.infinispan.interceptors.CallInterceptor;
21+
import org.infinispan.util.logging.Log;
22+
import org.infinispan.util.logging.LogFactory;
23+
24+
import java.util.Comparator;
25+
import java.util.Set;
26+
import java.util.UUID;
27+
28+
/**
29+
* Note that this does not implement all commands, only those appropriate for {@link TombstoneAccessDelegate}
30+
* and {@link org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion}
31+
*
32+
* The behaviour here also breaks notifications, which are not used for 2LC caches.
33+
*
34+
* @author Radim Vansa &lt;[email protected]&gt;
35+
*/
36+
public class VersionedCallInterceptor extends CallInterceptor {
37+
private final Comparator<Object> versionComparator;
38+
private AdvancedCache cache;
39+
40+
public VersionedCallInterceptor(Comparator<Object> versionComparator) {
41+
this.versionComparator = versionComparator;
42+
}
43+
44+
@Inject
45+
public void injectDependencies(AdvancedCache cache) {
46+
this.cache = cache;
47+
}
48+
49+
@Override
50+
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
51+
MVCCEntry e = (MVCCEntry) ctx.lookupEntry(command.getKey());
52+
if (e == null) {
53+
return null;
54+
}
55+
56+
Object oldValue = e.getValue();
57+
Object oldVersion = null;
58+
long oldTimestamp = Long.MIN_VALUE;
59+
if (oldValue instanceof VersionedEntry) {
60+
oldVersion = ((VersionedEntry) oldValue).getVersion();
61+
oldTimestamp = ((VersionedEntry) oldValue).getTimestamp();
62+
oldValue = ((VersionedEntry) oldValue).getValue();
63+
}
64+
else if (oldValue instanceof org.hibernate.cache.spi.entry.CacheEntry) {
65+
oldVersion = ((org.hibernate.cache.spi.entry.CacheEntry) oldValue).getVersion();
66+
}
67+
68+
Object newValue = command.getValue();
69+
Object newVersion = null;
70+
long newTimestamp = Long.MIN_VALUE;
71+
Object actualNewValue = newValue;
72+
boolean isRemoval = false;
73+
if (newValue instanceof VersionedEntry) {
74+
VersionedEntry ve = (VersionedEntry) newValue;
75+
newVersion = ve.getVersion();
76+
newTimestamp = ve.getTimestamp();
77+
if (ve.getValue() == null) {
78+
isRemoval = true;
79+
}
80+
else if (ve.getValue() instanceof org.hibernate.cache.spi.entry.CacheEntry) {
81+
actualNewValue = ve.getValue();
82+
}
83+
}
84+
else if (newValue instanceof org.hibernate.cache.spi.entry.CacheEntry) {
85+
newVersion = ((org.hibernate.cache.spi.entry.CacheEntry) newValue).getVersion();
86+
}
87+
88+
if (newVersion == null) {
89+
// eviction or post-commit removal: we'll store it with given timestamp
90+
setValue(e, newValue);
91+
return null;
92+
}
93+
if (oldVersion == null) {
94+
assert oldValue == null || oldTimestamp != Long.MIN_VALUE;
95+
if (newTimestamp == Long.MIN_VALUE) {
96+
// remove, knowing the version
97+
setValue(e, newValue);
98+
}
99+
else if (newTimestamp <= oldTimestamp) {
100+
// either putFromLoad or regular update/insert - in either case this update might come
101+
// when it was evicted/region-invalidated. In both cases, with old timestamp we'll leave
102+
// the invalid value
103+
assert oldValue == null;
104+
}
105+
else {
106+
setValue(e, newValue);
107+
}
108+
return null;
109+
}
110+
int compareResult = versionComparator.compare(newVersion, oldVersion);
111+
if (isRemoval && compareResult >= 0) {
112+
setValue(e, newValue);
113+
}
114+
else if (compareResult > 0) {
115+
setValue(e, actualNewValue);
116+
}
117+
return null;
118+
}
119+
120+
private Object setValue(MVCCEntry e, Object value) {
121+
if (e.isRemoved()) {
122+
e.setRemoved(false);
123+
e.setCreated(true);
124+
e.setValid(true);
125+
}
126+
else {
127+
e.setChanged(true);
128+
}
129+
return e.setValue(value);
130+
}
131+
132+
private void removeValue(MVCCEntry e) {
133+
e.setRemoved(true);
134+
e.setChanged(true);
135+
e.setCreated(false);
136+
e.setValid(false);
137+
e.setValue(null);
138+
}
139+
140+
@Override
141+
public Object visitSizeCommand(InvocationContext ctx, SizeCommand command) throws Throwable {
142+
Set<Flag> flags = command.getFlags();
143+
int size = 0;
144+
AdvancedCache decoratedCache = cache.getAdvancedCache().withFlags(flags != null ? flags.toArray(new Flag[flags.size()]) : null);
145+
// In non-transactional caches we don't care about context
146+
CloseableIterable<CacheEntry<Object, Void>> iterable = decoratedCache
147+
.filterEntries(VersionedEntry.EXCLUDE_EMPTY_EXTRACT_VALUE).converter(NullValueConverter.getInstance());
148+
try {
149+
for (CacheEntry<Object, Void> entry : iterable) {
150+
if (size++ == Integer.MAX_VALUE) {
151+
return Integer.MAX_VALUE;
152+
}
153+
}
154+
}
155+
finally {
156+
iterable.close();
157+
}
158+
return size;
159+
}
160+
}

0 commit comments

Comments
 (0)