Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@ Apollo 2.5.0
* [Feature: Support exporting and importing configurations for specified applications and clusters](https://github.com/apolloconfig/apollo/pull/5517)
* [doc: Add rust apollo client link](https://github.com/apolloconfig/apollo/pull/5514)
* [Perf: optimize namespace-related interface](https://github.com/apolloconfig/apollo/pull/5518)
* [Perf: Replace synchronized multimap with concurrent hashmap in NotificationControllerV2 for better performance](https://github.com/apolloconfig/apollo/pull/5532)
------------------
All issues and pull requests are [here](https://github.com/apolloconfig/apollo/milestone/16?closed=1)
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.ctrip.framework.apollo.configservice.service.ReleaseMessageServiceWithCache;
import com.ctrip.framework.apollo.configservice.util.NamespaceUtil;
import com.ctrip.framework.apollo.configservice.util.WatchKeysUtil;
import com.ctrip.framework.apollo.configservice.wrapper.CaseInsensitiveMultimapWrapper;
import com.ctrip.framework.apollo.configservice.wrapper.DeferredResultWrapper;
import com.ctrip.framework.apollo.core.ConfigConsts;
import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification;
Expand All @@ -35,10 +36,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultimap;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.slf4j.Logger;
Expand Down Expand Up @@ -69,9 +67,8 @@
@RequestMapping("/notifications/v2")
public class NotificationControllerV2 implements ReleaseMessageListener {
private static final Logger logger = LoggerFactory.getLogger(NotificationControllerV2.class);
private final Multimap<String, DeferredResultWrapper> deferredResults =
Multimaps.synchronizedSetMultimap(
TreeMultimap.create(String.CASE_INSENSITIVE_ORDER, Ordering.natural()));
private final CaseInsensitiveMultimapWrapper<DeferredResultWrapper> deferredResults =
new CaseInsensitiveMultimapWrapper<>(Maps.newConcurrentMap(), Sets::newConcurrentHashSet);

private static final Type notificationsTypeReference =
new TypeToken<List<ApolloConfigNotification>>() {}.getType();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2025 Apollo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.ctrip.framework.apollo.configservice.wrapper;

import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;

/**
* Multimap with case-insensitive keys.
* <p>
* This class wraps a {@code Map<String, Set<V>>} to provide multimap-like behavior with
* case-insensitive keys.
* </p>
*
* <p><b>Thread-safety</b>: This class is thread-safe if and only if the supplied delegate map and
* the sets produced by {@code setSupplier} are thread-safe.
* </p>
*
* <p><b>Views</b>: {@link #get(String)} returns an unmodifiable view of the underlying set.
* Mutations should be done via {@link #put(String, Object)} and {@link #remove(String, Object)} so
* that empty sets can be cleaned up.
* </p>
*/
public class CaseInsensitiveMultimapWrapper<V> {
private final Map<String, Set<V>> delegate;
private final Supplier<Set<V>> setSupplier;

public CaseInsensitiveMultimapWrapper(Map<String, Set<V>> delegate,
Supplier<Set<V>> setSupplier) {
this.delegate = delegate;
this.setSupplier = setSupplier;
}

private static String normalizeKey(String key) {
return Objects.requireNonNull(key, "key").toLowerCase(Locale.ROOT);
}

public boolean put(String key, V value) {
boolean[] added = {false};
delegate.compute(normalizeKey(key), (k, set) -> {
if (set == null) {
set = setSupplier.get();
}
added[0] = set.add(value);
return set;
});
return added[0];
}

public boolean remove(String key, V value) {
boolean[] removed = {false};
delegate.computeIfPresent(normalizeKey(key), (k, set) -> {
removed[0] = set.remove(value);
return set.isEmpty() ? null : set;
});
return removed[0];
}

public Set<V> get(String key) {
Set<V> set = delegate.get(normalizeKey(key));
return set != null ? Collections.unmodifiableSet(set) : Collections.emptySet();
}

public boolean containsKey(String key) {
Set<V> set = delegate.get(normalizeKey(key));
return set != null && !set.isEmpty();
}

/**
* Returns the total number of values across all keys.
* <p>
* Note: In concurrent scenarios, the returned value is a best-effort approximation.
* </p>
*/
public int size() {
int size = 0;
for (Set<V> set : delegate.values()) {
size += set.size();
}
return size;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.ctrip.framework.apollo.configservice.service.ReleaseMessageServiceWithCache;
import com.ctrip.framework.apollo.configservice.util.NamespaceUtil;
import com.ctrip.framework.apollo.configservice.util.WatchKeysUtil;
import com.ctrip.framework.apollo.configservice.wrapper.CaseInsensitiveMultimapWrapper;
import com.ctrip.framework.apollo.configservice.wrapper.DeferredResultWrapper;
import com.ctrip.framework.apollo.core.ConfigConsts;
import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification;
Expand Down Expand Up @@ -79,7 +80,7 @@ public class NotificationControllerV2Test {

private Gson gson;

private Multimap<String, DeferredResultWrapper> deferredResults;
private CaseInsensitiveMultimapWrapper<DeferredResultWrapper> deferredResults;

@Before
public void setUp() throws Exception {
Expand All @@ -106,7 +107,7 @@ public void setUp() throws Exception {
when(namespaceUtil.normalizeNamespace(someAppId, somePublicNamespace))
.thenReturn(somePublicNamespace);

deferredResults = (Multimap<String, DeferredResultWrapper>) ReflectionTestUtils
deferredResults = (CaseInsensitiveMultimapWrapper<DeferredResultWrapper>) ReflectionTestUtils
.getField(controller, "deferredResults");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Copyright 2025 Apollo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.ctrip.framework.apollo.configservice.wrapper;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.Test;

public class CaseInsensitiveMultimapWrapperTest {

private CaseInsensitiveMultimapWrapper<String> multimap;

@Before
public void setUp() throws Exception {
multimap =
new CaseInsensitiveMultimapWrapper<>(Maps.newConcurrentMap(), Sets::newConcurrentHashSet);
}

@Test
public void testPutAndGet() {
String key = "SomeKey";
String value1 = "value1";
String value2 = "value2";

assertTrue(multimap.put(key, value1));
assertTrue(multimap.put(key.toLowerCase(), value2));
assertFalse(multimap.put(key.toUpperCase(), value1)); // already exists

Set<String> values = multimap.get(key);
assertEquals(2, values.size());
assertTrue(values.contains(value1));
assertTrue(values.contains(value2));

Set<String> valuesFromLower = multimap.get(key.toLowerCase());
assertEquals(values, valuesFromLower);
}

@Test
public void testRemove() {
String key = "SomeKey";
String value = "someValue";

multimap.put(key, value);
assertTrue(multimap.containsKey(key));

assertTrue(multimap.remove(key.toUpperCase(), value));
assertFalse(multimap.containsKey(key));
assertTrue(multimap.get(key).isEmpty());
}

@Test
public void testContainsKey() {
String key = "SomeKey";
String value = "someValue";

assertFalse(multimap.containsKey(key));
multimap.put(key, value);
assertTrue(multimap.containsKey(key.toLowerCase()));
assertTrue(multimap.containsKey(key.toUpperCase()));
}

@Test
public void testSize() {
multimap.put("Key1", "v1");
multimap.put("key1", "v2");
multimap.put("Key2", "v3");

assertEquals(3, multimap.size());

multimap.remove("KEY1", "v1");
assertEquals(2, multimap.size());
}

@Test
public void testGetEmpty() {
assertTrue(multimap.get("nonExistent").isEmpty());
}

@Test
public void testConcurrencyRaceCondition() throws InterruptedException {
final int loopCount = 100000;
final String key = "RaceKey";
final String valA = "A";
final String valB = "B";
final CountDownLatch latch = new CountDownLatch(2);
final AtomicBoolean running = new AtomicBoolean(true);
final AtomicInteger failures = new AtomicInteger(0);
final AtomicReference<Throwable> threadException = new AtomicReference<>();

// Thread A: toggles valA
new Thread(() -> {
try {
while (running.get()) {
multimap.put(key, valA);
multimap.remove(key, valA);
}
} catch (Throwable e) {
threadException.set(e);
} finally {
latch.countDown();
}
}).start();

// Thread B: repeatedly adds and removes valB to test put atomicity
new Thread(() -> {
try {
for (int i = 0; i < loopCount; i++) {
multimap.put(key, valB);
if (!multimap.get(key).contains(valB)) {
failures.incrementAndGet();
}
// Remove B to allow the set to become empty again,
// giving Thread A a chance to trigger the map removal race condition again.
multimap.remove(key, valB);
}
} catch (Throwable e) {
threadException.set(e);
} finally {
running.set(false);
latch.countDown();
}
}).start();

latch.await();

if (threadException.get() != null) {
throw new RuntimeException("Exception in worker thread", threadException.get());
}

assertEquals("Value B should not be lost due to race condition", 0, failures.get());
}

@Test
public void testGetReturnsUnmodifiableView() {
assertTrue(multimap.put("Key", "Value"));

Set<String> set = multimap.get("key");
assertTrue(set.contains("Value"));

try {
set.add("Another");
org.junit.Assert.fail("get() should return an unmodifiable view");
} catch (UnsupportedOperationException expected) {
// expected
}
}
}