Skip to content

Commit b77133e

Browse files
committed
update DefaultReactiveListeners
1 parent b1f63b6 commit b77133e

17 files changed

+1182
-938
lines changed

hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveCollectionRemoveAction.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,23 @@ public ReactiveCollectionRemoveAction(
4949
this.affectedOwner = session.getPersistenceContextInternal().getLoadedCollectionOwnerOrNull( collection );
5050
}
5151

52+
/**
53+
* Removes a persistent collection for an unloaded proxy.
54+
*
55+
* Use this constructor when the owning entity is has not been loaded.
56+
* @param persister The collection's persister
57+
* @param id The collection key
58+
* @param session The session
59+
*/
60+
public ReactiveCollectionRemoveAction(
61+
final CollectionPersister persister,
62+
final Object id,
63+
final EventSource session) {
64+
super( persister, null, id, session );
65+
emptySnapshot = false;
66+
affectedOwner = null;
67+
}
68+
5269
@Override
5370
public CompletionStage<Void> reactiveExecute() {
5471
final Object key = getKey();

hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveEntityDeleteAction.java

Lines changed: 88 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.hibernate.action.internal.EntityDeleteAction;
1414
import org.hibernate.cache.spi.access.EntityDataAccess;
1515
import org.hibernate.engine.spi.EntityEntry;
16+
import org.hibernate.engine.spi.EntityKey;
1617
import org.hibernate.engine.spi.PersistenceContext;
1718
import org.hibernate.engine.spi.SharedSessionContractImplementor;
1819
import org.hibernate.event.spi.EventSource;
@@ -43,71 +44,119 @@ public ReactiveEntityDeleteAction(
4344
super( id, state, version, instance, persister, isCascadeDeleteEnabled, session );
4445
}
4546

47+
public ReactiveEntityDeleteAction(Object id, EntityPersister persister, EventSource session) {
48+
super( id, persister, session );
49+
}
50+
4651
@Override
4752
public void execute() throws HibernateException {
4853
throw LOG.nonReactiveMethodCall( "reactiveExecute" );
4954
}
5055

56+
private boolean isInstanceLoaded() {
57+
// A null instance signals that we're deleting an unloaded proxy.
58+
return getInstance() != null;
59+
}
60+
5161
@Override
5262
public CompletionStage<Void> reactiveExecute() throws HibernateException {
5363
final Object id = getId();
64+
final Object version = getCurrentVersion();
5465
final EntityPersister persister = getPersister();
5566
final SharedSessionContractImplementor session = getSession();
5667
final Object instance = getInstance();
5768

58-
final boolean veto = preDelete();
69+
final boolean veto = isInstanceLoaded() && preDelete();
5970

60-
Object version = getVersion();
61-
if ( persister.isVersionPropertyGenerated() ) {
62-
// we need to grab the version value from the entity, otherwise
63-
// we have issues with generated-version entities that may have
64-
// multiple actions queued during the same flush
65-
version = persister.getVersion( instance );
66-
}
71+
final Object ck = lockCacheItem();
6772

68-
final Object ck;
69-
if ( persister.canWriteToCache() ) {
70-
final EntityDataAccess cache = persister.getCacheAccessStrategy();
71-
ck = cache.generateCacheKey( id, persister, session.getFactory(), session.getTenantIdentifier() );
72-
setLock( cache.lockItem( session, ck, version ) );
73-
}
74-
else {
75-
ck = null;
76-
}
77-
78-
CompletionStage<Void> deleteStep = !isCascadeDeleteEnabled() && !veto
73+
final CompletionStage<Void> deleteStep = !isCascadeDeleteEnabled() && !veto
7974
? ( (ReactiveEntityPersister) persister ).deleteReactive( id, version, instance, session )
8075
: voidFuture();
8176

8277
return deleteStep.thenAccept( v -> {
83-
//postDelete:
84-
// After actually deleting a row, record the fact that the instance no longer
85-
// exists on the database (needed for identity-column key generation), and
86-
// remove it from the session cache
87-
final PersistenceContext persistenceContext = session.getPersistenceContextInternal();
88-
final EntityEntry entry = persistenceContext.removeEntry( instance );
89-
if ( entry == null ) {
90-
throw new AssertionFailure( "possible non-threadsafe access to session" );
78+
if ( isInstanceLoaded() ) {
79+
postDeleteLoaded( id, persister, session, instance, ck );
9180
}
92-
entry.postDelete();
93-
94-
persistenceContext.removeEntity( entry.getEntityKey() );
95-
persistenceContext.removeProxy( entry.getEntityKey() );
96-
97-
if ( persister.canWriteToCache() ) {
98-
persister.getCacheAccessStrategy().remove( session, ck );
81+
else {
82+
// we're deleting an unloaded proxy
83+
postDeleteUnloaded( id, persister, session, ck );
9984
}
10085

101-
persistenceContext.getNaturalIdResolutions()
102-
.removeSharedResolution( id, getNaturalIdValues(), persister );
103-
104-
postDelete();
105-
10686
final StatisticsImplementor statistics = getSession().getFactory().getStatistics();
10787
if ( statistics.isStatisticsEnabled() && !veto ) {
10888
statistics.deleteEntity( getPersister().getEntityName() );
10989
}
11090
} );
11191
}
11292

93+
//TODO: copy/paste from superclass (make it protected!)
94+
private Object getCurrentVersion() {
95+
return getPersister().isVersionPropertyGenerated()
96+
// skip if we're deleting an unloaded proxy, no need for the version
97+
&& isInstanceLoaded()
98+
// we need to grab the version value from the entity, otherwise
99+
// we have issues with generated-version entities that may have
100+
// multiple actions queued during the same flush
101+
? getPersister().getVersion( getInstance() )
102+
: getVersion();
103+
}
104+
105+
//TODO: copy/paste of postDeleteLoaded() from superclass (make it protected!)
106+
private void postDeleteLoaded(
107+
Object id,
108+
EntityPersister persister,
109+
SharedSessionContractImplementor session,
110+
Object instance,
111+
Object ck) {
112+
// After actually deleting a row, record the fact that the instance no longer
113+
// exists on the database (needed for identity-column key generation), and
114+
// remove it from the session cache
115+
final PersistenceContext persistenceContext = session.getPersistenceContextInternal();
116+
final EntityEntry entry = persistenceContext.removeEntry(instance);
117+
if ( entry == null ) {
118+
throw new AssertionFailure( "possible non-threadsafe access to session" );
119+
}
120+
entry.postDelete();
121+
final EntityKey key = entry.getEntityKey();
122+
persistenceContext.removeEntity( key );
123+
persistenceContext.removeProxy( key );
124+
removeCacheItem( ck );
125+
persistenceContext.getNaturalIdResolutions().removeSharedResolution( id, getNaturalIdValues(), persister );
126+
postDelete();
127+
}
128+
129+
//TODO: copy/paste of postDeleteUnloaded() from superclass (make it protected!)
130+
private void postDeleteUnloaded(Object id, EntityPersister persister, SharedSessionContractImplementor session, Object ck) {
131+
final PersistenceContext persistenceContext = session.getPersistenceContextInternal();
132+
final EntityKey key = session.generateEntityKey( id, persister );
133+
if ( !persistenceContext.containsDeletedUnloadedEntityKey( key ) ) {
134+
throw new AssertionFailure( "deleted proxy should be for an unloaded entity: " + key );
135+
}
136+
persistenceContext.removeProxy( key );
137+
removeCacheItem( ck );
138+
}
139+
140+
//TODO: copy/paste from superclass (make it protected!)
141+
private Object lockCacheItem() {
142+
final EntityPersister persister = getPersister();
143+
if ( persister.canWriteToCache() ) {
144+
final EntityDataAccess cache = persister.getCacheAccessStrategy();
145+
final SharedSessionContractImplementor session = getSession();
146+
final Object ck = cache.generateCacheKey( getId(), persister, session.getFactory(), session.getTenantIdentifier() );
147+
setLock( cache.lockItem( session, ck, getCurrentVersion() ) );
148+
return ck;
149+
}
150+
else {
151+
return null;
152+
}
153+
}
154+
155+
//TODO: copy/paste from superclass (make it protected!)
156+
private void removeCacheItem(Object ck) {
157+
final EntityPersister persister = getPersister();
158+
if ( persister.canWriteToCache() ) {
159+
persister.getCacheAccessStrategy().remove( getSession(), ck);
160+
}
161+
}
113162
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/AbstractReactiveFlushingEventListener.java

Lines changed: 47 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
package org.hibernate.reactive.event.impl;
77

88
import static org.hibernate.reactive.util.impl.CompletionStages.loop;
9-
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
109

1110
import java.lang.invoke.MethodHandles;
1211
import java.util.Map;
@@ -26,6 +25,7 @@
2625
import org.hibernate.event.spi.FlushEntityEventListener;
2726
import org.hibernate.event.spi.FlushEvent;
2827
import org.hibernate.event.spi.PersistContext;
28+
import org.hibernate.internal.util.EntityPrinter;
2929
import org.hibernate.persister.entity.EntityPersister;
3030
import org.hibernate.reactive.engine.ReactiveActionQueue;
3131
import org.hibernate.reactive.engine.impl.Cascade;
@@ -54,15 +54,13 @@ protected CompletionStage<Void> performExecutions(EventSource session) {
5454
// during-flush callbacks more leniency in regards to initializing proxies and
5555
// lazy collections during their processing.
5656
// For more information, see HHH-2763
57-
return voidFuture()
58-
.thenCompose( v -> {
59-
session.getJdbcCoordinator().flushBeginning();
60-
session.getPersistenceContext().setFlushing( true );
61-
// we need to lock the collection caches before executing entity inserts/updates in order to
62-
// account for bi-directional associations
63-
actionQueue( session ).prepareActions();
64-
return actionQueue( session ).executeActions();
65-
} )
57+
session.getJdbcCoordinator().flushBeginning();
58+
session.getPersistenceContext().setFlushing( true );
59+
// we need to lock the collection caches before executing entity inserts/updates
60+
// in order to account for bidirectional associations
61+
final ReactiveActionQueue actionQueue = actionQueue(session);
62+
actionQueue.prepareActions();
63+
return actionQueue.executeActions()
6664
.whenComplete( (v, x) -> {
6765
session.getPersistenceContext().setFlushing( false );
6866
session.getJdbcCoordinator().flushEnding();
@@ -85,12 +83,12 @@ protected CompletionStage<Void> flushEverythingToExecutions(FlushEvent event) th
8583

8684
LOG.trace( "Flushing session" );
8785

88-
EventSource session = event.getSession();
86+
final EventSource session = event.getSession();
8987

9088
final PersistenceContext persistenceContext = session.getPersistenceContextInternal();
9189
session.getInterceptor().preFlush( persistenceContext.managedEntitiesIterator() );
9290

93-
return prepareEntityFlushes(session, persistenceContext)
91+
return prepareEntityFlushes( session, persistenceContext )
9492
.thenAccept( v -> {
9593
// we could move this inside if we wanted to
9694
// tolerate collection initializations during
@@ -99,21 +97,46 @@ protected CompletionStage<Void> flushEverythingToExecutions(FlushEvent event) th
9997
// now, any collections that are initialized
10098
// inside this block do not get updated - they
10199
// are ignored until the next flush
102-
persistenceContext.setFlushing(true);
100+
persistenceContext.setFlushing( true );
103101
try {
104-
int entityCount = flushEntities(event, persistenceContext);
105-
int collectionCount = flushCollections(session, persistenceContext);
102+
int entityCount = flushEntities( event, persistenceContext );
103+
int collectionCount = flushCollections( session, persistenceContext );
106104

107105
event.setNumberOfEntitiesProcessed(entityCount);
108106
event.setNumberOfCollectionsProcessed(collectionCount);
109107
}
110108
finally {
111-
persistenceContext.setFlushing(false);
109+
persistenceContext.setFlushing( false );
112110
}
111+
112+
//some statistics
113+
logFlushResults( event );
113114
} );
115+
}
114116

115-
//some statistics
116-
// logFlushResults( event );
117+
protected void logFlushResults(FlushEvent event) {
118+
if ( !LOG.isDebugEnabled() ) {
119+
return;
120+
}
121+
final EventSource session = event.getSession();
122+
final PersistenceContext persistenceContext = session.getPersistenceContextInternal();
123+
LOG.debugf(
124+
"Flushed: %s insertions, %s updates, %s deletions to %s objects",
125+
session.getActionQueue().numberOfInsertions(),
126+
session.getActionQueue().numberOfUpdates(),
127+
session.getActionQueue().numberOfDeletions(),
128+
persistenceContext.getNumberOfManagedEntities()
129+
);
130+
LOG.debugf(
131+
"Flushed: %s (re)creations, %s updates, %s removals to %s collections",
132+
session.getActionQueue().numberOfCollectionCreations(),
133+
session.getActionQueue().numberOfCollectionUpdates(),
134+
session.getActionQueue().numberOfCollectionRemovals(),
135+
persistenceContext.getCollectionEntriesSize()
136+
);
137+
new EntityPrinter( session.getFactory() ).toString(
138+
persistenceContext.getEntitiesByKey().entrySet()
139+
);
117140
}
118141

119142
/**
@@ -125,17 +148,17 @@ private CompletionStage<Void> prepareEntityFlushes(EventSource session, Persiste
125148

126149
LOG.debug( "Processing flush-time cascades" );
127150

128-
PersistContext context = PersistContext.create();
151+
final PersistContext context = PersistContext.create();
129152
//safe from concurrent modification because of how concurrentEntries() is implemented on IdentityMap
130-
Map.Entry<Object, EntityEntry>[] entries = persistenceContext.reentrantSafeEntityEntries();
153+
final Map.Entry<Object, EntityEntry>[] entries = persistenceContext.reentrantSafeEntityEntries();
131154
return loop(
132155
entries,
133156
index -> flushable( entries[index].getValue() ),
134157
index -> cascadeOnFlush( session, entries[index].getValue().getPersister(), entries[index].getKey(), context ) );
135158
}
136159

137160
private static boolean flushable(EntityEntry entry) {
138-
Status status = entry.getStatus();
161+
final Status status = entry.getStatus();
139162
return status == Status.MANAGED
140163
|| status == Status.SAVING
141164
|| status == Status.READ_ONLY;
@@ -183,8 +206,8 @@ private int flushEntities(final FlushEvent event, final PersistenceContext persi
183206

184207
// Update the status of the object and if necessary, schedule an update
185208

186-
EntityEntry entry = me.getValue();
187-
Status status = entry.getStatus();
209+
final EntityEntry entry = me.getValue();
210+
final Status status = entry.getStatus();
188211

189212
if ( status != Status.LOADING && status != Status.GONE ) {
190213
final FlushEntityEvent entityEvent = new FlushEntityEvent( source, me.getKey(), entry );
@@ -316,7 +339,7 @@ protected void postFlush(SessionImplementor session) throws HibernateException {
316339
}
317340
else {
318341
//otherwise recreate the mapping between the collection and its key
319-
CollectionKey collectionKey = new CollectionKey(
342+
final CollectionKey collectionKey = new CollectionKey(
320343
collectionEntry.getLoadedPersister(),
321344
collectionEntry.getLoadedKey()
322345
);

0 commit comments

Comments
 (0)