Skip to content

Commit 2eb2129

Browse files
authored
[ISSUE #9773] Implement Shared RocksDB Instance for Broker Configs (#9774)
1 parent 035c91a commit 2eb2129

File tree

17 files changed

+1220
-189
lines changed

17 files changed

+1220
-189
lines changed

auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProvider.java

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,20 @@
3535
import org.apache.rocketmq.auth.config.AuthConfig;
3636
import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
3737
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
38-
import org.rocksdb.RocksIterator;
38+
import org.rocksdb.RocksDB;
3939

4040
public class LocalAuthenticationMetadataProvider implements AuthenticationMetadataProvider {
4141

42+
private final static String AUTH_METADATA_COLUMN_FAMILY = new String(RocksDB.DEFAULT_COLUMN_FAMILY,
43+
StandardCharsets.UTF_8);
44+
4245
private ConfigRocksDBStorage storage;
4346

4447
private LoadingCache<String, User> userCache;
4548

4649
@Override
4750
public void initialize(AuthConfig authConfig, Supplier<?> metadataService) {
48-
this.storage = new ConfigRocksDBStorage(authConfig.getAuthConfigPath() + File.separator + "users");
51+
this.storage = ConfigRocksDBStorage.getStore(authConfig.getAuthConfigPath() + File.separator + "users", false);
4952
if (!this.storage.start()) {
5053
throw new RuntimeException("Failed to load rocksdb for auth_user, please check whether it is occupied");
5154
}
@@ -72,7 +75,7 @@ public CompletableFuture<Void> createUser(User user) {
7275
try {
7376
byte[] keyBytes = user.getUsername().getBytes(StandardCharsets.UTF_8);
7477
byte[] valueBytes = JSON.toJSONBytes(user);
75-
this.storage.put(keyBytes, keyBytes.length, valueBytes);
78+
this.storage.put(AUTH_METADATA_COLUMN_FAMILY, keyBytes, keyBytes.length, valueBytes);
7679
this.storage.flushWAL();
7780
this.userCache.invalidate(user.getUsername());
7881
} catch (Exception e) {
@@ -84,7 +87,7 @@ public CompletableFuture<Void> createUser(User user) {
8487
@Override
8588
public CompletableFuture<Void> deleteUser(String username) {
8689
try {
87-
this.storage.delete(username.getBytes(StandardCharsets.UTF_8));
90+
this.storage.delete(AUTH_METADATA_COLUMN_FAMILY, username.getBytes(StandardCharsets.UTF_8));
8891
this.storage.flushWAL();
8992
this.userCache.invalidate(username);
9093
} catch (Exception e) {
@@ -98,7 +101,7 @@ public CompletableFuture<Void> updateUser(User user) {
98101
try {
99102
byte[] keyBytes = user.getUsername().getBytes(StandardCharsets.UTF_8);
100103
byte[] valueBytes = JSON.toJSONBytes(user);
101-
this.storage.put(keyBytes, keyBytes.length, valueBytes);
104+
this.storage.put(AUTH_METADATA_COLUMN_FAMILY, keyBytes, keyBytes.length, valueBytes);
102105
this.storage.flushWAL();
103106
this.userCache.invalidate(user.getUsername());
104107
} catch (Exception e) {
@@ -119,20 +122,21 @@ public CompletableFuture<User> getUser(String username) {
119122
@Override
120123
public CompletableFuture<List<User>> listUser(String filter) {
121124
List<User> result = new ArrayList<>();
122-
try (RocksIterator iterator = this.storage.iterator()) {
123-
iterator.seekToFirst();
124-
while (iterator.isValid()) {
125-
String username = new String(iterator.key(), StandardCharsets.UTF_8);
125+
CompletableFuture<List<User>> future = new CompletableFuture<>();
126+
try {
127+
this.storage.iterate(AUTH_METADATA_COLUMN_FAMILY, (key, value) -> {
128+
String username = new String(key, StandardCharsets.UTF_8);
126129
if (StringUtils.isNotBlank(filter) && !username.contains(filter)) {
127-
iterator.next();
128-
continue;
130+
return;
129131
}
130-
User user = JSON.parseObject(new String(iterator.value(), StandardCharsets.UTF_8), User.class);
132+
User user = JSON.parseObject(new String(value, StandardCharsets.UTF_8), User.class);
131133
result.add(user);
132-
iterator.next();
133-
}
134+
});
135+
} catch (Exception e) {
136+
future.completeExceptionally(e);
134137
}
135-
return CompletableFuture.completedFuture(result);
138+
future.complete(result);
139+
return future;
136140
}
137141

138142
@Override
@@ -154,7 +158,7 @@ public UserCacheLoader(ConfigRocksDBStorage storage) {
154158
public User load(String username) {
155159
try {
156160
byte[] keyBytes = username.getBytes(StandardCharsets.UTF_8);
157-
byte[] valueBytes = storage.get(keyBytes);
161+
byte[] valueBytes = storage.get(AUTH_METADATA_COLUMN_FAMILY, keyBytes);
158162
if (ArrayUtils.isEmpty(valueBytes)) {
159163
return EMPTY_USER;
160164
}

auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProvider.java

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,20 @@
4040
import org.apache.rocketmq.auth.config.AuthConfig;
4141
import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
4242
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
43-
import org.rocksdb.RocksIterator;
43+
import org.rocksdb.RocksDB;
4444

4545
public class LocalAuthorizationMetadataProvider implements AuthorizationMetadataProvider {
4646

47+
private final static String AUTH_METADATA_COLUMN_FAMILY = new String(RocksDB.DEFAULT_COLUMN_FAMILY,
48+
StandardCharsets.UTF_8);
49+
4750
private ConfigRocksDBStorage storage;
4851

4952
private LoadingCache<String, Acl> aclCache;
5053

5154
@Override
5255
public void initialize(AuthConfig authConfig, Supplier<?> metadataService) {
53-
this.storage = new ConfigRocksDBStorage(authConfig.getAuthConfigPath() + File.separator + "acls");
56+
this.storage = ConfigRocksDBStorage.getStore(authConfig.getAuthConfigPath() + File.separator + "acls", false);
5457
if (!this.storage.start()) {
5558
throw new RuntimeException("Failed to load rocksdb for auth_acl, please check whether it is occupied.");
5659
}
@@ -77,7 +80,7 @@ public CompletableFuture<Void> createAcl(Acl acl) {
7780
Subject subject = acl.getSubject();
7881
byte[] keyBytes = subject.getSubjectKey().getBytes(StandardCharsets.UTF_8);
7982
byte[] valueBytes = JSON.toJSONBytes(acl);
80-
this.storage.put(keyBytes, keyBytes.length, valueBytes);
83+
this.storage.put(AUTH_METADATA_COLUMN_FAMILY, keyBytes, keyBytes.length, valueBytes);
8184
this.storage.flushWAL();
8285
this.aclCache.invalidate(subject.getSubjectKey());
8386
} catch (Exception e) {
@@ -90,7 +93,7 @@ public CompletableFuture<Void> createAcl(Acl acl) {
9093
public CompletableFuture<Void> deleteAcl(Subject subject) {
9194
try {
9295
byte[] keyBytes = subject.getSubjectKey().getBytes(StandardCharsets.UTF_8);
93-
this.storage.delete(keyBytes);
96+
this.storage.delete(AUTH_METADATA_COLUMN_FAMILY, keyBytes);
9497
this.storage.flushWAL();
9598
this.aclCache.invalidate(subject.getSubjectKey());
9699
} catch (Exception e) {
@@ -105,7 +108,7 @@ public CompletableFuture<Void> updateAcl(Acl acl) {
105108
Subject subject = acl.getSubject();
106109
byte[] keyBytes = subject.getSubjectKey().getBytes(StandardCharsets.UTF_8);
107110
byte[] valueBytes = JSON.toJSONBytes(acl);
108-
this.storage.put(keyBytes, keyBytes.length, valueBytes);
111+
this.storage.put(AUTH_METADATA_COLUMN_FAMILY, keyBytes, keyBytes.length, valueBytes);
109112
this.storage.flushWAL();
110113
this.aclCache.invalidate(subject.getSubjectKey());
111114
} catch (Exception e) {
@@ -126,20 +129,18 @@ public CompletableFuture<Acl> getAcl(Subject subject) {
126129
@Override
127130
public CompletableFuture<List<Acl>> listAcl(String subjectFilter, String resourceFilter) {
128131
List<Acl> result = new ArrayList<>();
129-
try (RocksIterator iterator = this.storage.iterator()) {
130-
iterator.seekToFirst();
131-
while (iterator.isValid()) {
132-
String subjectKey = new String(iterator.key(), StandardCharsets.UTF_8);
132+
CompletableFuture<List<Acl>> future = new CompletableFuture<>();
133+
try {
134+
this.storage.iterate(AUTH_METADATA_COLUMN_FAMILY, (key, value) -> {
135+
String subjectKey = new String(key, StandardCharsets.UTF_8);
133136
if (StringUtils.isNotBlank(subjectFilter) && !subjectKey.contains(subjectFilter)) {
134-
iterator.next();
135-
continue;
137+
return;
136138
}
137139
Subject subject = Subject.of(subjectKey);
138-
Acl acl = JSON.parseObject(new String(iterator.value(), StandardCharsets.UTF_8), Acl.class);
140+
Acl acl = JSON.parseObject(new String(value, StandardCharsets.UTF_8), Acl.class);
139141
List<Policy> policies = acl.getPolicies();
140142
if (!CollectionUtils.isNotEmpty(policies)) {
141-
iterator.next();
142-
continue;
143+
return;
143144
}
144145
Iterator<Policy> policyIterator = policies.iterator();
145146
while (policyIterator.hasNext()) {
@@ -158,10 +159,12 @@ public CompletableFuture<List<Acl>> listAcl(String subjectFilter, String resourc
158159
if (CollectionUtils.isNotEmpty(policies)) {
159160
result.add(Acl.of(subject, policies));
160161
}
161-
iterator.next();
162-
}
162+
});
163+
} catch (Exception e) {
164+
future.completeExceptionally(e);
163165
}
164-
return CompletableFuture.completedFuture(result);
166+
future.complete(result);
167+
return future;
165168
}
166169

167170
@Override
@@ -185,7 +188,7 @@ public Acl load(String subjectKey) {
185188
byte[] keyBytes = subjectKey.getBytes(StandardCharsets.UTF_8);
186189
Subject subject = Subject.of(subjectKey);
187190

188-
byte[] valueBytes = this.storage.get(keyBytes);
191+
byte[] valueBytes = this.storage.get(AUTH_METADATA_COLUMN_FAMILY, keyBytes);
189192
if (ArrayUtils.isEmpty(valueBytes)) {
190193
return EMPTY_ACL;
191194
}

broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java renamed to broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConfigManager.java

Lines changed: 56 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.rocketmq.broker;
17+
package org.apache.rocketmq.broker.config.v1;
1818

1919
import com.alibaba.fastjson.JSON;
20+
import java.nio.charset.Charset;
2021
import java.nio.charset.StandardCharsets;
2122
import java.util.function.BiConsumer;
2223
import org.apache.commons.lang3.StringUtils;
@@ -27,12 +28,16 @@
2728
import org.apache.rocketmq.remoting.protocol.DataVersion;
2829
import org.rocksdb.CompressionType;
2930
import org.rocksdb.FlushOptions;
30-
import org.rocksdb.RocksIterator;
31+
import org.rocksdb.RocksDB;
32+
import org.rocksdb.RocksDBException;
3133
import org.rocksdb.Statistics;
3234
import org.rocksdb.WriteBatch;
3335

3436
public class RocksDBConfigManager {
3537
protected static final Logger BROKER_LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
38+
39+
public static final Charset CHARSET = StandardCharsets.UTF_8;
40+
3641
public volatile boolean isStop = false;
3742
public ConfigRocksDBStorage configRocksDBStorage = null;
3843
private FlushOptions flushOptions = null;
@@ -42,21 +47,44 @@ public class RocksDBConfigManager {
4247
private final CompressionType compressionType;
4348
private DataVersion kvDataVersion = new DataVersion();
4449

50+
public static final byte[] KV_DATA_VERSION_COLUMN_FAMILY_NAME = "kvDataVersion".getBytes(CHARSET);
51+
public static final byte[] KV_DATA_VERSION_KEY = "kvDataVersionKey".getBytes(CHARSET);
52+
53+
private final String defaultCF;
54+
private final String versionCF;
55+
56+
57+
public RocksDBConfigManager(String filePath, long memTableFlushInterval, CompressionType compressionType,
58+
String defaultCF, String versionCF) {
59+
this.filePath = filePath;
60+
this.memTableFlushInterval = memTableFlushInterval;
61+
this.compressionType = compressionType;
62+
this.defaultCF = defaultCF;
63+
this.versionCF = versionCF;
64+
}
65+
4566
public RocksDBConfigManager(String filePath, long memTableFlushInterval, CompressionType compressionType) {
4667
this.filePath = filePath;
4768
this.memTableFlushInterval = memTableFlushInterval;
4869
this.compressionType = compressionType;
70+
this.defaultCF = new String(RocksDB.DEFAULT_COLUMN_FAMILY, CHARSET);
71+
this.versionCF = new String(KV_DATA_VERSION_COLUMN_FAMILY_NAME, CHARSET);
4972
}
5073

51-
public boolean init() {
74+
public boolean init(boolean readOnly) {
5275
this.isStop = false;
53-
this.configRocksDBStorage = new ConfigRocksDBStorage(filePath, compressionType);
76+
this.configRocksDBStorage = ConfigRocksDBStorage.getStore(filePath, readOnly, compressionType);
5477
return this.configRocksDBStorage.start();
5578
}
79+
80+
public boolean init() {
81+
return this.init(false);
82+
}
83+
5684
public boolean loadDataVersion() {
5785
String currDataVersionString = null;
5886
try {
59-
byte[] dataVersion = this.configRocksDBStorage.getKvDataVersion();
87+
byte[] dataVersion = this.configRocksDBStorage.get(versionCF, KV_DATA_VERSION_KEY);
6088
if (dataVersion != null && dataVersion.length > 0) {
6189
currDataVersionString = new String(dataVersion, StandardCharsets.UTF_8);
6290
}
@@ -68,12 +96,11 @@ public boolean loadDataVersion() {
6896
}
6997

7098
public boolean loadData(BiConsumer<byte[], byte[]> biConsumer) {
71-
try (RocksIterator iterator = this.configRocksDBStorage.iterator()) {
72-
iterator.seekToFirst();
73-
while (iterator.isValid()) {
74-
biConsumer.accept(iterator.key(), iterator.value());
75-
iterator.next();
76-
}
99+
try {
100+
configRocksDBStorage.iterate(this.defaultCF, biConsumer);
101+
} catch (Exception e) {
102+
BROKER_LOG.error("RocksDBConfigManager loadData failed", e);
103+
return false;
77104
}
78105

79106
this.flushOptions = new FlushOptions();
@@ -87,9 +114,7 @@ public void start() {
87114

88115
public boolean stop() {
89116
this.isStop = true;
90-
if (this.configRocksDBStorage != null) {
91-
return this.configRocksDBStorage.shutdown();
92-
}
117+
ConfigRocksDBStorage.shutdown(filePath);
93118
if (this.flushOptions != null) {
94119
this.flushOptions.close();
95120
}
@@ -115,28 +140,38 @@ public void flushWAL() {
115140
}
116141
}
117142

118-
public void put(final byte[] keyBytes, final int keyLen, final byte[] valueBytes) throws Exception {
119-
this.configRocksDBStorage.put(keyBytes, keyLen, valueBytes);
143+
public void put(final byte[] keyBytes, final byte[] valueBytes) throws Exception {
144+
this.configRocksDBStorage.put(defaultCF, keyBytes, keyBytes.length, valueBytes);
145+
}
146+
147+
public void put(String cf, String key, String value) throws Exception {
148+
byte[] keyBytes = key.getBytes(CHARSET);
149+
this.configRocksDBStorage.put(cf, keyBytes, keyBytes.length, value.getBytes(CHARSET));
150+
}
151+
152+
public void put(String cf, final byte[] keyBytes, final byte[] valueBytes) throws Exception {
153+
this.configRocksDBStorage.put(cf, keyBytes, keyBytes.length, valueBytes);
120154
}
121155

122156
public void delete(final byte[] keyBytes) throws Exception {
123-
this.configRocksDBStorage.delete(keyBytes);
157+
this.configRocksDBStorage.delete(defaultCF, keyBytes);
124158
}
125159

126160
public void updateKvDataVersion() throws Exception {
127161
kvDataVersion.nextVersion();
128-
this.configRocksDBStorage.updateKvDataVersion(JSON.toJSONString(kvDataVersion).getBytes(StandardCharsets.UTF_8));
162+
this.configRocksDBStorage.put(versionCF, KV_DATA_VERSION_KEY, KV_DATA_VERSION_KEY.length,
163+
JSON.toJSONString(kvDataVersion).getBytes(StandardCharsets.UTF_8));
129164
}
130165

131166
public DataVersion getKvDataVersion() {
132167
return kvDataVersion;
133168
}
134169

135-
public void updateForbidden(String key, String value) throws Exception {
136-
this.configRocksDBStorage.updateForbidden(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8));
170+
// batch operations
171+
public void writeBatchPutOperation(WriteBatch writeBatch, final byte[] key, final byte[] value) throws RocksDBException {
172+
configRocksDBStorage.writeBatchPutOperation(defaultCF, writeBatch, key, value);
137173
}
138174

139-
140175
public void batchPutWithWal(final WriteBatch batch) throws Exception {
141176
this.configRocksDBStorage.batchPutWithWal(batch);
142177
}

0 commit comments

Comments
 (0)