Skip to content

Commit 61a5e42

Browse files
committed
Added event handling support for the server.
1 parent a8e0b9f commit 61a5e42

File tree

7 files changed

+156
-23
lines changed

7 files changed

+156
-23
lines changed

ocpp-common/src/main/java/eu/chargetime/ocpp/Communicator.java

Lines changed: 39 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -114,29 +114,16 @@ public Communicator(Transmitter transmitter) {
114114
* @param events handler for call back events.
115115
*/
116116
public void connect(String uri, CommunicatorEvents events) {
117-
transmitter.connect(uri, new TransmitterEvents() {
118-
@Override
119-
public void connected() {
120-
events.onConnected();
121-
}
122-
123-
@Override
124-
public void receivedMessage(String input) {
125-
System.out.println(input);
126-
Message message = parse(input);
127-
if(message instanceof CallResultMessage) {
128-
events.onCallResult(message.getId(), message.getPayload());
129-
} else if (message instanceof CallMessage) {
130-
CallMessage call = (CallMessage)message;
131-
events.onCall(call.getId(), call.getAction(), call.getPayload());
132-
}
133-
}
117+
transmitter.connect(uri, new EventHandler(events));
118+
}
134119

135-
@Override
136-
public void disconnected() {
137-
events.onDisconnected();
138-
}
139-
});
120+
/**
121+
* Use the injected {@link Transmitter} to accept a client.
122+
*
123+
* @param events handler for call back events.
124+
*/
125+
public void accept(CommunicatorEvents events) {
126+
transmitter.accept(new EventHandler(events));
140127
}
141128

142129
/**
@@ -177,4 +164,34 @@ public void sendCallError(String uniqueId, String errorCode, String errorDescrip
177164
public void disconnect() {
178165
transmitter.disconnect();
179166
}
167+
168+
private class EventHandler implements TransmitterEvents {
169+
private final CommunicatorEvents events;
170+
171+
public EventHandler(CommunicatorEvents events) {
172+
this.events = events;
173+
}
174+
175+
@Override
176+
public void connected() {
177+
events.onConnected();
178+
}
179+
180+
@Override
181+
public void receivedMessage(String input) {
182+
System.out.println(input);
183+
Message message = parse(input);
184+
if (message instanceof CallResultMessage) {
185+
events.onCallResult(message.getId(), message.getPayload());
186+
} else if (message instanceof CallMessage) {
187+
CallMessage call = (CallMessage) message;
188+
events.onCall(call.getId(), call.getAction(), call.getPayload());
189+
}
190+
}
191+
192+
@Override
193+
public void disconnected() {
194+
events.onDisconnected();
195+
}
196+
}
180197
}

ocpp-common/src/main/java/eu/chargetime/ocpp/Server.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,71 @@ public abstract class Server extends FeatureHandler {
4242

4343
private ArrayList<Session> sessions;
4444

45+
/**
46+
*
47+
*/
4548
public Server() {
4649
this.sessions = new ArrayList<>();
4750
}
4851

52+
/**
53+
* Start listening for clients.
54+
*
55+
* @param listener Inject the listener.
56+
* @param serverEvents Callback handler for server specific events.
57+
*/
4958
public void open(Listener listener, ServerEvents serverEvents) {
5059
listener.open(session -> {
60+
session.accept(new SessionEvents() {
61+
@Override
62+
public Feature findFeatureByAction(String action) {
63+
return findFeature(action);
64+
}
65+
66+
@Override
67+
public Feature findFeatureByRequest(Request request) {
68+
return findFeature(request);
69+
}
70+
71+
@Override
72+
public void handleConfirmation(String uniqueId, Confirmation confirmation) {
73+
getPromise(uniqueId).complete(confirmation);
74+
}
75+
76+
@Override
77+
public Confirmation handleRequest(Request request) {
78+
Feature feature = findFeatureByRequest(request);
79+
return feature.handleRequest(sessions.indexOf(session), request);
80+
}
81+
82+
@Override
83+
public void handleError(String uniqueId) {
84+
85+
}
86+
87+
@Override
88+
public void handleConnectionClosed() {
89+
90+
}
91+
92+
@Override
93+
public void handleConnectionOpened() {
94+
95+
}
96+
});
5197
sessions.add(session);
5298
serverEvents.newSession(sessions.indexOf(session));
5399
});
54100
}
55101

102+
/**
103+
* Send a message to a client.
104+
*
105+
* @param sessionIndex Session index of the client.
106+
* @param request Request for the client.
107+
* @return Callback handler for when the client responds.
108+
* @throws UnsupportedFeatureException Thrown if the feature isn't amoung the list of supported featured.
109+
*/
56110
public CompletableFuture<Confirmation> send(int sessionIndex, Request request) throws UnsupportedFeatureException {
57111
Feature feature = findFeature(request);
58112
if (feature == null)

ocpp-common/src/main/java/eu/chargetime/ocpp/Session.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,11 @@ public void close() {
103103
communicator.disconnect();
104104
}
105105

106+
public void accept(SessionEvents eventHandler) {
107+
this.events = eventHandler;
108+
communicator.accept(new CommunicatorEventHandler());
109+
}
110+
106111
private class CommunicatorEventHandler implements CommunicatorEvents {
107112
@Override
108113
public void onCallResult(String id, String payload) {

ocpp-common/src/main/java/eu/chargetime/ocpp/Transmitter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,11 @@ public interface Transmitter
5151
* @param message test message to send.
5252
*/
5353
void send(String message);
54+
55+
/**
56+
* Accept an incoming connection request.
57+
*
58+
* @param events connection related events.
59+
*/
60+
void accept(TransmitterEvents events);
5461
}

ocpp-common/src/test/java/eu/chargetime/ocpp/test/ServerTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ public void setup() {
6161
request = () -> false;
6262
doReturn(request.getClass()).when(feature).getRequestType();
6363
doReturn(TestConfirmation.class).when(feature).getConfirmationType();
64+
when(feature.getAction()).thenReturn(null);
6465
doAnswer(invocation -> listenerEvents = invocation.getArgumentAt(0, ListenerEvents.class)).when(listener).open(any());
66+
doAnswer(invocation -> sessionEvents = invocation.getArgumentAt(0, SessionEvents.class)).when(session).accept(any());
6567

6668
server = new Server() {
6769
};
@@ -70,6 +72,18 @@ public void setup() {
7072
server.addFeatureProfile(profile);
7173
}
7274

75+
@Test
76+
public void newSession_serverIsListening_sessionIsAccepted() {
77+
// Given
78+
server.open(listener, serverEvents);
79+
80+
// When
81+
listenerEvents.newSession(session);
82+
83+
// Then
84+
verify(session, times(1)).accept(any());
85+
}
86+
7387
@Test
7488
public void newSession_serverIsListening_callbackWithIndex0() {
7589
// Given
@@ -99,4 +113,17 @@ public void send_aMessage_isCommunicated() throws Exception {
99113
verify(session, times(1)).sendRequest(anyString(), eq(request));
100114
}
101115

116+
@Test
117+
public void handleRequest_callsFeatureHandleRequest() {
118+
// Given
119+
server.open(listener, serverEvents);
120+
listenerEvents.newSession(session);
121+
122+
// When
123+
sessionEvents.handleRequest(request);
124+
125+
// Then
126+
verify(feature, times(1)).handleRequest(eq(0), eq(request));
127+
}
128+
102129
}

ocpp-common/src/test/java/eu/chargetime/ocpp/test/SessionTest.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,25 @@ public void onCall_unknownAction_callSendCallError() {
204204
// When
205205
eventHandler.onCall(someId, null, null);
206206

207-
// then
207+
// Then
208208
verify(communicator, times(1)).sendCallError(eq(someId), anyString(), anyString());
209209
}
210+
211+
@Test
212+
public void onCall_sessionAccepted_callIsForwarded() throws Exception {
213+
// Given
214+
String someId = "Some id";
215+
session.accept(sessionEvents);
216+
when(communicator.unpackPayload(any(), any())).thenReturn(new TestRequest());
217+
218+
// When
219+
eventHandler.onCall(someId, null, null);
220+
try {
221+
Thread.sleep(10);
222+
} catch (Exception ex) {
223+
} // TODO make async invoker injectable
224+
225+
// Then
226+
verify(sessionEvents, times(1)).handleRequest(any());
227+
}
210228
}

ocpp-v1_6/src/main/java/eu/chargetime/ocpp/WebSocketTransmitter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,9 @@ public void disconnect()
8888
public void send(String request) {
8989
client.send(request);
9090
}
91+
92+
@Override
93+
public void accept(TransmitterEvents events) {
94+
95+
}
9196
}

0 commit comments

Comments
 (0)