Skip to content

Commit 4de5983

Browse files
zymapsijie
authored andcommitted
Issue #1987: Migrate command convert-to-interleaved-storage
Descriptions of the changes in this PR: - Migrate command `convert-to-interleaved-storage` ### Motivation - #1987 - Use `bkctl` to run command `convert-to-interleaved-storage` ### Changes - Add command in `bookiegroup` Reviewers: Jia Zhai <zhaijia@apache.org>, Sijie Guo <sijie@apache.org> This closes #1988 from zymap/command-ctis, closes #1987
1 parent 468743e commit 4de5983

File tree

4 files changed

+358
-89
lines changed

4 files changed

+358
-89
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java

Lines changed: 6 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import com.google.common.util.concurrent.UncheckedExecutionException;
3030
import io.netty.buffer.ByteBuf;
3131
import io.netty.buffer.ByteBufUtil;
32-
import io.netty.buffer.PooledByteBufAllocator;
3332
import io.netty.buffer.Unpooled;
3433
import io.netty.buffer.UnpooledByteBufAllocator;
3534
import io.netty.channel.EventLoopGroup;
@@ -74,10 +73,8 @@
7473
import java.util.stream.LongStream;
7574
import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
7675
import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
77-
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
7876
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
7977
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
80-
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
8178
import org.apache.bookkeeper.bookie.storage.ldb.LocationsIndexRebuildOp;
8279
import org.apache.bookkeeper.client.BKException;
8380
import org.apache.bookkeeper.client.BKException.MetaStoreException;
@@ -108,6 +105,7 @@
108105
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
109106
import org.apache.bookkeeper.stats.NullStatsLogger;
110107
import org.apache.bookkeeper.tools.cli.commands.bookie.ConvertToDBStorageCommand;
108+
import org.apache.bookkeeper.tools.cli.commands.bookie.ConvertToInterleavedStorageCommand;
111109
import org.apache.bookkeeper.tools.cli.commands.bookie.FormatCommand;
112110
import org.apache.bookkeeper.tools.cli.commands.bookie.InitCommand;
113111
import org.apache.bookkeeper.tools.cli.commands.bookie.LastMarkCommand;
@@ -2070,12 +2068,12 @@ private int updateBookieIdInCookie(final String bookieId, final boolean useHostn
20702068
for (File dir : ledgerDirectories) {
20712069
newCookie.writeToDirectory(dir);
20722070
}
2073-
LOG.info("Updated cookie file present in ledgerDirectories {}", ledgerDirectories);
2071+
LOG.info("Updated cookie file present in ledgerDirectories {}", (Object) ledgerDirectories);
20742072
if (ledgerDirectories != indexDirectories) {
20752073
for (File dir : indexDirectories) {
20762074
newCookie.writeToDirectory(dir);
20772075
}
2078-
LOG.info("Updated cookie file present in indexDirectories {}", indexDirectories);
2076+
LOG.info("Updated cookie file present in indexDirectories {}", (Object) indexDirectories);
20792077
}
20802078
}
20812079
// writes newcookie to zookeeper
@@ -2584,89 +2582,9 @@ String getUsage() {
25842582

25852583
@Override
25862584
int runCmd(CommandLine cmdLine) throws Exception {
2587-
LOG.info("=== Converting DbLedgerStorage ===");
2588-
ServerConfiguration conf = new ServerConfiguration(bkConf);
2589-
LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
2590-
new DiskChecker(bkConf.getDiskUsageThreshold(), bkConf.getDiskUsageWarnThreshold()));
2591-
LedgerDirsManager ledgerIndexManager = new LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
2592-
new DiskChecker(bkConf.getDiskUsageThreshold(), bkConf.getDiskUsageWarnThreshold()));
2593-
2594-
DbLedgerStorage dbStorage = new DbLedgerStorage();
2595-
InterleavedLedgerStorage interleavedStorage = new InterleavedLedgerStorage();
2596-
2597-
CheckpointSource checkpointSource = new CheckpointSource() {
2598-
@Override
2599-
public Checkpoint newCheckpoint() {
2600-
return Checkpoint.MAX;
2601-
}
2602-
2603-
@Override
2604-
public void checkpointComplete(Checkpoint checkpoint, boolean compact)
2605-
throws IOException {
2606-
}
2607-
};
2608-
Checkpointer checkpointer = new Checkpointer() {
2609-
@Override
2610-
public void startCheckpoint(Checkpoint checkpoint) {
2611-
// No-op
2612-
}
2613-
2614-
@Override
2615-
public void start() {
2616-
// no-op
2617-
}
2618-
};
2619-
2620-
dbStorage.initialize(conf, null, ledgerDirsManager, ledgerIndexManager, null,
2621-
checkpointSource, checkpointer, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT);
2622-
interleavedStorage.initialize(conf, null, ledgerDirsManager, ledgerIndexManager,
2623-
null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT);
2624-
LedgerCache interleavedLedgerCache = interleavedStorage.ledgerCache;
2625-
2626-
int convertedLedgers = 0;
2627-
for (long ledgerId : dbStorage.getActiveLedgersInRange(0, Long.MAX_VALUE)) {
2628-
if (LOG.isDebugEnabled()) {
2629-
LOG.debug("Converting ledger {}", ledgerIdFormatter.formatLedgerId(ledgerId));
2630-
}
2631-
2632-
interleavedStorage.setMasterKey(ledgerId, dbStorage.readMasterKey(ledgerId));
2633-
if (dbStorage.isFenced(ledgerId)) {
2634-
interleavedStorage.setFenced(ledgerId);
2635-
}
2636-
2637-
long lastEntryInLedger = dbStorage.getLastEntryInLedger(ledgerId);
2638-
for (long entryId = 0; entryId <= lastEntryInLedger; entryId++) {
2639-
try {
2640-
long location = dbStorage.getLocation(ledgerId, entryId);
2641-
if (location != 0L) {
2642-
interleavedLedgerCache.putEntryOffset(ledgerId, entryId, location);
2643-
}
2644-
} catch (Bookie.NoEntryException e) {
2645-
// Ignore entry
2646-
}
2647-
}
2648-
2649-
if (++convertedLedgers % 1000 == 0) {
2650-
LOG.info("Converted {} ledgers", convertedLedgers);
2651-
}
2652-
}
2653-
2654-
dbStorage.shutdown();
2655-
2656-
interleavedLedgerCache.flushLedger(true);
2657-
interleavedStorage.flush();
2658-
interleavedStorage.shutdown();
2659-
2660-
String baseDir = ledgerDirsManager.getAllLedgerDirs().get(0).toString();
2661-
2662-
// Rename databases and keep backup
2663-
Files.move(FileSystems.getDefault().getPath(baseDir, "ledgers"),
2664-
FileSystems.getDefault().getPath(baseDir, "ledgers.backup"));
2665-
2666-
Files.move(FileSystems.getDefault().getPath(baseDir, "locations"),
2667-
FileSystems.getDefault().getPath(baseDir, "locations.backup"));
2668-
2669-
LOG.info("---- Done Converting {} ledgers ----", convertedLedgers);
2585+
ConvertToInterleavedStorageCommand cmd = new ConvertToInterleavedStorageCommand();
2586+
ConvertToInterleavedStorageCommand.CTISFlags flags = new ConvertToInterleavedStorageCommand.CTISFlags();
2587+
cmd.apply(bkConf, flags);
26702588
return 0;
26712589
}
26722590
}
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.bookkeeper.tools.cli.commands.bookie;
20+
21+
import com.beust.jcommander.Parameter;
22+
import com.google.common.util.concurrent.UncheckedExecutionException;
23+
import io.netty.buffer.PooledByteBufAllocator;
24+
import java.nio.file.FileSystems;
25+
import java.nio.file.Files;
26+
import lombok.Setter;
27+
import lombok.experimental.Accessors;
28+
import org.apache.bookkeeper.bookie.Bookie;
29+
import org.apache.bookkeeper.bookie.CheckpointSource;
30+
import org.apache.bookkeeper.bookie.Checkpointer;
31+
import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
32+
import org.apache.bookkeeper.bookie.LedgerCache;
33+
import org.apache.bookkeeper.bookie.LedgerDirsManager;
34+
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
35+
import org.apache.bookkeeper.conf.ServerConfiguration;
36+
import org.apache.bookkeeper.stats.NullStatsLogger;
37+
import org.apache.bookkeeper.tools.cli.helpers.BookieCommand;
38+
import org.apache.bookkeeper.tools.framework.CliFlags;
39+
import org.apache.bookkeeper.tools.framework.CliSpec;
40+
import org.apache.bookkeeper.util.DiskChecker;
41+
import org.apache.bookkeeper.util.LedgerIdFormatter;
42+
import org.slf4j.Logger;
43+
import org.slf4j.LoggerFactory;
44+
45+
46+
/**
47+
* A command to convert bookie indexes from DbLedgerStorage to InterleavedStorage format.
48+
*/
49+
public class ConvertToInterleavedStorageCommand extends BookieCommand<ConvertToInterleavedStorageCommand.CTISFlags> {
50+
51+
private static final Logger LOG = LoggerFactory.getLogger(ConvertToInterleavedStorageCommand.class);
52+
private static final String NAME = "converttointerleavedstorage";
53+
private static final String DESC = "Convert bookie indexes from DbLedgerStorage to InterleavedStorage format";
54+
private static final String NOT_INIT = "default formatter";
55+
56+
@Setter
57+
private LedgerIdFormatter ledgerIdFormatter;
58+
59+
public ConvertToInterleavedStorageCommand() {
60+
this(new CTISFlags());
61+
}
62+
63+
public ConvertToInterleavedStorageCommand(CTISFlags flags) {
64+
super(CliSpec.<CTISFlags>newBuilder().withName(NAME).withDescription(DESC).withFlags(flags).build());
65+
}
66+
67+
/**
68+
* Flags for this command.
69+
*/
70+
@Accessors(fluent = true)
71+
public static class CTISFlags extends CliFlags{
72+
73+
@Parameter(names = { "-l", "--ledgeridformatter" }, description = "Set ledger id formatter")
74+
private String ledgerIdFormatter = NOT_INIT;
75+
76+
}
77+
78+
@Override
79+
public boolean apply(ServerConfiguration conf, CTISFlags cmdFlags) {
80+
initLedgerIdFormatter(conf, cmdFlags);
81+
try {
82+
return handle(conf);
83+
} catch (Exception e) {
84+
throw new UncheckedExecutionException(e.getMessage(), e);
85+
}
86+
}
87+
88+
private boolean handle(ServerConfiguration bkConf) throws Exception {
89+
LOG.info("=== Converting DbLedgerStorage ===");
90+
ServerConfiguration conf = new ServerConfiguration(bkConf);
91+
LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
92+
new DiskChecker(bkConf.getDiskUsageThreshold(), bkConf.getDiskUsageWarnThreshold()));
93+
LedgerDirsManager ledgerIndexManager = new LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
94+
new DiskChecker(bkConf.getDiskUsageThreshold(), bkConf.getDiskUsageWarnThreshold()));
95+
96+
DbLedgerStorage dbStorage = new DbLedgerStorage();
97+
InterleavedLedgerStorage interleavedStorage = new InterleavedLedgerStorage();
98+
99+
CheckpointSource checkpointSource = new CheckpointSource() {
100+
@Override
101+
public Checkpoint newCheckpoint() {
102+
return Checkpoint.MAX;
103+
}
104+
105+
@Override
106+
public void checkpointComplete(Checkpoint checkpoint, boolean compact) {}
107+
};
108+
Checkpointer checkpointer = new Checkpointer() {
109+
@Override
110+
public void startCheckpoint(CheckpointSource.Checkpoint checkpoint) {
111+
// No-op
112+
}
113+
114+
@Override
115+
public void start() {
116+
// no-op
117+
}
118+
};
119+
120+
dbStorage.initialize(conf, null, ledgerDirsManager, ledgerIndexManager, null,
121+
checkpointSource, checkpointer, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT);
122+
interleavedStorage.initialize(conf, null, ledgerDirsManager, ledgerIndexManager,
123+
null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT);
124+
LedgerCache interleavedLedgerCache = interleavedStorage.getLedgerCache();
125+
126+
int convertedLedgers = 0;
127+
for (long ledgerId : dbStorage.getActiveLedgersInRange(0, Long.MAX_VALUE)) {
128+
if (LOG.isDebugEnabled()) {
129+
LOG.debug("Converting ledger {}", ledgerIdFormatter.formatLedgerId(ledgerId));
130+
}
131+
132+
interleavedStorage.setMasterKey(ledgerId, dbStorage.readMasterKey(ledgerId));
133+
if (dbStorage.isFenced(ledgerId)) {
134+
interleavedStorage.setFenced(ledgerId);
135+
}
136+
137+
long lastEntryInLedger = dbStorage.getLastEntryInLedger(ledgerId);
138+
for (long entryId = 0; entryId <= lastEntryInLedger; entryId++) {
139+
try {
140+
long location = dbStorage.getLocation(ledgerId, entryId);
141+
if (location != 0L) {
142+
interleavedLedgerCache.putEntryOffset(ledgerId, entryId, location);
143+
}
144+
} catch (Bookie.NoEntryException e) {
145+
// Ignore entry
146+
}
147+
}
148+
149+
if (++convertedLedgers % 1000 == 0) {
150+
LOG.info("Converted {} ledgers", convertedLedgers);
151+
}
152+
}
153+
154+
dbStorage.shutdown();
155+
156+
interleavedLedgerCache.flushLedger(true);
157+
interleavedStorage.flush();
158+
interleavedStorage.shutdown();
159+
160+
String baseDir = ledgerDirsManager.getAllLedgerDirs().get(0).toString();
161+
162+
// Rename databases and keep backup
163+
Files.move(FileSystems.getDefault().getPath(baseDir, "ledgers"),
164+
FileSystems.getDefault().getPath(baseDir, "ledgers.backup"));
165+
166+
Files.move(FileSystems.getDefault().getPath(baseDir, "locations"),
167+
FileSystems.getDefault().getPath(baseDir, "locations.backup"));
168+
169+
LOG.info("---- Done Converting {} ledgers ----", convertedLedgers);
170+
return true;
171+
}
172+
173+
private void initLedgerIdFormatter(ServerConfiguration conf, CTISFlags flags) {
174+
if (this.ledgerIdFormatter != null) {
175+
return;
176+
}
177+
if (flags.ledgerIdFormatter.equals(NOT_INIT)) {
178+
this.ledgerIdFormatter = LedgerIdFormatter.newLedgerIdFormatter(conf);
179+
} else {
180+
this.ledgerIdFormatter = LedgerIdFormatter.newLedgerIdFormatter(flags.ledgerIdFormatter, conf);
181+
}
182+
}
183+
}

tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import org.apache.bookkeeper.tools.cli.BKCtl;
2424
import org.apache.bookkeeper.tools.cli.commands.bookie.ConvertToDBStorageCommand;
25+
import org.apache.bookkeeper.tools.cli.commands.bookie.ConvertToInterleavedStorageCommand;
2526
import org.apache.bookkeeper.tools.cli.commands.bookie.FormatCommand;
2627
import org.apache.bookkeeper.tools.cli.commands.bookie.InitCommand;
2728
import org.apache.bookkeeper.tools.cli.commands.bookie.LastMarkCommand;
@@ -50,7 +51,7 @@ public class BookieCommandGroup extends CliCommandGroup<BKFlags> {
5051
.addCommand(new SanityTestCommand())
5152
.addCommand(new LedgerCommand())
5253
.addCommand(new ConvertToDBStorageCommand())
53-
.build();
54+
.addCommand(new ConvertToInterleavedStorageCommand()).build();
5455

5556
public BookieCommandGroup() {
5657
super(spec);

0 commit comments

Comments
 (0)