2020import java .util .concurrent .CompletionStage ;
2121import java .util .function .Consumer ;
2222import java .util .function .Function ;
23+ import java .util .function .Supplier ;
2324import java .util .stream .Collectors ;
2425import java .util .stream .StreamSupport ;
2526
@@ -124,7 +125,7 @@ public class AsyncCassandraTemplate
124125
125126 private final StatementFactory statementFactory ;
126127
127- private @ Nullable ApplicationEventPublisher eventPublisher ;
128+ private final EntityLifecycleEventDelegate eventDelegate ;
128129
129130 private @ Nullable EntityCallbacks entityCallbacks ;
130131
@@ -190,11 +191,12 @@ public AsyncCassandraTemplate(AsyncCqlTemplate asyncCqlTemplate, CassandraConver
190191 this .entityOperations = new EntityOperations (converter );
191192 this .exceptionTranslator = asyncCqlTemplate .getExceptionTranslator ();
192193 this .statementFactory = new StatementFactory (converter );
194+ this .eventDelegate = new EntityLifecycleEventDelegate ();
193195 }
194196
195197 @ Override
196198 public void setApplicationEventPublisher (ApplicationEventPublisher applicationEventPublisher ) {
197- this .eventPublisher = applicationEventPublisher ;
199+ this .eventDelegate . setPublisher ( applicationEventPublisher ) ;
198200 }
199201
200202 @ Override
@@ -214,6 +216,18 @@ public void setEntityCallbacks(@Nullable EntityCallbacks entityCallbacks) {
214216 this .entityCallbacks = entityCallbacks ;
215217 }
216218
219+ /**
220+ * Configure whether lifecycle events such as {@link AfterLoadEvent}, {@link BeforeSaveEvent}, etc. should be
221+ * published or whether emission should be suppressed. Enabled by default.
222+ *
223+ * @param enabled {@code true} to enable entity lifecycle events; {@code false} to disable entity lifecycle events.
224+ * @since 4.0
225+ * @see CassandraMappingEvent
226+ */
227+ public void setEntityLifecycleEventsEnabled (boolean enabled ) {
228+ this .eventDelegate .setEventsEnabled (enabled );
229+ }
230+
217231 @ Override
218232 public AsyncCqlOperations getAsyncCqlOperations () {
219233 return this .cqlOperations ;
@@ -456,11 +470,12 @@ private ListenableFuture<Boolean> doDelete(Query query, Class<?> entityClass, Cq
456470 tableName );
457471 SimpleStatement delete = builder .build ();
458472
459- maybeEmitEvent (new BeforeDeleteEvent <>(delete , entityClass , tableName ));
473+ maybeEmitEvent (() -> new BeforeDeleteEvent <>(delete , entityClass , tableName ));
460474
461475 ListenableFuture <Boolean > future = doExecute (delete , AsyncResultSet ::wasApplied );
462476
463- future .addCallback (success -> maybeEmitEvent (new AfterDeleteEvent <>(delete , entityClass , tableName )), e -> {});
477+ future .addCallback (success -> maybeEmitEvent (() -> new AfterDeleteEvent <>(delete , entityClass , tableName )),
478+ e -> {});
464479
465480 return future ;
466481 }
@@ -677,8 +692,8 @@ private ListenableFuture<WriteResult> doDeleteVersioned(Object entity, QueryOpti
677692
678693 if (!result .wasApplied ()) {
679694 throw new OptimisticLockingFailureException (
680- String .format ("Cannot delete entity %s with version %s in table %s; Has it been modified meanwhile" ,
681- entity , source .getVersion (), tableName ));
695+ String .format ("Cannot delete entity %s with version %s in table %s; Has it been modified meanwhile" , entity ,
696+ source .getVersion (), tableName ));
682697 }
683698 });
684699 }
@@ -702,10 +717,11 @@ public ListenableFuture<Boolean> deleteById(Object id, Class<?> entityClass) {
702717 StatementBuilder <Delete > builder = getStatementFactory ().deleteById (id , entity , tableName );
703718 SimpleStatement delete = builder .build ();
704719
705- maybeEmitEvent (new BeforeDeleteEvent <>(delete , entityClass , tableName ));
720+ maybeEmitEvent (() -> new BeforeDeleteEvent <>(delete , entityClass , tableName ));
706721
707722 ListenableFuture <Boolean > future = doExecute (delete , AsyncResultSet ::wasApplied );
708- future .addCallback (success -> maybeEmitEvent (new AfterDeleteEvent <>(delete , entityClass , tableName )), e -> {});
723+ future .addCallback (success -> maybeEmitEvent (() -> new AfterDeleteEvent <>(delete , entityClass , tableName )),
724+ e -> {});
709725
710726 return future ;
711727 }
@@ -719,10 +735,11 @@ public ListenableFuture<Void> truncate(Class<?> entityClass) {
719735 Truncate truncate = QueryBuilder .truncate (tableName );
720736 SimpleStatement statement = truncate .build ();
721737
722- maybeEmitEvent (new BeforeDeleteEvent <>(statement , entityClass , tableName ));
738+ maybeEmitEvent (() -> new BeforeDeleteEvent <>(statement , entityClass , tableName ));
723739
724740 ListenableFuture <Boolean > future = doExecute (statement , AsyncResultSet ::wasApplied );
725- future .addCallback (success -> maybeEmitEvent (new AfterDeleteEvent <>(statement , entityClass , tableName )), e -> {});
741+ future .addCallback (success -> maybeEmitEvent (() -> new AfterDeleteEvent <>(statement , entityClass , tableName )),
742+ e -> {});
726743
727744 return new MappingListenableFutureAdapter <>(future , aBoolean -> null );
728745 }
@@ -753,7 +770,7 @@ private <T> ListenableFuture<EntityWriteResult<T>> executeSave(T entity, CqlIden
753770 private <T > ListenableFuture <EntityWriteResult <T >> executeSave (T entity , CqlIdentifier tableName ,
754771 SimpleStatement statement , Consumer <WriteResult > beforeAfterSaveEvent ) {
755772
756- maybeEmitEvent (new BeforeSaveEvent <>(entity , tableName , statement ));
773+ maybeEmitEvent (() -> new BeforeSaveEvent <>(entity , tableName , statement ));
757774 T entityToSave = maybeCallBeforeSave (entity , tableName , statement );
758775
759776 ListenableFuture <AsyncResultSet > result = doQueryForResultSet (statement );
@@ -766,7 +783,7 @@ private <T> ListenableFuture<EntityWriteResult<T>> executeSave(T entity, CqlIden
766783
767784 beforeAfterSaveEvent .accept (writeResult );
768785
769- maybeEmitEvent (new AfterSaveEvent <>(entityToSave , tableName ));
786+ maybeEmitEvent (() -> new AfterSaveEvent <>(entityToSave , tableName ));
770787
771788 return writeResult ;
772789 });
@@ -775,7 +792,7 @@ private <T> ListenableFuture<EntityWriteResult<T>> executeSave(T entity, CqlIden
775792 private ListenableFuture <WriteResult > executeDelete (Object entity , CqlIdentifier tableName , SimpleStatement statement ,
776793 Consumer <WriteResult > resultConsumer ) {
777794
778- maybeEmitEvent (new BeforeDeleteEvent <>(statement , entity .getClass (), tableName ));
795+ maybeEmitEvent (() -> new BeforeDeleteEvent <>(statement , entity .getClass (), tableName ));
779796
780797 ListenableFuture <AsyncResultSet > result = doQueryForResultSet (statement );
781798
@@ -786,7 +803,7 @@ private ListenableFuture<WriteResult> executeDelete(Object entity, CqlIdentifier
786803
787804 resultConsumer .accept (writeResult );
788805
789- maybeEmitEvent (new AfterDeleteEvent <>(statement , entity .getClass (), tableName ));
806+ maybeEmitEvent (() -> new AfterDeleteEvent <>(statement , entity .getClass (), tableName ));
790807
791808 return writeResult ;
792809 });
@@ -864,9 +881,7 @@ public String getCql() {
864881 }
865882 }
866883
867- return getAsyncCqlOperations ()
868- .execute (new GetConfiguredPageSize ())
869- .completable ().join ();
884+ return getAsyncCqlOperations ().execute (new GetConfiguredPageSize ()).completable ().join ();
870885 }
871886
872887 @ SuppressWarnings ("unchecked" )
@@ -876,12 +891,12 @@ private <T> Function<Row, T> getMapper(Class<?> entityType, Class<T> targetType,
876891
877892 return row -> {
878893
879- maybeEmitEvent (new AfterLoadEvent <>(row , targetType , tableName ));
894+ maybeEmitEvent (() -> new AfterLoadEvent <>(row , targetType , tableName ));
880895
881896 T result = getConverter ().project (projection , row );
882897
883898 if (result != null ) {
884- maybeEmitEvent (new AfterConvertEvent <>(row , result , tableName ));
899+ maybeEmitEvent (() -> new AfterConvertEvent <>(row , result , tableName ));
885900 }
886901
887902 return result ;
@@ -899,11 +914,8 @@ private static MappingCassandraConverter newConverter(CqlSession session) {
899914 return converter ;
900915 }
901916
902- protected <E extends CassandraMappingEvent <T >, T > void maybeEmitEvent (E event ) {
903-
904- if (this .eventPublisher != null ) {
905- this .eventPublisher .publishEvent (event );
906- }
917+ protected <E extends CassandraMappingEvent <T >, T > void maybeEmitEvent (Supplier <E > event ) {
918+ this .eventDelegate .publishEvent (event );
907919 }
908920
909921 protected <T > T maybeCallBeforeConvert (T object , CqlIdentifier tableName ) {
0 commit comments