Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
import org.slf4j.LoggerFactory;

import javax.servlet.ServletException;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

import static org.apache.unomi.healthcheck.HealthCheckConfig.CONFIG_AUTH_REALM;

Expand All @@ -42,8 +43,11 @@ public class HealthCheckService {
private static final Logger LOGGER = LoggerFactory.getLogger(HealthCheckService.class.getName());

private final List<HealthCheckProvider> providers = new ArrayList<>();
private ExecutorService executor;
private boolean busy = false;
private final Object cacheLock = new Object();
private volatile long cacheTimestamp = 0L;
private volatile List<HealthCheckResponse> healthCache = Collections.emptyList();
private volatile boolean initialized = false;
private volatile boolean busy = false;
private boolean registered = false;

@Reference
Expand All @@ -58,7 +62,6 @@ public HealthCheckService() {
@Activate
public void activate() throws ServletException, NamespaceException {
LOGGER.info("Activating healthcheck service...");
executor = Executors.newSingleThreadExecutor();
if (!registered) {
setConfig(config);
}
Expand Down Expand Up @@ -98,9 +101,6 @@ public void deactivate() {
httpService.unregister("/health/check");
registered = false;
}
if (executor != null) {
executor.shutdown();
}
}

@Reference(service = HealthCheckProvider.class, cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC, unbind = "unbind")
Expand All @@ -114,37 +114,57 @@ protected void unbind(HealthCheckProvider provider) {
providers.remove(provider);
}

public List<HealthCheckResponse> check() throws RejectedExecutionException {
if (config !=null && config.isEnabled()) {
LOGGER.debug("Health check called");
if (busy) {
throw new RejectedExecutionException("Health check already in progress");
} else {
try {
public List<HealthCheckResponse> check() {
if (config == null || !config.isEnabled()) {
LOGGER.warn("Healthcheck service is disabled");
return Collections.emptyList();
}
if (!initialized) {
synchronized (cacheLock) {
if (!initialized) {
refreshCache();
initialized = true;
}
}
} else if (shouldRefreshCache()) {
synchronized (cacheLock) {
if (!busy) {
busy = true;
List<HealthCheckResponse> health = new ArrayList<>();
health.add(HealthCheckResponse.live("karaf"));
for (HealthCheckProvider provider : providers.stream().filter(p -> config.getEnabledProviders().contains(p.name())).collect(Collectors.toList())) {
Future<HealthCheckResponse> future = executor.submit(provider::execute);
try {
HealthCheckResponse response = future.get(config.getTimeout(), TimeUnit.MILLISECONDS);
health.add(response);
} catch (TimeoutException e) {
future.cancel(true);
health.add(provider.timeout());
} catch (Exception e) {
LOGGER.error("Error while executing health check", e);
}
try {
refreshCache();
} finally {
busy = false;
}
return health;
} finally {
busy = false;
}
}
} else {
LOGGER.info("Healthcheck service is disabled");
return Collections.emptyList();
}
return healthCache;
}

private boolean shouldRefreshCache() {
return !busy && (System.currentTimeMillis() - cacheTimestamp) > 1000;
}

private void refreshCache() {
try {
List<HealthCheckResponse> health = new ArrayList<>();
health.add(HealthCheckResponse.live("karaf"));
for (HealthCheckProvider provider : providers.stream()
.filter(p -> config.getEnabledProviders().contains(p.name()))
.toList()) {
try {
HealthCheckResponse response = provider.execute();
health.add(response);
} catch (Exception e) {
LOGGER.error("Error while executing health check", e);
health.add(provider.timeout());
}
}
health.sort(Comparator.comparing(HealthCheckResponse::getName));
healthCache = List.copyOf(health);
cacheTimestamp = System.currentTimeMillis();
} catch (Exception e) {
LOGGER.error("Error refreshing health cache", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;

/**
Expand Down Expand Up @@ -66,7 +65,6 @@ protected void service(HttpServletRequest request, HttpServletResponse response)
return;
}
List<HealthCheckResponse> checks = service.check();
checks.sort(Comparator.comparing(HealthCheckResponse::getName));
response.getWriter().println(mapper.writeValueAsString(checks));
response.setContentType("application/json");
response.setHeader("Cache-Control", "no-cache");
Expand Down
38 changes: 38 additions & 0 deletions itests/src/test/java/org/apache/unomi/itests/HealthCheckIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

import static org.junit.Assert.fail;

Expand Down Expand Up @@ -68,6 +70,42 @@ public void testHealthCheck() {
}
}

@Test
public void testConcurrentHealthCheck() {
final int NB_THREADS = 10;
final int NB_ITERATIONS = 20;

ExecutorService executorService = null;
try {
executorService = Executors.newFixedThreadPool(NB_THREADS);
List<Future<List<HealthCheckResponse>>> futures = new ArrayList<>();
for (int i = 0; i < NB_ITERATIONS; i++) {
for (int j = 0; j < NB_THREADS; j++) {
Future<List<HealthCheckResponse>> future = executorService.submit(() -> get(HEALTHCHECK_ENDPOINT, new TypeReference<>() {}));
futures.add(future);
}
for (Future<List<HealthCheckResponse>> future : futures) {
List<HealthCheckResponse> health = future.get(10, TimeUnit.SECONDS);
Assert.assertEquals(4, health.size());
Assert.assertTrue(health.stream().anyMatch(r -> r.getName().equals("karaf") && r.getStatus() == HealthCheckResponse.Status.LIVE));
Assert.assertTrue(health.stream().anyMatch(r -> r.getName().equals(searchEngine) && r.getStatus() == HealthCheckResponse.Status.LIVE));
Assert.assertTrue(health.stream().anyMatch(r -> r.getName().equals("unomi") && r.getStatus() == HealthCheckResponse.Status.LIVE));
Assert.assertTrue(health.stream().anyMatch(r -> r.getName().equals("cluster") && r.getStatus() == HealthCheckResponse.Status.LIVE));
}
Thread.sleep(10);
}
executorService.shutdown();
Assert.assertTrue(executorService.awaitTermination(10, TimeUnit.SECONDS));
} catch (Exception e) {
LOGGER.error("Error while executing concurrent health check", e);
fail("Error while executing concurrent health check: " + e.getMessage());
} finally {
if ( executorService != null ) {
executorService.shutdownNow();
}
}
}

protected <T> T get(final String url, TypeReference<T> typeReference) {
CloseableHttpResponse response = null;
try {
Expand Down
Loading