Skip to content

Commit a322354

Browse files
author
wangwei
committed
test case
1 parent b9bb870 commit a322354

File tree

1 file changed

+276
-0
lines changed

1 file changed

+276
-0
lines changed
Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.dubbo.rpc.cluster.directory;
18+
19+
import org.apache.dubbo.common.URL;
20+
import org.apache.dubbo.common.utils.NetUtils;
21+
import org.apache.dubbo.rpc.Invocation;
22+
import org.apache.dubbo.rpc.Invoker;
23+
import org.apache.dubbo.rpc.RpcException;
24+
import org.apache.dubbo.rpc.cluster.RouterChain;
25+
import org.apache.dubbo.rpc.cluster.SingleRouterChain;
26+
import org.apache.dubbo.rpc.cluster.router.state.BitList;
27+
28+
import java.util.Collections;
29+
import java.util.List;
30+
import java.util.concurrent.CountDownLatch;
31+
import java.util.concurrent.ExecutorService;
32+
import java.util.concurrent.Executors;
33+
import java.util.concurrent.Future;
34+
import java.util.concurrent.TimeUnit;
35+
import java.util.concurrent.atomic.AtomicBoolean;
36+
import java.util.concurrent.atomic.AtomicReference;
37+
38+
import org.junit.jupiter.api.AfterEach;
39+
import org.junit.jupiter.api.Assertions;
40+
import org.junit.jupiter.api.BeforeEach;
41+
import org.junit.jupiter.api.Test;
42+
43+
import static org.mockito.Mockito.mock;
44+
45+
class AbstractDirectoryConcurrencyTest {
46+
47+
private TestDirectory directory;
48+
private URL url;
49+
private ExecutorService executor;
50+
51+
@BeforeEach
52+
void setUp() {
53+
url = URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":20880/com.foo.BarService");
54+
directory = new TestDirectory(url);
55+
executor = Executors.newFixedThreadPool(10);
56+
}
57+
58+
@AfterEach
59+
void tearDown() {
60+
if (directory != null) {
61+
directory.destroy();
62+
}
63+
if (executor != null) {
64+
executor.shutdownNow();
65+
}
66+
}
67+
68+
@Test
69+
void testMultipleReadLocks() throws InterruptedException {
70+
int threadCount = 5;
71+
CountDownLatch latch = new CountDownLatch(1);
72+
CountDownLatch doneLatch = new CountDownLatch(threadCount);
73+
AtomicBoolean failed = new AtomicBoolean(false);
74+
75+
// Setup the directory with a slow list implementation to simulate work holding the read lock
76+
directory.setListAction(() -> {
77+
try {
78+
// Wait for the latch to ensure all threads are in doList
79+
latch.await(5, TimeUnit.SECONDS);
80+
} catch (InterruptedException e) {
81+
Thread.currentThread().interrupt();
82+
}
83+
});
84+
85+
for (int i = 0; i < threadCount; i++) {
86+
executor.submit(() -> {
87+
try {
88+
directory.list(mock(Invocation.class));
89+
} catch (Exception e) {
90+
e.printStackTrace();
91+
failed.set(true);
92+
} finally {
93+
doneLatch.countDown();
94+
}
95+
});
96+
}
97+
98+
// Give threads time to start and acquire read lock
99+
Thread.sleep(100);
100+
// Release the latch, letting them proceed
101+
latch.countDown();
102+
103+
Assertions.assertTrue(doneLatch.await(5, TimeUnit.SECONDS), "All list calls should complete");
104+
Assertions.assertFalse(failed.get(), "No exceptions should occur during concurrent reads");
105+
}
106+
107+
@Test
108+
void testWriteBlocksRead() throws InterruptedException {
109+
CountDownLatch writeLockAcquiredLatch = new CountDownLatch(1);
110+
CountDownLatch releaseWriteLockLatch = new CountDownLatch(1);
111+
AtomicReference<Boolean> readBlocked = new AtomicReference<>(false);
112+
113+
// Thread to hold write lock
114+
executor.submit(() -> {
115+
directory.simulateWriteLock(writeLockAcquiredLatch, releaseWriteLockLatch);
116+
});
117+
118+
// Wait for write lock to be acquired
119+
Assertions.assertTrue(writeLockAcquiredLatch.await(5, TimeUnit.SECONDS));
120+
121+
// Try to read in another thread
122+
Future<?> readFuture = executor.submit(() -> {
123+
long start = System.currentTimeMillis();
124+
directory.list(mock(Invocation.class));
125+
long duration = System.currentTimeMillis() - start;
126+
// If duration is > 100ms, we assume it was blocked
127+
readBlocked.set(duration >= 100);
128+
});
129+
130+
// Sleep to ensure read thread tries to acquire lock and blocks
131+
Thread.sleep(200);
132+
133+
// Release write lock
134+
releaseWriteLockLatch.countDown();
135+
136+
try {
137+
readFuture.get(5, TimeUnit.SECONDS);
138+
} catch (Exception e) {
139+
Assertions.fail("Read execution failed");
140+
}
141+
142+
Assertions.assertTrue(readBlocked.get(), "Read operation should be blocked by write lock");
143+
}
144+
145+
@Test
146+
void testConcurrentReadAndWrite() throws InterruptedException {
147+
int readThreads = 10;
148+
int writeThreads = 2;
149+
int iterations = 100;
150+
CountDownLatch doneLatch = new CountDownLatch(readThreads + writeThreads);
151+
AtomicBoolean failed = new AtomicBoolean(false);
152+
153+
directory.setListAction(() -> {
154+
// Simulate some work
155+
try {
156+
Thread.sleep(1);
157+
} catch (InterruptedException e) {
158+
}
159+
});
160+
161+
// Start read threads
162+
for (int i = 0; i < readThreads; i++) {
163+
executor.submit(() -> {
164+
try {
165+
for (int j = 0; j < iterations; j++) {
166+
directory.list(mock(Invocation.class));
167+
}
168+
} catch (Exception e) {
169+
e.printStackTrace();
170+
failed.set(true);
171+
} finally {
172+
doneLatch.countDown();
173+
}
174+
});
175+
}
176+
177+
// Start write threads
178+
for (int i = 0; i < writeThreads; i++) {
179+
executor.submit(() -> {
180+
try {
181+
for (int j = 0; j < iterations; j++) {
182+
// Use setInvokers to trigger write lock
183+
directory.setInvokers(new BitList<>(Collections.emptyList()));
184+
Thread.sleep(2);
185+
}
186+
} catch (Exception e) {
187+
e.printStackTrace();
188+
failed.set(true);
189+
} finally {
190+
doneLatch.countDown();
191+
}
192+
});
193+
}
194+
195+
Assertions.assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "All operations should complete");
196+
Assertions.assertFalse(failed.get(), "No exceptions should occur during concurrent read/write");
197+
}
198+
199+
// Helper class to expose protected methods and hook into list()
200+
static class TestDirectory extends AbstractDirectory<Object> {
201+
private Runnable listAction = () -> {};
202+
203+
public TestDirectory(URL url) {
204+
super(url);
205+
// Initialize with empty router chain to avoid NPE
206+
setRouterChain(RouterChain.buildChain(Object.class, url));
207+
}
208+
209+
public void setListAction(Runnable listAction) {
210+
this.listAction = listAction;
211+
}
212+
213+
@Override
214+
public Class<Object> getInterface() {
215+
return Object.class;
216+
}
217+
218+
@Override
219+
public List<Invoker<Object>> getAllInvokers() {
220+
return Collections.emptyList();
221+
}
222+
223+
@Override
224+
public boolean isAvailable() {
225+
return true;
226+
}
227+
228+
@Override
229+
protected List<Invoker<Object>> doList(
230+
SingleRouterChain<Object> singleRouterChain, BitList<Invoker<Object>> invokers, Invocation invocation)
231+
throws RpcException {
232+
listAction.run();
233+
return Collections.emptyList();
234+
}
235+
236+
// Helper to simulate holding write lock
237+
public void simulateWriteLock(CountDownLatch acquired, CountDownLatch release) {
238+
// We use refreshInvoker to acquire write lock, but we need to inject our blocking logic
239+
// Since we can't easily inject into refreshInvoker without complex mocking,
240+
// we'll use a trick: override setInvokers logic? No, setInvokers uses lock internally.
241+
// But we can use the fact that addRouters/etc might not use the same lock? No.
242+
// We can't access the lock directly.
243+
// However, we can use 'addInvalidateInvoker' or similar if we can hook into it.
244+
245+
// Actually, we can use a method that holds the lock and calls something we can override?
246+
// AbstractDirectory doesn't call many overridable methods inside the lock.
247+
// refreshInvoker calls refreshInvokerInternal (private).
248+
249+
// Wait, we can use reflection to get the lock and lock it manually for this test helper.
250+
try {
251+
java.lang.reflect.Field lockField = AbstractDirectory.class.getDeclaredField("invokerRefreshLock");
252+
lockField.setAccessible(true);
253+
java.util.concurrent.locks.ReadWriteLock lock =
254+
(java.util.concurrent.locks.ReadWriteLock) lockField.get(this);
255+
256+
lock.writeLock().lock();
257+
try {
258+
acquired.countDown();
259+
release.await();
260+
} catch (InterruptedException e) {
261+
Thread.currentThread().interrupt();
262+
} finally {
263+
lock.writeLock().unlock();
264+
}
265+
} catch (Exception e) {
266+
e.printStackTrace();
267+
}
268+
}
269+
270+
// Expose setInvokers for test
271+
@Override
272+
public void setInvokers(BitList<Invoker<Object>> invokers) {
273+
super.setInvokers(invokers);
274+
}
275+
}
276+
}

0 commit comments

Comments
 (0)