Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public enum Setting {
Page.SERVICES,
"Set the service polling interval for remote services in seconds",
Integer.class,
20),
10),
DO_NOT_CREATE_A_DEFAULT_OBSERVER(
Page.GENERAL,
"Do not create a default observer for a connected digital twin",
Expand Down Expand Up @@ -298,6 +298,11 @@ public enum Setting {
"Launch a DevToolsFX debugging tool for the GUI when in graphical mode",
Map.class,
Map.of()),
LIST_LOCAL_COMMIT_OPERATIONS(
Page.DEBUGGING,
"List local commit/push operations in project team actions",
Boolean.class,
Boolean.FALSE),
CLEAR_WORKSPACE(
Page.RESOURCES,
"Execute to remove all workspaces from the service. This is a destructive operation.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ default boolean hasChangedComparedTo(ServiceStatus statusBeforeChecking) {
|| this.isBusy() != statusBeforeChecking.isBusy()
|| this.isConnected() != statusBeforeChecking.isConnected()
|| this.isOperational() != statusBeforeChecking.isOperational()
|| this.isShutdown() != statusBeforeChecking.isShutdown()
|| !this.getAdvisories().equals(statusBeforeChecking.getAdvisories());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.nio.file.Files;
import java.util.EnumSet;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecuteResultHandler;
Expand All @@ -31,6 +32,7 @@ public abstract class LocalInstanceImpl implements LocalInstance {
protected final Stack.Tag tag;

protected AtomicReference<Status> status = new AtomicReference<>(Status.UNKNOWN);
protected AtomicBoolean stopRequested = new AtomicBoolean(false);
protected DefaultExecutor executor;
protected ExecuteWatchdog watchdog;
protected ExecuteStreamHandler streamHandler;
Expand Down Expand Up @@ -126,8 +128,7 @@ private void monitorAlreadyRunningProcess(long pid) {
.thenAccept(
p -> {
if (this.pid != null && this.pid.equals(p.pid())) {
this.status.set(Status.STOPPED);
cleanupState();
markStopped();
}
});
});
Expand Down Expand Up @@ -161,6 +162,7 @@ public void setStreamHandler(ExecuteStreamHandler streamHandler) {
@Override
public boolean forceRestart(Option... options) {
stop();
waitForStop();
return start(options);
}
Comment thread
greptile-apps[bot] marked this conversation as resolved.

Expand All @@ -170,11 +172,15 @@ public synchronized boolean start(Option... options) {
if (status.get() == Status.RUNNING) {
return true;
}
if (status.get() == Status.WAITING) {
return false;
}

CommandLine commandLine = getCommandLine(product, settings);
if (commandLine == null) {
return false;
}
stopRequested.set(false);

EnumSet<Option> startOptions = EnumSet.noneOf(Option.class);
if (options != null) {
Expand Down Expand Up @@ -220,15 +226,17 @@ protected Process launch(
@Override
public void onProcessComplete(int exitValue) {
super.onProcessComplete(exitValue);
status.set(Status.STOPPED);
cleanupState();
markStopped();
}

@Override
public void onProcessFailed(ExecuteException e) {
super.onProcessFailed(e);
status.set(Status.ERROR);
cleanupState();
if (stopRequested.get()) {
markStopped();
} else {
markError();
}
}
};

Expand Down Expand Up @@ -267,6 +275,11 @@ private void cleanupState() {
@Override
public synchronized boolean stop() {
if (watchdog != null) {
stopRequested.set(true);
status.set(Status.WAITING);
if (pid != null) {
monitorAlreadyRunningProcess(pid);
}
watchdog.destroyProcess();
watchdog = null;
executor = null;
Expand All @@ -276,9 +289,18 @@ public synchronized boolean stop() {
return true;
}
if (pid != null) {
ProcessHandle.of(pid).ifPresent(ProcessHandle::destroy);
cleanupState();
status.set(Status.STOPPED);
var stoppedPid = pid;
var processHandle = ProcessHandle.of(stoppedPid);
if (processHandle.isPresent() && processHandle.get().isAlive()) {
stopRequested.set(true);
status.set(Status.WAITING);
monitorAlreadyRunningProcess(stoppedPid);
if (!processHandle.get().destroy()) {
processHandle.get().destroyForcibly();
}
} else {
markStopped();
}
process = null;
inputStream = null;
outputStream = null;
Expand All @@ -287,6 +309,36 @@ public synchronized boolean stop() {
return false;
}

private void waitForStop() {
long deadline = System.currentTimeMillis() + 10000;
while (status.get() == Status.WAITING && System.currentTimeMillis() < deadline) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}

private synchronized void markStopped() {
stopRequested.set(false);
status.set(Status.STOPPED);
cleanupState();
watchdog = null;
executor = null;
process = null;
}

private synchronized void markError() {
stopRequested.set(false);
status.set(Status.ERROR);
cleanupState();
watchdog = null;
executor = null;
process = null;
}

@Override
public OutputStream getOutputStream() {
return process != null ? outputStream : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,28 @@ public boolean isLocal() {
public boolean shutdown() {
int refCount = monitor.release(this);
if (refCount == 0 && monitor.isLocal()) {
// FIXME needs the admin user scope
return requestShutdown();
}
return false;
}

/**
* Ask the service process to shut down without unregistering this client from status monitoring.
*/
public boolean requestShutdown() {
if (monitor.isLocal()) {
// FIXME needs the admin user scope
return client.withScope(serviceScope).put(ServicesAPI.ADMIN.SHUTDOWN, null, Boolean.class);
}
return false;
}

/** Poll status immediately and return the monitor's current status without unregistering. */
public ServiceStatus refreshStatus() {
return monitor.refreshStatus();
}

@Override
public String declareSessionScope(
SessionScope sessionScope, UserScope userScope, KActorsBehavior behavior) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public enum ServiceClientCatalog {
private final long localPollCycleSeconds = (Integer) Setting.POLLING_INTERVAL_LOCAL.defaultValue;
private final long onlinePollCycleSeconds =
(Integer) Setting.POLLING_INTERVAL_REMOTE.defaultValue;
private static final int REMOTE_FAILURES_BEFORE_OFFLINE = 3;
private final ScheduledExecutorService pollingTasks = Executors.newScheduledThreadPool(10);

/**
Expand All @@ -45,9 +46,10 @@ public class ClientMonitor {
private final KlabService.Type type;
private final AtomicReference<KlabService.ServiceStatus> status;
private final boolean local;
private int consecutiveFailedPolls = 0;
private ScheduledFuture<?> schedule;

private Set<BaseServiceClient> registeredClients = new HashSet<>();
private Set<BaseServiceClient> registeredClients = ConcurrentHashMap.newKeySet();

public Utils.Http.Client getClient() {
return client;
Expand Down Expand Up @@ -112,19 +114,27 @@ void connect() {
}

void timedTasks() {
refreshStatus(true);
}

public KlabService.ServiceStatus refreshStatus() {
return refreshStatus(false);
}

synchronized KlabService.ServiceStatus refreshStatus(boolean notifyListeners) {

// if (settings != null && "off".equals(settings.get(Setting.POLLING, String.class))) {
// return;
// }

if (!client.isAlive()) {
this.status.set(KlabService.ServiceStatus.offline(type, serverId));
return;
}

var statusBeforeChecking = status.get();
try {
readStatus();
var refreshed = readStatus();
if (refreshed) {
consecutiveFailedPolls = 0;
} else if (local || ++consecutiveFailedPolls >= REMOTE_FAILURES_BEFORE_OFFLINE) {
this.status.set(KlabService.ServiceStatus.offline(type, serverId));
}
} finally {

boolean statusHasChanged =
Expand All @@ -140,27 +150,34 @@ void timedTasks() {
serviceClients.put(serverId, this);
}

for (var client : registeredClients) {
for (var listener : client.statusListeners) {
listener.accept(status.get(), statusHasChanged);
if (notifyListeners) {
for (var client : registeredClients) {
for (var listener : client.statusListeners) {
listener.accept(status.get(), statusHasChanged);
}
}
}
}
}

return status.get();
}

void readStatus() {
boolean readStatus() {
var status =
client.get(ServicesAPI.STATUS, ServiceStatusImpl.class, Notification.Mode.Silent);
if (status != null) {
this.status.set(status);
return true;
} else {
this.status.set(KlabService.ServiceStatus.offline(type, serverId));
return false;
}
}

private void close() {
this.schedule.cancel(true);
if (this.schedule != null) {
this.schedule.cancel(true);
}
serviceClients.remove(serverId);
}

Expand Down
Loading
Loading