Skip to content

Commit 2b85ced

Browse files
committed
[#2651] Implemented Reactive versions of CollectionLockingAction
1 parent f221bb3 commit 2b85ced

File tree

2 files changed

+159
-1
lines changed

2 files changed

+159
-1
lines changed

hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/ReactiveJdbcSelectWithActions.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
package org.hibernate.reactive.sql.exec.internal;
77

88
import org.hibernate.LockOptions;
9+
import org.hibernate.Locking;
910
import org.hibernate.dialect.lock.spi.LockTimeoutType;
1011
import org.hibernate.dialect.lock.spi.LockingSupport;
1112
import org.hibernate.internal.util.collections.CollectionHelper;
1213
import org.hibernate.reactive.pool.ReactiveConnection;
14+
import org.hibernate.reactive.sql.exec.internal.lock.ReactiveCollectionLockingAction;
1315
import org.hibernate.reactive.sql.exec.internal.lock.ReactiveConnectionLockTimeoutStrategyBuilder;
1416
import org.hibernate.reactive.sql.exec.internal.lock.ReactiveFollowOnLockingAction;
1517
import org.hibernate.reactive.sql.exec.internal.lock.ReactiveLockTimeoutHandler;
@@ -173,7 +175,9 @@ public JdbcSelect build() {
173175
if ( isFollonOnLockStrategy ) {
174176
ReactiveFollowOnLockingAction.apply( lockOptions, lockingTarget, lockingClauseStrategy, this );
175177
}
176-
178+
else if ( lockOptions.getScope() == Locking.Scope.INCLUDE_COLLECTIONS ) {
179+
ReactiveCollectionLockingAction.apply( lockOptions, lockingTarget, this );
180+
}
177181
if ( preActions == null && postActions == null ) {
178182
assert loadedValuesCollector == null;
179183
return primaryAction;
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/* Hibernate, Relational Persistence for Idiomatic Java
2+
*
3+
* SPDX-License-Identifier: Apache-2.0
4+
* Copyright: Red Hat Inc. and Hibernate Authors
5+
*/
6+
package org.hibernate.reactive.sql.exec.internal.lock;
7+
8+
import org.hibernate.LockMode;
9+
import org.hibernate.LockOptions;
10+
import org.hibernate.Locking;
11+
import org.hibernate.engine.spi.EffectiveEntityGraph;
12+
import org.hibernate.engine.spi.EntityKey;
13+
import org.hibernate.engine.spi.SharedSessionContractImplementor;
14+
import org.hibernate.graph.GraphSemantic;
15+
import org.hibernate.metamodel.mapping.EntityMappingType;
16+
import org.hibernate.metamodel.mapping.PluralAttributeMapping;
17+
import org.hibernate.query.sqm.mutation.internal.SqmMutationStrategyHelper;
18+
import org.hibernate.reactive.pool.ReactiveConnection;
19+
import org.hibernate.reactive.sql.exec.spi.ReactivePostAction;
20+
import org.hibernate.reactive.util.impl.CompletionStages;
21+
import org.hibernate.sql.ast.tree.select.QuerySpec;
22+
import org.hibernate.sql.exec.internal.lock.CollectionLockingAction;
23+
import org.hibernate.sql.exec.internal.lock.EntityDetails;
24+
import org.hibernate.sql.exec.internal.lock.LockingHelper;
25+
import org.hibernate.sql.exec.spi.ExecutionContext;
26+
import org.hibernate.sql.exec.spi.JdbcSelectWithActionsBuilder;
27+
28+
import jakarta.persistence.Timeout;
29+
import java.util.ArrayList;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.concurrent.CompletionStage;
33+
import java.util.function.Supplier;
34+
35+
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
36+
import static org.hibernate.sql.exec.SqlExecLogger.SQL_EXEC_LOGGER;
37+
38+
/**
39+
* Reactive version of {@link CollectionLockingAction}
40+
*/
41+
public class ReactiveCollectionLockingAction extends CollectionLockingAction implements ReactivePostAction {
42+
43+
protected ReactiveCollectionLockingAction(
44+
LoadedValuesCollectorImpl loadedValuesCollector,
45+
LockMode lockMode,
46+
Timeout lockTimeout) {
47+
super( loadedValuesCollector, lockMode, lockTimeout );
48+
}
49+
50+
51+
public static void apply(
52+
LockOptions lockOptions,
53+
QuerySpec lockingTarget,
54+
JdbcSelectWithActionsBuilder jdbcSelectBuilder) {
55+
assert lockOptions.getScope() == Locking.Scope.INCLUDE_COLLECTIONS;
56+
57+
final var loadedValuesCollector = resolveLoadedValuesCollector( lockingTarget.getFromClause() );
58+
59+
// NOTE: we need to set this separately so that it can get incorporated into
60+
// the JdbcValuesSourceProcessingState for proper callbacks
61+
jdbcSelectBuilder.setLoadedValuesCollector( loadedValuesCollector );
62+
63+
// additionally, add a post-action which uses the collected values.
64+
jdbcSelectBuilder.appendPostAction( new ReactiveCollectionLockingAction(
65+
loadedValuesCollector,
66+
lockOptions.getLockMode(),
67+
lockOptions.getTimeout()
68+
) );
69+
}
70+
71+
@Override
72+
public CompletionStage<Void> reactivePerformReactivePostAction(
73+
ReactiveConnection jdbcConnection,
74+
ExecutionContext executionContext) {
75+
LockingHelper.logLoadedValues( loadedValuesCollector );
76+
77+
final var session = executionContext.getSession();
78+
79+
// NOTE: we deal with effective graphs here to make sure embedded associations are treated as lazy
80+
final var effectiveEntityGraph = session.getLoadQueryInfluencers().getEffectiveEntityGraph();
81+
final var initialGraph = effectiveEntityGraph.getGraph();
82+
final var initialSemantic = effectiveEntityGraph.getSemantic();
83+
84+
// collect registrations by entity type
85+
final var entitySegments = segmentLoadedValues( loadedValuesCollector );
86+
87+
try {
88+
// for each entity-type, prepare a locking select statement per table.
89+
// this is based on the attributes for "state array" ordering purposes -
90+
// we match each attribute to the table it is mapped to and add it to
91+
// the select-list for that table-segment.
92+
CompletionStage<Void> loop = voidFuture();
93+
94+
for ( Map.Entry<EntityMappingType, List<EntityKey>> entry : entitySegments.entrySet() ) {
95+
loop = loop.thenCompose( v -> execute( executionContext, entry, session, effectiveEntityGraph ) );
96+
}
97+
return loop;
98+
}
99+
finally {
100+
// reset the effective graph to whatever it was when we started
101+
effectiveEntityGraph.clear();
102+
session.getLoadQueryInfluencers().applyEntityGraph( initialGraph, initialSemantic );
103+
}
104+
}
105+
106+
private CompletionStage<Void> execute(
107+
ExecutionContext executionContext,
108+
Map.Entry<EntityMappingType, List<EntityKey>> entry,
109+
SharedSessionContractImplementor session,
110+
EffectiveEntityGraph effectiveEntityGraph) {
111+
EntityMappingType entityMappingType = entry.getKey();
112+
List<EntityKey> entityKeys = entry.getValue();
113+
if ( SQL_EXEC_LOGGER.isDebugEnabled() ) {
114+
SQL_EXEC_LOGGER.startingIncludeCollectionsLockingProcess( entityMappingType.getEntityName() );
115+
}
116+
117+
// apply an empty "fetch graph" to make sure any embedded associations reachable from
118+
// any of the DomainResults we will create are treated as lazy
119+
final var graph = entityMappingType.createRootGraph( session );
120+
effectiveEntityGraph.clear();
121+
effectiveEntityGraph.applyGraph( graph, GraphSemantic.FETCH );
122+
123+
// create a cross-reference of information related to an entity based on its identifier.
124+
// we use this as the collection owners whose collections need to be locked
125+
final var entityDetailsMap = LockingHelper.resolveEntityKeys( entityKeys, executionContext );
126+
127+
final List<Supplier<CompletionStage<Void>>> suppliers = new ArrayList<>();
128+
SqmMutationStrategyHelper.visitCollectionTables(
129+
entityMappingType, (attribute) -> {
130+
// we may need to lock the "collection table".
131+
// the conditions are a bit unclear as to directionality, etc., so for now lock each.
132+
suppliers.add( () -> reactiveLockCollectionTable(
133+
attribute,
134+
lockMode,
135+
lockTimeout,
136+
entityDetailsMap,
137+
executionContext
138+
) );
139+
}
140+
);
141+
return CompletionStages.loop( suppliers, Supplier::get );
142+
}
143+
144+
public static CompletionStage<Void> reactiveLockCollectionTable(
145+
PluralAttributeMapping attributeMapping,
146+
LockMode lockMode,
147+
Timeout lockTimeout,
148+
Map<Object, EntityDetails> ownerDetailsMap,
149+
ExecutionContext executionContext) {
150+
/// execute query
151+
return voidFuture();
152+
}
153+
154+
}

0 commit comments

Comments
 (0)