Skip to content

Commit 31aea97

Browse files
committed
Implement OCPP 2.1 RPC Layer Extensions
Implement the CALLRESULTERROR RPC as a response to a CALLRESULT, in case that failed internal validation, or if the completion action last added to the CompletionStage returned by send() using .whenComplete[Async]() throws any exception. Add an optional confirmationError() callback method to the ClientEvents and ServerEvents interfaces to allow the application to get notified about an incoming CALLRESULTERROR. The application may use the uniqueId parameter to match it to the Request#getOcppMessageId() of one of the requests it last responded to, provided it keeps track of them. Implement the SEND RPC as requests which do not have a confirmation type. These are sent through the existing send() method, returning a CompletableFuture which is already completed when the method returns, to which a completion action can be added to check whether a local exception prevented the request from being sent. Fix the only OCPP message using the SEND RPC, NotifyPeriodicEventStream, and add its missing Feature and Function handlers. The handler for this message is a void method, as it has no response. Remove sending CALLERROR in response to anything other than a CALL, except for the RpcFrameworkError when the message could not be parsed. Simplify the pendingPromises synchronization in Session by using the ConcurrentHashMap class rather than HashMap. Fix a few typos encountered along the way.
1 parent febd153 commit 31aea97

36 files changed

+721
-102
lines changed

ocpp-common/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
*/
44

55
dependencies {
6+
compile 'com.google.code.findbugs:jsr305:3.0.2'
67
compile 'org.slf4j:slf4j-api:2.0.17'
78
compile 'ch.qos.logback:logback-classic:1.3.16'
89
compile group: 'javax.xml.bind', name: 'jaxb-api', version: '2.3.1'

ocpp-common/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@
3838
</scm>
3939

4040
<dependencies>
41+
<dependency>
42+
<groupId>com.google.code.findbugs</groupId>
43+
<artifactId>jsr305</artifactId>
44+
<version>3.0.2</version>
45+
</dependency>
4146
<dependency>
4247
<groupId>javax.xml.bind</groupId>
4348
<artifactId>jaxb-api</artifactId>

ocpp-common/src/main/java/eu/chargetime/ocpp/AsyncPromiseFulfillerDecorator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ of this software and associated documentation files (the "Software"), to deal
3030
import java.util.concurrent.CompletableFuture;
3131
import java.util.concurrent.ExecutorService;
3232
import java.util.concurrent.Executors;
33+
import javax.annotation.Nullable;
3334

3435
public class AsyncPromiseFulfillerDecorator implements PromiseFulfiller {
3536

@@ -43,7 +44,9 @@ public static void setExecutor(ExecutorService newExecutor) {
4344

4445
@Override
4546
public void fulfill(
46-
CompletableFuture<Confirmation> promise, SessionEvents eventHandler, Request request) {
47+
@Nullable CompletableFuture<Confirmation> promise,
48+
SessionEvents eventHandler,
49+
Request request) {
4750
executor.submit(() -> promiseFulfiller.fulfill(promise, eventHandler, request));
4851
}
4952

ocpp-common/src/main/java/eu/chargetime/ocpp/Client.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ of this software and associated documentation files (the "Software"), to deal
3232
import java.util.Optional;
3333
import java.util.UUID;
3434
import java.util.concurrent.CompletableFuture;
35+
import java.util.concurrent.CompletionException;
36+
import javax.annotation.Nullable;
3537
import org.slf4j.Logger;
3638
import org.slf4j.LoggerFactory;
3739

@@ -71,11 +73,23 @@ public void connect(String uri, ClientEvents events) {
7173
new SessionEvents() {
7274

7375
@Override
74-
public void handleConfirmation(String uniqueId, Confirmation confirmation) {
76+
public void handleConfirmation(String uniqueId, @Nullable Confirmation confirmation) {
7577
Optional<CompletableFuture<Confirmation>> promiseOptional =
7678
promiseRepository.getPromise(uniqueId);
7779
if (promiseOptional.isPresent()) {
7880
promiseOptional.get().complete(confirmation);
81+
// join completion to catch and rethrow any exceptions thrown in the last added
82+
// completion action, so that a CALLRESULTERROR may be produced from it.
83+
try {
84+
promiseOptional.get().join();
85+
} catch (CompletionException e) {
86+
Throwable cause = e.getCause() != null ? e.getCause() : e;
87+
if (cause instanceof RuntimeException) {
88+
throw (RuntimeException) cause;
89+
} else {
90+
throw new RuntimeException(cause);
91+
}
92+
}
7993
} else {
8094
logger.debug("Promise not found for confirmation {}", confirmation);
8195
}
@@ -112,6 +126,18 @@ public void handleError(
112126
}
113127
}
114128

129+
@Override
130+
public void handleConfirmationError(
131+
String uniqueId, String errorCode, String errorDescription, Object payload) {
132+
logger.error(
133+
"Received an error which occurred while processing a call result: "
134+
+ "uniqueId {}: errorCode: {}, errorDescription: {}",
135+
uniqueId,
136+
errorCode,
137+
errorDescription);
138+
events.confirmationError(uniqueId, errorCode, errorDescription, payload);
139+
}
140+
115141
@Override
116142
public void handleConnectionClosed() {
117143
if (events != null) events.connectionClosed();
@@ -137,7 +163,8 @@ public void disconnect() {
137163
* Send a {@link Request} to the server. Can only send {@link Request} that the client supports.
138164
*
139165
* @param request outgoing request
140-
* @return call back object, will be fulfilled with confirmation when received
166+
* @return call back object, will be fulfilled with confirmation when received or {@code null} if
167+
* the request has no confirmation, or exceptionally if a local or remote error occurred.
141168
* @throws UnsupportedFeatureException trying to send a request from an unsupported feature
142169
* @throws OccurenceConstraintException Thrown if the request isn't valid.
143170
* @see CompletableFuture
@@ -166,7 +193,11 @@ public CompletableFuture<Confirmation> send(Request request)
166193
promiseRepository.removePromise(requestUuid);
167194
});
168195

169-
session.sendRequest(featureOptional.get().getAction(), request, requestUuid);
196+
if (featureOptional.get().getConfirmationType() != null) {
197+
session.sendRequest(featureOptional.get().getAction(), request, requestUuid);
198+
} else {
199+
session.sendMessage(featureOptional.get().getAction(), request, requestUuid);
200+
}
170201
return promise;
171202
}
172203

ocpp-common/src/main/java/eu/chargetime/ocpp/ClientEvents.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,7 @@ public interface ClientEvents {
3131
void connectionOpened();
3232

3333
void connectionClosed();
34+
35+
default void confirmationError(
36+
String uniqueId, String errorCode, String errorDescription, Object payload) {};
3437
}

ocpp-common/src/main/java/eu/chargetime/ocpp/Communicator.java

Lines changed: 92 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ of this software and associated documentation files (the "Software"), to deal
4141
public abstract class Communicator {
4242
private static final Logger logger = LoggerFactory.getLogger(Communicator.class);
4343

44+
private final ArrayDeque<Object> transactionQueue;
4445
private RetryRunner retryRunner;
4546
protected Radio radio;
46-
private ArrayDeque<Object> transactionQueue;
4747
private CommunicatorEvents events;
4848
private boolean failedFlag;
4949

@@ -97,6 +97,27 @@ public abstract class Communicator {
9797
protected abstract Object makeCallError(
9898
String uniqueId, String action, String errorCode, String errorDescription);
9999

100+
/**
101+
* Create a call result error envelope to transmit.
102+
*
103+
* @param uniqueId the id the receiver expects.
104+
* @param errorCode an OCPP error code.
105+
* @param errorDescription an associated error description.
106+
* @return a fully packed message ready to send.
107+
*/
108+
protected abstract Object makeCallResultError(
109+
String uniqueId, String action, String errorCode, String errorDescription);
110+
111+
/**
112+
* Create a send envelope to transmit to the server.
113+
*
114+
* @param uniqueId the id of the message.
115+
* @param action action name of the feature.
116+
* @param payload packed payload.
117+
* @return a fully packed message ready to send.
118+
*/
119+
protected abstract Object makeSend(String uniqueId, String action, Object payload);
120+
100121
/**
101122
* Identify an incoming call and parse it into one of the following: {@link CallMessage} a
102123
* request. {@link CallResultMessage} a response.
@@ -177,7 +198,7 @@ public synchronized void sendCall(String uniqueId, String action, Request reques
177198
}
178199
} else if (request.transactionRelated()
179200
&& transactionQueue != null
180-
&& transactionQueue.size() > 0) {
201+
&& !transactionQueue.isEmpty()) {
181202
transactionQueue.add(call);
182203
processTransactionQueue();
183204
} else {
@@ -216,7 +237,7 @@ public void sendCallResult(String uniqueId, String action, Confirmation confirma
216237
events.onError(
217238
uniqueId,
218239
"ConfirmationCompletedHandlerFailed",
219-
"The confirmation completed callback handler failed with exception " + e.toString(),
240+
"The confirmation completed callback handler failed with exception " + e,
220241
confirmation);
221242
}
222243
}
@@ -240,7 +261,7 @@ public void sendCallResult(String uniqueId, String action, Confirmation confirma
240261
public void sendCallError(
241262
String uniqueId, String action, String errorCode, String errorDescription) {
242263
logger.error(
243-
"An error occurred. Sending this information: uniqueId {}: action: {}, errorCore: {}, errorDescription: {}",
264+
"An error occurred. Sending this information: uniqueId {}: action: {}, errorCode: {}, errorDescription: {}",
244265
uniqueId,
245266
action,
246267
errorCode,
@@ -257,6 +278,65 @@ public void sendCallError(
257278
}
258279
}
259280

281+
/**
282+
* Send a call result error. If offline, the message is thrown away.
283+
*
284+
* @param uniqueId the id the receiver expects a response to.
285+
* @param errorCode an OCPP error Code
286+
* @param errorDescription a associated error description.
287+
*/
288+
public void sendCallResultError(
289+
String uniqueId, String action, String errorCode, String errorDescription) {
290+
logger.error(
291+
"An error occurred while processing a call result. Sending this information: "
292+
+ "uniqueId {}: action: {}, errorCode: {}, errorDescription: {}",
293+
uniqueId,
294+
action,
295+
errorCode,
296+
errorDescription);
297+
try {
298+
radio.send(makeCallResultError(uniqueId, action, errorCode, errorDescription));
299+
} catch (NotConnectedException ex) {
300+
logger.warn("sendCallResultError() failed", ex);
301+
events.onError(
302+
uniqueId,
303+
"Not connected",
304+
"The call result error couldn't be sent due to the lack of connection",
305+
errorCode);
306+
}
307+
}
308+
309+
/**
310+
* Send a {@link Request} which has no confirmation.
311+
*
312+
* @param uniqueId the id of the {@link Request}.
313+
* @param action action name of the {@link eu.chargetime.ocpp.feature.Feature}.
314+
* @param request the outgoing {@link Request}
315+
*/
316+
public synchronized void send(String uniqueId, String action, Request request) {
317+
Object call = makeSend(uniqueId, action, packPayload(request));
318+
319+
try {
320+
if (radio.isClosed()) {
321+
logger.warn("Not connected: can't send request: {}", request);
322+
events.onError(
323+
uniqueId,
324+
"Not connected",
325+
"The request can't be sent due to the lack of connection",
326+
request);
327+
} else {
328+
radio.send(call);
329+
}
330+
} catch (NotConnectedException ex) {
331+
logger.warn("sendCall() failed: not connected");
332+
events.onError(
333+
uniqueId,
334+
"Not connected",
335+
"The request can't be sent due to the lack of connection",
336+
request);
337+
}
338+
}
339+
260340
/** Close down the connection. Uses the {@link Transmitter}. */
261341
public void disconnect() {
262342
radio.disconnect();
@@ -289,6 +369,10 @@ public void receivedMessage(Object input) {
289369
Message message = parse(input);
290370
if (message instanceof CallResultMessage) {
291371
events.onCallResult(message.getId(), message.getAction(), message.getPayload());
372+
} else if (message instanceof CallResultErrorMessage) {
373+
CallResultErrorMessage call = (CallResultErrorMessage) message;
374+
events.onCallResultError(
375+
call.getId(), call.getErrorCode(), call.getErrorDescription(), call.getRawPayload());
292376
} else if (message instanceof CallErrorMessage) {
293377
failedFlag = true;
294378
CallErrorMessage call = (CallErrorMessage) message;
@@ -297,6 +381,9 @@ public void receivedMessage(Object input) {
297381
} else if (message instanceof CallMessage) {
298382
CallMessage call = (CallMessage) message;
299383
events.onCall(call.getId(), call.getAction(), call.getPayload());
384+
} else if (message instanceof SendMessage) {
385+
SendMessage send = (SendMessage) message;
386+
events.onSend(send.getId(), send.getAction(), send.getPayload());
300387
}
301388
}
302389

@@ -318,7 +405,7 @@ private Object getRetryMessage() {
318405
}
319406

320407
/**
321-
* Check if a error message was received.
408+
* Check if an error message was received.
322409
*
323410
* @return whether a fail flag has been raised.
324411
*/

ocpp-common/src/main/java/eu/chargetime/ocpp/CommunicatorEvents.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,31 @@ public interface CommunicatorEvents {
6666
*/
6767
void onError(String id, String errorCode, String errorDescription, Object payload);
6868

69+
/**
70+
* Handle call result error.
71+
*
72+
* <p>Hint: Use the id to identify the original call result. You can use {@link Communicator}s
73+
* unpackPayload method.
74+
*
75+
* @param id unique id used to identify the original call result.
76+
* @param errorCode short text to categorize the error.
77+
* @param errorDescription a longer text to describe the error.
78+
* @param payload Object payload attached to the error.
79+
*/
80+
void onCallResultError(String id, String errorCode, String errorDescription, Object payload);
81+
82+
/**
83+
* Handle send.
84+
*
85+
* <p>Hint: Use the action name to identify the feature, you can then choose to use {@link
86+
* Communicator}s unpackPayload method.
87+
*
88+
* @param id unique id.
89+
* @param action action name used to identify the feature.
90+
* @param payload raw payload.
91+
*/
92+
void onSend(String id, String action, Object payload);
93+
6994
/** The connection was disconnected. */
7095
void onDisconnected();
7196

ocpp-common/src/main/java/eu/chargetime/ocpp/FeatureRepository.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,9 @@ public void addFeatureProfile(Profile profile) {
7373
public void addFeature(Feature feature) {
7474
actionMap.put(feature.getAction(), feature);
7575
classMap.put(feature.getRequestType(), feature);
76-
classMap.put(feature.getConfirmationType(), feature);
76+
if (feature.getConfirmationType() != null) {
77+
classMap.put(feature.getConfirmationType(), feature);
78+
}
7779
}
7880

7981
/**

ocpp-common/src/main/java/eu/chargetime/ocpp/ISession.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,5 +47,7 @@ public interface ISession {
4747
boolean completePendingPromise(String id, Confirmation confirmation)
4848
throws UnsupportedFeatureException, OccurenceConstraintException;
4949

50+
void sendMessage(String action, Request payload, String uuid);
51+
5052
void close();
5153
}

0 commit comments

Comments
 (0)