diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java index dadfd7ec77..cf4384d815 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java @@ -35,6 +35,7 @@ import org.apache.hugegraph.backend.query.QueryResults; import org.apache.hugegraph.backend.store.BackendMutation; import org.apache.hugegraph.backend.store.BackendStore; +import org.apache.hugegraph.backend.store.BackendStoreProvider; import org.apache.hugegraph.backend.store.ram.RamTable; import org.apache.hugegraph.backend.tx.GraphTransaction; import org.apache.hugegraph.config.CoreOptions; @@ -70,12 +71,22 @@ public final class CachedGraphTransaction extends GraphTransaction { private static final ConcurrentMap GRAPH_CACHE_EVENT_LISTENERS = new ConcurrentHashMap<>(); + /* + * Same ref-counted lifecycle for the store event listener registered + * on the BackendStoreProvider; see StoreListenerHolder. + * + * Replaces the removed protected static storeEventListenStatus field + * that previously tracked store-listen state on GraphTransaction. + */ + private static final ConcurrentMap + STORE_EVENT_LISTENERS = new ConcurrentHashMap<>(); + private final Cache verticesCache; private final Cache edgesCache; - private EventListener storeEventListener; private EventListener cacheEventListener; private CacheListenerHolder holder; + private StoreListenerHolder storeHolder; public CachedGraphTransaction(HugeGraphParams graph, BackendStore store) { super(graph, store); @@ -135,7 +146,7 @@ private void listenChanges() { Set storeEvents = ImmutableSet.of(Events.STORE_INIT, Events.STORE_CLEAR, Events.STORE_TRUNCATE); - this.storeEventListener = event -> { + EventListener storeListener = event -> { if (storeEvents.contains(event.name())) { LOG.debug("Graph {} clear graph cache on event '{}'", this.graph(), event.name()); @@ -144,9 +155,24 @@ private void listenChanges() { } return false; }; - if (storeEventListenStatus.putIfAbsent(this.params().spaceGraphName(), true) == null) { - this.store().provider().listen(this.storeEventListener); - } + BackendStoreProvider provider = this.store().provider(); + String graphName = this.params().spaceGraphName(); + StoreListenerHolder storeAcquired = STORE_EVENT_LISTENERS.compute( + graphName, (key, existing) -> { + if (existing == null || existing.provider != provider) { + // Graph close/reopen creates a new provider for the + // same graph name; replace the stale holder. Old + // transactions skip decrement via identity check. + if (existing != null) { + existing.provider.unlisten(existing.listener); + } + provider.listen(storeListener); + return new StoreListenerHolder(storeListener, provider); + } + existing.refCount++; + return existing; + }); + this.storeHolder = storeAcquired; // Listen cache event: "cache"(invalid cache item) EventListener listener = event -> { @@ -196,7 +222,6 @@ private void listenChanges() { return false; }; EventHub graphEventHub = this.params().graphEventHub(); - String graphName = this.params().spaceGraphName(); CacheListenerHolder acquired = GRAPH_CACHE_EVENT_LISTENERS.compute( graphName, (key, existing) -> { if (existing == null || existing.hub != graphEventHub) { @@ -235,14 +260,20 @@ private void unlistenChanges() { this.holder = null; this.cacheEventListener = null; } - // TODO (follow-up): storeEventListenStatus has the same owner-first - // close bug this PR fixes for GRAPH_CACHE_EVENT_LISTENERS. A non-owner - // transaction can remove the tracking entry, unlisten its own - // never-registered storeEventListener as a no-op, and leave the - // original store listener registered but untracked. Apply the same - // ref-counted holder pattern in a follow-up PR. - if (storeEventListenStatus.remove(graphName) != null) { - this.store().provider().unlisten(this.storeEventListener); + StoreListenerHolder storeOurs = this.storeHolder; + if (storeOurs != null) { + STORE_EVENT_LISTENERS.compute(graphName, (key, existing) -> { + if (existing == null || existing != storeOurs) { + return existing; + } + existing.refCount--; + if (existing.refCount == 0) { + existing.provider.unlisten(existing.listener); + return null; + } + return existing; + }); + this.storeHolder = null; } } @@ -476,4 +507,27 @@ public void removeIndex(IndexLabel indexLabel) { } } } + + /* + * Listener lifetime must cover all active transactions for the graph. + * The holder is removed from the registry and unregistered from the + * BackendStoreProvider only when the last transaction releases it. + * Mirror of CacheListenerHolder for the store event path. + */ + private static final class StoreListenerHolder { + + final EventListener listener; + final BackendStoreProvider provider; + // Must only be read or written inside ConcurrentMap.compute() for the + // enclosing registry; ConcurrentHashMap.compute() serialises per-key + // access. + int refCount; + + StoreListenerHolder(EventListener listener, + BackendStoreProvider provider) { + this.listener = listener; + this.provider = provider; + this.refCount = 1; + } + } } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java index 5e33e0b3fc..a0792ee99e 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java @@ -28,7 +28,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -140,8 +139,6 @@ public class GraphTransaction extends IndexableTransaction { private final int verticesCapacity; private final int edgesCapacity; - protected static final ConcurrentHashMap storeEventListenStatus = - new ConcurrentHashMap<>(); public GraphTransaction(HugeGraphParams graph, BackendStore store) { super(graph, store); diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedGraphTransactionTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedGraphTransactionTest.java index 7bcc1a7fef..b128f277ca 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedGraphTransactionTest.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedGraphTransactionTest.java @@ -30,7 +30,8 @@ import org.apache.hugegraph.backend.cache.CachedGraphTransaction; import org.apache.hugegraph.backend.id.Id; import org.apache.hugegraph.backend.id.IdGenerator; -import org.apache.hugegraph.backend.tx.GraphTransaction; +import org.apache.hugegraph.backend.store.BackendStoreProvider; +import org.apache.hugegraph.event.EventHub; import org.apache.hugegraph.event.EventListener; import org.apache.hugegraph.schema.VertexLabel; import org.apache.hugegraph.structure.HugeEdge; @@ -93,20 +94,13 @@ private static ConcurrentMap graphCacheEventListeners() } @SuppressWarnings("unchecked") - private static ConcurrentMap storeEventListenStatus() + private static ConcurrentMap storeEventListeners() throws Exception { - Field field = GraphTransaction.class - .getDeclaredField("storeEventListenStatus"); + Field field = CachedGraphTransaction.class + .getDeclaredField( + "STORE_EVENT_LISTENERS"); field.setAccessible(true); - return (ConcurrentMap) field.get(null); - } - - private static void restoreStoreListenerStatusForKnownTeardownBug( - ConcurrentMap storeListeners, String graphName) { - // Closing a secondary transaction can consume storeEventListenStatus due - // to the follow-up bug documented in CachedGraphTransaction.unlistenChanges(). - // Restore it so teardown can still unregister the primary store listener. - storeListeners.putIfAbsent(graphName, true); + return (ConcurrentMap) field.get(null); } private static EventListener holderListener(Object holder) { @@ -118,6 +112,10 @@ private static int holderRefCount(Object holder) { return refCount; } + private static BackendStoreProvider holderProvider(Object holder) { + return Whitebox.getInternalState(holder, "provider"); + } + private HugeVertex newVertex(Id id) { HugeGraph graph = this.cache().graph(); graph.schema().propertyKey("name").asText() @@ -251,8 +249,6 @@ public void testClosingNonOwnerKeepsGraphCacheListenerRegistered() throws Exception { ConcurrentMap cacheListeners = graphCacheEventListeners(); - ConcurrentMap storeListeners = - storeEventListenStatus(); String graphName = this.params.spaceGraphName(); Object holder = cacheListeners.get(graphName); @@ -265,18 +261,58 @@ public void testClosingNonOwnerKeepsGraphCacheListenerRegistered() Assert.assertSame(holder, cacheListeners.get(graphName)); Assert.assertEquals(refCount + 1, holderRefCount(holder)); - try { - second.close(); + second.close(); - Assert.assertSame(holder, cacheListeners.get(graphName)); - Assert.assertEquals(refCount, holderRefCount(holder)); - Assert.assertTrue(this.params.graphEventHub() - .listeners(Events.CACHE) - .contains(registered)); - } finally { - restoreStoreListenerStatusForKnownTeardownBug(storeListeners, - graphName); - } + Assert.assertSame(holder, cacheListeners.get(graphName)); + Assert.assertEquals(refCount, holderRefCount(holder)); + Assert.assertTrue(this.params.graphEventHub() + .listeners(Events.CACHE) + .contains(registered)); + } + + @Test + public void testClosingNonOwnerKeepsStoreListenerRegistered() + throws Exception { + ConcurrentMap storeListeners = storeEventListeners(); + + String graphName = this.params.spaceGraphName(); + Object holder = storeListeners.get(graphName); + Assert.assertNotNull(holder); + EventListener registered = holderListener(holder); + int refCount = holderRefCount(holder); + + CachedGraphTransaction second = new CachedGraphTransaction( + this.params, this.params.loadGraphStore()); + Assert.assertSame(holder, storeListeners.get(graphName)); + Assert.assertEquals(refCount + 1, holderRefCount(holder)); + + second.close(); + + // Non-owner close must decrement the refcount, not drop the entry + Assert.assertSame(holder, storeListeners.get(graphName)); + Assert.assertEquals(refCount, holderRefCount(holder)); + Assert.assertSame(registered, holderListener(holder)); + } + + @Test + public void testLastCloseRemovesStoreListener() throws Exception { + ConcurrentMap storeListeners = storeEventListeners(); + + String graphName = this.params.spaceGraphName(); + CachedGraphTransaction owner = this.cache(); + CachedGraphTransaction second = new CachedGraphTransaction( + this.params, this.params.loadGraphStore()); + + Object holder = storeListeners.get(graphName); + Assert.assertNotNull(holder); + Assert.assertTrue(holderRefCount(holder) >= 2); + + owner.close(); + second.close(); + this.cache = null; + this.params.graphTransaction().close(); + + Assert.assertFalse(storeListeners.containsKey(graphName)); } @Test @@ -319,6 +355,93 @@ public void testCacheListenerSurvivesOwnerClose() throws Exception { Whitebox.invoke(second, "verticesCache", "size")); } + @Test + public void testStoreListenerSurvivesOwnerClose() throws Exception { + ConcurrentMap storeListeners = storeEventListeners(); + String graphName = this.params.spaceGraphName(); + CachedGraphTransaction owner = this.cache(); + CachedGraphTransaction second = new CachedGraphTransaction( + this.params, this.params.loadGraphStore()); + + Object holder = storeListeners.get(graphName); + Assert.assertNotNull(holder); + EventListener registered = holderListener(holder); + BackendStoreProvider provider = holderProvider(holder); + int refCount = holderRefCount(holder); + Assert.assertTrue(refCount >= 2); + + owner.close(); + this.cache = second; + + Assert.assertSame(holder, storeListeners.get(graphName)); + Assert.assertEquals(refCount - 1, holderRefCount(holder)); + Assert.assertTrue(provider.storeEventHub() + .listeners(EventHub.ANY_EVENT) + .contains(registered)); + + second.addVertex(this.newVertex(IdGenerator.of(1))); + second.addVertex(this.newVertex(IdGenerator.of(2))); + second.commit(); + Assert.assertTrue(second.queryVertices(IdGenerator.of(1)).hasNext()); + Assert.assertTrue(second.queryVertices(IdGenerator.of(2)).hasNext()); + Assert.assertEquals(2L, + Whitebox.invoke(second, "verticesCache", "size")); + + // Owner is closed first; the surviving transaction must still observe + // the store event through the ref-counted provider listener and clear + // its cache. + provider.storeEventHub().notify(Events.STORE_CLEAR, provider).get(); + + Assert.assertEquals(0L, + Whitebox.invoke(second, "verticesCache", "size")); + } + + @Test + public void testReopenGraphReRegistersStoreListener() throws Exception { + ConcurrentMap storeListeners = storeEventListeners(); + String graphName = this.params.spaceGraphName(); + CachedGraphTransaction owner = this.cache(); + + Object holder = storeListeners.get(graphName); + Assert.assertNotNull(holder); + EventListener registered = holderListener(holder); + BackendStoreProvider provider = holderProvider(holder); + + owner.close(); + this.cache = null; + this.params.graphTransaction().close(); + + // Last close drops the registry entry and unregisters the listener. + Assert.assertFalse(storeListeners.containsKey(graphName)); + Assert.assertFalse(provider.storeEventHub() + .listeners(EventHub.ANY_EVENT) + .contains(registered)); + + this.graph.clearBackend(); + this.graph.close(); + this.graph = null; + + HugeGraph reopened = HugeFactory.open(FakeObjects.newConfig()); + this.graph = reopened; + this.params = Whitebox.getInternalState(reopened, "params"); + CachedGraphTransaction third = new CachedGraphTransaction( + this.params, this.params.loadGraphStore()); + this.cache = third; + + // Reopen registers a fresh holder for the same graph name, and its + // listener is wired to the store provider again (no stale leftover). + // The provider instance is pooled and reused across reopen, so the + // provider-replacement branch is not exercised here. + Object reopenedHolder = storeListeners.get(graphName); + Assert.assertNotNull(reopenedHolder); + Assert.assertNotSame(holder, reopenedHolder); + EventListener reopenedListener = holderListener(reopenedHolder); + Assert.assertNotSame(registered, reopenedListener); + Assert.assertTrue(holderProvider(reopenedHolder).storeEventHub() + .listeners(EventHub.ANY_EVENT) + .contains(reopenedListener)); + } + @Test public void testLastCloseRemovesGraphCacheListener() throws Exception { ConcurrentMap cacheListeners =