Skip to content

Commit 42f3238

Browse files
committed
move DocileConnection to its own file
1 parent d65bdf4 commit 42f3238

File tree

2 files changed

+193
-182
lines changed

2 files changed

+193
-182
lines changed
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
package chargepoint.docile.dsl
2+
3+
import java.net.URI
4+
5+
import com.thenewmotion.ocpp.{Version, Version1X, VersionFamily}
6+
import com.thenewmotion.ocpp.json.api.{Ocpp1XJsonClient, Ocpp20JsonClient, OcppException, OcppJsonClient, RequestHandler}
7+
import com.thenewmotion.ocpp.messages.{ReqRes, Request, Response}
8+
import com.thenewmotion.ocpp.messages.v1x.{CentralSystemReq, CentralSystemReqRes, CentralSystemRes, ChargePointReq, ChargePointReqRes, ChargePointRes}
9+
import com.thenewmotion.ocpp.messages.v20.{CsReqRes, CsRequest, CsResponse, CsmsReqRes, CsmsRequest, CsmsResponse}
10+
import com.typesafe.scalalogging.Logger
11+
import javax.net.ssl.SSLContext
12+
import org.slf4j.LoggerFactory
13+
14+
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
15+
import scala.concurrent.duration._
16+
import scala.util.{Failure, Success}
17+
18+
trait DocileConnection[
19+
VFam <: VersionFamily,
20+
VersionBound <: Version, // shouldn't be necessary. we need some type level function from versionfamily to this
21+
OutgoingReqBound <: Request,
22+
IncomingResBound <: Response,
23+
OutgoingReqRes[_ <: OutgoingReqBound, _ <: IncomingResBound] <: ReqRes[_, _],
24+
IncomingReqBound <: Request,
25+
OutgoingResBound <: Response,
26+
IncomingReqRes[_ <: IncomingReqBound, _ <: OutgoingResBound] <: ReqRes[_, _]
27+
] extends MessageLogging {
28+
29+
val receivedMsgManager: ReceivedMsgManager[OutgoingReqBound, IncomingResBound, OutgoingReqRes, IncomingReqBound, OutgoingResBound, IncomingReqRes] =
30+
new ReceivedMsgManager[OutgoingReqBound, IncomingResBound, OutgoingReqRes, IncomingReqBound, OutgoingResBound, IncomingReqRes]()
31+
32+
var incomingMessageHandlerStack: List[GenericIncomingMessageProcessor[
33+
OutgoingReqBound, IncomingResBound, OutgoingReqRes, IncomingReqBound, OutgoingResBound, IncomingReqRes, _
34+
]] = List()
35+
36+
var ocppClient: Option[OcppJsonClient[
37+
VFam,
38+
OutgoingReqBound,
39+
IncomingResBound,
40+
OutgoingReqRes,
41+
IncomingReqBound,
42+
OutgoingResBound,
43+
IncomingReqRes
44+
]] = None
45+
46+
// TODO handle this more gently. The identity should be known after the script is started, regardless of whether the connection was established or not.
47+
// that also has to do with being able to renew connections or disconnect and reconnect while executing a script
48+
def chargePointIdentity: String = connectedCpId.getOrElse(sys.error("Asked for charge point ID on not yet connected DocileConnection"))
49+
50+
var connectedCpId: Option[String] = None
51+
52+
private val connectionLogger = Logger(LoggerFactory.getLogger("connection"))
53+
54+
final def pushIncomingMessageHandler(handler: GenericIncomingMessageProcessor[OutgoingReqBound, IncomingResBound, OutgoingReqRes, IncomingReqBound, OutgoingResBound, IncomingReqRes, _]): Unit = {
55+
incomingMessageHandlerStack = handler :: incomingMessageHandlerStack
56+
}
57+
58+
final def popIncomingMessageHandler(): Unit = {
59+
incomingMessageHandlerStack = incomingMessageHandlerStack.tail
60+
}
61+
62+
def connect(chargePointId: String,
63+
endpoint: URI,
64+
version: VersionBound,
65+
authKey: Option[String]
66+
)(implicit executionContext: ExecutionContext, sslContext: SSLContext): Unit = {
67+
68+
connectedCpId = Some(chargePointId)
69+
70+
connectionLogger.info(s"Connecting to OCPP v${version.name} endpoint $endpoint")
71+
72+
val connection = createClient(chargePointId, endpoint, version, authKey)(executionContext, sslContext) {
73+
new RequestHandler[IncomingReqBound, OutgoingResBound, IncomingReqRes] {
74+
def apply[REQ <: IncomingReqBound, RES <: OutgoingResBound](req: REQ)(implicit reqRes: IncomingReqRes[REQ, RES], ec: ExecutionContext): Future[RES] = {
75+
76+
incomingLogger.info(s"$req")
77+
78+
val responsePromise = Promise[OutgoingResBound]()
79+
80+
def respond(res: OutgoingResBound): Unit = {
81+
outgoingLogger.info(s"$res")
82+
responsePromise.success(res)
83+
()
84+
}
85+
86+
receivedMsgManager.enqueue(
87+
GenericIncomingMessage[OutgoingReqBound, IncomingResBound, OutgoingReqRes, IncomingReqBound, OutgoingResBound, IncomingReqRes](req, respond _)
88+
)
89+
90+
// TODO nicer conversion?
91+
responsePromise.future.map(_.asInstanceOf[RES])(ec)
92+
}
93+
}
94+
}
95+
96+
connection.onClose.foreach { _ =>
97+
connectionLogger.info(s"Gracefully disconnected from endpoint $endpoint")
98+
ocppClient = None
99+
}(executionContext)
100+
101+
ocppClient = Some(connection)
102+
}
103+
104+
def sendRequestAndManageResponse[REQ <: OutgoingReqBound](req: REQ)(
105+
implicit reqRes: OutgoingReqRes[REQ, _ <: IncomingResBound],
106+
executionContext: ExecutionContext
107+
): Unit =
108+
ocppClient match {
109+
case None =>
110+
throw ExpectationFailed("Trying to send an OCPP message while not connected")
111+
case Some(client) =>
112+
outgoingLogger.info(s"$req")
113+
client.send(req)(reqRes) onComplete {
114+
case Success(res) =>
115+
incomingLogger.info(s"$res")
116+
receivedMsgManager.enqueue(
117+
GenericIncomingMessage[OutgoingReqBound, IncomingResBound, OutgoingReqRes, IncomingReqBound, OutgoingResBound, IncomingReqRes](res)
118+
)
119+
case Failure(OcppException(ocppError)) =>
120+
incomingLogger.info(s"$ocppError")
121+
receivedMsgManager.enqueue(
122+
GenericIncomingMessage[OutgoingReqBound, IncomingResBound, OutgoingReqRes, IncomingReqBound, OutgoingResBound, IncomingReqRes](ocppError)
123+
)
124+
case Failure(e) =>
125+
connectionLogger.error(s"Failed to get response to outgoing OCPP request $req: ${e.getMessage}\n\t${e.getStackTrace.mkString("\n\t")}")
126+
throw ExecutionError(e)
127+
}
128+
}
129+
130+
/** Template method to be implemented by version-specific extending classes to establish a connection for that
131+
* version of OCPP */
132+
protected def createClient(
133+
chargePointId: String,
134+
endpoint: URI,
135+
version: VersionBound,
136+
authKey: Option[String]
137+
)(implicit executionContext: ExecutionContext, sslContext: SSLContext): RequestHandler[IncomingReqBound, OutgoingResBound, IncomingReqRes] => OcppJsonClient[
138+
VFam,
139+
OutgoingReqBound,
140+
IncomingResBound,
141+
OutgoingReqRes,
142+
IncomingReqBound,
143+
OutgoingResBound,
144+
IncomingReqRes
145+
]
146+
147+
148+
def disconnect(): Unit = ocppClient.foreach { conn =>
149+
Await.result(conn.close(), 45.seconds)
150+
}
151+
}
152+
153+
object DocileConnection {
154+
def forVersion1x(): DocileConnection[
155+
VersionFamily.V1X.type,
156+
Version1X,
157+
CentralSystemReq,
158+
CentralSystemRes,
159+
CentralSystemReqRes,
160+
ChargePointReq,
161+
ChargePointRes,
162+
ChargePointReqRes
163+
] = {
164+
new DocileConnection[VersionFamily.V1X.type, Version1X, CentralSystemReq, CentralSystemRes, CentralSystemReqRes, ChargePointReq, ChargePointRes, ChargePointReqRes] {
165+
type VersionBound = Version1X
166+
167+
override protected def createClient(chargePointId: String, endpoint: URI, version: Version1X, authKey: Option[String])(implicit executionContext: ExecutionContext, sslContext: SSLContext): RequestHandler[ChargePointReq, ChargePointRes, ChargePointReqRes] => Ocpp1XJsonClient = { reqHandler =>
168+
OcppJsonClient.forVersion1x(chargePointId, endpoint, List(version), authKey)(reqHandler)(executionContext, sslContext)
169+
}
170+
}
171+
}
172+
173+
def forVersion20(): DocileConnection[
174+
VersionFamily.V20.type,
175+
Version.V20.type,
176+
CsmsRequest,
177+
CsmsResponse,
178+
CsmsReqRes,
179+
CsRequest,
180+
CsResponse,
181+
CsReqRes
182+
] = {
183+
new DocileConnection[VersionFamily.V20.type, Version.V20.type, CsmsRequest, CsmsResponse, CsmsReqRes, CsRequest, CsResponse, CsReqRes] {
184+
type VersionBound = Version.V20.type
185+
186+
override protected def createClient(chargePointId: String, endpoint: URI, version: Version.V20.type, authKey: Option[String])(implicit executionContext: ExecutionContext, sslContext: SSLContext): RequestHandler[CsRequest, CsResponse, CsReqRes] => Ocpp20JsonClient = { reqHandler =>
187+
OcppJsonClient.forVersion20(chargePointId, endpoint, authKey)(reqHandler)(executionContext, sslContext)
188+
}
189+
}
190+
}
191+
}

core/src/main/scala/chargepoint/docile/dsl/OcppTest.scala

Lines changed: 2 additions & 182 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,11 @@ import com.thenewmotion.ocpp.VersionFamily.{CsMessageTypesForVersionFamily, Csms
77
import javax.net.ssl.SSLContext
88

99
import scala.language.higherKinds
10-
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
11-
import scala.concurrent.duration.DurationInt
12-
import scala.util.{Failure, Success}
10+
import scala.concurrent.ExecutionContext
1311
import com.thenewmotion.ocpp.{Version, Version1X, VersionFamily}
14-
import com.thenewmotion.ocpp.json.api._
1512
import com.thenewmotion.ocpp.messages.{ReqRes, Request, Response}
1613
import com.thenewmotion.ocpp.messages.v1x.{CentralSystemReq, CentralSystemReqRes, CentralSystemRes, ChargePointReq, ChargePointReqRes, ChargePointRes}
17-
import com.thenewmotion.ocpp.messages.v20._
18-
import com.typesafe.scalalogging.Logger
19-
import org.slf4j.LoggerFactory
20-
14+
import com.thenewmotion.ocpp.messages.v20.{CsmsRequest, CsmsResponse, CsmsReqRes, CsRequest, CsResponse, CsReqRes}
2115

2216
trait OcppTest[VFam <: VersionFamily] {
2317

@@ -156,177 +150,3 @@ object Ocpp20Test {
156150
with ocpp20transactions.Ops
157151
}
158152

159-
trait DocileConnection[
160-
VFam <: VersionFamily,
161-
VersionBound <: Version, // shouldn't be necessary. we need some type level function from versionfamily to this
162-
OutgoingReqBound <: Request,
163-
IncomingResBound <: Response,
164-
OutgoingReqRes[_ <: OutgoingReqBound, _ <: IncomingResBound] <: ReqRes[_, _],
165-
IncomingReqBound <: Request,
166-
OutgoingResBound <: Response,
167-
IncomingReqRes[_ <: IncomingReqBound, _ <: OutgoingResBound] <: ReqRes[_, _]
168-
] extends MessageLogging {
169-
170-
val receivedMsgManager: ReceivedMsgManager[OutgoingReqBound, IncomingResBound, OutgoingReqRes, IncomingReqBound, OutgoingResBound, IncomingReqRes] =
171-
new ReceivedMsgManager[OutgoingReqBound, IncomingResBound, OutgoingReqRes, IncomingReqBound, OutgoingResBound, IncomingReqRes]()
172-
173-
var incomingMessageHandlerStack: List[GenericIncomingMessageProcessor[
174-
OutgoingReqBound, IncomingResBound, OutgoingReqRes, IncomingReqBound, OutgoingResBound, IncomingReqRes, _
175-
]] = List()
176-
177-
var ocppClient: Option[OcppJsonClient[
178-
VFam,
179-
OutgoingReqBound,
180-
IncomingResBound,
181-
OutgoingReqRes,
182-
IncomingReqBound,
183-
OutgoingResBound,
184-
IncomingReqRes
185-
]] = None
186-
187-
// TODO handle this more gently. The identity should be known after the script is started, regardless of whether the connection was established or not.
188-
// that also has to do with being able to renew connections or disconnect and reconnect while executing a script
189-
def chargePointIdentity: String = connectedCpId.getOrElse(sys.error("Asked for charge point ID on not yet connected DocileConnection"))
190-
191-
var connectedCpId: Option[String] = None
192-
193-
private val connectionLogger = Logger(LoggerFactory.getLogger("connection"))
194-
195-
final def pushIncomingMessageHandler(handler: GenericIncomingMessageProcessor[OutgoingReqBound, IncomingResBound, OutgoingReqRes, IncomingReqBound, OutgoingResBound, IncomingReqRes, _]): Unit = {
196-
incomingMessageHandlerStack = handler :: incomingMessageHandlerStack
197-
}
198-
199-
final def popIncomingMessageHandler(): Unit = {
200-
incomingMessageHandlerStack = incomingMessageHandlerStack.tail
201-
}
202-
203-
def connect(chargePointId: String,
204-
endpoint: URI,
205-
version: VersionBound,
206-
authKey: Option[String]
207-
)(implicit executionContext: ExecutionContext, sslContext: SSLContext): Unit = {
208-
209-
connectedCpId = Some(chargePointId)
210-
211-
connectionLogger.info(s"Connecting to OCPP v${version.name} endpoint $endpoint")
212-
213-
val connection = createClient(chargePointId, endpoint, version, authKey)(executionContext, sslContext) {
214-
new RequestHandler[IncomingReqBound, OutgoingResBound, IncomingReqRes] {
215-
def apply[REQ <: IncomingReqBound, RES <: OutgoingResBound](req: REQ)(implicit reqRes: IncomingReqRes[REQ, RES], ec: ExecutionContext): Future[RES] = {
216-
217-
incomingLogger.info(s"$req")
218-
219-
val responsePromise = Promise[OutgoingResBound]()
220-
221-
def respond(res: OutgoingResBound): Unit = {
222-
outgoingLogger.info(s"$res")
223-
responsePromise.success(res)
224-
()
225-
}
226-
227-
receivedMsgManager.enqueue(
228-
GenericIncomingMessage[OutgoingReqBound, IncomingResBound, OutgoingReqRes, IncomingReqBound, OutgoingResBound, IncomingReqRes](req, respond _)
229-
)
230-
231-
// TODO nicer conversion?
232-
responsePromise.future.map(_.asInstanceOf[RES])(ec)
233-
}
234-
}
235-
}
236-
237-
connection.onClose.foreach { _ =>
238-
connectionLogger.info(s"Gracefully disconnected from endpoint $endpoint")
239-
ocppClient = None
240-
}(executionContext)
241-
242-
ocppClient = Some(connection)
243-
}
244-
245-
def sendRequestAndManageResponse[REQ <: OutgoingReqBound](req: REQ)(
246-
implicit reqRes: OutgoingReqRes[REQ, _ <: IncomingResBound],
247-
executionContext: ExecutionContext
248-
): Unit =
249-
ocppClient match {
250-
case None =>
251-
throw ExpectationFailed("Trying to send an OCPP message while not connected")
252-
case Some(client) =>
253-
outgoingLogger.info(s"$req")
254-
client.send(req)(reqRes) onComplete {
255-
case Success(res) =>
256-
incomingLogger.info(s"$res")
257-
receivedMsgManager.enqueue(
258-
GenericIncomingMessage[OutgoingReqBound, IncomingResBound, OutgoingReqRes, IncomingReqBound, OutgoingResBound, IncomingReqRes](res)
259-
)
260-
case Failure(OcppException(ocppError)) =>
261-
incomingLogger.info(s"$ocppError")
262-
receivedMsgManager.enqueue(
263-
GenericIncomingMessage[OutgoingReqBound, IncomingResBound, OutgoingReqRes, IncomingReqBound, OutgoingResBound, IncomingReqRes](ocppError)
264-
)
265-
case Failure(e) =>
266-
connectionLogger.error(s"Failed to get response to outgoing OCPP request $req: ${e.getMessage}\n\t${e.getStackTrace.mkString("\n\t")}")
267-
throw ExecutionError(e)
268-
}
269-
}
270-
271-
/** Template method to be implemented by version-specific extending classes to establish a connection for that
272-
* version of OCPP */
273-
protected def createClient(
274-
chargePointId: String,
275-
endpoint: URI,
276-
version: VersionBound,
277-
authKey: Option[String]
278-
)(implicit executionContext: ExecutionContext, sslContext: SSLContext): RequestHandler[IncomingReqBound, OutgoingResBound, IncomingReqRes] => OcppJsonClient[
279-
VFam,
280-
OutgoingReqBound,
281-
IncomingResBound,
282-
OutgoingReqRes,
283-
IncomingReqBound,
284-
OutgoingResBound,
285-
IncomingReqRes
286-
]
287-
288-
289-
def disconnect(): Unit = ocppClient.foreach { conn =>
290-
Await.result(conn.close(), 45.seconds)
291-
}
292-
}
293-
294-
object DocileConnection {
295-
def forVersion1x(): DocileConnection[
296-
VersionFamily.V1X.type,
297-
Version1X,
298-
CentralSystemReq,
299-
CentralSystemRes,
300-
CentralSystemReqRes,
301-
ChargePointReq,
302-
ChargePointRes,
303-
ChargePointReqRes
304-
] = {
305-
new DocileConnection[VersionFamily.V1X.type, Version1X, CentralSystemReq, CentralSystemRes, CentralSystemReqRes, ChargePointReq, ChargePointRes, ChargePointReqRes] {
306-
type VersionBound = Version1X
307-
308-
override protected def createClient(chargePointId: String, endpoint: URI, version: Version1X, authKey: Option[String])(implicit executionContext: ExecutionContext, sslContext: SSLContext): RequestHandler[ChargePointReq, ChargePointRes, ChargePointReqRes] => Ocpp1XJsonClient = { reqHandler =>
309-
OcppJsonClient.forVersion1x(chargePointId, endpoint, List(version), authKey)(reqHandler)(executionContext, sslContext)
310-
}
311-
}
312-
}
313-
314-
def forVersion20(): DocileConnection[
315-
VersionFamily.V20.type,
316-
Version.V20.type,
317-
CsmsRequest,
318-
CsmsResponse,
319-
CsmsReqRes,
320-
CsRequest,
321-
CsResponse,
322-
CsReqRes
323-
] = {
324-
new DocileConnection[VersionFamily.V20.type, Version.V20.type, CsmsRequest, CsmsResponse, CsmsReqRes, CsRequest, CsResponse, CsReqRes] {
325-
type VersionBound = Version.V20.type
326-
327-
override protected def createClient(chargePointId: String, endpoint: URI, version: Version.V20.type, authKey: Option[String])(implicit executionContext: ExecutionContext, sslContext: SSLContext): RequestHandler[CsRequest, CsResponse, CsReqRes] => Ocpp20JsonClient = { reqHandler =>
328-
OcppJsonClient.forVersion20(chargePointId, endpoint, authKey)(reqHandler)(executionContext, sslContext)
329-
}
330-
}
331-
}
332-
}

0 commit comments

Comments
 (0)