Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024 the Eclipse Milo Authors
* Copyright (c) 2025 the Eclipse Milo Authors
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
Expand All @@ -11,9 +11,17 @@
package org.eclipse.milo.opcua.sdk.client.session;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.OpcUaClientConfig;
import org.eclipse.milo.opcua.sdk.client.OpcUaSession;
import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
import org.eclipse.milo.opcua.sdk.test.TestClient;
import org.eclipse.milo.opcua.sdk.test.TestServer;
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
import org.eclipse.milo.opcua.stack.core.transport.TransportProfile;
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
Expand Down Expand Up @@ -47,4 +55,58 @@ public void testCloseSessionWhileInactive() throws Exception {

assertNotNull(sessionFsm.closeSession().get());
}

/**
* Verify that SessionFuture instances are properly completed with an exception when
* closeSession() is called in the CreatingWait state.
*/
@Test
public void testCloseSessionCompletesSessionFutureInCreatingWait() throws Exception {
OpcUaServer server = TestServer.create().getServer();
server.startup().get();

OpcUaClient client = TestClient.create(server, cfg -> {});

server.shutdown().get();
client.connectAsync();

SessionFsm sessionFsm = client.getSessionFsm();
while (sessionFsm.getState() != State.CreatingWait) {
//noinspection BusyWait
Thread.sleep(100);
}

CompletableFuture<OpcUaSession> sessionFuture = sessionFsm.getSession();
sessionFsm.closeSession();

assertThrows(ExecutionException.class, () -> sessionFuture.get(5, TimeUnit.SECONDS));
}

/**
* Verify that SessionFuture instances are properly completed with an exception when
* closeSession() is called in the ReactivatingWait state.
*/
@Test
public void testCloseSessionCompletesSessionFutureInReactivatingWait() throws Exception {
OpcUaServer server = TestServer.create().getServer();
server.startup().get();

OpcUaClient client = TestClient.create(server, cfg -> {});
client.connect();

Thread.sleep(1000);

server.shutdown().get();

SessionFsm sessionFsm = client.getSessionFsm();
while (sessionFsm.getState() != State.ReactivatingWait) {
//noinspection BusyWait
Thread.sleep(100);
}

CompletableFuture<OpcUaSession> sessionFuture = sessionFsm.getSession();
sessionFsm.closeSession();

assertThrows(ExecutionException.class, () -> sessionFuture.get(5, TimeUnit.SECONDS));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,15 @@ public CompletableFuture<OpcUaSession> getSessionAsync() {
return sessionFsm.getSession();
}

/**
* Get the {@link SessionFsm} managing this client's session lifecycle.
*
* @return the {@link SessionFsm} for this client.
*/
public SessionFsm getSessionFsm() {
return sessionFsm;
}

public OpcUaClientConfig getConfig() {
return config;
}
Expand Down

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the copyright in this file be updated?

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import com.digitalpetri.fsm.Fsm;
import com.digitalpetri.fsm.FsmContext;
import com.digitalpetri.netty.fsm.ChannelFsm;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -93,6 +94,15 @@ public CompletableFuture<OpcUaSession> getSession() {
}
}

/**
* Get the current state of the SessionFsm.
*
* @return the current {@link State} of the SessionFsm.
*/
public State getState() {
return fsm.getState();
}

public void addInitializer(SessionInitializer initializer) {
sessionInitializers.add(initializer);
}
Expand Down Expand Up @@ -129,6 +139,9 @@ public void removeActivityListener(SessionActivityListener listener) {
static final FsmContext.Key<ScheduledFuture> KEY_KEEP_ALIVE_SCHEDULED_FUTURE =
new FsmContext.Key<>("keepAliveScheduledFuture", ScheduledFuture.class);

static final FsmContext.Key<ChannelFsm.TransitionListener> KEY_CHANNEL_FSM_TRANSITION_LISTENER =
new FsmContext.Key<>("channelFsmTransitionListener", ChannelFsm.TransitionListener.class);

static final FsmContext.Key<SessionInitializers> KEY_SESSION_INITIALIZERS =
new FsmContext.Key<>("sessionInitializers", SessionInitializers.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.eclipse.milo.opcua.sdk.client.session.SessionFsm.KEY_CHANNEL_FSM_TRANSITION_LISTENER;
import static org.eclipse.milo.opcua.sdk.client.session.SessionFsm.KEY_CLOSE_FUTURE;
import static org.eclipse.milo.opcua.sdk.client.session.SessionFsm.KEY_KEEP_ALIVE_FAILURE_COUNT;
import static org.eclipse.milo.opcua.sdk.client.session.SessionFsm.KEY_KEEP_ALIVE_SCHEDULED_FUTURE;
Expand Down Expand Up @@ -240,6 +241,9 @@ private static void configureCreatingWaitState(

KEY_WAIT_TIME.remove(ctx);

handleFailureToOpenSession(
client, ctx, new UaException(StatusCodes.Bad_SessionClosed));

Event.CloseSession event = (Event.CloseSession) ctx.event();

client
Expand Down Expand Up @@ -630,7 +634,7 @@ private static void configureActiveState(FsmBuilder<State, Event> fb, OpcUaClien
if (transport instanceof OpcTcpClientTransport) {
ChannelFsm channelFsm = ((OpcTcpClientTransport) transport).getChannelFsm();

channelFsm.addTransitionListener(
ChannelFsm.TransitionListener listener =
new ChannelFsm.TransitionListener() {
@Override
public void onStateTransition(
Expand All @@ -641,8 +645,6 @@ public void onStateTransition(
if (from == com.digitalpetri.netty.fsm.State.Connected
&& to != com.digitalpetri.netty.fsm.State.Connected) {

channelFsm.removeTransitionListener(this);

try (MDCCloseable ignored =
MDC.putCloseable("instance-id", ctx.getUserContext().toString())) {

Expand All @@ -653,7 +655,10 @@ public void onStateTransition(
ctx.fireEvent(new Event.ConnectionLost());
}
}
});
};

channelFsm.addTransitionListener(listener);
KEY_CHANNEL_FSM_TRANSITION_LISTENER.set(ctx, listener);
}

client
Expand All @@ -669,7 +674,7 @@ public void onStateTransition(
.execute(FsmContext::processShelvedEvents);

fb.onTransitionFrom(State.Active)
.to(s -> s == State.Closing || s == State.ReactivatingWait)
.to(s -> s != State.Active)
.viaAny()
.execute(
ctx -> {
Expand All @@ -678,6 +683,16 @@ public void onStateTransition(
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
}

ChannelFsm.TransitionListener listener =
KEY_CHANNEL_FSM_TRANSITION_LISTENER.remove(ctx);

if (listener != null) {
OpcClientTransport clientTransport = client.getTransport();
if (clientTransport instanceof OpcTcpClientTransport tcpClientTransport) {
tcpClientTransport.getChannelFsm().removeTransitionListener(listener);
}
}
});

// onSessionActive() callbacks
Expand Down Expand Up @@ -936,6 +951,9 @@ private static void configureReactivatingWaitState(
}

KEY_WAIT_TIME.remove(ctx);

handleFailureToOpenSession(
client, ctx, new UaException(StatusCodes.Bad_SessionClosed));
});

/* Internal Transition Actions */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

package org.eclipse.milo.opcua.sdk.client.session;

enum State {
public enum State {
Inactive,
CreatingWait,
Creating,
Expand Down
Loading