|
1 | 1 | /*
|
2 |
| - * Copyright 2012-2019 the original author or authors. |
| 2 | + * Copyright 2012-2020 the original author or authors. |
3 | 3 | *
|
4 | 4 | * Licensed under the Apache License, Version 2.0 (the "License");
|
5 | 5 | * you may not use this file except in compliance with the License.
|
|
18 | 18 |
|
19 | 19 | import java.security.Principal;
|
20 | 20 | import java.time.Duration;
|
21 |
| -import java.util.Arrays; |
22 | 21 | import java.util.Collections;
|
23 | 22 | import java.util.HashMap;
|
24 | 23 | import java.util.Map;
|
| 24 | +import java.util.concurrent.atomic.AtomicInteger; |
25 | 25 |
|
26 | 26 | import org.junit.Test;
|
27 | 27 | import reactor.core.publisher.Flux;
|
@@ -73,25 +73,25 @@ public void cacheInTtlWithNullParameters() {
|
73 | 73 |
|
74 | 74 | @Test
|
75 | 75 | public void cacheInTtlWithMonoResponse() {
|
76 |
| - MonoOperationInvoker.invocations = 0; |
| 76 | + MonoOperationInvoker.invocations = new AtomicInteger(); |
77 | 77 | MonoOperationInvoker target = new MonoOperationInvoker();
|
78 | 78 | InvocationContext context = new InvocationContext(mock(SecurityContext.class), Collections.emptyMap());
|
79 | 79 | CachingOperationInvoker invoker = new CachingOperationInvoker(target, CACHE_TTL);
|
80 | 80 | Object response = ((Mono<?>) invoker.invoke(context)).block();
|
81 | 81 | Object cachedResponse = ((Mono<?>) invoker.invoke(context)).block();
|
82 |
| - assertThat(MonoOperationInvoker.invocations).isEqualTo(1); |
| 82 | + assertThat(MonoOperationInvoker.invocations).hasValue(1); |
83 | 83 | assertThat(response).isSameAs(cachedResponse);
|
84 | 84 | }
|
85 | 85 |
|
86 | 86 | @Test
|
87 | 87 | public void cacheInTtlWithFluxResponse() {
|
88 |
| - FluxOperationInvoker.invocations = 0; |
| 88 | + FluxOperationInvoker.invocations = new AtomicInteger(); |
89 | 89 | FluxOperationInvoker target = new FluxOperationInvoker();
|
90 | 90 | InvocationContext context = new InvocationContext(mock(SecurityContext.class), Collections.emptyMap());
|
91 | 91 | CachingOperationInvoker invoker = new CachingOperationInvoker(target, CACHE_TTL);
|
92 | 92 | Object response = ((Flux<?>) invoker.invoke(context)).blockLast();
|
93 | 93 | Object cachedResponse = ((Flux<?>) invoker.invoke(context)).blockLast();
|
94 |
| - assertThat(FluxOperationInvoker.invocations).isEqualTo(1); |
| 94 | + assertThat(FluxOperationInvoker.invocations).hasValue(1); |
95 | 95 | assertThat(response).isSameAs(cachedResponse);
|
96 | 96 | }
|
97 | 97 |
|
@@ -154,28 +154,25 @@ public void targetInvokedWhenCacheExpires() throws InterruptedException {
|
154 | 154 |
|
155 | 155 | private static class MonoOperationInvoker implements OperationInvoker {
|
156 | 156 |
|
157 |
| - static int invocations; |
| 157 | + static AtomicInteger invocations = new AtomicInteger(); |
158 | 158 |
|
159 | 159 | @Override
|
160 |
| - public Object invoke(InvocationContext context) throws MissingParametersException { |
| 160 | + public Mono<String> invoke(InvocationContext context) throws MissingParametersException { |
161 | 161 | return Mono.fromCallable(() -> {
|
162 |
| - invocations++; |
163 |
| - return Mono.just("test"); |
| 162 | + invocations.incrementAndGet(); |
| 163 | + return "test"; |
164 | 164 | });
|
165 | 165 | }
|
166 | 166 |
|
167 | 167 | }
|
168 | 168 |
|
169 | 169 | private static class FluxOperationInvoker implements OperationInvoker {
|
170 | 170 |
|
171 |
| - static int invocations; |
| 171 | + static AtomicInteger invocations = new AtomicInteger(); |
172 | 172 |
|
173 | 173 | @Override
|
174 |
| - public Object invoke(InvocationContext context) throws MissingParametersException { |
175 |
| - return Flux.fromIterable(() -> { |
176 |
| - invocations++; |
177 |
| - return Arrays.asList("spring", "boot").iterator(); |
178 |
| - }); |
| 174 | + public Flux<String> invoke(InvocationContext context) throws MissingParametersException { |
| 175 | + return Flux.just("spring", "boot").hide().doFirst(invocations::incrementAndGet); |
179 | 176 | }
|
180 | 177 |
|
181 | 178 | }
|
|
0 commit comments