3232import org .bson .conversions .Bson ;
3333import org .junit .Test ;
3434import org .mockito .Mockito ;
35+ import org .reactivestreams .Publisher ;
3536
3637import org .springframework .beans .factory .BeanFactory ;
38+ import org .springframework .context .ConfigurableApplicationContext ;
39+ import org .springframework .context .annotation .AnnotationConfigApplicationContext ;
40+ import org .springframework .context .annotation .Bean ;
41+ import org .springframework .context .annotation .Configuration ;
3742import org .springframework .data .mongodb .ReactiveMongoDatabaseFactory ;
3843import org .springframework .data .mongodb .core .ReactiveMongoTemplate ;
3944import org .springframework .data .mongodb .core .convert .MappingMongoConverter ;
4045import org .springframework .data .mongodb .core .mapping .MongoMappingContext ;
4146import org .springframework .expression .Expression ;
4247import org .springframework .expression .common .LiteralExpression ;
48+ import org .springframework .integration .channel .FluxMessageChannel ;
49+ import org .springframework .integration .config .EnableIntegration ;
50+ import org .springframework .integration .core .MessageSource ;
51+ import org .springframework .integration .dsl .IntegrationFlow ;
52+ import org .springframework .integration .dsl .IntegrationFlows ;
53+ import org .springframework .integration .dsl .Pollers ;
4354import org .springframework .integration .mongodb .rules .MongoDbAvailable ;
4455import org .springframework .integration .mongodb .rules .MongoDbAvailableTests ;
4556
5061
5162/**
5263 * @author David Turanski
64+ * @author Artem Bilan
5365 *
5466 * @since 5.3
5567 */
@@ -83,7 +95,7 @@ public void validateSuccessfulQueryWithSingleElementFluxOfDbObject() {
8395 ReactiveMongoDatabaseFactory reactiveMongoDatabaseFactory = this .prepareReactiveMongoFactory ();
8496
8597 ReactiveMongoTemplate template = new ReactiveMongoTemplate (reactiveMongoDatabaseFactory );
86- waitFor (template .save (this . createPerson (), "data" ));
98+ waitFor (template .save (createPerson (), "data" ));
8799
88100 Expression queryExpression = new LiteralExpression ("{'name' : 'Oleg'}" );
89101 ReactiveMongoDbMessageSource messageSource = new ReactiveMongoDbMessageSource (reactiveMongoDatabaseFactory ,
@@ -103,7 +115,7 @@ public void validateSuccessfulQueryWithSingleElementFluxOfPerson() {
103115 ReactiveMongoDatabaseFactory reactiveMongoDatabaseFactory = this .prepareReactiveMongoFactory ();
104116
105117 ReactiveMongoTemplate template = new ReactiveMongoTemplate (reactiveMongoDatabaseFactory );
106- waitFor (template .save (this . createPerson (), "data" ));
118+ waitFor (template .save (createPerson (), "data" ));
107119
108120 Expression queryExpression = new LiteralExpression ("{'name' : 'Oleg'}" );
109121 ReactiveMongoDbMessageSource messageSource = new ReactiveMongoDbMessageSource (reactiveMongoDatabaseFactory ,
@@ -148,7 +160,7 @@ public void validateSuccessfulQueryWithEmptyReturn() {
148160 @ MongoDbAvailable
149161 @ SuppressWarnings ("unchecked" )
150162 public void validateSuccessfulQueryWithCustomConverter () {
151- MappingMongoConverter converter = new ReactiveTestMongoConverter (this . prepareReactiveMongoFactory (),
163+ MappingMongoConverter converter = new ReactiveTestMongoConverter (prepareReactiveMongoFactory (),
152164 new MongoMappingContext ());
153165 converter .afterPropertiesSet ();
154166 converter = spy (converter );
@@ -161,11 +173,28 @@ public void validateSuccessfulQueryWithCustomConverter() {
161173
162174 @ Test
163175 @ MongoDbAvailable
164- @ SuppressWarnings ("unchecked" )
165- public void validatePipelineInModifyOut () {
176+ public void validateWithConfiguredPollerFlow () {
177+ ReactiveMongoDatabaseFactory reactiveMongoDatabaseFactory = prepareReactiveMongoFactory ();
178+ ReactiveMongoTemplate template = new ReactiveMongoTemplate (reactiveMongoDatabaseFactory );
166179
167- ReactiveMongoDatabaseFactory reactiveMongoDatabaseFactory = this .prepareReactiveMongoFactory ();
180+ waitFor (template .save (createPerson (), "data" ));
181+
182+ ConfigurableApplicationContext context = new AnnotationConfigApplicationContext (TestContext .class );
183+ FluxMessageChannel output = context .getBean (FluxMessageChannel .class );
184+ StepVerifier .create (output )
185+ .assertNext (
186+ message -> assertThat (((Person ) message .getPayload ()).getName ()).isEqualTo ("Oleg" ))
187+ .thenCancel ()
188+ .verify ();
189+
190+ context .close ();
191+ }
168192
193+ @ Test
194+ @ MongoDbAvailable
195+ @ SuppressWarnings ("unchecked" )
196+ public void validatePipelineInModifyOut () {
197+ ReactiveMongoDatabaseFactory reactiveMongoDatabaseFactory = prepareReactiveMongoFactory ();
169198 ReactiveMongoTemplate template = new ReactiveMongoTemplate (reactiveMongoDatabaseFactory );
170199
171200 waitFor (template .save (BasicDBObject .parse ("{'name' : 'Manny', 'id' : 1}" ), "data" ));
@@ -185,17 +214,17 @@ public void validatePipelineInModifyOut() {
185214 }
186215
187216 private Flux <Person > queryMultipleElements (Expression queryExpression ) {
188- return this . queryMultipleElements (queryExpression , Optional .empty ());
217+ return queryMultipleElements (queryExpression , Optional .empty ());
189218 }
190219
191220 @ SuppressWarnings ("unchecked" )
192221 private Flux <Person > queryMultipleElements (Expression queryExpression , Optional <MappingMongoConverter > converter ) {
193222 ReactiveMongoDatabaseFactory reactiveMongoDatabaseFactory = this .prepareReactiveMongoFactory ();
194223
195224 ReactiveMongoTemplate template = new ReactiveMongoTemplate (reactiveMongoDatabaseFactory );
196- waitFor (template .save (this . createPerson ("Manny" ), "data" ));
197- waitFor (template .save (this . createPerson ("Moe" ), "data" ));
198- waitFor (template .save (this . createPerson ("Jack" ), "data" ));
225+ waitFor (template .save (createPerson ("Manny" ), "data" ));
226+ waitFor (template .save (createPerson ("Moe" ), "data" ));
227+ waitFor (template .save (createPerson ("Jack" ), "data" ));
199228
200229 ReactiveMongoDbMessageSource messageSource = new ReactiveMongoDbMessageSource (reactiveMongoDatabaseFactory ,
201230 queryExpression );
@@ -211,4 +240,38 @@ private static <T> T waitFor(Mono<T> mono) {
211240 return mono .block (Duration .ofSeconds (10 ));
212241 }
213242
243+ @ Configuration
244+ @ EnableIntegration
245+ static class TestContext {
246+
247+ @ Bean
248+ FluxMessageChannel output () {
249+ return new FluxMessageChannel ();
250+ }
251+
252+ @ Bean
253+ public MessageSource <Publisher <?>> mongodbMessageSource (ReactiveMongoDatabaseFactory mongoDatabaseFactory ) {
254+ Expression queryExpression = new LiteralExpression ("{'name' : 'Oleg'}" );
255+ ReactiveMongoDbMessageSource reactiveMongoDbMessageSource =
256+ new ReactiveMongoDbMessageSource (mongoDatabaseFactory , queryExpression );
257+ reactiveMongoDbMessageSource .setEntityClass (Person .class );
258+ return reactiveMongoDbMessageSource ;
259+ }
260+
261+ @ Bean
262+ ReactiveMongoDatabaseFactory mongoDatabaseFactory () {
263+ return MongoDbAvailableTests .REACTIVE_MONGO_DATABASE_FACTORY ;
264+ }
265+
266+ @ Bean
267+ public IntegrationFlow pollingFlow (MessageSource <Publisher <?>> mongodbMessageSource ) {
268+ return IntegrationFlows
269+ .from (mongodbMessageSource , c -> c .poller (Pollers .fixedDelay (100 ).maxMessagesPerPoll (1 )))
270+ .split ()
271+ .channel (output ())
272+ .get ();
273+ }
274+
275+ }
276+
214277}
0 commit comments