Skip to content

Commit c8a4314

Browse files
committed
feat(db/rocksdb):reactor code for ReadOptions and Options
1 parent 8fe01ad commit c8a4314

File tree

8 files changed

+130
-70
lines changed

8 files changed

+130
-70
lines changed

chainbase/src/main/java/org/tron/common/storage/rocksdb/RocksDbDataSourceImpl.java

Lines changed: 58 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class RocksDbDataSourceImpl extends DbStat implements DbSourceInter<byte[
5252
private volatile boolean alive;
5353
private String parentPath;
5454
private ReadWriteLock resetDbLock = new ReentrantReadWriteLock();
55+
private Options options;
5556

5657
public RocksDbDataSourceImpl(String parentPath, String name) {
5758
this.dataBaseName = name;
@@ -78,6 +79,9 @@ public void closeDB() {
7879
if (!isAlive()) {
7980
return;
8081
}
82+
if (this.options != null) {
83+
this.options.close();
84+
}
8185
database.close();
8286
alive = false;
8387
} catch (Exception e) {
@@ -117,7 +121,8 @@ private static void checkArgNotNull(Object value, String name) {
117121
@Override
118122
public Set<byte[]> allKeys() throws RuntimeException {
119123
resetDbLock.readLock().lock();
120-
try (final RocksIterator iter = getRocksIterator()) {
124+
try (final ReadOptions readOptions = getReadOptions();
125+
final RocksIterator iter = getRocksIterator(readOptions)) {
121126
Set<byte[]> result = Sets.newHashSet();
122127
for (iter.seekToFirst(); iter.isValid(); iter.next()) {
123128
result.add(iter.key());
@@ -133,7 +138,8 @@ public Set<byte[]> allKeys() throws RuntimeException {
133138
@Override
134139
public Set<byte[]> allValues() throws RuntimeException {
135140
resetDbLock.readLock().lock();
136-
try (final RocksIterator iter = getRocksIterator()) {
141+
try (final ReadOptions readOptions = getReadOptions();
142+
final RocksIterator iter = getRocksIterator(readOptions)) {
137143
Set<byte[]> result = Sets.newHashSet();
138144
for (iter.seekToFirst(); iter.isValid(); iter.next()) {
139145
result.add(iter.value());
@@ -149,7 +155,8 @@ public Set<byte[]> allValues() throws RuntimeException {
149155
@Override
150156
public long getTotal() throws RuntimeException {
151157
resetDbLock.readLock().lock();
152-
try (final RocksIterator iter = getRocksIterator()) {
158+
try (final ReadOptions readOptions = getReadOptions();
159+
final RocksIterator iter = getRocksIterator(readOptions)) {
153160
long total = 0;
154161
for (iter.seekToFirst(); iter.isValid(); iter.next()) {
155162
total++;
@@ -180,7 +187,7 @@ private void initDB() {
180187
throw new IllegalArgumentException("No name set to the dbStore");
181188
}
182189

183-
try (Options options = RocksDbSettings.getOptionsByDbName(dataBaseName)) {
190+
try {
184191
logger.debug("Opening database {}.", dataBaseName);
185192
final Path dbPath = getDbPath();
186193

@@ -191,14 +198,19 @@ private void initDB() {
191198
try {
192199
DbSourceInter.checkOrInitEngine(getEngine(), dbPath.toString(),
193200
TronError.ErrCode.ROCKSDB_INIT);
194-
database = RocksDB.open(options, dbPath.toString());
201+
this.options = RocksDbSettings.getOptionsByDbName(dataBaseName);
202+
database = RocksDB.open(this.options, dbPath.toString());
195203
} catch (RocksDBException e) {
196204
if (Objects.equals(e.getStatus().getCode(), Status.Code.Corruption)) {
197205
logger.error("Database {} corrupted, please delete database directory({}) "
198206
+ "and restart.", dataBaseName, parentPath, e);
199207
} else {
200208
logger.error("Open Database {} failed", dataBaseName, e);
201209
}
210+
211+
if (this.options != null) {
212+
this.options.close();
213+
}
202214
throw new TronError(e, TronError.ErrCode.ROCKSDB_INIT);
203215
}
204216

@@ -282,7 +294,8 @@ public boolean flush() {
282294
*/
283295
@Override
284296
public org.tron.core.db.common.iterator.DBIterator iterator() {
285-
return new RockStoreIterator(getRocksIterator());
297+
ReadOptions readOptions = getReadOptions();
298+
return new RockStoreIterator(getRocksIterator(readOptions), readOptions);
286299
}
287300

288301
private void updateByBatchInner(Map<byte[], byte[]> rows, WriteOptions options)
@@ -333,7 +346,8 @@ public List<byte[]> getKeysNext(byte[] key, long limit) {
333346
return new ArrayList<>();
334347
}
335348
resetDbLock.readLock().lock();
336-
try (RocksIterator iter = getRocksIterator()) {
349+
try (final ReadOptions readOptions = getReadOptions();
350+
final RocksIterator iter = getRocksIterator(readOptions)) {
337351
List<byte[]> result = new ArrayList<>();
338352
long i = 0;
339353
for (iter.seek(key); iter.isValid() && i < limit; iter.next(), i++) {
@@ -350,7 +364,8 @@ public Map<byte[], byte[]> getNext(byte[] key, long limit) {
350364
return Collections.emptyMap();
351365
}
352366
resetDbLock.readLock().lock();
353-
try (RocksIterator iter = getRocksIterator()) {
367+
try (final ReadOptions readOptions = getReadOptions();
368+
final RocksIterator iter = getRocksIterator(readOptions)) {
354369
Map<byte[], byte[]> result = new HashMap<>();
355370
long i = 0;
356371
for (iter.seek(key); iter.isValid() && i < limit; iter.next(), i++) {
@@ -365,7 +380,8 @@ public Map<byte[], byte[]> getNext(byte[] key, long limit) {
365380
@Override
366381
public Map<WrappedByteArray, byte[]> prefixQuery(byte[] key) {
367382
resetDbLock.readLock().lock();
368-
try (RocksIterator iterator = getRocksIterator()) {
383+
try (final ReadOptions readOptions = getReadOptions();
384+
final RocksIterator iterator = getRocksIterator(readOptions)) {
369385
Map<WrappedByteArray, byte[]> result = new HashMap<>();
370386
for (iterator.seek(key); iterator.isValid(); iterator.next()) {
371387
if (Bytes.indexOf(iterator.key(), key) == 0) {
@@ -385,7 +401,8 @@ public Set<byte[]> getlatestValues(long limit) {
385401
return Sets.newHashSet();
386402
}
387403
resetDbLock.readLock().lock();
388-
try (RocksIterator iter = getRocksIterator()) {
404+
try (final ReadOptions readOptions = getReadOptions();
405+
final RocksIterator iter = getRocksIterator(readOptions)) {
389406
Set<byte[]> result = Sets.newHashSet();
390407
long i = 0;
391408
for (iter.seekToLast(); iter.isValid() && i < limit; iter.prev(), i++) {
@@ -402,7 +419,8 @@ public Set<byte[]> getValuesNext(byte[] key, long limit) {
402419
return Sets.newHashSet();
403420
}
404421
resetDbLock.readLock().lock();
405-
try (RocksIterator iter = getRocksIterator()) {
422+
try (final ReadOptions readOptions = getReadOptions();
423+
final RocksIterator iter = getRocksIterator(readOptions)) {
406424
Set<byte[]> result = Sets.newHashSet();
407425
long i = 0;
408426
for (iter.seek(key); iter.isValid() && i < limit; iter.next(), i++) {
@@ -430,18 +448,41 @@ public void backup(String dir) throws RocksDBException {
430448
*
431449
* <p>Example of correct usage:
432450
* <pre>{@code
433-
* try (RocksIterator iterator = getRocksIterator()) {
451+
* try ( ReadOptions readOptions = new ReadOptions().setFillCache(false);
452+
* RocksIterator iterator = getRocksIterator(readOptions)) {
453+
* iterator.seekToFirst();
434454
* // do something
435455
* }
436456
* }</pre>
437457
*
438458
* @return a new database iterator that must be closed.
439459
*/
440-
private RocksIterator getRocksIterator() {
441-
try (ReadOptions readOptions = new ReadOptions().setFillCache(false)) {
442-
throwIfNotAlive();
443-
return database.newIterator(readOptions);
444-
}
460+
private RocksIterator getRocksIterator(ReadOptions readOptions) {
461+
throwIfNotAlive();
462+
return database.newIterator(readOptions);
463+
}
464+
465+
/**
466+
* Returns an ReadOptions.
467+
*
468+
* <p><b>CRITICAL:</b> The returned ReadOptions holds native resources and <b>MUST</b> be closed
469+
* after use to prevent memory leaks. It is strongly recommended to use a try-with-resources
470+
* statement.
471+
*
472+
* <p>Example of correct usage:
473+
* <pre>{@code
474+
* try (ReadOptions readOptions = getReadOptions();
475+
* RocksIterator iterator = getRocksIterator(readOptions)) {
476+
* iterator.seekToFirst();
477+
* // do something
478+
* }
479+
* }</pre>
480+
*
481+
* @return a new database iterator that must be closed.
482+
*/
483+
private ReadOptions getReadOptions() {
484+
throwIfNotAlive();
485+
return new ReadOptions().setFillCache(false);
445486
}
446487

447488
public boolean deleteDbBakPath(String dir) {

chainbase/src/main/java/org/tron/core/db/common/iterator/RockStoreIterator.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.util.NoSuchElementException;
66
import java.util.concurrent.atomic.AtomicBoolean;
77
import lombok.extern.slf4j.Slf4j;
8+
import org.rocksdb.ReadOptions;
89
import org.rocksdb.RocksIterator;
910

1011

@@ -15,14 +16,17 @@ public final class RockStoreIterator implements DBIterator {
1516
private boolean first = true;
1617

1718
private final AtomicBoolean close = new AtomicBoolean(false);
19+
private final ReadOptions readOptions;
1820

19-
public RockStoreIterator(RocksIterator dbIterator) {
21+
public RockStoreIterator(RocksIterator dbIterator, ReadOptions readOptions) {
22+
this.readOptions = readOptions;
2023
this.dbIterator = dbIterator;
2124
}
2225

2326
@Override
2427
public void close() throws IOException {
2528
if (close.compareAndSet(false, true)) {
29+
readOptions.close();
2630
dbIterator.close();
2731
}
2832
}

framework/src/test/java/org/tron/core/db/DBIteratorTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.junit.Test;
1515
import org.junit.rules.ExpectedException;
1616
import org.junit.rules.TemporaryFolder;
17+
import org.rocksdb.ReadOptions;
1718
import org.rocksdb.RocksDB;
1819
import org.rocksdb.RocksDBException;
1920
import org.tron.core.db.common.iterator.RockStoreIterator;
@@ -83,7 +84,7 @@ public void testRocksDb() throws RocksDBException, IOException {
8384
RocksDB db = RocksDB.open(options, file.toString())) {
8485
db.put("1".getBytes(StandardCharsets.UTF_8), "1".getBytes(StandardCharsets.UTF_8));
8586
db.put("2".getBytes(StandardCharsets.UTF_8), "2".getBytes(StandardCharsets.UTF_8));
86-
RockStoreIterator iterator = new RockStoreIterator(db.newIterator());
87+
RockStoreIterator iterator = new RockStoreIterator(db.newIterator(), new ReadOptions());
8788
iterator.seekToFirst();
8889
Assert.assertArrayEquals("1".getBytes(StandardCharsets.UTF_8), iterator.getKey());
8990
Assert.assertArrayEquals("1".getBytes(StandardCharsets.UTF_8), iterator.next().getValue());
@@ -99,7 +100,7 @@ public void testRocksDb() throws RocksDBException, IOException {
99100
Assert.assertTrue(e instanceof IllegalStateException);
100101
}
101102

102-
iterator = new RockStoreIterator(db.newIterator());
103+
iterator = new RockStoreIterator(db.newIterator(), new ReadOptions());
103104
iterator.seekToLast();
104105
Assert.assertArrayEquals("2".getBytes(StandardCharsets.UTF_8), iterator.getKey());
105106
Assert.assertArrayEquals("2".getBytes(StandardCharsets.UTF_8), iterator.getValue());

plugins/src/main/java/common/org/tron/plugins/DbConvert.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,8 @@ public void convertLevelToRocks() throws Exception {
259259
JniDBFactory.pushMemoryPool(1024 * 1024);
260260
try (
261261
DB level = DBUtils.newLevelDb(srcDbPath);
262-
RocksDB rocks = DBUtils.newRocksDbForBulkLoad(dstDbPath);
262+
org.rocksdb.Options options = DBUtils.newDefaultRocksDbOptions(true, dbName);
263+
RocksDB rocks = RocksDB.open(options, this.dstDbPath.toString());
263264
DBIterator levelIterator = level.iterator(
264265
new org.iq80.leveldb.ReadOptions().fillCache(false))) {
265266

@@ -291,7 +292,8 @@ private void compact() throws RocksDBException {
291292
if (DBUtils.MARKET_PAIR_PRICE_TO_ORDER.equalsIgnoreCase(this.dbName)) {
292293
return;
293294
}
294-
try (RocksDB rocks = DBUtils.newRocksDb(this.dstDbPath)) {
295+
try (org.rocksdb.Options options = DBUtils.newDefaultRocksDbOptions(false, dbName);
296+
RocksDB rocks = RocksDB.open(options, this.dstDbPath.toString())) {
295297
logger.info("compact database {} start", this.dbName);
296298
rocks.compactRange();
297299
logger.info("compact database {} end", this.dbName);
@@ -300,7 +302,8 @@ private void compact() throws RocksDBException {
300302

301303
private boolean check() throws RocksDBException {
302304
try (
303-
RocksDB rocks = DBUtils.newRocksDbReadOnly(this.dstDbPath);
305+
org.rocksdb.Options options = DBUtils.newDefaultRocksDbOptions(false, dbName);
306+
RocksDB rocks = RocksDB.openReadOnly(options, this.dstDbPath.toString());
304307
org.rocksdb.ReadOptions r = new org.rocksdb.ReadOptions().setFillCache(false);
305308
RocksIterator rocksIterator = rocks.newIterator(r)) {
306309

plugins/src/main/java/common/org/tron/plugins/utils/DBUtils.java

Lines changed: 6 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414
import org.rocksdb.BloomFilter;
1515
import org.rocksdb.ComparatorOptions;
1616
import org.rocksdb.Options;
17-
import org.rocksdb.RocksDB;
18-
import org.rocksdb.RocksDBException;
1917
import org.tron.common.arch.Arch;
2018
import org.tron.common.utils.MarketOrderPriceComparatorForLevelDB;
2119
import org.tron.common.utils.MarketOrderPriceComparatorForRocksDB;
@@ -95,15 +93,16 @@ public static org.iq80.leveldb.Options newDefaultLevelDbOptions() {
9593
* Use try-with-resources.
9694
*
9795
* <pre>{@code
98-
* try (Options options = newDefaultRocksDbOptions(false)) {
96+
* try (Options options = newDefaultRocksDbOptions(false, name)) {
9997
* // do something
10098
* }
10199
* }</pre>
102100
*
103101
* @param forBulkLoad if true, optimizes for bulk loading
102+
* @param name db name
104103
* @return a new Options instance that must be closed
105104
*/
106-
private static Options newDefaultRocksDbOptions(boolean forBulkLoad) {
105+
public static Options newDefaultRocksDbOptions(boolean forBulkLoad, String name) {
107106
Options options = new Options();
108107
options.setCreateIfMissing(true);
109108
options.setIncreaseParallelism(1);
@@ -126,35 +125,10 @@ private static Options newDefaultRocksDbOptions(boolean forBulkLoad) {
126125
if (forBulkLoad) {
127126
options.prepareForBulkLoad();
128127
}
129-
return options;
130-
}
131-
132-
public static RocksDB newRocksDb(Path db) throws RocksDBException {
133-
try (Options options = newDefaultRocksDbOptions(false)) {
134-
if (MARKET_PAIR_PRICE_TO_ORDER.equalsIgnoreCase(db.getFileName().toString())) {
135-
options.setComparator(new MarketOrderPriceComparatorForRocksDB(new ComparatorOptions()));
136-
}
137-
return RocksDB.open(options, db.toString());
138-
}
139-
}
140-
141-
public static RocksDB newRocksDbForBulkLoad(Path db) throws RocksDBException {
142-
try (Options options = newDefaultRocksDbOptions(true)) {
143-
if (MARKET_PAIR_PRICE_TO_ORDER.equalsIgnoreCase(db.getFileName().toString())) {
144-
options.setComparator(new MarketOrderPriceComparatorForRocksDB(new ComparatorOptions()));
145-
}
146-
return RocksDB.open(options, db.toString());
147-
}
148-
}
149-
150-
151-
public static RocksDB newRocksDbReadOnly(Path db) throws RocksDBException {
152-
try (Options options = newDefaultRocksDbOptions(false)) {
153-
if (MARKET_PAIR_PRICE_TO_ORDER.equalsIgnoreCase(db.getFileName().toString())) {
154-
options.setComparator(new MarketOrderPriceComparatorForRocksDB(new ComparatorOptions()));
155-
}
156-
return RocksDB.openReadOnly(options, db.toString());
128+
if (MARKET_PAIR_PRICE_TO_ORDER.equalsIgnoreCase(name)) {
129+
options.setComparator(new MarketOrderPriceComparatorForRocksDB(new ComparatorOptions()));
157130
}
131+
return options;
158132
}
159133

160134
public static String simpleDecode(byte[] bytes) {

plugins/src/main/java/common/org/tron/plugins/utils/db/DbTool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ public static LevelDBImpl openLevelDb(Path db, String name) throws IOException {
181181
}
182182

183183
public static RocksDBImpl openRocksDb(Path db, String name) throws RocksDBException {
184-
RocksDBImpl rocksdb = new RocksDBImpl(DBUtils.newRocksDb(db), name);
184+
RocksDBImpl rocksdb = new RocksDBImpl(db, name);
185185
tryInitEngineFile(db, ROCKSDB);
186186
return rocksdb;
187187
}

plugins/src/main/java/common/org/tron/plugins/utils/db/RockDBIterator.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,19 @@
22

33
import java.io.IOException;
44
import java.util.Map;
5+
import java.util.concurrent.atomic.AtomicBoolean;
6+
import org.rocksdb.ReadOptions;
57
import org.rocksdb.RocksIterator;
68

79
public class RockDBIterator implements DBIterator {
810

911
private final RocksIterator iterator;
12+
private final ReadOptions readOptions;
13+
private final AtomicBoolean closed = new AtomicBoolean(false);
1014

11-
public RockDBIterator(RocksIterator iterator) {
15+
public RockDBIterator(RocksIterator iterator, ReadOptions readOptions) {
1216
this.iterator = iterator;
17+
this.readOptions = readOptions;
1318
}
1419

1520
@Override
@@ -72,6 +77,9 @@ public byte[] setValue(byte[] value) {
7277

7378
@Override
7479
public void close() throws IOException {
75-
iterator.close();
80+
if (closed.compareAndSet(false, true)) {
81+
readOptions.close();
82+
iterator.close();
83+
}
7684
}
7785
}

0 commit comments

Comments
 (0)