Skip to content

Commit b975dae

Browse files
authored
Merge pull request #216 from CDOT-CV/Feature/rsm-pipeline
New RSM Pipeline
2 parents 5fa7dfb + 39e1d96 commit b975dae

File tree

34 files changed

+1491
-7
lines changed

34 files changed

+1491
-7
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ Supported message types:
9494
- PSM
9595
- SDSM
9696
- RTCM
97+
- RSM
9798

9899
1. Navigate to the [UDP sender Python scripts](<./scripts/tests/>) in the project.
99100
2. Ensure the environment variable "DOCKER_HOST_IP" has been set in the shell that will be running the script. This must be set to the same IP that the ODE deployments are using.

docker-compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ services:
3030
- "44990:44990/udp"
3131
- "44950:44950/udp"
3232
- "44960:44960/udp"
33+
- "44970:44970/udp"
3334
- "5555:5555/udp"
3435
- "6666:6666/udp"
3536
environment:

docs/data-flow-diagrams/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,3 +154,9 @@ see [Overview Data Flow 1 (Tim Depositor Controller)](#overview-data-flow-1-tim-
154154
2. The [RawEncodedRTCMJsonRouter](/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/kafka/listeners/json/RawEncodedRTCMJsonRouter.java) consumes messages from the OdeRawEncodedRTCMJson topic and pushes the RTCM to the Asn1DecoderInput topic. Any remaining signed IEEE 1609.2 headers are removed at this point.
155155
3. The [ACM](https://github.com/usdot-jpo-ode/asn1_codec) consumes from the Asn1DecoderInput topic, decodes the RTCM, and pushes it to the Asn1DecoderOutput topic.
156156
4. The [Asn1DecodedDataRouter](/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/kafka/listeners/asn1/Asn1DecodedDataRouter.java) consumes from the Asn1DecoderOutput topic and pushes the RTCM to the OdeRtcmJson topic.
157+
158+
### RSM Data Flow
159+
1. The RSM comes in through the [RsmReceiver](/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/rsm/RsmReceiver.java) class and is pushed to the OdeRawEncodedRSMJson topic. Any IEEE 1609.3 or unsigned IEEE 1609.2 headers are stripped at this point.
160+
2. The [RawEncodedRSMJsonRouter](/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/kafka/listeners/json/RawEncodedRSMJsonRouter.java) consumes messages from the OdeRawEncodedRSMJson topic and pushes the RSM to the Asn1DecoderInput topic. Any remaining signed IEEE 1609.2 headers are removed at this point.
161+
3. The [ACM](https://github.com/usdot-jpo-ode/asn1_codec) consumes from the Asn1DecoderInput topic, decodes the RSM, and pushes it to the Asn1DecoderOutput topic.
162+
4. The [Asn1DecodedDataRouter](/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/kafka/listeners/asn1/Asn1DecodedDataRouter.java) consumes from the Asn1DecoderOutput topic and pushes the RSM to the OdeRsmJson topic.
69.9 KB
Loading

docs/schemas/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# jpo-ode JSON Output Schemas
22

3-
The jpo-ode supports receiving and decoding ASN1 messages from RSUs. The supported message types are currently BSM, TIM, MAP, SPaT, SRM, SSM, PSM, SDSM and RTCM. These are decoded into XML, deserialized into POJOs and finally serialized into JSON. This JSON output can be access from any of the corresponding message's JSON output Kafka topics:
3+
The jpo-ode supports receiving and decoding ASN1 messages from RSUs. The supported message types are currently BSM, TIM, MAP, SPaT, SRM, SSM, PSM, SDSM, RTCM and RSM. These are decoded into XML, deserialized into POJOs and finally serialized into JSON. This JSON output can be access from any of the corresponding message's JSON output Kafka topics:
44

55
- [topic.OdeBsmJson](../../jpo-ode-core/src/main/resources/schemas/schema-bsm.json)
66
- [topic.OdeTimJson](../../jpo-ode-core/src/main/resources/schemas/schema-tim.json)
@@ -11,6 +11,7 @@ The jpo-ode supports receiving and decoding ASN1 messages from RSUs. The support
1111
- [topic.OdePsmJson](../../jpo-ode-core/src/main/resources/schemas/schema-psm.json)
1212
- [topic.OdeSdsmJson](../../jpo-ode-core/src/main/resources/schemas/schema-sdsm.json)
1313
- [topic.OdeRtcmJson](../../jpo-ode-core/src/main/resources/schemas/schema-rtcm.json)
14+
- [topic.OdeRsmJson](../../jpo-ode-core/src/main/resources/schemas/schema-rsm.json)
1415

1516
The output JSON of the ODE is complex but it is similar to the official standard of J2735 with some minor differences due to the form of their deserialized POJOs. To help implement proper data validation for the JSON output of the ODE into any data pipeline infrastructure, you may use the provided validation schemas within this directory.
1617

jpo-ode-core/src/main/java/us/dot/its/jpo/ode/model/OdeLogMetadata.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public class OdeLogMetadata extends OdeMsgMetadata {
4646
* Enum representing the possible record types for log metadata.
4747
*/
4848
public enum RecordType {
49-
bsmLogDuringEvent, rxMsg, dnMsg, bsmTx, driverAlert, mapTx, spatTx, ssmTx, srmTx, timMsg, psmTx, sdsmTx, rtcmTx, unsupported
49+
bsmLogDuringEvent, rxMsg, dnMsg, bsmTx, driverAlert, mapTx, spatTx, ssmTx, srmTx, timMsg, psmTx, sdsmTx, rtcmTx, rsmTx, unsupported
5050
}
5151

5252
/**
Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
{
2+
"$schema": "http://json-schema.org/draft-07/schema",
3+
"properties": {
4+
"metadata": {
5+
"properties": {
6+
"source": {
7+
"enum": [
8+
"RSU",
9+
"UNKNOWN"
10+
],
11+
"type": "string"
12+
},
13+
"encodings": {
14+
"items": {
15+
"properties": {
16+
"elementName": {
17+
"type": "string"
18+
},
19+
"elementType": {
20+
"type": "string"
21+
},
22+
"encodingRule": {
23+
"enum": [
24+
"UPER",
25+
"COER"
26+
],
27+
"type": "string"
28+
}
29+
},
30+
"required": [
31+
"elementName",
32+
"elementType",
33+
"encodingRule"
34+
],
35+
"type": "object"
36+
},
37+
"type": [
38+
"array",
39+
"null"
40+
]
41+
},
42+
"logFileName": {
43+
"type": "string"
44+
},
45+
"maxDurationTime": {
46+
"type": [
47+
"integer",
48+
"null"
49+
]
50+
},
51+
"odePacketID": {
52+
"type": [
53+
"string",
54+
"null"
55+
]
56+
},
57+
"odeReceivedAt": {
58+
"type": "string"
59+
},
60+
"odeTimStartDateTime": {
61+
"type": [
62+
"string",
63+
"null"
64+
]
65+
},
66+
"originIp": {
67+
"type": [
68+
"string",
69+
"null"
70+
]
71+
},
72+
"payloadType": {
73+
"type": "string"
74+
},
75+
"receivedMessageDetails": {
76+
"properties": {
77+
"locationData": {
78+
"properties": {
79+
"elevation": {
80+
"type": "string"
81+
},
82+
"heading": {
83+
"type": "string"
84+
},
85+
"latitude": {
86+
"type": "string"
87+
},
88+
"longitude": {
89+
"type": "string"
90+
},
91+
"speed": {
92+
"type": "string"
93+
}
94+
},
95+
"required": [
96+
"latitude",
97+
"longitude",
98+
"elevation",
99+
"speed",
100+
"heading"
101+
],
102+
"type": "object"
103+
},
104+
"rxSource": {
105+
"enum": [
106+
"RSU",
107+
"NA",
108+
"UNKNOWN"
109+
],
110+
"type": "string"
111+
}
112+
},
113+
"required": [
114+
"rxSource"
115+
],
116+
"type": "object"
117+
},
118+
"recordGeneratedAt": {
119+
"type": "string"
120+
},
121+
"recordGeneratedBy": {
122+
"enum": [
123+
"RSU",
124+
"UNKNOWN",
125+
null
126+
],
127+
"type": [
128+
"string",
129+
"null"
130+
]
131+
},
132+
"recordType": {
133+
"enum": [
134+
"rsmTx",
135+
"unsupported"
136+
],
137+
"type": "string"
138+
},
139+
"sanitized": {
140+
"type": "boolean"
141+
},
142+
"schemaVersion": {
143+
"const": 9,
144+
"type": "integer"
145+
},
146+
"securityResultCode": {
147+
"enum": [
148+
"success",
149+
"unknown",
150+
"inconsistentInputParameters",
151+
"spduParsingInvalidInput",
152+
"spduParsingUnsupportedCriticalInformationField",
153+
"spduParsingCertificateNotFound",
154+
"spduParsingGenerationTimeNotAvailable",
155+
"spduParsingGenerationLocationNotAvailable",
156+
"spduCertificateChainNotEnoughInformationToConstructChain",
157+
"spduCertificateChainChainEndedAtUntrustedRoot",
158+
"spduCertificateChainChainWasTooLongForImplementation",
159+
"spduCertificateChainCertificateRevoked",
160+
"spduCertificateChainOverdueCRL",
161+
"spduCertificateChainInconsistentExpiryTimes",
162+
"spduCertificateChainInconsistentStartTimes",
163+
"spduCertificateChainInconsistentChainPermissions",
164+
"spduCryptoVerificationFailure",
165+
"spduConsistencyFutureCertificateAtGenerationTime",
166+
"spduConsistencyExpiredCertificateAtGenerationTime",
167+
"spduConsistencyExpiryDateTooEarly",
168+
"spduConsistencyExpiryDateTooLate",
169+
"spduConsistencyGenerationLocationOutsideValidityRegion",
170+
"spduConsistencyNoGenerationLocation",
171+
"spduConsistencyUnauthorizedPSID",
172+
"spduInternalConsistencyExpiryTimeBeforeGenerationTime",
173+
"spduInternalConsistencyextDataHashDoesntMatch",
174+
"spduInternalConsistencynoExtDataHashProvided",
175+
"spduInternalConsistencynoExtDataHashPresent",
176+
"spduLocalConsistencyPSIDsDontMatch",
177+
"spduLocalConsistencyChainWasTooLongForSDEE",
178+
"spduRelevanceGenerationTimeTooFarInPast",
179+
"spduRelevanceGenerationTimeTooFarInFuture",
180+
"spduRelevanceExpiryTimeInPast",
181+
"spduRelevanceGenerationLocationTooDistant",
182+
"spduRelevanceReplayedSpdu",
183+
"spduCertificateExpired"
184+
],
185+
"type": "string"
186+
},
187+
"serialId": {
188+
"properties": {
189+
"bundleId": {
190+
"type": "integer"
191+
},
192+
"bundleSize": {
193+
"type": "integer"
194+
},
195+
"recordId": {
196+
"type": "integer"
197+
},
198+
"serialNumber": {
199+
"type": "integer"
200+
},
201+
"streamId": {
202+
"type": "string"
203+
}
204+
},
205+
"required": [
206+
"streamId",
207+
"bundleSize",
208+
"bundleId",
209+
"recordId",
210+
"serialNumber"
211+
],
212+
"type": "object"
213+
},
214+
"asn1": {
215+
"type": [
216+
"string",
217+
"null"
218+
]
219+
},
220+
"isCertPresent": {
221+
"type": "boolean"
222+
}
223+
},
224+
"required": [
225+
"source",
226+
"logFileName",
227+
"recordType",
228+
"securityResultCode",
229+
"receivedMessageDetails",
230+
"payloadType",
231+
"serialId",
232+
"odeReceivedAt",
233+
"schemaVersion",
234+
"recordGeneratedAt",
235+
"sanitized",
236+
"asn1",
237+
"isCertPresent"
238+
],
239+
"type": "object"
240+
},
241+
"payload": {
242+
"properties": {
243+
"data": {
244+
"$ref": "https://raw.githubusercontent.com/CDOT-CV/jpo-asn-pojos/refs/heads/develop/jpo-asn-jsonschema-generator/src/main/resources/schemas/RoadSafetyMessage/RoadSafetyMessageMessageFrame.schema.json"
245+
},
246+
"dataType": {
247+
"type": "string"
248+
}
249+
},
250+
"required": [
251+
"dataType",
252+
"data"
253+
],
254+
"type": "object"
255+
}
256+
},
257+
"required": [
258+
"metadata",
259+
"payload"
260+
],
261+
"type": "object"
262+
}

jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/kafka/listeners/asn1/Asn1DecodedDataRouter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ public void listen(ConsumerRecord<String, String> consumerRecord)
103103
case "signalRequestMessage" -> routeMessageFrame(consumerRecord, jsonTopics.getSrm());
104104
case "sensorDataSharingMessage" -> routeMessageFrame(consumerRecord, jsonTopics.getSdsm());
105105
case "rtcmCorrections" -> routeMessageFrame(consumerRecord, jsonTopics.getRtcm());
106+
case "roadSafetyMessage" -> routeMessageFrame(consumerRecord, jsonTopics.getRsm());
106107
default -> log.warn("Unknown message type: {}", messageName);
107108
}
108109
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package us.dot.its.jpo.ode.kafka.listeners.json;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import org.apache.kafka.clients.consumer.ConsumerRecord;
5+
import org.springframework.beans.factory.annotation.Value;
6+
import org.springframework.kafka.annotation.KafkaListener;
7+
import org.springframework.kafka.core.KafkaTemplate;
8+
import org.springframework.stereotype.Component;
9+
import us.dot.its.jpo.ode.model.OdeMessageFrameMetadata;
10+
import us.dot.its.jpo.ode.model.OdeObject;
11+
import us.dot.its.jpo.ode.uper.StartFlagNotFoundException;
12+
import us.dot.its.jpo.ode.uper.SupportedMessageType;
13+
14+
/**
15+
* A Kafka listener component that processes ASN.1 encoded RSM JSON messages from a specified Kafka
16+
* topic. It processes the raw encoded JSON messages and publishes them to be decoded by the ASN.1
17+
* codec
18+
*/
19+
@Component
20+
public class RawEncodedRSMJsonRouter {
21+
22+
private final KafkaTemplate<String, OdeObject> kafkaTemplate;
23+
private final String publishTopic;
24+
private final RawEncodedJsonService rawEncodedJsonService;
25+
26+
/**
27+
* Constructs an instance of the RawEncodedRSMJsonRouter.
28+
*
29+
* @param kafkaTemplate A KafkaTemplate for publishing messages to a Kafka topic.
30+
* @param publishTopic The name of the Kafka topic to publish the processed messages to.
31+
* @param rawEncodedJsonService A service to transform incoming data into the expected output
32+
*/
33+
public RawEncodedRSMJsonRouter(KafkaTemplate<String, OdeObject> kafkaTemplate,
34+
@Value("${ode.kafka.topics.asn1.decoder-input}") String publishTopic,
35+
RawEncodedJsonService rawEncodedJsonService) {
36+
this.kafkaTemplate = kafkaTemplate;
37+
this.publishTopic = publishTopic;
38+
this.rawEncodedJsonService = rawEncodedJsonService;
39+
}
40+
41+
/**
42+
* Consumes and processes Kafka messages containing ASN.1 encoded RSM JSON data. This method
43+
* extracts metadata and payload from the JSON message and sends it for decoding.
44+
*
45+
* @param consumerRecord The Kafka consumer record containing the message key and value. The value
46+
* includes the raw ASN.1 encoded JSON RSM data to be processed.
47+
* @throws StartFlagNotFoundException If the start flag for the RSM message type is not found
48+
* during payload processing.
49+
* @throws JsonProcessingException If there's an error while processing or deserializing JSON
50+
* data.
51+
*/
52+
@KafkaListener(id = "RawEncodedRSMJsonRouter", topics = "${ode.kafka.topics.raw-encoded-json.rsm}")
53+
public void listen(ConsumerRecord<String, String> consumerRecord)
54+
throws StartFlagNotFoundException, JsonProcessingException {
55+
var messageToPublish =
56+
rawEncodedJsonService.addEncodingAndMutateBytes(consumerRecord.value(),
57+
SupportedMessageType.RSM,
58+
OdeMessageFrameMetadata.class);
59+
kafkaTemplate.send(publishTopic, consumerRecord.key(), messageToPublish);
60+
}
61+
}

jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/kafka/topics/JsonTopics.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public class JsonTopics {
2020
private String tim;
2121
private String sdsm;
2222
private String rtcm;
23+
private String rsm;
2324

2425
private String driverAlert;
2526

0 commit comments

Comments
 (0)