@@ -157,17 +157,21 @@ public String configFilePath() {
157157
158158 @ Override
159159 public synchronized void persist () {
160- try (WriteBatch writeBatch = new WriteBatch ()) {
161- for (Entry <String , ConcurrentMap <Integer , Long >> entry : this .offsetTable .entrySet ()) {
162- putWriteBatch (writeBatch , entry .getKey (), entry .getValue ());
163- if (writeBatch .getDataSize () >= 4 * 1024 ) {
164- this .rocksDBConfigManager .batchPutWithWal (writeBatch );
160+ if (!rocksDBConfigManager .isStop ) {
161+ try (WriteBatch writeBatch = new WriteBatch ()) {
162+ for (Entry <String , ConcurrentMap <Integer , Long >> entry : this .offsetTable .entrySet ()) {
163+ putWriteBatch (writeBatch , entry .getKey (), entry .getValue ());
164+ if (writeBatch .getDataSize () >= 4 * 1024 ) {
165+ this .rocksDBConfigManager .batchPutWithWal (writeBatch );
166+ }
165167 }
168+ this .rocksDBConfigManager .batchPutWithWal (writeBatch );
169+ this .rocksDBConfigManager .flushWAL ();
170+ } catch (Exception e ) {
171+ log .error ("consumer offset persist Failed" , e );
166172 }
167- this .rocksDBConfigManager .batchPutWithWal (writeBatch );
168- this .rocksDBConfigManager .flushWAL ();
169- } catch (Exception e ) {
170- log .error ("consumer offset persist Failed" , e );
173+ } else {
174+ log .warn ("RocksDBConsumerOffsetManager has been stopped, persist fail" );
171175 }
172176 }
173177
0 commit comments