Skip to content

Commit d49cc20

Browse files
committed
Rework CouchbaseReactiveHealthIndicator to use DiagnosticsReport
Closes gh-14799
1 parent 80fdd47 commit d49cc20

File tree

4 files changed

+129
-119
lines changed

4 files changed

+129
-119
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright 2012-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.actuate.couchbase;
18+
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
import java.util.stream.Collectors;
22+
23+
import com.couchbase.client.core.message.internal.DiagnosticsReport;
24+
import com.couchbase.client.core.message.internal.EndpointHealth;
25+
import com.couchbase.client.core.state.LifecycleState;
26+
27+
import org.springframework.boot.actuate.health.Health.Builder;
28+
29+
/**
30+
* Details of Couchbase's health.
31+
*
32+
* @author Andy Wilkinson
33+
*/
34+
class CouchbaseHealth {
35+
36+
private final DiagnosticsReport diagnostics;
37+
38+
CouchbaseHealth(DiagnosticsReport diagnostics) {
39+
this.diagnostics = diagnostics;
40+
}
41+
42+
void applyTo(Builder builder) {
43+
builder = isCouchbaseUp(this.diagnostics) ? builder.up() : builder.down();
44+
builder.withDetail("sdk", this.diagnostics.sdk());
45+
builder.withDetail("endpoints", this.diagnostics.endpoints().stream()
46+
.map(this::describe).collect(Collectors.toList()));
47+
}
48+
49+
private boolean isCouchbaseUp(DiagnosticsReport diagnostics) {
50+
for (EndpointHealth health : diagnostics.endpoints()) {
51+
LifecycleState state = health.state();
52+
if (state != LifecycleState.CONNECTED && state != LifecycleState.IDLE) {
53+
return false;
54+
}
55+
}
56+
return true;
57+
}
58+
59+
private Map<String, Object> describe(EndpointHealth endpointHealth) {
60+
Map<String, Object> map = new HashMap<>();
61+
map.put("id", endpointHealth.id());
62+
map.put("lastActivity", endpointHealth.lastActivity());
63+
map.put("local", endpointHealth.local().toString());
64+
map.put("remote", endpointHealth.remote().toString());
65+
map.put("state", endpointHealth.state());
66+
map.put("type", endpointHealth.type());
67+
return map;
68+
}
69+
70+
}

spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/couchbase/CouchbaseHealthIndicator.java

Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,7 @@
1616

1717
package org.springframework.boot.actuate.couchbase;
1818

19-
import java.util.HashMap;
20-
import java.util.Map;
21-
import java.util.stream.Collectors;
22-
2319
import com.couchbase.client.core.message.internal.DiagnosticsReport;
24-
import com.couchbase.client.core.message.internal.EndpointHealth;
25-
import com.couchbase.client.core.state.LifecycleState;
2620
import com.couchbase.client.java.Cluster;
2721

2822
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
@@ -55,31 +49,7 @@ public CouchbaseHealthIndicator(Cluster cluster) {
5549
@Override
5650
protected void doHealthCheck(Health.Builder builder) throws Exception {
5751
DiagnosticsReport diagnostics = this.cluster.diagnostics();
58-
builder = isCouchbaseUp(diagnostics) ? builder.up() : builder.down();
59-
builder.withDetail("sdk", diagnostics.sdk());
60-
builder.withDetail("endpoints", diagnostics.endpoints().stream()
61-
.map(this::describe).collect(Collectors.toList()));
62-
}
63-
64-
private boolean isCouchbaseUp(DiagnosticsReport diagnostics) {
65-
for (EndpointHealth health : diagnostics.endpoints()) {
66-
LifecycleState state = health.state();
67-
if (state != LifecycleState.CONNECTED && state != LifecycleState.IDLE) {
68-
return false;
69-
}
70-
}
71-
return true;
72-
}
73-
74-
private Map<String, Object> describe(EndpointHealth endpointHealth) {
75-
Map<String, Object> map = new HashMap<>();
76-
map.put("id", endpointHealth.id());
77-
map.put("lastActivity", endpointHealth.lastActivity());
78-
map.put("local", endpointHealth.local().toString());
79-
map.put("remote", endpointHealth.remote().toString());
80-
map.put("state", endpointHealth.state());
81-
map.put("type", endpointHealth.type());
82-
return map;
52+
new CouchbaseHealth(diagnostics).applyTo(builder);
8353
}
8454

8555
}

spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/couchbase/CouchbaseReactiveHealthIndicator.java

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,13 @@
1515
*/
1616
package org.springframework.boot.actuate.couchbase;
1717

18-
import com.couchbase.client.java.bucket.BucketInfo;
19-
import com.couchbase.client.java.cluster.ClusterInfo;
18+
import com.couchbase.client.core.message.internal.DiagnosticsReport;
19+
import com.couchbase.client.java.Cluster;
2020
import reactor.core.publisher.Mono;
21-
import rx.Observable;
22-
import rx.RxReactiveStreams;
23-
import rx.Single;
2421

2522
import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator;
2623
import org.springframework.boot.actuate.health.Health;
2724
import org.springframework.boot.actuate.health.ReactiveHealthIndicator;
28-
import org.springframework.data.couchbase.core.RxJavaCouchbaseOperations;
29-
import org.springframework.util.StringUtils;
3025

3126
/**
3227
* A {@link ReactiveHealthIndicator} for Couchbase.
@@ -37,30 +32,21 @@
3732
*/
3833
public class CouchbaseReactiveHealthIndicator extends AbstractReactiveHealthIndicator {
3934

40-
private final RxJavaCouchbaseOperations couchbaseOperations;
35+
private final Cluster cluster;
4136

4237
/**
4338
* Create a new {@link CouchbaseReactiveHealthIndicator} instance.
44-
* @param couchbaseOperations the reactive couchbase operations
39+
* @param cluster the Couchbase cluster
4540
*/
46-
public CouchbaseReactiveHealthIndicator(
47-
RxJavaCouchbaseOperations couchbaseOperations) {
48-
this.couchbaseOperations = couchbaseOperations;
41+
public CouchbaseReactiveHealthIndicator(Cluster cluster) {
42+
this.cluster = cluster;
4943
}
5044

5145
@Override
5246
protected Mono<Health> doHealthCheck(Health.Builder builder) {
53-
ClusterInfo cluster = this.couchbaseOperations.getCouchbaseClusterInfo();
54-
String versions = StringUtils
55-
.collectionToCommaDelimitedString(cluster.getAllVersions());
56-
Observable<BucketInfo> bucket = this.couchbaseOperations.getCouchbaseBucket()
57-
.bucketManager().async().info();
58-
Single<Health> health = bucket.map(BucketInfo::nodeList)
59-
.map(StringUtils::collectionToCommaDelimitedString)
60-
.map((nodes) -> builder.up().withDetail("versions", versions)
61-
.withDetail("nodes", nodes).build())
62-
.toSingle();
63-
return Mono.from(RxReactiveStreams.toPublisher(health));
47+
DiagnosticsReport diagnostics = this.cluster.diagnostics();
48+
new CouchbaseHealth(diagnostics).applyTo(builder);
49+
return Mono.just(builder.build());
6450
}
6551

6652
}

spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/couchbase/CouchbaseReactiveHealthIndicatorTests.java

Lines changed: 49 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -15,91 +15,75 @@
1515
*/
1616
package org.springframework.boot.actuate.couchbase;
1717

18-
import java.net.InetAddress;
18+
import java.net.InetSocketAddress;
1919
import java.util.Arrays;
20+
import java.util.List;
21+
import java.util.Map;
2022

21-
import com.couchbase.client.java.Bucket;
22-
import com.couchbase.client.java.bucket.AsyncBucketManager;
23-
import com.couchbase.client.java.bucket.BucketInfo;
24-
import com.couchbase.client.java.bucket.BucketManager;
25-
import com.couchbase.client.java.cluster.ClusterInfo;
26-
import com.couchbase.client.java.error.TranscodingException;
27-
import com.couchbase.client.java.util.features.Version;
23+
import com.couchbase.client.core.message.internal.DiagnosticsReport;
24+
import com.couchbase.client.core.message.internal.EndpointHealth;
25+
import com.couchbase.client.core.service.ServiceType;
26+
import com.couchbase.client.core.state.LifecycleState;
27+
import com.couchbase.client.java.Cluster;
2828
import org.junit.Test;
29-
import reactor.core.publisher.Mono;
30-
import reactor.test.StepVerifier;
31-
import rx.Observable;
3229

3330
import org.springframework.boot.actuate.health.Health;
3431
import org.springframework.boot.actuate.health.Status;
35-
import org.springframework.data.couchbase.core.RxJavaCouchbaseOperations;
3632

3733
import static org.assertj.core.api.Assertions.assertThat;
3834
import static org.mockito.BDDMockito.given;
3935
import static org.mockito.Mockito.mock;
36+
import static org.mockito.Mockito.verify;
4037

4138
/**
4239
* Tests for {@link CouchbaseReactiveHealthIndicator}.
4340
*/
4441
public class CouchbaseReactiveHealthIndicatorTests {
4542

4643
@Test
47-
public void couchbaseIsUp() {
48-
RxJavaCouchbaseOperations rxJavaCouchbaseOperations = mock(
49-
RxJavaCouchbaseOperations.class);
50-
AsyncBucketManager asyncBucketManager = mockAsyncBucketManager(
51-
rxJavaCouchbaseOperations);
52-
BucketInfo info = mock(BucketInfo.class);
53-
InetAddress node1Address = mock(InetAddress.class);
54-
InetAddress node2Address = mock(InetAddress.class);
55-
given(info.nodeList()).willReturn(Arrays.asList(node1Address, node2Address));
56-
given(node1Address.toString()).willReturn("127.0.0.1");
57-
given(node2Address.toString()).willReturn("127.0.0.2");
58-
given(asyncBucketManager.info()).willReturn(Observable.just(info));
59-
CouchbaseReactiveHealthIndicator couchbaseReactiveHealthIndicator = new CouchbaseReactiveHealthIndicator(
60-
rxJavaCouchbaseOperations);
61-
Mono<Health> health = couchbaseReactiveHealthIndicator.health();
62-
StepVerifier.create(health).consumeNextWith((h) -> {
63-
assertThat(h.getStatus()).isEqualTo(Status.UP);
64-
assertThat(h.getDetails()).containsKeys("versions", "nodes");
65-
assertThat(h.getDetails().get("versions")).isEqualTo("5.5.0,6.0.0");
66-
assertThat(h.getDetails().get("nodes")).isEqualTo("127.0.0.1,127.0.0.2");
67-
}).verifyComplete();
44+
@SuppressWarnings("unchecked")
45+
public void couchbaseClusterIsUp() {
46+
Cluster cluster = mock(Cluster.class);
47+
CouchbaseReactiveHealthIndicator healthIndicator = new CouchbaseReactiveHealthIndicator(
48+
cluster);
49+
List<EndpointHealth> endpoints = Arrays.asList(new EndpointHealth(
50+
ServiceType.BINARY, LifecycleState.CONNECTED, new InetSocketAddress(0),
51+
new InetSocketAddress(0), 1234, "endpoint-1"));
52+
DiagnosticsReport diagnostics = new DiagnosticsReport(endpoints, "test-sdk",
53+
"test-id", null);
54+
given(cluster.diagnostics()).willReturn(diagnostics);
55+
Health health = healthIndicator.health().block();
56+
assertThat(health.getStatus()).isEqualTo(Status.UP);
57+
assertThat(health.getDetails()).containsEntry("sdk", "test-sdk");
58+
assertThat(health.getDetails()).containsKey("endpoints");
59+
assertThat((List<Map<String, Object>>) health.getDetails().get("endpoints"))
60+
.hasSize(1);
61+
verify(cluster).diagnostics();
6862
}
6963

7064
@Test
71-
public void couchbaseIsDown() {
72-
RxJavaCouchbaseOperations rxJavaCouchbaseOperations = mock(
73-
RxJavaCouchbaseOperations.class);
74-
AsyncBucketManager asyncBucketManager = mockAsyncBucketManager(
75-
rxJavaCouchbaseOperations);
76-
given(asyncBucketManager.info())
77-
.willReturn(Observable.error(new TranscodingException("Failure")));
78-
CouchbaseReactiveHealthIndicator couchbaseReactiveHealthIndicator = new CouchbaseReactiveHealthIndicator(
79-
rxJavaCouchbaseOperations);
80-
Mono<Health> health = couchbaseReactiveHealthIndicator.health();
81-
StepVerifier.create(health).consumeNextWith((h) -> {
82-
assertThat(h.getStatus()).isEqualTo(Status.DOWN);
83-
assertThat(h.getDetails()).containsOnlyKeys("error");
84-
assertThat(h.getDetails().get("error"))
85-
.isEqualTo(TranscodingException.class.getName() + ": Failure");
86-
}).verifyComplete();
87-
}
88-
89-
private AsyncBucketManager mockAsyncBucketManager(
90-
RxJavaCouchbaseOperations rxJavaCouchbaseOperations) {
91-
ClusterInfo clusterInfo = mock(ClusterInfo.class);
92-
given(rxJavaCouchbaseOperations.getCouchbaseClusterInfo())
93-
.willReturn(clusterInfo);
94-
given(clusterInfo.getAllVersions())
95-
.willReturn(Arrays.asList(new Version(5, 5, 0), new Version(6, 0, 0)));
96-
Bucket bucket = mock(Bucket.class);
97-
BucketManager bucketManager = mock(BucketManager.class);
98-
AsyncBucketManager asyncBucketManager = mock(AsyncBucketManager.class);
99-
given(rxJavaCouchbaseOperations.getCouchbaseBucket()).willReturn(bucket);
100-
given(bucket.bucketManager()).willReturn(bucketManager);
101-
given(bucketManager.async()).willReturn(asyncBucketManager);
102-
return asyncBucketManager;
65+
@SuppressWarnings("unchecked")
66+
public void couchbaseClusterIsDown() {
67+
Cluster cluster = mock(Cluster.class);
68+
CouchbaseReactiveHealthIndicator healthIndicator = new CouchbaseReactiveHealthIndicator(
69+
cluster);
70+
List<EndpointHealth> endpoints = Arrays.asList(
71+
new EndpointHealth(ServiceType.BINARY, LifecycleState.CONNECTED,
72+
new InetSocketAddress(0), new InetSocketAddress(0), 1234,
73+
"endpoint-1"),
74+
new EndpointHealth(ServiceType.BINARY, LifecycleState.CONNECTING,
75+
new InetSocketAddress(0), new InetSocketAddress(0), 1234,
76+
"endpoint-2"));
77+
DiagnosticsReport diagnostics = new DiagnosticsReport(endpoints, "test-sdk",
78+
"test-id", null);
79+
given(cluster.diagnostics()).willReturn(diagnostics);
80+
Health health = healthIndicator.health().block();
81+
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
82+
assertThat(health.getDetails()).containsEntry("sdk", "test-sdk");
83+
assertThat(health.getDetails()).containsKey("endpoints");
84+
assertThat((List<Map<String, Object>>) health.getDetails().get("endpoints"))
85+
.hasSize(2);
86+
verify(cluster).diagnostics();
10387
}
10488

10589
}

0 commit comments

Comments
 (0)