3636import java .util .Arrays ;
3737import java .util .List ;
3838import java .util .OptionalLong ;
39+ import java .util .Set ;
3940import java .util .concurrent .CompletableFuture ;
4041import java .util .concurrent .CopyOnWriteArrayList ;
4142import java .util .concurrent .ExecutorService ;
4748import java .util .concurrent .atomic .AtomicBoolean ;
4849import java .util .function .Consumer ;
4950import java .util .function .Supplier ;
51+ import java .util .stream .Collectors ;
5052import org .apache .ignite .internal .close .ManuallyCloseable ;
5153import org .apache .ignite .internal .failure .FailureContext ;
5254import org .apache .ignite .internal .failure .FailureProcessor ;
@@ -115,6 +117,10 @@ public interface EntryReader {
115117
116118 private volatile WatchEventHandlingCallback watchEventHandlingCallback ;
117119
120+ // This field is used in assertions only. It was added in order to ease the debug of a tricky problem that is nearly impossible to
121+ // reproduce.
122+ private volatile long revision = -1 ;
123+
118124 /** Executor for processing watch events. */
119125 private final ExecutorService watchExecutor ;
120126
@@ -252,6 +258,9 @@ CompletableFuture<Void> enqueue(
252258 private CompletableFuture <Void > notifyWatchesInternal (long newRevision , List <Entry > updatedEntries , HybridTimestamp time ) {
253259 assert time != null ;
254260
261+ Set <Long > revisionsSet = updatedEntries .stream ().map (Entry ::revision ).collect (Collectors .toUnmodifiableSet ());
262+ assert revisionsSet .size () <= 1 : "Update entries are associated with different revisions, revisions=" + revisionsSet ;
263+
255264 List <Entry > filteredUpdatedEntries = updatedEntries .isEmpty () ? emptyList () : updatedEntries .stream ()
256265 .filter (WatchProcessor ::isNotIdempotentCacheCommand )
257266 .collect (toList ());
@@ -274,7 +283,7 @@ private CompletableFuture<Void> notifyWatchesInternal(long newRevision, List<Ent
274283 return newNotificationFuture ;
275284 }, newNotificationFuture -> {
276285 invokeNotificationFutureListeners (newNotificationFuture , filteredUpdatedEntries , time );
277- }, updatedEntriesKeysInfo (updatedEntries ));
286+ }, updatedEntriesKeysInfo (newRevision , updatedEntries ));
278287 }
279288
280289 private void invokeNotificationFutureListeners (
@@ -287,10 +296,11 @@ private void invokeNotificationFutureListeners(
287296 }
288297 }
289298
290- private static Supplier <String > updatedEntriesKeysInfo (List <Entry > updatedEntries ) {
299+ private Supplier <String > updatedEntriesKeysInfo (long revision , List <Entry > updatedEntries ) {
291300 return () -> updatedEntries .stream ()
292301 .map (entry -> new String (entry .key (), UTF_8 ))
293- .collect (joining ("," , "Keys of updated entries: " , "" ));
302+ .collect (joining (", " , "Keys of revision: " + revision + " and previous revision: " + this .revision
303+ + "with updated entries: " , "" ));
294304 }
295305
296306 private static CompletableFuture <Void > performWatchesNotifications (
@@ -403,6 +413,8 @@ private void invokeOnRevisionCallback(long revision, HybridTimestamp time) {
403413 watchEventHandlingCallback .onSafeTimeAdvanced (time );
404414
405415 watchEventHandlingCallback .onRevisionApplied (revision );
416+
417+ this .revision = revision ;
406418 }
407419
408420 /**
0 commit comments