11package com .google .daq .mqtt .mapping ;
22
3- import static com .google .bos .iot .core .reconcile .SourceRepoMessageUtils .parseSourceRepoMessageData ;
3+ import static com .google .bos .iot .core .reconcile .SourceRepoMessageUtils .parseSourceRepoMessageDataToRawMap ;
44import static com .google .udmi .util .GeneralUtils .isNotEmpty ;
55
66import com .google .daq .mqtt .registrar .Registrar ;
@@ -31,7 +31,9 @@ public class MappingService extends AbstractPollingService {
3131 private static final String DISCOVERY_NODE_DEVICE_ID_FIELD = "deviceId" ;
3232 private static final String DISCOVERY_TIMESTAMP = "generation" ;
3333 private static final String TRIGGER_BRANCH = "discovery" ;
34+ private static final String TRIGGER_BRANCH_IoT = "discovery_iot" ;
3435 private static final String DEFAULT_TARGET_BRANCH = "main" ;
36+ private static final String GATEWAY_ID_FIELD = "gatewayId" ;
3537 private final String projectSpec ;
3638
3739 /**
@@ -77,60 +79,113 @@ public MappingService(String projectTarget, String projectSpec, String siteModel
7779
7880 @ Override
7981 protected void handleMessage (PubsubMessage message ) throws Exception {
80- Map <String , Object > messageData = parseSourceRepoMessageData (message );
82+ Map <String , Object > messageData = parseSourceRepoMessageDataToRawMap (message );
8183 Integer eventNumber = (Integer ) messageData .getOrDefault (EVENT_NUMBER_FIELD , 0 );
82- LOGGER .info ("Received event no. from message: {}" , eventNumber );
8384 if (eventNumber < 0 ) {
84- String mappingFamily = (String ) messageData .getOrDefault (FAMILY_FIELD , "" );
85- String registryId = message .getAttributesOrDefault (REGISTRY_ID_FIELD , "" );
85+ processDiscoveryComplete (message , messageData );
86+ } else if (message .getAttributesMap ().containsKey (GATEWAY_ID_FIELD )) {
87+ stitchDeviceProperties (message , messageData );
88+ }
89+ }
8690
87- if (registryId .isEmpty ()) {
88- LOGGER .error ("Registry Id not found for the message." );
89- return ;
90- }
91+ private void processDiscoveryComplete (PubsubMessage message , Map <String , Object > messageData )
92+ throws Exception {
93+ LOGGER .info (
94+ "Received event no. from message: {}" , messageData .getOrDefault (EVENT_NUMBER_FIELD , 0 ));
95+ String mappingFamily = (String ) messageData .getOrDefault (FAMILY_FIELD , "" );
96+ String registryId = message .getAttributesOrDefault (REGISTRY_ID_FIELD , "" );
9197
92- Instant now = Instant .now ();
93- String currentTimestamp = DateTimeFormatter .ISO_INSTANT .format (now );
94- String discoveryTimestamp = (String ) messageData .getOrDefault (DISCOVERY_TIMESTAMP ,
95- currentTimestamp );
96- String discoveryNodeDeviceId = message .getAttributesOrDefault (
97- DISCOVERY_NODE_DEVICE_ID_FIELD , "" );
98- if (discoveryNodeDeviceId .isEmpty ()) {
99- LOGGER .error ("Discovery Node device Id not found for the message received." );
100- return ;
101- }
102- LOGGER .info ("Starting Mapping process for registry: {}, family: {}, discoverNode deviceId:"
103- + " {}" , registryId , mappingFamily , discoveryNodeDeviceId );
98+ if (registryId .isEmpty ()) {
99+ LOGGER .error ("Registry Id not found for the message." );
100+ return ;
101+ }
102+
103+ Instant now = Instant .now ();
104+ String currentTimestamp = DateTimeFormatter .ISO_INSTANT .format (now );
105+ String discoveryTimestamp =
106+ (String ) messageData .getOrDefault (DISCOVERY_TIMESTAMP , currentTimestamp );
107+ String discoveryNodeDeviceId =
108+ message .getAttributesOrDefault (DISCOVERY_NODE_DEVICE_ID_FIELD , "" );
109+ if (discoveryNodeDeviceId .isEmpty ()) {
110+ LOGGER .error ("Discovery Node device Id not found for the message received." );
111+ return ;
112+ }
113+ LOGGER .info (
114+ "Starting Mapping process for registry: {}, family: {}, discoverNode deviceId:" + " {}" ,
115+ registryId , mappingFamily , discoveryNodeDeviceId );
116+
117+ withRepository (registryId , TRIGGER_BRANCH ,
118+ repository -> {
119+ String udmiModelPath = repository .getUdmiModelPath ();
120+ (new Registrar ())
121+ .processArgs (new ArrayList <>(List .of (udmiModelPath , projectSpec )))
122+ .execute ();
123+ MappingAgent mappingAgent =
124+ new MappingAgent (new ArrayList <>(List .of (udmiModelPath , projectSpec )));
125+ mappingAgent .processMapping (
126+ new ArrayList <>(List .of (discoveryNodeDeviceId , discoveryTimestamp , mappingFamily )));
127+ });
128+ }
104129
105- SourceRepository repository = initRepository (registryId );
106- if (repository .clone (DEFAULT_TARGET_BRANCH )) {
130+ private void stitchDeviceProperties (PubsubMessage message ,
131+ Map <String , Object > messageData )
132+ throws Exception {
133+ String registryId = message .getAttributesOrDefault (REGISTRY_ID_FIELD , "" );
134+
135+ if (registryId .isEmpty ()) {
136+ LOGGER .error ("Registry Id not found for the message." );
137+ return ;
138+ }
139+ LOGGER .info ("Starting Stitching process for registry: {}" , registryId );
140+
141+ withRepository (registryId , TRIGGER_BRANCH_IoT ,
142+ repository -> {
143+ Object devicesObject = messageData .get ("devices" );
144+ if (!(devicesObject instanceof Map )) {
145+ LOGGER .warn ("Skipping Processing: Received message without a valid 'devices' map." );
146+ return ;
147+ }
148+ String udmiModelPath = repository .getUdmiModelPath ();
149+ @ SuppressWarnings ("unchecked" )
150+ Map <String , Map <String , Object >> devices =
151+ (Map <String , Map <String , Object >>) devicesObject ;
152+ MappingAgent mappingAgent =
153+ new MappingAgent (new ArrayList <>(List .of (udmiModelPath , projectSpec )));
154+
155+ mappingAgent .stitchProperties (devices );
156+ });
157+ }
158+
159+ private void withRepository (String registryId , String branchPrefix , RepositoryConsumer work )
160+ throws Exception {
161+ SourceRepository repository = initRepository (registryId );
162+ if (repository .clone (DEFAULT_TARGET_BRANCH )) {
163+ try {
107164 String timestamp = LocalDateTime .now ()
108165 .format (DateTimeFormatter .ofPattern ("yyyyMMdd.HHmmss" ));
109166
110- String exportBranch = String .format ("%s/%s/%s" , TRIGGER_BRANCH , SERVICE_NAME , timestamp );
167+ String exportBranch = String .format ("%s/%s/%s" , branchPrefix , SERVICE_NAME , timestamp );
111168 if (!repository .checkoutNewBranch (exportBranch )) {
112- throw new RuntimeException ("Unable to create and checkout export branch "
113- + exportBranch );
169+ throw new RuntimeException ("Unable to create and checkout export branch " + exportBranch );
114170 }
115171
116- String udmiModelPath = repository .getUdmiModelPath ();
117- (new Registrar ()).processArgs (new ArrayList <>(List .of (udmiModelPath , projectSpec )))
118- .execute ();
119- MappingAgent mappingAgent = new MappingAgent (new ArrayList <>(
120- List .of (udmiModelPath , projectSpec )));
121- mappingAgent .processMapping (new ArrayList <>(List .of (discoveryNodeDeviceId ,
122- discoveryTimestamp , mappingFamily )));
172+ work .accept (repository );
123173
124174 LOGGER .info ("Committing and pushing changes to branch {}" , exportBranch );
125175 if (!repository .commitAndPush ("Merge changes from source: MappingService" )) {
126- throw new RuntimeException ("Unable to commit and push changes to branch "
127- + exportBranch );
176+ throw new RuntimeException ("Unable to commit and push changes to branch " + exportBranch );
128177 }
129178 LOGGER .info ("Export operation complete." );
179+ } finally {
130180 repository .delete ();
131- } else {
132- LOGGER .error ("Could not clone repository! PR message was not published!" );
133181 }
182+ } else {
183+ LOGGER .error ("Could not clone repository! PR message was not published!" );
134184 }
135185 }
186+
187+ @ FunctionalInterface
188+ private interface RepositoryConsumer {
189+ void accept (SourceRepository repository ) throws Exception ;
190+ }
136191}
0 commit comments