Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
import org.slf4j.LoggerFactory;

import javax.servlet.ServletException;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

import static org.apache.unomi.healthcheck.HealthCheckConfig.CONFIG_AUTH_REALM;

Expand All @@ -42,8 +43,11 @@ public class HealthCheckService {
private static final Logger LOGGER = LoggerFactory.getLogger(HealthCheckService.class.getName());

private final List<HealthCheckProvider> providers = new ArrayList<>();
private ExecutorService executor;
private boolean busy = false;
private final Object cacheLock = new Object();
private volatile long cacheTimestamp = 0L;
private volatile List<HealthCheckResponse> healthCache = Collections.emptyList();
private volatile boolean initialized = false;
private volatile boolean busy = false;
private boolean registered = false;

@Reference
Expand All @@ -58,7 +62,6 @@ public HealthCheckService() {
@Activate
public void activate() throws ServletException, NamespaceException {
LOGGER.info("Activating healthcheck service...");
executor = Executors.newSingleThreadExecutor();
if (!registered) {
setConfig(config);
}
Expand Down Expand Up @@ -98,9 +101,6 @@ public void deactivate() {
httpService.unregister("/health/check");
registered = false;
}
if (executor != null) {
executor.shutdown();
}
}

@Reference(service = HealthCheckProvider.class, cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC, unbind = "unbind")
Expand All @@ -114,37 +114,57 @@ protected void unbind(HealthCheckProvider provider) {
providers.remove(provider);
}

public List<HealthCheckResponse> check() throws RejectedExecutionException {
if (config !=null && config.isEnabled()) {
LOGGER.debug("Health check called");
if (busy) {
throw new RejectedExecutionException("Health check already in progress");
} else {
try {
public List<HealthCheckResponse> check() {
if (config == null || !config.isEnabled()) {
LOGGER.warn("Healthcheck service is disabled");
return Collections.emptyList();
}
if (!initialized) {
synchronized (cacheLock) {
if (!initialized) {
refreshCache();
initialized = true;
}
}
} else if (shouldRefreshCache()) {
synchronized (cacheLock) {
if (!busy) {
busy = true;
List<HealthCheckResponse> health = new ArrayList<>();
health.add(HealthCheckResponse.live("karaf"));
for (HealthCheckProvider provider : providers.stream().filter(p -> config.getEnabledProviders().contains(p.name())).collect(Collectors.toList())) {
Future<HealthCheckResponse> future = executor.submit(provider::execute);
try {
HealthCheckResponse response = future.get(config.getTimeout(), TimeUnit.MILLISECONDS);
health.add(response);
} catch (TimeoutException e) {
future.cancel(true);
health.add(provider.timeout());
} catch (Exception e) {
LOGGER.error("Error while executing health check", e);
}
try {
refreshCache();
} finally {
busy = false;
}
return health;
} finally {
busy = false;
}
}
} else {
LOGGER.info("Healthcheck service is disabled");
return Collections.emptyList();
}
return healthCache;
}

private boolean shouldRefreshCache() {
return !busy && (System.currentTimeMillis() - cacheTimestamp) > 1000;
}

private void refreshCache() {
try {
List<HealthCheckResponse> health = new ArrayList<>();
health.add(HealthCheckResponse.live("karaf"));
for (HealthCheckProvider provider : providers.stream()
.filter(p -> config.getEnabledProviders().contains(p.name()))
.toList()) {
try {
HealthCheckResponse response = provider.execute();
health.add(response);
} catch (Exception e) {
LOGGER.error("Error while executing health check", e);
health.add(provider.timeout());
}
}
health.sort(Comparator.comparing(HealthCheckResponse::getName));
healthCache = List.copyOf(health);
cacheTimestamp = System.currentTimeMillis();
} catch (Exception e) {
LOGGER.error("Error refreshing health cache", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;

/**
Expand Down Expand Up @@ -66,7 +65,6 @@ protected void service(HttpServletRequest request, HttpServletResponse response)
return;
}
List<HealthCheckResponse> checks = service.check();
checks.sort(Comparator.comparing(HealthCheckResponse::getName));
response.getWriter().println(mapper.writeValueAsString(checks));
response.setContentType("application/json");
response.setHeader("Cache-Control", "no-cache");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.unomi.api.Profile;
import org.apache.unomi.api.PropertyType;
import org.apache.unomi.api.rules.Rule;
import org.apache.unomi.api.services.EventService;
import org.apache.unomi.persistence.spi.CustomObjectMapper;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -166,10 +167,13 @@ private Event sendCopyPropertyEvent(Map<String, Object> properties, String profi

Event event = new Event("copyProperties", null, profile, null, null, profile, new Date());
event.setPersistent(false);

event.setProperty("urlParameters", properties);

eventService.send(event);
int result = eventService.send(event);
LOGGER.info("Event processing result: {}", result);
if (result == EventService.ERROR) {
LOGGER.error("Event processing resulted in ERROR. Event details: {}", event);
}
Assert.assertNotEquals(EventService.ERROR, result);
return event;
}

Expand Down
38 changes: 38 additions & 0 deletions itests/src/test/java/org/apache/unomi/itests/HealthCheckIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

import static org.junit.Assert.fail;

Expand Down Expand Up @@ -68,6 +70,42 @@ public void testHealthCheck() {
}
}

@Test
public void testConcurrentHealthCheck() {
final int NB_THREADS = 10;
final int NB_ITERATIONS = 20;

ExecutorService executorService = null;
try {
executorService = Executors.newFixedThreadPool(NB_THREADS);
List<Future<List<HealthCheckResponse>>> futures = new ArrayList<>();
for (int i = 0; i < NB_ITERATIONS; i++) {
for (int j = 0; j < NB_THREADS; j++) {
Future<List<HealthCheckResponse>> future = executorService.submit(() -> get(HEALTHCHECK_ENDPOINT, new TypeReference<>() {}));
futures.add(future);
}
for (Future<List<HealthCheckResponse>> future : futures) {
List<HealthCheckResponse> health = future.get(10, TimeUnit.SECONDS);
Assert.assertEquals(4, health.size());
Assert.assertTrue(health.stream().anyMatch(r -> r.getName().equals("karaf") && r.getStatus() == HealthCheckResponse.Status.LIVE));
Assert.assertTrue(health.stream().anyMatch(r -> r.getName().equals(searchEngine) && r.getStatus() == HealthCheckResponse.Status.LIVE));
Assert.assertTrue(health.stream().anyMatch(r -> r.getName().equals("unomi") && r.getStatus() == HealthCheckResponse.Status.LIVE));
Assert.assertTrue(health.stream().anyMatch(r -> r.getName().equals("cluster") && r.getStatus() == HealthCheckResponse.Status.LIVE));
}
Thread.sleep(10);
}
executorService.shutdown();
Assert.assertTrue(executorService.awaitTermination(10, TimeUnit.SECONDS));
} catch (Exception e) {
LOGGER.error("Error while executing concurrent health check", e);
fail("Error while executing concurrent health check: " + e.getMessage());
} finally {
if ( executorService != null ) {
executorService.shutdownNow();
}
}
}

protected <T> T get(final String url, TypeReference<T> typeReference) {
CloseableHttpResponse response = null;
try {
Expand Down
11 changes: 10 additions & 1 deletion itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
import org.apache.unomi.api.actions.Action;
import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.rules.Rule;
import org.apache.unomi.api.services.EventService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.junit.PaxExam;
import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
import org.ops4j.pax.exam.spi.reactors.PerSuite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

Expand All @@ -36,6 +39,8 @@
@RunWith(PaxExam.class)
@ExamReactorStrategy(PerSuite.class)
public class ProfileMergeIT extends BaseIT {
private final static Logger LOGGER = LoggerFactory.getLogger(ProfileMergeIT.class);

public static final String PERSONALIZATION_STRATEGY_STATUS = "personalizationStrategyStatus";
public static final String PERSONALIZATION_STRATEGY_STATUS_ID = "personalizationId";
public static final String PERSONALIZATION_STRATEGY_STATUS_IN_CTRL_GROUP = "inControlGroup";
Expand Down Expand Up @@ -468,7 +473,11 @@ private Event sendEvent() {
profile.setProperty("j:nodename", "michel");
profile.getSystemProperties().put("mergeIdentifier", "jose");
Event testEvent = new Event(TEST_EVENT_TYPE, null, profile, null, null, profile, new Date());
eventService.send(testEvent);
int result = eventService.send(testEvent);
LOGGER.info("Event processing result: {}", result);
if (result == EventService.ERROR) {
LOGGER.error("Event processing resulted in ERROR. Event details: {}", testEvent);
}
return testEvent;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@
import org.apache.unomi.api.actions.Action;
import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.rules.Rule;
import org.apache.unomi.api.services.EventService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.junit.PaxExam;
import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
import org.ops4j.pax.exam.spi.reactors.PerSuite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Date;
Expand All @@ -38,6 +41,7 @@
@RunWith(PaxExam.class)
@ExamReactorStrategy(PerSuite.class)
public class SendEventActionIT extends BaseIT {
private final static Logger LOGGER = LoggerFactory.getLogger(SendEventActionIT.class);

private final static String TEST_RULE_ID = "sendEventTest";
private final static String EVENT_ID = "sendEventTestId";
Expand Down Expand Up @@ -80,7 +84,11 @@ private Event sendEvent() {
profile.setProperty("j:nodename", "michel");
Event testEvent = new Event(TEST_EVENT_TYPE, null, profile, null, null, profile, new Date());
testEvent.setItemId(EVENT_ID);
eventService.send(testEvent);
int result = eventService.send(testEvent);
LOGGER.info("Event processing result: {}", result);
if (result == EventService.ERROR) {
LOGGER.error("Event processing resulted in ERROR. Event details: {}", testEvent);
}
return testEvent;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,22 @@
* to dynamically adjust its behavior based on external configurations. It leverages the {@link FeaturesService} to
* manage Karaf features dynamically.</p>
*
* <h3>Configuration</h3>
* <p><b>Configuration</b></p>
* <p>The service reads its configuration from the OSGi Configuration Admin under the PID <code>org.apache.unomi.start</code>.
* The configuration includes:</p>
* <ul>
* <li><b>startFeatures</b>: A semicolon-separated list of features mapped to persistence implementations
* in the format <code>persistenceImplementation:feature1,feature2</code>.</li>
* </ul>
*
* <h3>Usage</h3>
* <p><b>Usage</b></p>
* <p>This service can be controlled programmatically through its methods:</p>
* <ul>
* <li>{@link #startUnomi(String, boolean)}: Installs and starts features for the specified start features configuration.</li>
* <li>{@link #stopUnomi()}: Stops and uninstalls the previously started features.</li>
* </ul>
*
* <h3>Dependencies</h3>
* <p><b>Dependencies</b></p>
* <p>The following dependencies are required for this service:</p>
* <ul>
* <li>{@link MigrationService}: Handles migration tasks during startup.</li>
Expand Down
Loading