Skip to content

Commit 4ab4f85

Browse files
committed
test: add test for relocate placement feature
1 parent 151171b commit 4ab4f85

File tree

2 files changed

+379
-0
lines changed

2 files changed

+379
-0
lines changed
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.bookkeeper.client;
19+
20+
import static junit.framework.TestCase.assertEquals;
21+
import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.REPP_DNS_RESOLVER_CLASS;
22+
import static org.mockito.ArgumentMatchers.eq;
23+
import java.lang.reflect.Field;
24+
import java.util.Arrays;
25+
import java.util.Collections;
26+
import java.util.HashMap;
27+
import java.util.HashSet;
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.NavigableMap;
31+
import java.util.Set;
32+
import java.util.TreeMap;
33+
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.TimeUnit;
35+
import java.util.concurrent.atomic.AtomicInteger;
36+
import lombok.Cleanup;
37+
import org.apache.bookkeeper.bookie.BookieShell;
38+
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
39+
import org.apache.bookkeeper.client.api.LedgerMetadata;
40+
import org.apache.bookkeeper.net.BookieId;
41+
import org.apache.bookkeeper.net.BookieSocketAddress;
42+
import org.apache.bookkeeper.net.NetworkTopology;
43+
import org.apache.bookkeeper.proto.BookieAddressResolver;
44+
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
45+
import org.apache.bookkeeper.tools.cli.commands.bookies.CorrectEnsemblePlacementCommand;
46+
import org.apache.bookkeeper.util.EntryFormatter;
47+
import org.apache.bookkeeper.util.LedgerIdFormatter;
48+
import org.apache.bookkeeper.util.StaticDNSResolver;
49+
import org.junit.Test;
50+
import org.mockito.Mockito;
51+
import org.slf4j.Logger;
52+
import org.slf4j.LoggerFactory;
53+
54+
/**
55+
* Tests of correct-ensemble-placement command.
56+
*/
57+
public class CorrectEnsemblePlacementCmdTest extends BookKeeperClusterTestCase {
58+
59+
private static final Logger LOG = LoggerFactory.getLogger(CorrectEnsemblePlacementCmdTest.class);
60+
private BookKeeper.DigestType digestType = BookKeeper.DigestType.CRC32;
61+
private static final String PASSWORD = "testPasswd";
62+
63+
public CorrectEnsemblePlacementCmdTest() throws Exception {
64+
super(0);
65+
baseConf.setLedgerStorageClass(DbLedgerStorage.class.getName());
66+
baseConf.setGcWaitTime(60000);
67+
baseConf.setFlushInterval(1);
68+
}
69+
70+
/**
71+
* list of entry logger files that contains given ledgerId.
72+
*/
73+
@Test
74+
public void testArgument() throws Exception {
75+
startNewBookie();
76+
77+
@Cleanup
78+
final BookKeeper bk = new BookKeeper(baseClientConf, zkc);
79+
@Cleanup
80+
final LedgerHandle lh = createLedgerWithEntries(bk, 10, 1, 1);
81+
82+
final String[] argv1 = { "correct-ensemble-placement", "--ledgerids", String.valueOf(lh.getId()),
83+
"--skipOpenLedgers", "--force"};
84+
final String[] argv2 = { "correct-ensemble-placement", "--ledgerids", String.valueOf(lh.getId()),
85+
"--skipOpenLedgers", "--force", "--dryrun"};
86+
final BookieShell bkShell =
87+
new BookieShell(LedgerIdFormatter.LONG_LEDGERID_FORMATTER, EntryFormatter.STRING_FORMATTER);
88+
bkShell.setConf(baseClientConf);
89+
90+
assertEquals("Failed to return exit code!", 0, bkShell.run(argv1));
91+
assertEquals("Failed to return exit code!", 0, bkShell.run(argv2));
92+
}
93+
94+
@Test
95+
public void testCorrectEnsemblePlacementByRackaware() throws Exception {
96+
final int ensembleSize = 7;
97+
final int quorumSize = 2;
98+
99+
final BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
100+
final BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
101+
final BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
102+
final BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
103+
final BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181);
104+
final BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.6", 3181);
105+
final BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.7", 3181);
106+
final BookieSocketAddress addr8 = new BookieSocketAddress("127.0.0.8", 3181);
107+
final BookieSocketAddress addr9 = new BookieSocketAddress("127.0.0.9", 3181);
108+
109+
final Set<BookieId> writableBookies = new HashSet<>();
110+
writableBookies.add(addr1.toBookieId());
111+
writableBookies.add(addr2.toBookieId());
112+
writableBookies.add(addr3.toBookieId());
113+
writableBookies.add(addr4.toBookieId());
114+
writableBookies.add(addr5.toBookieId());
115+
writableBookies.add(addr6.toBookieId());
116+
writableBookies.add(addr7.toBookieId());
117+
writableBookies.add(addr8.toBookieId());
118+
writableBookies.add(addr9.toBookieId());
119+
120+
// add bookie node to resolver
121+
StaticDNSResolver.reset();
122+
123+
final String rackName1 = NetworkTopology.DEFAULT_REGION + "/r1";
124+
final String rackName2 = NetworkTopology.DEFAULT_REGION + "/r2";
125+
final String rackName3 = NetworkTopology.DEFAULT_REGION + "/r3";
126+
127+
// update dns mapping
128+
// add port for testing
129+
StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(), rackName1);
130+
StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(), rackName1);
131+
StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(), rackName1);
132+
StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(), rackName2);
133+
StaticDNSResolver.addNodeToRack(addr5.getSocketAddress().getAddress().getHostAddress(), rackName2);
134+
StaticDNSResolver.addNodeToRack(addr6.getSocketAddress().getAddress().getHostAddress(), rackName2);
135+
StaticDNSResolver.addNodeToRack(addr7.getSocketAddress().getAddress().getHostAddress(), rackName3);
136+
StaticDNSResolver.addNodeToRack(addr8.getSocketAddress().getAddress().getHostAddress(), rackName3);
137+
StaticDNSResolver.addNodeToRack(addr9.getSocketAddress().getAddress().getHostAddress(), rackName3);
138+
LOG.info("Set up static DNS Resolver.");
139+
baseClientConf.setEnsemblePlacementPolicy(RackawareEnsemblePlacementPolicy.class);
140+
baseClientConf.setProperty(REPP_DNS_RESOLVER_CLASS, StaticDNSResolver.class.getName());
141+
142+
startNewBookie();
143+
144+
final NavigableMap<Long, List<BookieId>> ensemble = new TreeMap<>();
145+
// create failed ensemble
146+
// expect that the ensemble will be replaced to
147+
// [addr1, addr4, addr7, addr2, addr5, addr8, *addr6 (in /default-region/r2 bookies)*]
148+
ensemble.put(0L, Arrays.asList(
149+
addr1.toBookieId(), addr4.toBookieId(),
150+
addr7.toBookieId(), addr2.toBookieId(),
151+
addr5.toBookieId(), addr8.toBookieId(),
152+
addr3.toBookieId()));
153+
final LedgerMetadata lm1 = Mockito.mock(LedgerMetadata.class);
154+
Mockito.doReturn(ensemble).when(lm1).getAllEnsembles();
155+
Mockito.doReturn(ensembleSize).when(lm1).getEnsembleSize();
156+
Mockito.doReturn(quorumSize).when(lm1).getWriteQuorumSize();
157+
Mockito.doReturn(quorumSize).when(lm1).getAckQuorumSize();
158+
final LedgerHandle lh1 = Mockito.mock(LedgerHandle.class);
159+
Mockito.when(lh1.getLedgerMetadata()).thenReturn(lm1);
160+
161+
@Cleanup
162+
final BookKeeper bookKeeper = Mockito.spy(new BookKeeper(baseClientConf));
163+
Mockito.doReturn(true).when(bookKeeper).isClosed(Mockito.anyLong());
164+
165+
@Cleanup
166+
final BookKeeperAdmin admin = Mockito.spy(new BookKeeperAdmin(baseClientConf));
167+
Mockito.doAnswer(invocationOnMock -> {
168+
final AsyncCallback.OpenCallback op = invocationOnMock.getArgument(1);
169+
final CountDownLatch ctx = invocationOnMock.getArgument(2);
170+
op.openComplete(BKException.Code.OK, lh1, ctx);
171+
return null;
172+
}).when(admin).asyncOpenLedger(Mockito.anyLong(), Mockito.any(), Mockito.any());
173+
// expected return
174+
final Map<Integer, BookieId> expectedMap = new HashMap<>();
175+
expectedMap.put(6, addr6.toBookieId());
176+
Mockito.doNothing().when(admin)
177+
.replicateLedgerFragment(Mockito.any(), Mockito.any(), eq(expectedMap), Mockito.any());
178+
179+
final EnsemblePlacementPolicy policy = bookKeeper.getPlacementPolicy();
180+
final Field topologyField = TopologyAwareEnsemblePlacementPolicy.class
181+
.getDeclaredField("topology");
182+
topologyField.setAccessible(true);
183+
final NetworkTopology topology = Mockito.spy((NetworkTopology) topologyField.get(policy));
184+
Mockito.doReturn(2).when(topology).getNumOfRacks();
185+
topologyField.set(policy, topology);
186+
final Field bookieAddressResolverField = TopologyAwareEnsemblePlacementPolicy.class
187+
.getDeclaredField("bookieAddressResolver");
188+
bookieAddressResolverField.setAccessible(true);
189+
final BookieAddressResolver bookieAddressResolver =
190+
Mockito.spy((BookieAddressResolver) bookieAddressResolverField.get(policy));
191+
Mockito.doAnswer(invocationOnMock -> {
192+
final BookieId bookieId = invocationOnMock.getArgument(0);
193+
return new BookieSocketAddress(bookieId.getId());
194+
}).when(bookieAddressResolver).resolve(Mockito.any());
195+
bookieAddressResolverField.set(policy, bookieAddressResolver);
196+
197+
// add mock bookies to knownBookies
198+
policy.onClusterChanged(writableBookies, Collections.emptySet());
199+
200+
// make sure that the ensemble is FAIL state
201+
assertEquals(EnsemblePlacementPolicy.PlacementPolicyAdherence.FAIL,
202+
policy.isEnsembleAdheringToPlacementPolicy(ensemble.get(0L), quorumSize, quorumSize));
203+
204+
final CorrectEnsemblePlacementCommand.CorrectEnsemblePlacementFlags flag =
205+
new CorrectEnsemblePlacementCommand.CorrectEnsemblePlacementFlags();
206+
flag.ledgerIds(Collections.singletonList(1L));
207+
final CorrectEnsemblePlacementCommand cmd = new CorrectEnsemblePlacementCommand();
208+
cmd.relocate(baseConf, flag, bookKeeper, admin);
209+
210+
Mockito.verify(admin, Mockito.times(1))
211+
.replicateLedgerFragment(Mockito.any(), Mockito.any(), eq(expectedMap), Mockito.any());
212+
}
213+
214+
private LedgerHandle createLedgerWithEntries(BookKeeper bk, int numOfEntries,
215+
int ensembleSize, int quorumSize) throws Exception {
216+
LedgerHandle lh = bk.createLedger(ensembleSize, quorumSize, digestType, PASSWORD.getBytes());
217+
final AtomicInteger rc = new AtomicInteger(BKException.Code.OK);
218+
final CountDownLatch latch = new CountDownLatch(numOfEntries);
219+
220+
final AsyncCallback.AddCallback cb = (rccb, lh1, entryId, ctx) -> {
221+
rc.compareAndSet(BKException.Code.OK, rccb);
222+
latch.countDown();
223+
};
224+
for (int i = 0; i < numOfEntries; i++) {
225+
lh.asyncAddEntry(("foobar" + i).getBytes(), cb, null);
226+
}
227+
if (!latch.await(30, TimeUnit.SECONDS)) {
228+
throw new Exception("Entries took too long to add");
229+
}
230+
if (rc.get() != BKException.Code.OK) {
231+
throw BKException.create(rc.get());
232+
}
233+
return lh;
234+
}
235+
}

0 commit comments

Comments
 (0)