Skip to content

Commit 677ccec

Browse files
hangc0276Ivan Kelly
andauthored
BP-47 (task6): Direct I/O entrylogger support (#3263)
* Direct I/O entry log support The implementation uses JNI to do direct I/O to files via posix syscalls. Fallocate is used if running on linux, otherwise this is skipped (at the cost of more filesystem operates during writing). There are two calls to write, writeAt and writeDelimited. I expect writeAt to be used for the entrylog headers, which entries will go through writeDelimited. In both cases, the calls may return before the syscalls occur. #flush() needs to be called to ensure things are actually written. The entry log format isn't much changed from what is used by the existing entrylogger. The biggest difference is the padding. Direct I/O must write in aligned blocked. The size of the alignment varies by machine configuration, but 4K is a safe bet on most. As it is unlikely that entry data will land exactly on the alignment boundary, we need to add padding to writes. The existing entry logger has been changed to take this padding into account. When read as a signed int/long/byte the padding will aways parse to a negative value, which distinguishes it from valid entry data (the entry size will always be positive) and also from preallocated space (which is always 0). Another difference in the format is that the header is now 4K rather than 1K. Again, this is to allow aligned rights. No changes are necessary to allow the existing entry logger to deal with the header change, as we create a dummy entry in the extra header space that the existing entry logger already knows to ignore. To enable, set dbStorage_directIOEntryLogger=true in the configuration. (cherry picked from commit 09a1c81) * format code * format code * fix spotbugs check * fix jnilib not found * fix jnilib not found * address comments * fix so lib not found * address comments * format code * add compat test * add compat test * address comments * fix findbugs failed * format code Co-authored-by: Ivan Kelly <ikelly@splunk.com>
1 parent 8d4b9e2 commit 677ccec

31 files changed

+5566
-20
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ public abstract class AbstractLogCompactor {
3333
protected final ServerConfiguration conf;
3434
protected final Throttler throttler;
3535

36-
interface LogRemovalListener {
36+
/**
37+
* LogRemovalListener.
38+
*/
39+
public interface LogRemovalListener {
3740
void removeEntryLog(long logToRemove);
3841
}
3942

@@ -71,7 +74,7 @@ public static class Throttler {
7174
}
7275

7376
// acquire. if bybytes: bytes of this entry; if byentries: 1.
74-
void acquire(int permits) {
77+
public void acquire(int permits) {
7578
rateLimiter.acquire(this.isThrottleByBytes ? permits : 1);
7679
}
7780
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1005,23 +1005,24 @@ public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOExce
10051005
return;
10061006
}
10071007
long offset = pos;
1008-
pos += 4;
1008+
10091009
int entrySize = headerBuffer.readInt();
1010+
if (entrySize <= 0) { // hitting padding
1011+
pos++;
1012+
headerBuffer.clear();
1013+
continue;
1014+
}
10101015
long ledgerId = headerBuffer.readLong();
10111016
headerBuffer.clear();
10121017

1018+
pos += 4;
10131019
if (ledgerId == INVALID_LID || !scanner.accept(ledgerId)) {
10141020
// skip this entry
10151021
pos += entrySize;
10161022
continue;
10171023
}
10181024
// read the entry
10191025
data.clear();
1020-
if (entrySize <= 0) {
1021-
LOG.warn("bad read for ledger entry from entryLog {}@{} (entry size {})",
1022-
entryLogId, pos, entrySize);
1023-
return;
1024-
}
10251026
data.capacity(entrySize);
10261027
int rc = readFromLogChannel(entryLogId, bc, data, pos);
10271028
if (rc != entrySize) {
@@ -1086,7 +1087,9 @@ EntryLogMetadata extractEntryLogMetadataFromIndex(long entryLogId) throws IOExce
10861087
bc.read(sizeBuffer.get(), offset);
10871088

10881089
int ledgersMapSize = sizeBuffer.get().readInt();
1089-
1090+
if (ledgersMapSize <= 0) {
1091+
break;
1092+
}
10901093
// Read the index into a buffer
10911094
ledgersMap.clear();
10921095
bc.read(ledgersMap, offset + 4, ledgersMapSize);

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLocation.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,14 @@ public long getEntry() {
4646
public long getLocation() {
4747
return location;
4848
}
49+
50+
@Override
51+
public String toString() {
52+
return new StringBuilder().append("EntryLocation{")
53+
.append("ledger=").append(ledger)
54+
.append(",entry=").append(entry)
55+
.append(",locationLog=").append(location >> 32 & 0xFFFFFFFF)
56+
.append(",locationOffset=").append((int) (location & 0xFFFFFFFF))
57+
.append("}").toString();
58+
}
4959
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/**
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
*
20+
*/
21+
package org.apache.bookkeeper.bookie.storage;
22+
23+
import java.io.IOException;
24+
25+
/**
26+
* Generate unique entry log ids.
27+
*/
28+
public interface EntryLogIds {
29+
/**
30+
* Get the next available entry log ID.
31+
*/
32+
int nextId() throws IOException;
33+
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/**
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
*
20+
*/
21+
package org.apache.bookkeeper.bookie.storage;
22+
23+
import java.io.File;
24+
import java.io.IOException;
25+
import java.util.ArrayList;
26+
import java.util.Collections;
27+
import java.util.List;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.regex.Matcher;
30+
import java.util.regex.Pattern;
31+
32+
import org.apache.bookkeeper.bookie.LedgerDirsManager;
33+
import org.apache.bookkeeper.bookie.storage.directentrylogger.Events;
34+
import org.apache.bookkeeper.slogger.Slogger;
35+
import org.apache.commons.lang3.tuple.Pair;
36+
37+
/**
38+
* EntryLogIdsImpl.
39+
*/
40+
public class EntryLogIdsImpl implements EntryLogIds {
41+
public static final Pattern FILE_PATTERN = Pattern.compile("^([0-9a-fA-F]+)\\.log$");
42+
public static final Pattern COMPACTED_FILE_PATTERN =
43+
Pattern.compile("^([0-9a-fA-F]+)\\.log\\.([0-9a-fA-F]+)\\.compacted$");
44+
45+
private final LedgerDirsManager ledgerDirsManager;
46+
private final Slogger slog;
47+
private int nextId;
48+
private int maxId;
49+
50+
public EntryLogIdsImpl(LedgerDirsManager ledgerDirsManager,
51+
Slogger slog) throws IOException {
52+
this.ledgerDirsManager = ledgerDirsManager;
53+
this.slog = slog;
54+
findLargestGap();
55+
}
56+
57+
@Override
58+
public int nextId() throws IOException {
59+
while (true) {
60+
synchronized (this) {
61+
int current = nextId;
62+
nextId++;
63+
if (nextId == maxId) {
64+
findLargestGap();
65+
} else {
66+
return current;
67+
}
68+
}
69+
}
70+
}
71+
72+
private void findLargestGap() throws IOException {
73+
long start = System.nanoTime();
74+
List<Integer> currentIds = new ArrayList<Integer>();
75+
76+
for (File ledgerDir : ledgerDirsManager.getAllLedgerDirs()) {
77+
currentIds.addAll(logIdsInDirectory(ledgerDir));
78+
currentIds.addAll(compactedLogIdsInDirectory(ledgerDir));
79+
}
80+
81+
Pair<Integer, Integer> gap = findLargestGap(currentIds);
82+
nextId = gap.getLeft();
83+
maxId = gap.getRight();
84+
slog.kv("dirs", ledgerDirsManager.getAllLedgerDirs())
85+
.kv("nextId", nextId)
86+
.kv("maxId", maxId)
87+
.kv("durationMs", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start))
88+
.info(Events.ENTRYLOG_IDS_CANDIDATES_SELECTED);
89+
}
90+
91+
/**
92+
* O(nlogn) algorithm to find largest contiguous gap between
93+
* integers in a passed list. n should be relatively small.
94+
* Entry logs should be about 1GB in size, so even if the node
95+
* stores a PB, there should be only 1000000 entry logs.
96+
*/
97+
static Pair<Integer, Integer> findLargestGap(List<Integer> currentIds) {
98+
if (currentIds.isEmpty()) {
99+
return Pair.of(0, Integer.MAX_VALUE);
100+
}
101+
102+
Collections.sort(currentIds);
103+
104+
int nextIdCandidate = 0;
105+
int maxIdCandidate = currentIds.get(0);
106+
int maxGap = maxIdCandidate - nextIdCandidate;
107+
for (int i = 0; i < currentIds.size(); i++) {
108+
int gapStart = currentIds.get(i) + 1;
109+
int j = i + 1;
110+
int gapEnd = Integer.MAX_VALUE;
111+
if (j < currentIds.size()) {
112+
gapEnd = currentIds.get(j);
113+
}
114+
int gapSize = gapEnd - gapStart;
115+
if (gapSize > maxGap) {
116+
maxGap = gapSize;
117+
nextIdCandidate = gapStart;
118+
maxIdCandidate = gapEnd;
119+
}
120+
}
121+
return Pair.of(nextIdCandidate, maxIdCandidate);
122+
}
123+
124+
public static List<Integer> logIdsInDirectory(File directory) {
125+
List<Integer> ids = new ArrayList<>();
126+
if (directory.exists() && directory.isDirectory()) {
127+
File[] files = directory.listFiles();
128+
if (files != null && files.length > 0) {
129+
for (File f : files) {
130+
Matcher m = FILE_PATTERN.matcher(f.getName());
131+
if (m.matches()) {
132+
int logId = Integer.parseUnsignedInt(m.group(1), 16);
133+
ids.add(logId);
134+
}
135+
}
136+
}
137+
}
138+
return ids;
139+
}
140+
141+
private static List<Integer> compactedLogIdsInDirectory(File directory) {
142+
List<Integer> ids = new ArrayList<>();
143+
if (directory.exists() && directory.isDirectory()) {
144+
File[] files = directory.listFiles();
145+
if (files != null && files.length > 0) {
146+
for (File f : files) {
147+
Matcher m = COMPACTED_FILE_PATTERN.matcher(f.getName());
148+
if (m.matches()) {
149+
int logId = Integer.parseUnsignedInt(m.group(1), 16);
150+
ids.add(logId);
151+
}
152+
}
153+
}
154+
}
155+
return ids;
156+
157+
}
158+
}

0 commit comments

Comments
 (0)