Skip to content

Commit eee2e9f

Browse files
committed
Replace synchronized multimap with concurrent hash map in NotificationControllerV2 for better performance
1 parent 55e09fe commit eee2e9f

File tree

5 files changed

+279
-8
lines changed

5 files changed

+279
-8
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,6 @@ Apollo 2.5.0
2727
* [Feature: Support exporting and importing configurations for specified applications and clusters](https://github.com/apolloconfig/apollo/pull/5517)
2828
* [doc: Add rust apollo client link](https://github.com/apolloconfig/apollo/pull/5514)
2929
* [Perf: optimize namespace-related interface](https://github.com/apolloconfig/apollo/pull/5518)
30+
* [Perf: Replace synchronized multimap with concurrent hashmap in NotificationControllerV2 for better performance](https://github.com/apolloconfig/apollo/pull/5532)
3031
------------------
3132
All issues and pull requests are [here](https://github.com/apolloconfig/apollo/milestone/16?closed=1)

apollo-configservice/src/main/java/com/ctrip/framework/apollo/configservice/controller/NotificationControllerV2.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.ctrip.framework.apollo.configservice.service.ReleaseMessageServiceWithCache;
2727
import com.ctrip.framework.apollo.configservice.util.NamespaceUtil;
2828
import com.ctrip.framework.apollo.configservice.util.WatchKeysUtil;
29+
import com.ctrip.framework.apollo.configservice.wrapper.CaseInsensitiveMultimapWrapper;
2930
import com.ctrip.framework.apollo.configservice.wrapper.DeferredResultWrapper;
3031
import com.ctrip.framework.apollo.core.ConfigConsts;
3132
import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification;
@@ -35,10 +36,7 @@
3536
import com.google.common.collect.Lists;
3637
import com.google.common.collect.Maps;
3738
import com.google.common.collect.Multimap;
38-
import com.google.common.collect.Multimaps;
39-
import com.google.common.collect.Ordering;
4039
import com.google.common.collect.Sets;
41-
import com.google.common.collect.TreeMultimap;
4240
import com.google.gson.Gson;
4341
import com.google.gson.reflect.TypeToken;
4442
import org.slf4j.Logger;
@@ -69,9 +67,8 @@
6967
@RequestMapping("/notifications/v2")
7068
public class NotificationControllerV2 implements ReleaseMessageListener {
7169
private static final Logger logger = LoggerFactory.getLogger(NotificationControllerV2.class);
72-
private final Multimap<String, DeferredResultWrapper> deferredResults =
73-
Multimaps.synchronizedSetMultimap(
74-
TreeMultimap.create(String.CASE_INSENSITIVE_ORDER, Ordering.natural()));
70+
private final CaseInsensitiveMultimapWrapper<DeferredResultWrapper> deferredResults =
71+
new CaseInsensitiveMultimapWrapper<>(Maps.newConcurrentMap(), Sets::newConcurrentHashSet);
7572

7673
private static final Type notificationsTypeReference =
7774
new TypeToken<List<ApolloConfigNotification>>() {}.getType();
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2025 Apollo Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
package com.ctrip.framework.apollo.configservice.wrapper;
18+
19+
import java.util.Collections;
20+
import java.util.Locale;
21+
import java.util.Map;
22+
import java.util.Objects;
23+
import java.util.Set;
24+
import java.util.function.Supplier;
25+
26+
/**
27+
* Multimap with case-insensitive keys.
28+
* <p>
29+
* This class wraps a {@code Map<String, Set<V>>} to provide multimap-like behavior with
30+
* case-insensitive keys.
31+
* </p>
32+
*
33+
* <p><b>Thread-safety</b>: This class is thread-safe if and only if the supplied delegate map and
34+
* the sets produced by {@code setSupplier} are thread-safe.
35+
* </p>
36+
*
37+
* <p><b>Views</b>: {@link #get(String)} returns an unmodifiable view of the underlying set.
38+
* Mutations should be done via {@link #put(String, Object)} and {@link #remove(String, Object)} so
39+
* that empty sets can be cleaned up.
40+
* </p>
41+
*/
42+
public class CaseInsensitiveMultimapWrapper<V> {
43+
private final Map<String, Set<V>> delegate;
44+
private final Supplier<Set<V>> setSupplier;
45+
46+
public CaseInsensitiveMultimapWrapper(Map<String, Set<V>> delegate,
47+
Supplier<Set<V>> setSupplier) {
48+
this.delegate = delegate;
49+
this.setSupplier = setSupplier;
50+
}
51+
52+
private static String normalizeKey(String key) {
53+
return Objects.requireNonNull(key, "key").toLowerCase(Locale.ROOT);
54+
}
55+
56+
public boolean put(String key, V value) {
57+
boolean[] added = {false};
58+
delegate.compute(normalizeKey(key), (k, set) -> {
59+
if (set == null) {
60+
set = setSupplier.get();
61+
}
62+
added[0] = set.add(value);
63+
return set;
64+
});
65+
return added[0];
66+
}
67+
68+
public boolean remove(String key, V value) {
69+
boolean[] removed = {false};
70+
delegate.computeIfPresent(normalizeKey(key), (k, set) -> {
71+
removed[0] = set.remove(value);
72+
return set.isEmpty() ? null : set;
73+
});
74+
return removed[0];
75+
}
76+
77+
public Set<V> get(String key) {
78+
Set<V> set = delegate.get(normalizeKey(key));
79+
return set != null ? Collections.unmodifiableSet(set) : Collections.emptySet();
80+
}
81+
82+
public boolean containsKey(String key) {
83+
Set<V> set = delegate.get(normalizeKey(key));
84+
return set != null && !set.isEmpty();
85+
}
86+
87+
/**
88+
* Returns the total number of values across all keys.
89+
* <p>
90+
* Note: In concurrent scenarios, the returned value is a best-effort approximation.
91+
* </p>
92+
*/
93+
public int size() {
94+
int size = 0;
95+
for (Set<V> set : delegate.values()) {
96+
size += set.size();
97+
}
98+
return size;
99+
}
100+
}

apollo-configservice/src/test/java/com/ctrip/framework/apollo/configservice/controller/NotificationControllerV2Test.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.ctrip.framework.apollo.configservice.service.ReleaseMessageServiceWithCache;
2424
import com.ctrip.framework.apollo.configservice.util.NamespaceUtil;
2525
import com.ctrip.framework.apollo.configservice.util.WatchKeysUtil;
26+
import com.ctrip.framework.apollo.configservice.wrapper.CaseInsensitiveMultimapWrapper;
2627
import com.ctrip.framework.apollo.configservice.wrapper.DeferredResultWrapper;
2728
import com.ctrip.framework.apollo.core.ConfigConsts;
2829
import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification;
@@ -79,7 +80,7 @@ public class NotificationControllerV2Test {
7980

8081
private Gson gson;
8182

82-
private Multimap<String, DeferredResultWrapper> deferredResults;
83+
private CaseInsensitiveMultimapWrapper<DeferredResultWrapper> deferredResults;
8384

8485
@Before
8586
public void setUp() throws Exception {
@@ -106,7 +107,7 @@ public void setUp() throws Exception {
106107
when(namespaceUtil.normalizeNamespace(someAppId, somePublicNamespace))
107108
.thenReturn(somePublicNamespace);
108109

109-
deferredResults = (Multimap<String, DeferredResultWrapper>) ReflectionTestUtils
110+
deferredResults = (CaseInsensitiveMultimapWrapper<DeferredResultWrapper>) ReflectionTestUtils
110111
.getField(controller, "deferredResults");
111112
}
112113

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/*
2+
* Copyright 2025 Apollo Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
package com.ctrip.framework.apollo.configservice.wrapper;
18+
19+
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertFalse;
21+
import static org.junit.Assert.assertTrue;
22+
23+
import com.google.common.collect.Maps;
24+
import com.google.common.collect.Sets;
25+
import java.util.HashSet;
26+
import java.util.Set;
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.atomic.AtomicBoolean;
29+
import java.util.concurrent.atomic.AtomicInteger;
30+
import java.util.concurrent.atomic.AtomicReference;
31+
import org.junit.Before;
32+
import org.junit.Test;
33+
34+
public class CaseInsensitiveMultimapWrapperTest {
35+
36+
private CaseInsensitiveMultimapWrapper<String> multimap;
37+
38+
@Before
39+
public void setUp() throws Exception {
40+
multimap =
41+
new CaseInsensitiveMultimapWrapper<>(Maps.newConcurrentMap(), Sets::newConcurrentHashSet);
42+
}
43+
44+
@Test
45+
public void testPutAndGet() {
46+
String key = "SomeKey";
47+
String value1 = "value1";
48+
String value2 = "value2";
49+
50+
assertTrue(multimap.put(key, value1));
51+
assertTrue(multimap.put(key.toLowerCase(), value2));
52+
assertFalse(multimap.put(key.toUpperCase(), value1)); // already exists
53+
54+
Set<String> values = multimap.get(key);
55+
assertEquals(2, values.size());
56+
assertTrue(values.contains(value1));
57+
assertTrue(values.contains(value2));
58+
59+
Set<String> valuesFromLower = multimap.get(key.toLowerCase());
60+
assertEquals(values, valuesFromLower);
61+
}
62+
63+
@Test
64+
public void testRemove() {
65+
String key = "SomeKey";
66+
String value = "someValue";
67+
68+
multimap.put(key, value);
69+
assertTrue(multimap.containsKey(key));
70+
71+
assertTrue(multimap.remove(key.toUpperCase(), value));
72+
assertFalse(multimap.containsKey(key));
73+
assertTrue(multimap.get(key).isEmpty());
74+
}
75+
76+
@Test
77+
public void testContainsKey() {
78+
String key = "SomeKey";
79+
String value = "someValue";
80+
81+
assertFalse(multimap.containsKey(key));
82+
multimap.put(key, value);
83+
assertTrue(multimap.containsKey(key.toLowerCase()));
84+
assertTrue(multimap.containsKey(key.toUpperCase()));
85+
}
86+
87+
@Test
88+
public void testSize() {
89+
multimap.put("Key1", "v1");
90+
multimap.put("key1", "v2");
91+
multimap.put("Key2", "v3");
92+
93+
assertEquals(3, multimap.size());
94+
95+
multimap.remove("KEY1", "v1");
96+
assertEquals(2, multimap.size());
97+
}
98+
99+
@Test
100+
public void testGetEmpty() {
101+
assertTrue(multimap.get("nonExistent").isEmpty());
102+
}
103+
104+
@Test
105+
public void testConcurrencyRaceCondition() throws InterruptedException {
106+
final int loopCount = 100000;
107+
final String key = "RaceKey";
108+
final String valA = "A";
109+
final String valB = "B";
110+
final CountDownLatch latch = new CountDownLatch(2);
111+
final AtomicBoolean running = new AtomicBoolean(true);
112+
final AtomicInteger failures = new AtomicInteger(0);
113+
final AtomicReference<Throwable> threadException = new AtomicReference<>();
114+
115+
// Thread A: toggles valA
116+
new Thread(() -> {
117+
try {
118+
while (running.get()) {
119+
multimap.put(key, valA);
120+
multimap.remove(key, valA);
121+
}
122+
} catch (Throwable e) {
123+
threadException.set(e);
124+
} finally {
125+
latch.countDown();
126+
}
127+
}).start();
128+
129+
// Thread B: repeatedly adds and removes valB to test put atomicity
130+
new Thread(() -> {
131+
try {
132+
for (int i = 0; i < loopCount; i++) {
133+
multimap.put(key, valB);
134+
if (!multimap.get(key).contains(valB)) {
135+
failures.incrementAndGet();
136+
}
137+
// Remove B to allow the set to become empty again,
138+
// giving Thread A a chance to trigger the map removal race condition again.
139+
multimap.remove(key, valB);
140+
}
141+
} catch (Throwable e) {
142+
threadException.set(e);
143+
} finally {
144+
running.set(false);
145+
latch.countDown();
146+
}
147+
}).start();
148+
149+
latch.await();
150+
151+
if (threadException.get() != null) {
152+
throw new RuntimeException("Exception in worker thread", threadException.get());
153+
}
154+
155+
assertEquals("Value B should not be lost due to race condition", 0, failures.get());
156+
}
157+
158+
@Test
159+
public void testGetReturnsUnmodifiableView() {
160+
assertTrue(multimap.put("Key", "Value"));
161+
162+
Set<String> set = multimap.get("key");
163+
assertTrue(set.contains("Value"));
164+
165+
try {
166+
set.add("Another");
167+
org.junit.Assert.fail("get() should return an unmodifiable view");
168+
} catch (UnsupportedOperationException expected) {
169+
// expected
170+
}
171+
}
172+
}

0 commit comments

Comments
 (0)