Skip to content

Commit ae451fe

Browse files
committed
feat(db):reactor code for readOption
1 parent e2c3780 commit ae451fe

File tree

5 files changed

+86
-28
lines changed

5 files changed

+86
-28
lines changed

chainbase/src/main/java/org/tron/common/storage/rocksdb/RocksDbDataSourceImpl.java

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ private static void checkArgNotNull(Object value, String name) {
117117
@Override
118118
public Set<byte[]> allKeys() throws RuntimeException {
119119
resetDbLock.readLock().lock();
120-
try (final RocksIterator iter = getRocksIterator()) {
120+
try (final ReadOptions readOptions = getReadOptions();
121+
final RocksIterator iter = getRocksIterator(readOptions)) {
121122
Set<byte[]> result = Sets.newHashSet();
122123
for (iter.seekToFirst(); iter.isValid(); iter.next()) {
123124
result.add(iter.key());
@@ -133,7 +134,8 @@ public Set<byte[]> allKeys() throws RuntimeException {
133134
@Override
134135
public Set<byte[]> allValues() throws RuntimeException {
135136
resetDbLock.readLock().lock();
136-
try (final RocksIterator iter = getRocksIterator()) {
137+
try (final ReadOptions readOptions = getReadOptions();
138+
final RocksIterator iter = getRocksIterator(readOptions)) {
137139
Set<byte[]> result = Sets.newHashSet();
138140
for (iter.seekToFirst(); iter.isValid(); iter.next()) {
139141
result.add(iter.value());
@@ -149,7 +151,8 @@ public Set<byte[]> allValues() throws RuntimeException {
149151
@Override
150152
public long getTotal() throws RuntimeException {
151153
resetDbLock.readLock().lock();
152-
try (final RocksIterator iter = getRocksIterator()) {
154+
try (final ReadOptions readOptions = getReadOptions();
155+
final RocksIterator iter = getRocksIterator(readOptions)) {
153156
long total = 0;
154157
for (iter.seekToFirst(); iter.isValid(); iter.next()) {
155158
total++;
@@ -282,7 +285,8 @@ public boolean flush() {
282285
*/
283286
@Override
284287
public org.tron.core.db.common.iterator.DBIterator iterator() {
285-
return new RockStoreIterator(getRocksIterator());
288+
ReadOptions readOptions = getReadOptions();
289+
return new RockStoreIterator(getRocksIterator(readOptions), readOptions);
286290
}
287291

288292
private void updateByBatchInner(Map<byte[], byte[]> rows, WriteOptions options)
@@ -333,7 +337,8 @@ public List<byte[]> getKeysNext(byte[] key, long limit) {
333337
return new ArrayList<>();
334338
}
335339
resetDbLock.readLock().lock();
336-
try (RocksIterator iter = getRocksIterator()) {
340+
try (final ReadOptions readOptions = getReadOptions();
341+
final RocksIterator iter = getRocksIterator(readOptions)) {
337342
List<byte[]> result = new ArrayList<>();
338343
long i = 0;
339344
for (iter.seek(key); iter.isValid() && i < limit; iter.next(), i++) {
@@ -350,7 +355,8 @@ public Map<byte[], byte[]> getNext(byte[] key, long limit) {
350355
return Collections.emptyMap();
351356
}
352357
resetDbLock.readLock().lock();
353-
try (RocksIterator iter = getRocksIterator()) {
358+
try (final ReadOptions readOptions = getReadOptions();
359+
final RocksIterator iter = getRocksIterator(readOptions)) {
354360
Map<byte[], byte[]> result = new HashMap<>();
355361
long i = 0;
356362
for (iter.seek(key); iter.isValid() && i < limit; iter.next(), i++) {
@@ -365,7 +371,8 @@ public Map<byte[], byte[]> getNext(byte[] key, long limit) {
365371
@Override
366372
public Map<WrappedByteArray, byte[]> prefixQuery(byte[] key) {
367373
resetDbLock.readLock().lock();
368-
try (RocksIterator iterator = getRocksIterator()) {
374+
try (final ReadOptions readOptions = getReadOptions();
375+
final RocksIterator iterator = getRocksIterator(readOptions)) {
369376
Map<WrappedByteArray, byte[]> result = new HashMap<>();
370377
for (iterator.seek(key); iterator.isValid(); iterator.next()) {
371378
if (Bytes.indexOf(iterator.key(), key) == 0) {
@@ -385,7 +392,8 @@ public Set<byte[]> getlatestValues(long limit) {
385392
return Sets.newHashSet();
386393
}
387394
resetDbLock.readLock().lock();
388-
try (RocksIterator iter = getRocksIterator()) {
395+
try (final ReadOptions readOptions = getReadOptions();
396+
final RocksIterator iter = getRocksIterator(readOptions)) {
389397
Set<byte[]> result = Sets.newHashSet();
390398
long i = 0;
391399
for (iter.seekToLast(); iter.isValid() && i < limit; iter.prev(), i++) {
@@ -402,7 +410,8 @@ public Set<byte[]> getValuesNext(byte[] key, long limit) {
402410
return Sets.newHashSet();
403411
}
404412
resetDbLock.readLock().lock();
405-
try (RocksIterator iter = getRocksIterator()) {
413+
try (final ReadOptions readOptions = getReadOptions();
414+
final RocksIterator iter = getRocksIterator(readOptions)) {
406415
Set<byte[]> result = Sets.newHashSet();
407416
long i = 0;
408417
for (iter.seek(key); iter.isValid() && i < limit; iter.next(), i++) {
@@ -430,18 +439,41 @@ public void backup(String dir) throws RocksDBException {
430439
*
431440
* <p>Example of correct usage:
432441
* <pre>{@code
433-
* try (RocksIterator iterator = getRocksIterator()) {
442+
* try ( ReadOptions readOptions = new ReadOptions().setFillCache(false);
443+
* RocksIterator iterator = getRocksIterator(readOptions)) {
444+
* iterator.seekToFirst();
434445
* // do something
435446
* }
436447
* }</pre>
437448
*
438449
* @return a new database iterator that must be closed.
439450
*/
440-
private RocksIterator getRocksIterator() {
441-
try (ReadOptions readOptions = new ReadOptions().setFillCache(false)) {
442-
throwIfNotAlive();
443-
return database.newIterator(readOptions);
444-
}
451+
private RocksIterator getRocksIterator(ReadOptions readOptions) {
452+
throwIfNotAlive();
453+
return database.newIterator(readOptions);
454+
}
455+
456+
/**
457+
* Returns an ReadOptions.
458+
*
459+
* <p><b>CRITICAL:</b> The returned ReadOptions holds native resources and <b>MUST</b> be closed
460+
* after use to prevent memory leaks. It is strongly recommended to use a try-with-resources
461+
* statement.
462+
*
463+
* <p>Example of correct usage:
464+
* <pre>{@code
465+
* try ( ReadOptions readOptions = getReadOptions();
466+
* RocksIterator iterator = getRocksIterator(readOptions)) {
467+
* iterator.seekToFirst();
468+
* // do something
469+
* }
470+
* }</pre>
471+
*
472+
* @return a new database iterator that must be closed.
473+
*/
474+
private ReadOptions getReadOptions() {
475+
throwIfNotAlive();
476+
return new ReadOptions().setFillCache(false);
445477
}
446478

447479
public boolean deleteDbBakPath(String dir) {

chainbase/src/main/java/org/tron/core/db/common/iterator/RockStoreIterator.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.util.NoSuchElementException;
66
import java.util.concurrent.atomic.AtomicBoolean;
77
import lombok.extern.slf4j.Slf4j;
8+
import org.rocksdb.ReadOptions;
89
import org.rocksdb.RocksIterator;
910

1011

@@ -15,14 +16,17 @@ public final class RockStoreIterator implements DBIterator {
1516
private boolean first = true;
1617

1718
private final AtomicBoolean close = new AtomicBoolean(false);
19+
private final ReadOptions readOptions;
1820

19-
public RockStoreIterator(RocksIterator dbIterator) {
21+
public RockStoreIterator(RocksIterator dbIterator, ReadOptions readOptions) {
22+
this.readOptions = readOptions;
2023
this.dbIterator = dbIterator;
2124
}
2225

2326
@Override
2427
public void close() throws IOException {
2528
if (close.compareAndSet(false, true)) {
29+
readOptions.close();
2630
dbIterator.close();
2731
}
2832
}

framework/src/test/java/org/tron/core/db/DBIteratorTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.junit.Test;
1515
import org.junit.rules.ExpectedException;
1616
import org.junit.rules.TemporaryFolder;
17+
import org.rocksdb.ReadOptions;
1718
import org.rocksdb.RocksDB;
1819
import org.rocksdb.RocksDBException;
1920
import org.tron.core.db.common.iterator.RockStoreIterator;
@@ -83,7 +84,7 @@ public void testRocksDb() throws RocksDBException, IOException {
8384
RocksDB db = RocksDB.open(options, file.toString())) {
8485
db.put("1".getBytes(StandardCharsets.UTF_8), "1".getBytes(StandardCharsets.UTF_8));
8586
db.put("2".getBytes(StandardCharsets.UTF_8), "2".getBytes(StandardCharsets.UTF_8));
86-
RockStoreIterator iterator = new RockStoreIterator(db.newIterator());
87+
RockStoreIterator iterator = new RockStoreIterator(db.newIterator(), new ReadOptions());
8788
iterator.seekToFirst();
8889
Assert.assertArrayEquals("1".getBytes(StandardCharsets.UTF_8), iterator.getKey());
8990
Assert.assertArrayEquals("1".getBytes(StandardCharsets.UTF_8), iterator.next().getValue());
@@ -99,7 +100,7 @@ public void testRocksDb() throws RocksDBException, IOException {
99100
Assert.assertTrue(e instanceof IllegalStateException);
100101
}
101102

102-
iterator = new RockStoreIterator(db.newIterator());
103+
iterator = new RockStoreIterator(db.newIterator(), new ReadOptions());
103104
iterator.seekToLast();
104105
Assert.assertArrayEquals("2".getBytes(StandardCharsets.UTF_8), iterator.getKey());
105106
Assert.assertArrayEquals("2".getBytes(StandardCharsets.UTF_8), iterator.getValue());

plugins/src/main/java/common/org/tron/plugins/utils/db/RockDBIterator.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,19 @@
22

33
import java.io.IOException;
44
import java.util.Map;
5+
import java.util.concurrent.atomic.AtomicBoolean;
6+
import org.rocksdb.ReadOptions;
57
import org.rocksdb.RocksIterator;
68

79
public class RockDBIterator implements DBIterator {
810

911
private final RocksIterator iterator;
12+
private final ReadOptions readOptions;
13+
private final AtomicBoolean closed = new AtomicBoolean(false);
1014

11-
public RockDBIterator(RocksIterator iterator) {
15+
public RockDBIterator(RocksIterator iterator, ReadOptions readOptions) {
1216
this.iterator = iterator;
17+
this.readOptions = readOptions;
1318
}
1419

1520
@Override
@@ -72,6 +77,9 @@ public byte[] setValue(byte[] value) {
7277

7378
@Override
7479
public void close() throws IOException {
75-
iterator.close();
80+
if (closed.compareAndSet(false, true)) {
81+
readOptions.close();
82+
iterator.close();
83+
}
7684
}
7785
}

plugins/src/main/java/common/org/tron/plugins/utils/db/RocksDBImpl.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.common.collect.Streams;
44
import java.io.IOException;
5+
import java.util.concurrent.atomic.AtomicBoolean;
56
import lombok.Getter;
67
import org.rocksdb.ReadOptions;
78
import org.rocksdb.RocksDBException;
@@ -12,6 +13,7 @@ public class RocksDBImpl implements DBInterface {
1213

1314
@Getter
1415
private final String name;
16+
private final AtomicBoolean closed = new AtomicBoolean(false);
1517

1618
public RocksDBImpl(org.rocksdb.RocksDB rocksDB, String name) {
1719
this.rocksDB = rocksDB;
@@ -20,41 +22,44 @@ public RocksDBImpl(org.rocksdb.RocksDB rocksDB, String name) {
2022

2123
@Override
2224
public byte[] get(byte[] key) {
25+
throwIfClosed();
2326
try {
2427
return rocksDB.get(key);
2528
} catch (RocksDBException e) {
26-
e.printStackTrace();
29+
throw new RuntimeException(name, e);
2730
}
28-
return null;
2931
}
3032

3133
@Override
3234
public void put(byte[] key, byte[] value) {
35+
throwIfClosed();
3336
try {
3437
rocksDB.put(key, value);
3538
} catch (RocksDBException e) {
36-
e.printStackTrace();
39+
throw new RuntimeException(name, e);
3740
}
3841
}
3942

4043
@Override
4144
public void delete(byte[] key) {
45+
throwIfClosed();
4246
try {
4347
rocksDB.delete(key);
4448
} catch (RocksDBException e) {
45-
e.printStackTrace();
49+
throw new RuntimeException(name, e);
4650
}
4751
}
4852

4953
@Override
5054
public DBIterator iterator() {
51-
try (ReadOptions readOptions = new ReadOptions().setFillCache(false)) {
52-
return new RockDBIterator(rocksDB.newIterator(readOptions));
53-
}
55+
throwIfClosed();
56+
ReadOptions readOptions = new ReadOptions().setFillCache(false);
57+
return new RockDBIterator(rocksDB.newIterator(readOptions), readOptions);
5458
}
5559

5660
@Override
5761
public long size() throws IOException {
62+
throwIfClosed();
5863
try (DBIterator iterator = this.iterator()) {
5964
iterator.seekToFirst();
6065
return Streams.stream(iterator).count();
@@ -63,6 +68,14 @@ public long size() throws IOException {
6368

6469
@Override
6570
public void close() throws IOException {
66-
rocksDB.close();
71+
if (closed.compareAndSet(false, true)) {
72+
rocksDB.close();
73+
}
74+
}
75+
76+
private void throwIfClosed() {
77+
if (closed.get()) {
78+
throw new IllegalStateException("db " + name + " has been closed");
79+
}
6780
}
6881
}

0 commit comments

Comments
 (0)