4141import java .util .Objects ;
4242import java .util .concurrent .ConcurrentHashMap ;
4343import java .util .concurrent .Semaphore ;
44+ import java .util .concurrent .atomic .AtomicBoolean ;
45+ import java .util .concurrent .atomic .AtomicLong ;
4446import java .util .concurrent .locks .ReentrantReadWriteLock ;
4547import java .util .function .Function ;
4648import java .util .stream .Collectors ;
@@ -53,6 +55,8 @@ public class DataNodeTableCache implements ITableCache {
5355
5456 private static final Logger LOGGER = LoggerFactory .getLogger (DataNodeTableCache .class );
5557
58+ private final AtomicLong version = new AtomicLong (0 );
59+
5660 // The database is without "root"
5761 private final Map <String , Map <String , TsTable >> databaseTableMap = new ConcurrentHashMap <>();
5862
@@ -175,6 +179,7 @@ public void commitUpdateTable(String database, final String tableName) {
175179 .computeIfAbsent (database , k -> new ConcurrentHashMap <>())
176180 .put (tableName , preUpdateTableMap .get (database ).get (tableName ).getLeft ());
177181 removeTableFromPreUpdateMap (database , tableName );
182+ version .incrementAndGet ();
178183 LOGGER .info ("Commit-update table {}.{} successfully" , database , tableName );
179184 } finally {
180185 readWriteLock .writeLock ().unlock ();
@@ -188,6 +193,7 @@ public void invalid(String database) {
188193 try {
189194 databaseTableMap .remove (database );
190195 preUpdateTableMap .remove (database );
196+ version .incrementAndGet ();
191197 } finally {
192198 readWriteLock .writeLock ().unlock ();
193199 }
@@ -205,6 +211,7 @@ public void invalid(String database, final String tableName) {
205211 if (preUpdateTableMap .containsKey (database )) {
206212 preUpdateTableMap .get (database ).remove (tableName );
207213 }
214+ version .incrementAndGet ();
208215 } finally {
209216 readWriteLock .writeLock ().unlock ();
210217 }
@@ -228,11 +235,16 @@ public void invalid(String database, final String tableName, final String column
228235 }
229236 tableVersionPair .setRight (tableVersionPair .getRight () + 1 );
230237 }
238+ version .incrementAndGet ();
231239 } finally {
232240 readWriteLock .writeLock ().unlock ();
233241 }
234242 }
235243
244+ public long getVersion () {
245+ return version .get ();
246+ }
247+
236248 public TsTable getTableInWrite (final String database , final String tableName ) {
237249 final TsTable result = getTableInCache (database , tableName );
238250 return Objects .nonNull (result ) ? result : getTable (database , tableName );
@@ -316,6 +328,7 @@ private void updateTable(
316328 final Map <String , Map <String , Long >> previousVersions ) {
317329 readWriteLock .writeLock ().lock ();
318330 try {
331+ final AtomicBoolean isUpdated = new AtomicBoolean (false );
319332 fetchedTables .forEach (
320333 (database , tableInfoMap ) -> {
321334 if (preUpdateTableMap .containsKey (database )) {
@@ -329,6 +342,7 @@ private void updateTable(
329342 previousVersions .get (database ).get (tableName ))) {
330343 return ;
331344 }
345+ isUpdated .set (true );
332346 LOGGER .info (
333347 "Update table {}.{} by table fetch, table in preUpdateMap: {}, new table: {}" ,
334348 database ,
@@ -346,6 +360,9 @@ private void updateTable(
346360 });
347361 }
348362 });
363+ if (isUpdated .get ()) {
364+ version .incrementAndGet ();
365+ }
349366 } finally {
350367 readWriteLock .writeLock ().unlock ();
351368 }
0 commit comments