@@ -68,8 +68,8 @@ public class SatelliteGatewayProtocol extends Protocol {
6868 private final AtomicReference <Map <String , List <byte []>>> priorityMessages ;
6969 private final CipherManager cipherManager ;
7070
71- private final String namespacePath ;
72- private final String incomingNamespacePath ;
71+ private final String mapsIncomingNamespacePath ;
72+ private final String commonIncomingNamespacePath ;
7373 private final long outgoingPollInterval ;
7474
7575 private final int maxBufferSize ;
@@ -116,18 +116,31 @@ public SatelliteGatewayProtocol(@NonNull @NotNull EndPoint endPoint, @NotNull @N
116116 setKeepAlive (millis + random .nextLong (millis ));
117117 session = SessionManager .getInstance ().create (scb .build (), this );
118118 session .resumeState ();
119- String outBoundNamespacePath = config .getOutboundNamespaceRoot ().trim ();
119+ String outBoundNamespacePath = config .getCommonOutboundPublishRoot ().trim ();
120120 if (!outBoundNamespacePath .isEmpty ()){
121121 String path = outBoundNamespacePath .replace ("{deviceId}" , primeId );
122122 path = path .replace ("{mailboxId}" , config .getMailboxId ());
123123 SubscriptionContextBuilder subBuilder = new SubscriptionContextBuilder (path , ClientAcknowledgement .AUTO );
124124 subBuilder .setQos (QualityOfService .AT_MOST_ONCE )
125125 .setReceiveMaximum (config .getMaxInflightEventsPerDevice ())
126+ .setAlias ("common_requests" )
126127 .setNoLocalMessages (true );
127128 session .addSubscription (subBuilder .build ());
128129 }
129- namespacePath = parsePath (config .getNamespace (), "" , primeId , config .getMailboxId ());
130- incomingNamespacePath = parsePath (config .getInboundNamespaceRoot (), "/{deviceId}/incoming/{sin}/{min}" , primeId , config .getMailboxId ());
130+ String mapsOutboundNamespacePath = config .getMapsOutboundPublishRoot ();
131+ if (!mapsOutboundNamespacePath .isEmpty ()){
132+ String path = mapsOutboundNamespacePath .replace ("{deviceId}" , primeId );
133+ path = path .replace ("{mailboxId}" , config .getMailboxId ());
134+ SubscriptionContextBuilder subBuilder = new SubscriptionContextBuilder (path , ClientAcknowledgement .AUTO );
135+ subBuilder .setQos (QualityOfService .AT_MOST_ONCE )
136+ .setReceiveMaximum (config .getMaxInflightEventsPerDevice ())
137+ .setAlias ("maps_requests" )
138+ .setNoLocalMessages (true );
139+ session .addSubscription (subBuilder .build ());
140+ }
141+
142+ mapsIncomingNamespacePath = parsePath (config .getMapsInboundPublishRoot (), "/{deviceId}/maps/in" , primeId , config .getMailboxId ());
143+ commonIncomingNamespacePath = parsePath (config .getCommonInboundPublishRoot (), "/{deviceId}/common/in/{sin}/{min}" , primeId , config .getMailboxId ());
131144 String bcast = config .getOutboundBroadcast ();
132145 if (bcast != null && !bcast .isEmpty ()){
133146 SubscriptionContextBuilder subBuilder = new SubscriptionContextBuilder (bcast , ClientAcknowledgement .AUTO );
@@ -200,7 +213,8 @@ public ProtocolInformationDTO getInformation() {
200213
201214 @ Override
202215 public void sendMessage (@ NotNull @ NonNull MessageEvent messageEvent ) {
203- if (messageEvent .getDestinationName ().endsWith ("request" )) {
216+ if (messageEvent .getSubscription ().getContext ().getAlias () != null &&
217+ messageEvent .getSubscription ().getContext ().getAlias ().equals ("common_requests" )){
204218 MessageData messageData = new MessageData ();
205219 byte [] tmp = messageEvent .getMessage ().getOpaqueData ();
206220 int sin = tmp [0 ] & 0xFF ;
@@ -348,7 +362,7 @@ public void handleIncomingMessage(MessageData message) throws ExecutionException
348362 private void handleCommonMessage (MessageData message , byte [] raw ) throws ExecutionException , InterruptedException {
349363 int sin = message .getSin () & 0xff ;
350364 int min = message .getMin () & 0xff ;
351- String path = incomingNamespacePath ;
365+ String path = commonIncomingNamespacePath ;
352366 path = path .replace ("{sin}" , String .valueOf (sin ));
353367 path = path .replace ("{min}" , String .valueOf (min ));
354368 logger .log (SATELLITE_RECEIVED_RAW_MESSAGE , sin , min , raw .length , path );
@@ -369,10 +383,18 @@ private void processMapsMessage(SatelliteMessage satelliteMessage, Map<String, S
369383 destinationCount ++;
370384 messageCount += entry .getValue ().size ();
371385 String topic = entry .getKey ();
372- if (namespacePath != null && !namespacePath .isEmpty ()){
373- topic = namespacePath + "/" + topic ;
386+ boolean isSchema = false ;
387+ if (topic .toLowerCase ().startsWith ("$schema" )){
388+ topic = topic .substring ("$schema" .length ());
389+ isSchema = true ;
390+ }
391+ if (mapsIncomingNamespacePath != null && !mapsIncomingNamespacePath .isEmpty ()){
392+ topic = mapsIncomingNamespacePath + "/" + topic ;
374393 topic = topic .replace ("//" , "/" ); // to be sure
375394 }
395+ if (isSchema ){
396+ topic = "$schema" +topic ;
397+ }
376398 publishEvents (topic , entry .getValue (), transformation1 , meta );
377399 }
378400 logger .log (SATELLITE_RECEIVED_PACKED_MESSAGE , destinationCount , messageCount , satelliteMessage .getMessage ().length , raw .length );
0 commit comments