Skip to content

Commit ac35188

Browse files
authored
RATIS-2258. Caching TermIndex objects (#1239)
1 parent f00d764 commit ac35188

File tree

4 files changed

+326
-36
lines changed

4 files changed

+326
-36
lines changed
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.ratis.util;
19+
20+
import org.apache.ratis.thirdparty.com.google.common.collect.MapMaker;
21+
22+
import java.util.Map;
23+
import java.util.Objects;
24+
import java.util.concurrent.ConcurrentHashMap;
25+
import java.util.concurrent.ConcurrentMap;
26+
import java.util.concurrent.atomic.AtomicInteger;
27+
import java.util.function.BiFunction;
28+
import java.util.function.Consumer;
29+
30+
/**
31+
* Weak Value Cache: ({@link OUTER}, {@link INNER}) -> {@link T}.
32+
* <p>
33+
* Note that the cached values are weakly referenced.
34+
* A cached value could be garage-collected (i.e. evicted from the cache)
35+
* when there are no external (strong) references.
36+
*
37+
* @param <OUTER> the type of the outer keys.
38+
* @param <INNER> the type of the inner keys.
39+
* @param <T> the type to be cached.
40+
*/
41+
public final class BiWeakValueCache<OUTER, INNER, T> {
42+
private static <K, V> ConcurrentMap<K, V> newMap() {
43+
return new MapMaker().weakValues().makeMap();
44+
}
45+
46+
private final String outerName;
47+
private final String innerName;
48+
private final String name;
49+
50+
/** For constructing {@link T} values from ({@link OUTER}, {@link INNER}) keys. */
51+
private final BiFunction<OUTER, INNER, T> constructor;
52+
/** Count the number of {@link T} values constructed. */
53+
private final AtomicInteger valueCount = new AtomicInteger(0);
54+
55+
/**
56+
* Actual map {@link OUTER} -> ({@link INNER} -> {@link T})
57+
* for the logical view ({@link OUTER}, {@link INNER}) -> {@link T}.
58+
*/
59+
private final ConcurrentMap<OUTER, ConcurrentMap<INNER, T>> map = new ConcurrentHashMap<>();
60+
61+
/**
62+
* Create a cache for mapping ({@link OUTER}, {@link INNER}) keys to {@link T} values.
63+
*
64+
* @param outerName the name of the outer long.
65+
* @param innerName the name of the inner long.
66+
* @param constructor for constructing {@link T} values.
67+
*/
68+
public BiWeakValueCache(String outerName, String innerName, BiFunction<OUTER, INNER, T> constructor) {
69+
this.outerName = outerName;
70+
this.innerName = innerName;
71+
this.name = "(" + outerName + ", " + innerName + ")-cache";
72+
this.constructor = constructor;
73+
}
74+
75+
private T construct(OUTER outer, INNER inner) {
76+
final T constructed = constructor.apply(outer, inner);
77+
Objects.requireNonNull(constructed, "constructed == null");
78+
valueCount.incrementAndGet();
79+
return constructed;
80+
}
81+
82+
/**
83+
* If the key ({@link OUTER}, {@link INNER}) is in the cache, return the cached values.
84+
* Otherwise, create a new value and then return it.
85+
*/
86+
public T getOrCreate(OUTER outer, INNER inner) {
87+
Objects.requireNonNull(outer, () -> outerName + " (outer) == null");
88+
Objects.requireNonNull(inner, () -> innerName + " (inner) == null");
89+
final ConcurrentMap<INNER, T> innerMap = map.computeIfAbsent(outer, k -> newMap());
90+
final T computed = innerMap.computeIfAbsent(inner, i -> construct(outer, i));
91+
if ((valueCount.get() & 0xFFF) == 0) {
92+
cleanupEmptyInnerMaps(); // cleanup empty maps once in a while
93+
}
94+
return computed;
95+
}
96+
97+
/** @return the value count for the given outer key. */
98+
int count(OUTER outer) {
99+
final ConcurrentMap<INNER, T> innerMap = map.get(outer);
100+
if (innerMap == null) {
101+
return 0;
102+
}
103+
104+
// size() may return incorrect result; see Guava MapMaker javadoc
105+
int n = 0;
106+
for (INNER ignored : innerMap.keySet()) {
107+
n++;
108+
}
109+
return n;
110+
}
111+
112+
void cleanupEmptyInnerMaps() {
113+
// isEmpty() may return incorrect result; see Guava MapMaker javadoc
114+
map.values().removeIf(e -> !e.entrySet().iterator().hasNext());
115+
}
116+
117+
@Override
118+
public String toString() {
119+
return name;
120+
}
121+
122+
/** The cache content for debugging. */
123+
int dump(Consumer<String> out) {
124+
out.accept(name + ":\n");
125+
int emptyCount = 0;
126+
for (Map.Entry<OUTER, ConcurrentMap<INNER, T>> entry : map.entrySet()) {
127+
final OUTER outer = entry.getKey();
128+
final ConcurrentMap<INNER, T> innerMap = entry.getValue();
129+
final int count = count(outer);
130+
if (count == 0) {
131+
emptyCount++;
132+
}
133+
134+
out.accept(" " + outerName + ":" + outer);
135+
out.accept(", " + innerName + ":" + innerMap.keySet());
136+
out.accept(", count=" + count);
137+
out.accept(", size=" + innerMap.size());
138+
out.accept("\n");
139+
}
140+
out.accept(" emptyCount=" + emptyCount);
141+
out.accept("\n");
142+
return emptyCount;
143+
}
144+
}

ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java

Lines changed: 56 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
2121
import org.apache.ratis.proto.RaftProtos.TermIndexProto;
2222
import org.apache.ratis.server.raftlog.RaftLog;
23+
import org.apache.ratis.util.BiWeakValueCache;
2324

2425
import java.util.Comparator;
2526
import java.util.Optional;
@@ -73,43 +74,62 @@ static TermIndex valueOf(LogEntryProto proto) {
7374

7475
/** @return a {@link TermIndex} object. */
7576
static TermIndex valueOf(long term, long index) {
76-
return new TermIndex() {
77-
@Override
78-
public long getTerm() {
79-
return term;
80-
}
81-
82-
@Override
83-
public long getIndex() {
84-
return index;
85-
}
86-
87-
@Override
88-
public boolean equals(Object obj) {
89-
if (obj == this) {
90-
return true;
91-
} else if (!(obj instanceof TermIndex)) {
92-
return false;
77+
return Impl.getCache().getOrCreate(term, index);
78+
}
79+
80+
/**
81+
* An implementation for private use.
82+
* Note that this is not a public API, although this is public class.
83+
*/
84+
final class Impl {
85+
private Impl() { }
86+
87+
private static final BiWeakValueCache<Long, Long, TermIndex> CACHE
88+
= new BiWeakValueCache<>("term", "index", Impl::newTermIndex);
89+
90+
static BiWeakValueCache<Long, Long, TermIndex> getCache() {
91+
return CACHE;
92+
}
93+
94+
private static TermIndex newTermIndex(long term, long index) {
95+
return new TermIndex() {
96+
@Override
97+
public long getTerm() {
98+
return term;
99+
}
100+
101+
@Override
102+
public long getIndex() {
103+
return index;
104+
}
105+
106+
@Override
107+
public boolean equals(Object obj) {
108+
if (obj == this) {
109+
return true;
110+
} else if (!(obj instanceof TermIndex)) {
111+
return false;
112+
}
113+
114+
final TermIndex that = (TermIndex) obj;
115+
return this.getTerm() == that.getTerm()
116+
&& this.getIndex() == that.getIndex();
117+
}
118+
119+
@Override
120+
public int hashCode() {
121+
return Long.hashCode(term) ^ Long.hashCode(index);
122+
}
123+
124+
private String longToString(long n) {
125+
return n >= 0L ? String.valueOf(n) : "~";
93126
}
94127

95-
final TermIndex that = (TermIndex) obj;
96-
return this.getTerm() == that.getTerm()
97-
&& this.getIndex() == that.getIndex();
98-
}
99-
100-
@Override
101-
public int hashCode() {
102-
return Long.hashCode(term) ^ Long.hashCode(index);
103-
}
104-
105-
private String longToString(long n) {
106-
return n >= 0L? String.valueOf(n) : "~";
107-
}
108-
109-
@Override
110-
public String toString() {
111-
return String.format("(t:%s, i:%s)", longToString(term), longToString(index));
112-
}
113-
};
128+
@Override
129+
public String toString() {
130+
return String.format("(t:%s, i:%s)", longToString(term), longToString(index));
131+
}
132+
};
133+
}
114134
}
115135
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.ratis.server.protocol;
19+
20+
import org.apache.ratis.util.BiWeakValueCache;
21+
22+
public interface ProtocolTestUtils {
23+
static BiWeakValueCache<Long, Long, TermIndex> getTermIndexCache() {
24+
return TermIndex.Impl.getCache();
25+
}
26+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.ratis.util;
19+
20+
import org.apache.ratis.BaseTest;
21+
import org.apache.ratis.RaftTestUtil;
22+
import org.apache.ratis.server.protocol.ProtocolTestUtils;
23+
import org.apache.ratis.server.protocol.TermIndex;
24+
import org.junit.jupiter.api.Test;
25+
26+
import static org.junit.jupiter.api.Assertions.*;
27+
28+
/** Testing {@link BiWeakValueCache}. */
29+
public class TestTermIndex extends BaseTest {
30+
static BiWeakValueCache<Long, Long, TermIndex> CACHE = ProtocolTestUtils.getTermIndexCache();
31+
32+
static void dumpCache(Integer expectedEmptyCount) {
33+
final int computed = CACHE.dump(System.out::print);
34+
if (expectedEmptyCount != null) {
35+
assertEquals(expectedEmptyCount, computed);
36+
}
37+
System.out.flush();
38+
}
39+
40+
static void assertCacheSize(int expectedSize, long term) {
41+
final int computed = CACHE.count(term);
42+
if (computed != expectedSize) {
43+
dumpCache(null);
44+
}
45+
assertEquals(expectedSize, computed);
46+
}
47+
48+
void assertCacheSizeWithGC(int expectedSize, long term) throws Exception{
49+
JavaUtils.attempt(() -> {
50+
RaftTestUtil.gc();
51+
assertCacheSize(expectedSize, term);
52+
}, 5, HUNDRED_MILLIS, "assertCacheSizeWithGC", LOG);
53+
}
54+
55+
static void initTermIndex(TermIndex[][] ti, int term, int index) {
56+
ti[term][index] = TermIndex.valueOf(term, index);
57+
}
58+
59+
@Test
60+
public void testCaching() throws Exception {
61+
final int n = 9;
62+
final TermIndex[][] ti = new TermIndex[n][n];
63+
final long[] terms = new long[n];
64+
final long[] indices = new long[n];
65+
for(int j = 0; j < n; j++) {
66+
terms[j] = j;
67+
indices[j] = j;
68+
}
69+
70+
assertCacheSize(0, terms[1]);
71+
initTermIndex(ti, 1, 1);
72+
assertSame(ti[1][1], TermIndex.valueOf(terms[1], indices[1]));
73+
assertCacheSize(1, terms[1]);
74+
75+
initTermIndex(ti, 1, 2);
76+
assertSame(ti[1][1], TermIndex.valueOf(terms[1], indices[1]));
77+
assertSame(ti[1][2], TermIndex.valueOf(terms[1], indices[2]));
78+
assertCacheSize(2, terms[1]);
79+
dumpCache(0);
80+
81+
initTermIndex(ti, 2, 2);
82+
assertSame(ti[1][1], TermIndex.valueOf(terms[1], indices[1]));
83+
assertSame(ti[1][2], TermIndex.valueOf(terms[1], indices[2]));
84+
assertSame(ti[2][2], TermIndex.valueOf(terms[2], indices[2]));
85+
assertCacheSize(2, terms[1]);
86+
assertCacheSize(1, terms[2]);
87+
dumpCache(0);
88+
89+
ti[1][1] = null; // release ti[1][1];
90+
assertCacheSizeWithGC(1, terms[1]);
91+
dumpCache(0);
92+
93+
ti[1][2] = null; // release ti[1][2];
94+
assertCacheSizeWithGC(0, terms[1]);
95+
dumpCache(1);
96+
97+
CACHE.cleanupEmptyInnerMaps();
98+
dumpCache(0);
99+
}
100+
}

0 commit comments

Comments
 (0)