Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hugegraph.backend.query.QueryResults;
import org.apache.hugegraph.backend.store.BackendMutation;
import org.apache.hugegraph.backend.store.BackendStore;
import org.apache.hugegraph.backend.store.BackendStoreProvider;
import org.apache.hugegraph.backend.store.ram.RamTable;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.config.CoreOptions;
Expand Down Expand Up @@ -70,12 +71,22 @@ public final class CachedGraphTransaction extends GraphTransaction {
private static final ConcurrentMap<String, CacheListenerHolder>
GRAPH_CACHE_EVENT_LISTENERS = new ConcurrentHashMap<>();

/*
* Same ref-counted lifecycle for the store event listener registered
* on the BackendStoreProvider; see StoreListenerHolder.
*
* Replaces the removed protected static storeEventListenStatus field
* that previously tracked store-listen state on GraphTransaction.
*/
private static final ConcurrentMap<String, StoreListenerHolder>
STORE_EVENT_LISTENERS = new ConcurrentHashMap<>();

private final Cache<Id, Object> verticesCache;
private final Cache<Id, Object> edgesCache;

private EventListener storeEventListener;
private EventListener cacheEventListener;
private CacheListenerHolder holder;
private StoreListenerHolder storeHolder;

public CachedGraphTransaction(HugeGraphParams graph, BackendStore store) {
super(graph, store);
Expand Down Expand Up @@ -135,7 +146,7 @@ private void listenChanges() {
Set<String> storeEvents = ImmutableSet.of(Events.STORE_INIT,
Events.STORE_CLEAR,
Events.STORE_TRUNCATE);
this.storeEventListener = event -> {
EventListener storeListener = event -> {
if (storeEvents.contains(event.name())) {
LOG.debug("Graph {} clear graph cache on event '{}'",
this.graph(), event.name());
Expand All @@ -144,9 +155,24 @@ private void listenChanges() {
}
return false;
};
if (storeEventListenStatus.putIfAbsent(this.params().spaceGraphName(), true) == null) {
this.store().provider().listen(this.storeEventListener);
}
BackendStoreProvider provider = this.store().provider();
String graphName = this.params().spaceGraphName();
StoreListenerHolder storeAcquired = STORE_EVENT_LISTENERS.compute(
graphName, (key, existing) -> {
if (existing == null || existing.provider != provider) {
// Graph close/reopen creates a new provider for the
// same graph name; replace the stale holder. Old
// transactions skip decrement via identity check.
if (existing != null) {
existing.provider.unlisten(existing.listener);
}
provider.listen(storeListener);
return new StoreListenerHolder(storeListener, provider);
}
existing.refCount++;
return existing;
});
this.storeHolder = storeAcquired;

// Listen cache event: "cache"(invalid cache item)
EventListener listener = event -> {
Expand Down Expand Up @@ -196,7 +222,6 @@ private void listenChanges() {
return false;
};
EventHub graphEventHub = this.params().graphEventHub();
String graphName = this.params().spaceGraphName();
CacheListenerHolder acquired = GRAPH_CACHE_EVENT_LISTENERS.compute(
graphName, (key, existing) -> {
if (existing == null || existing.hub != graphEventHub) {
Expand Down Expand Up @@ -235,14 +260,20 @@ private void unlistenChanges() {
this.holder = null;
this.cacheEventListener = null;
}
// TODO (follow-up): storeEventListenStatus has the same owner-first
// close bug this PR fixes for GRAPH_CACHE_EVENT_LISTENERS. A non-owner
// transaction can remove the tracking entry, unlisten its own
// never-registered storeEventListener as a no-op, and leave the
// original store listener registered but untracked. Apply the same
// ref-counted holder pattern in a follow-up PR.
if (storeEventListenStatus.remove(graphName) != null) {
this.store().provider().unlisten(this.storeEventListener);
StoreListenerHolder storeOurs = this.storeHolder;
if (storeOurs != null) {
STORE_EVENT_LISTENERS.compute(graphName, (key, existing) -> {
if (existing == null || existing != storeOurs) {
return existing;
}
existing.refCount--;
if (existing.refCount == 0) {
existing.provider.unlisten(existing.listener);
return null;
}
return existing;
});
this.storeHolder = null;
}
}

Expand Down Expand Up @@ -476,4 +507,27 @@ public void removeIndex(IndexLabel indexLabel) {
}
}
}

/*
* Listener lifetime must cover all active transactions for the graph.
* The holder is removed from the registry and unregistered from the
* BackendStoreProvider only when the last transaction releases it.
* Mirror of CacheListenerHolder for the store event path.
*/
private static final class StoreListenerHolder {

final EventListener listener;
final BackendStoreProvider provider;
// Must only be read or written inside ConcurrentMap.compute() for the
// enclosing registry; ConcurrentHashMap.compute() serialises per-key
// access.
int refCount;

StoreListenerHolder(EventListener listener,
BackendStoreProvider provider) {
this.listener = listener;
this.provider = provider;
this.refCount = 1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -140,8 +139,6 @@ public class GraphTransaction extends IndexableTransaction {

private final int verticesCapacity;
private final int edgesCapacity;
Comment thread
dpol1 marked this conversation as resolved.
protected static final ConcurrentHashMap<String, Boolean> storeEventListenStatus =
new ConcurrentHashMap<>();

public GraphTransaction(HugeGraphParams graph, BackendStore store) {
super(graph, store);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
import org.apache.hugegraph.backend.cache.CachedGraphTransaction;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.id.IdGenerator;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.backend.store.BackendStoreProvider;
import org.apache.hugegraph.event.EventHub;
import org.apache.hugegraph.event.EventListener;
import org.apache.hugegraph.schema.VertexLabel;
import org.apache.hugegraph.structure.HugeEdge;
Expand Down Expand Up @@ -93,20 +94,13 @@ private static ConcurrentMap<String, Object> graphCacheEventListeners()
}

@SuppressWarnings("unchecked")
private static ConcurrentMap<String, Boolean> storeEventListenStatus()
private static ConcurrentMap<String, Object> storeEventListeners()
throws Exception {
Field field = GraphTransaction.class
.getDeclaredField("storeEventListenStatus");
Field field = CachedGraphTransaction.class
.getDeclaredField(
"STORE_EVENT_LISTENERS");
field.setAccessible(true);
return (ConcurrentMap<String, Boolean>) field.get(null);
}

private static void restoreStoreListenerStatusForKnownTeardownBug(
ConcurrentMap<String, Boolean> storeListeners, String graphName) {
// Closing a secondary transaction can consume storeEventListenStatus due
// to the follow-up bug documented in CachedGraphTransaction.unlistenChanges().
// Restore it so teardown can still unregister the primary store listener.
storeListeners.putIfAbsent(graphName, true);
return (ConcurrentMap<String, Object>) field.get(null);
}

private static EventListener holderListener(Object holder) {
Expand All @@ -118,6 +112,10 @@ private static int holderRefCount(Object holder) {
return refCount;
}

private static BackendStoreProvider holderProvider(Object holder) {
return Whitebox.getInternalState(holder, "provider");
}

private HugeVertex newVertex(Id id) {
HugeGraph graph = this.cache().graph();
graph.schema().propertyKey("name").asText()
Expand Down Expand Up @@ -251,8 +249,6 @@ public void testClosingNonOwnerKeepsGraphCacheListenerRegistered()
throws Exception {
ConcurrentMap<String, Object> cacheListeners =
graphCacheEventListeners();
ConcurrentMap<String, Boolean> storeListeners =
storeEventListenStatus();

String graphName = this.params.spaceGraphName();
Object holder = cacheListeners.get(graphName);
Expand All @@ -265,18 +261,58 @@ public void testClosingNonOwnerKeepsGraphCacheListenerRegistered()
Assert.assertSame(holder, cacheListeners.get(graphName));
Assert.assertEquals(refCount + 1, holderRefCount(holder));

try {
second.close();
second.close();

Assert.assertSame(holder, cacheListeners.get(graphName));
Assert.assertEquals(refCount, holderRefCount(holder));
Assert.assertTrue(this.params.graphEventHub()
.listeners(Events.CACHE)
.contains(registered));
} finally {
restoreStoreListenerStatusForKnownTeardownBug(storeListeners,
graphName);
}
Assert.assertSame(holder, cacheListeners.get(graphName));
Assert.assertEquals(refCount, holderRefCount(holder));
Assert.assertTrue(this.params.graphEventHub()
.listeners(Events.CACHE)
.contains(registered));
}

@Test
public void testClosingNonOwnerKeepsStoreListenerRegistered()
throws Exception {
ConcurrentMap<String, Object> storeListeners = storeEventListeners();

String graphName = this.params.spaceGraphName();
Object holder = storeListeners.get(graphName);
Assert.assertNotNull(holder);
EventListener registered = holderListener(holder);
int refCount = holderRefCount(holder);

CachedGraphTransaction second = new CachedGraphTransaction(
this.params, this.params.loadGraphStore());
Assert.assertSame(holder, storeListeners.get(graphName));
Assert.assertEquals(refCount + 1, holderRefCount(holder));

second.close();

// Non-owner close must decrement the refcount, not drop the entry
Assert.assertSame(holder, storeListeners.get(graphName));
Assert.assertEquals(refCount, holderRefCount(holder));
Assert.assertSame(registered, holderListener(holder));
}

@Test
public void testLastCloseRemovesStoreListener() throws Exception {
Comment thread
dpol1 marked this conversation as resolved.
ConcurrentMap<String, Object> storeListeners = storeEventListeners();

String graphName = this.params.spaceGraphName();
CachedGraphTransaction owner = this.cache();
CachedGraphTransaction second = new CachedGraphTransaction(
this.params, this.params.loadGraphStore());

Object holder = storeListeners.get(graphName);
Assert.assertNotNull(holder);
Assert.assertTrue(holderRefCount(holder) >= 2);

owner.close();
second.close();
this.cache = null;
this.params.graphTransaction().close();

Assert.assertFalse(storeListeners.containsKey(graphName));
}

@Test
Expand Down Expand Up @@ -319,6 +355,93 @@ public void testCacheListenerSurvivesOwnerClose() throws Exception {
Whitebox.invoke(second, "verticesCache", "size"));
}

@Test
public void testStoreListenerSurvivesOwnerClose() throws Exception {
ConcurrentMap<String, Object> storeListeners = storeEventListeners();
String graphName = this.params.spaceGraphName();
CachedGraphTransaction owner = this.cache();
CachedGraphTransaction second = new CachedGraphTransaction(
this.params, this.params.loadGraphStore());

Object holder = storeListeners.get(graphName);
Assert.assertNotNull(holder);
EventListener registered = holderListener(holder);
BackendStoreProvider provider = holderProvider(holder);
int refCount = holderRefCount(holder);
Assert.assertTrue(refCount >= 2);

owner.close();
this.cache = second;

Assert.assertSame(holder, storeListeners.get(graphName));
Assert.assertEquals(refCount - 1, holderRefCount(holder));
Assert.assertTrue(provider.storeEventHub()
.listeners(EventHub.ANY_EVENT)
.contains(registered));

second.addVertex(this.newVertex(IdGenerator.of(1)));
second.addVertex(this.newVertex(IdGenerator.of(2)));
second.commit();
Assert.assertTrue(second.queryVertices(IdGenerator.of(1)).hasNext());
Assert.assertTrue(second.queryVertices(IdGenerator.of(2)).hasNext());
Assert.assertEquals(2L,
Whitebox.invoke(second, "verticesCache", "size"));

// Owner is closed first; the surviving transaction must still observe
// the store event through the ref-counted provider listener and clear
// its cache.
provider.storeEventHub().notify(Events.STORE_CLEAR, provider).get();

Assert.assertEquals(0L,
Whitebox.invoke(second, "verticesCache", "size"));
}

@Test
public void testReopenGraphReRegistersStoreListener() throws Exception {
ConcurrentMap<String, Object> storeListeners = storeEventListeners();
String graphName = this.params.spaceGraphName();
CachedGraphTransaction owner = this.cache();

Object holder = storeListeners.get(graphName);
Assert.assertNotNull(holder);
EventListener registered = holderListener(holder);
BackendStoreProvider provider = holderProvider(holder);

owner.close();
this.cache = null;
this.params.graphTransaction().close();

// Last close drops the registry entry and unregisters the listener.
Assert.assertFalse(storeListeners.containsKey(graphName));
Assert.assertFalse(provider.storeEventHub()
.listeners(EventHub.ANY_EVENT)
.contains(registered));

this.graph.clearBackend();
this.graph.close();
this.graph = null;

HugeGraph reopened = HugeFactory.open(FakeObjects.newConfig());
this.graph = reopened;
this.params = Whitebox.getInternalState(reopened, "params");
CachedGraphTransaction third = new CachedGraphTransaction(
this.params, this.params.loadGraphStore());
this.cache = third;

// Reopen registers a fresh holder for the same graph name, and its
// listener is wired to the store provider again (no stale leftover).
// The provider instance is pooled and reused across reopen, so the
// provider-replacement branch is not exercised here.
Object reopenedHolder = storeListeners.get(graphName);
Assert.assertNotNull(reopenedHolder);
Assert.assertNotSame(holder, reopenedHolder);
EventListener reopenedListener = holderListener(reopenedHolder);
Assert.assertNotSame(registered, reopenedListener);
Assert.assertTrue(holderProvider(reopenedHolder).storeEventHub()
.listeners(EventHub.ANY_EVENT)
.contains(reopenedListener));
}

@Test
public void testLastCloseRemovesGraphCacheListener() throws Exception {
ConcurrentMap<String, Object> cacheListeners =
Expand Down
Loading