22
33import com .google .common .hash .BloomFilter ;
44import com .google .common .hash .Funnels ;
5+ import com .google .common .hash .HashCode ;
6+ import com .google .common .hash .Hashing ;
7+ import com .google .common .io .ByteSource ;
58import com .google .common .primitives .Longs ;
69import java .io .BufferedInputStream ;
710import java .io .BufferedOutputStream ;
11+ import java .io .IOException ;
812import java .io .InputStream ;
913import java .io .InputStreamReader ;
1014import java .io .OutputStream ;
1822import java .util .Iterator ;
1923import java .util .Map ;
2024import java .util .Map .Entry ;
25+ import java .util .Objects ;
2126import java .util .Properties ;
2227import java .util .concurrent .CompletableFuture ;
2328import java .util .concurrent .atomic .AtomicBoolean ;
4045import org .tron .core .db .RecentTransactionItem ;
4146import org .tron .core .db .RecentTransactionStore ;
4247import org .tron .core .db .common .iterator .DBIterator ;
48+ import org .tron .core .store .DynamicPropertiesStore ;
4349
4450@ Slf4j (topic = "DB" )
4551public class TxCacheDB implements DB <byte [], byte []>, Flusher {
@@ -59,7 +65,6 @@ public class TxCacheDB implements DB<byte[], byte[]>, Flusher {
5965 private BloomFilter <byte []>[] bloomFilters = new BloomFilter [2 ];
6066 // filterStartBlock record the start block of the active filter
6167 private volatile long filterStartBlock = INVALID_BLOCK ;
62- private volatile long currentBlockNum = INVALID_BLOCK ;
6368 // currentFilterIndex records the index of the active filter
6469 private volatile int currentFilterIndex = 0 ;
6570
@@ -75,21 +80,28 @@ public class TxCacheDB implements DB<byte[], byte[]>, Flusher {
7580 // replace persistentStore and optimizes startup performance
7681 private RecentTransactionStore recentTransactionStore ;
7782
83+ private DynamicPropertiesStore dynamicPropertiesStore ;
84+
7885 private final Path cacheFile0 ;
7986 private final Path cacheFile1 ;
87+ private String crc32c0 ;
88+ private String crc32c1 ;
8089 private final Path cacheProperties ;
8190 private final Path cacheDir ;
8291 private AtomicBoolean isValid = new AtomicBoolean (false );
92+ private boolean txCacheInitOptimization ;
8393
8494 @ Getter
8595 @ Setter
8696 private volatile boolean alive ;
8797
88- public TxCacheDB (String name , RecentTransactionStore recentTransactionStore ) {
98+ public TxCacheDB (String name , RecentTransactionStore recentTransactionStore ,
99+ DynamicPropertiesStore dynamicPropertiesStore ) {
89100 this .name = name ;
90101 this .TRANSACTION_COUNT =
91102 CommonParameter .getInstance ().getStorage ().getEstimatedBlockTransactions ();
92103 this .recentTransactionStore = recentTransactionStore ;
104+ this .dynamicPropertiesStore = dynamicPropertiesStore ;
93105 String dbEngine = CommonParameter .getInstance ().getStorage ().getDbEngine ();
94106 if ("LEVELDB" .equals (dbEngine .toUpperCase ())) {
95107 this .persistentStore = new LevelDB (
@@ -117,6 +129,8 @@ public TxCacheDB(String name, RecentTransactionStore recentTransactionStore) {
117129 this .cacheFile0 = Paths .get (cacheDir .toString (), "bloomFilters_0" );
118130 this .cacheFile1 = Paths .get (cacheDir .toString (), "bloomFilters_1" );
119131 this .cacheProperties = Paths .get (cacheDir .toString (), "txCache.properties" );
132+ this .txCacheInitOptimization = CommonParameter .getInstance ()
133+ .getStorage ().isTxCacheInitOptimization ();
120134
121135 }
122136
@@ -211,7 +225,6 @@ public void put(byte[] key, byte[] value) {
211225 MAX_BLOCK_SIZE * TRANSACTION_COUNT );
212226 }
213227 bloomFilters [currentFilterIndex ].put (key );
214- currentBlockNum = blockNum ;
215228 if (lastMetricBlock != blockNum ) {
216229 lastMetricBlock = blockNum ;
217230 Metrics .gaugeSet (MetricKeys .Gauge .TX_CACHE ,
@@ -270,6 +283,12 @@ public void reset() {
270283 }
271284
272285 private boolean recovery () {
286+ if (!txCacheInitOptimization ) {
287+ logger .info ("txCache init optimization is disabled, skip fast recovery mode." );
288+ logger .info ("If you want fast recovery mode,"
289+ + " please set `storage.txCache.initOptimization = true` in config.conf." );
290+ return false ;
291+ }
273292 FileUtil .createDirIfNotExists (this .cacheDir .toString ());
274293 logger .info ("recovery bloomFilters start." );
275294 CompletableFuture <Boolean > loadProperties = CompletableFuture .supplyAsync (this ::loadProperties );
@@ -278,13 +297,18 @@ private boolean recovery() {
278297 CompletableFuture <Boolean > tk1 = loadProperties .thenApplyAsync (
279298 v -> recovery (1 , this .cacheFile1 ));
280299
281- return CompletableFuture .allOf (tk0 , tk1 ).thenApply (v -> {
282- logger .info ("recovery bloomFilters success." );
283- return true ;
284- }).exceptionally (this ::handleException ).join ();
300+ try {
301+ return CompletableFuture .allOf (tk0 , tk1 ).thenApply (v -> {
302+ logger .info ("recovery bloomFilters success." );
303+ return true ;
304+ }).exceptionally (this ::handleException ).join ();
305+ } finally {
306+ clearCrc32c ();
307+ }
285308 }
286309
287310 private boolean recovery (int index , Path file ) {
311+ checkCrc32c (index , file );
288312 try (InputStream in = new BufferedInputStream (Files .newInputStream (file ,
289313 StandardOpenOption .READ , StandardOpenOption .DELETE_ON_CLOSE ))) {
290314 logger .info ("recovery bloomFilter[{}] from file." , index );
@@ -326,24 +350,38 @@ private void dump() {
326350 () -> dump (0 , this .cacheFile0 ));
327351 CompletableFuture <Void > task1 = CompletableFuture .runAsync (
328352 () -> dump (1 , this .cacheFile1 ));
329- CompletableFuture .allOf (task0 , task1 ).thenRun (() -> {
330- writeProperties ();
331- logger .info ("dump bloomFilters done." );
332-
333- }).exceptionally (e -> {
334- logger .info ("dump bloomFilters to file failed. {}" , e .getMessage ());
335- return null ;
336- }).join ();
353+ try {
354+ CompletableFuture .allOf (task0 , task1 ).thenRun (() -> {
355+ writeProperties ();
356+ logger .info ("dump bloomFilters done." );
357+ }).exceptionally (e -> {
358+ logger .info ("dump bloomFilters to file failed. {}" , e .getMessage ());
359+ return null ;
360+ }).join ();
361+ } finally {
362+ clearCrc32c ();
363+ }
337364 }
338365
339366 private void dump (int index , Path file ) {
367+ logger .info ("dump bloomFilters[{}] to file." , index );
368+ long start = System .currentTimeMillis ();
340369 try (OutputStream out = new BufferedOutputStream (Files .newOutputStream (file ))) {
341- logger .info ("dump bloomFilters[{}] to file." , index );
342- long start = System .currentTimeMillis ();
343370 bloomFilters [index ].writeTo (out );
344- logger .info ("dump bloomFilters[{}] to file done,filter: {}, filter-fpp: {}, cost {} ms." ,
371+ } catch (Exception e ) {
372+ throw new RuntimeException (e );
373+ }
374+ try {
375+ String crc32c = getCrc32c (file );
376+ if (index == 0 ) {
377+ this .crc32c0 = crc32c ;
378+ } else {
379+ this .crc32c1 = crc32c ;
380+ }
381+ logger .info ("dump bloomFilters[{}] to file done,filter: {}, filter-fpp: {}, "
382+ + "crc32c: {}, cost {} ms." ,
345383 index , bloomFilters [index ].approximateElementCount (), bloomFilters [index ].expectedFpp (),
346- System .currentTimeMillis () - start );
384+ crc32c , System .currentTimeMillis () - start );
347385 } catch (Exception e ) {
348386 throw new RuntimeException (e );
349387 }
@@ -356,8 +394,16 @@ private boolean loadProperties() {
356394 Properties properties = new Properties ();
357395 properties .load (r );
358396 filterStartBlock = Long .parseLong (properties .getProperty ("filterStartBlock" ));
359- currentBlockNum = Long .parseLong (properties .getProperty ("currentBlockNum" ));
397+ long currentBlockNum = Long .parseLong (properties .getProperty ("currentBlockNum" ));
398+ long currentBlockNumFromDB = dynamicPropertiesStore .getLatestBlockHeaderNumberFromDB ();
360399 currentFilterIndex = Integer .parseInt (properties .getProperty ("currentFilterIndex" ));
400+ if (currentBlockNum != currentBlockNumFromDB ) {
401+ throw new IllegalStateException (
402+ String .format ("currentBlockNum not match. filter: %d, db: %d" ,
403+ currentBlockNum , currentBlockNumFromDB ));
404+ }
405+ this .crc32c0 = properties .getProperty ("crc32c0" );
406+ this .crc32c1 = properties .getProperty ("crc32c1" );
361407 logger .info ("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}, load done." ,
362408 filterStartBlock , currentBlockNum , currentFilterIndex );
363409 return true ;
@@ -369,9 +415,12 @@ private boolean loadProperties() {
369415 private void writeProperties () {
370416 try (Writer w = Files .newBufferedWriter (this .cacheProperties , StandardCharsets .UTF_8 )) {
371417 Properties properties = new Properties ();
418+ long currentBlockNum = dynamicPropertiesStore .getLatestBlockHeaderNumberFromDB ();
372419 properties .setProperty ("filterStartBlock" , String .valueOf (filterStartBlock ));
373420 properties .setProperty ("currentBlockNum" , String .valueOf (currentBlockNum ));
374421 properties .setProperty ("currentFilterIndex" , String .valueOf (currentFilterIndex ));
422+ properties .setProperty ("crc32c0" , this .crc32c0 );
423+ properties .setProperty ("crc32c1" , this .crc32c1 );
375424 properties .store (w , "Generated by the application. PLEASE DO NOT EDIT! " );
376425 logger .info ("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}, write done." ,
377426 filterStartBlock , currentBlockNum , currentFilterIndex );
@@ -380,9 +429,33 @@ private void writeProperties() {
380429 }
381430 }
382431
432+ private String getCrc32c (Path file ) throws IOException {
433+ ByteSource byteSource = com .google .common .io .Files .asByteSource (file .toFile ());
434+ HashCode hc = byteSource .hash (Hashing .crc32c ());
435+ return hc .toString ();
436+ }
437+
438+ private void checkCrc32c (int index , Path file ) {
439+ try {
440+ String actual = getCrc32c (file );
441+ String expect = index == 0 ? this .crc32c0 : this .crc32c1 ;
442+ if (!Objects .equals (actual , expect )) {
443+ throw new IllegalStateException ("crc32c not match. index: " + index + ", expect: " + expect
444+ + ", actual: " + actual );
445+ }
446+ } catch (Exception e ) {
447+ throw new RuntimeException (e );
448+ }
449+ }
450+
451+ private void clearCrc32c () {
452+ this .crc32c0 = null ;
453+ this .crc32c1 = null ;
454+ }
455+
383456 @ Override
384457 public TxCacheDB newInstance () {
385- return new TxCacheDB (name , recentTransactionStore );
458+ return new TxCacheDB (name , recentTransactionStore , dynamicPropertiesStore );
386459 }
387460
388461 @ Override
0 commit comments