66import java .io .IOException ;
77import java .util .Set ;
88
9- import org .iris_events .exception .BadPayloadException ;
10- import org .slf4j .Logger ;
11- import org .slf4j .LoggerFactory ;
12-
13- import com .fasterxml .jackson .databind .ObjectMapper ;
14-
159import org .iris_events .annotations .ExchangeType ;
1610import org .iris_events .annotations .MessageHandler ;
1711import org .iris_events .annotations .Scope ;
1812import org .iris_events .common .Exchanges ;
1913import org .iris_events .common .MessagingHeaders ;
2014import org .iris_events .common .message .ResourceMessage ;
2115import org .iris_events .context .EventContext ;
16+ import org .iris_events .exception .BadPayloadException ;
2217import org .iris_events .producer .RoutingDetails ;
2318import org .iris_events .subscription .collection .RedisSnapshotCollection ;
2419import org .iris_events .subscription .collection .Snapshot ;
3126import org .iris_events .subscription .events .Unsubscribed ;
3227import org .iris_events .subscription .model .Resource ;
3328import org .iris_events .subscription .model .Subscription ;
29+ import org .slf4j .Logger ;
30+ import org .slf4j .LoggerFactory ;
3431
32+ import com .fasterxml .jackson .databind .ObjectMapper ;
3533import io .quarkus .runtime .StartupEvent ;
3634import jakarta .enterprise .context .ApplicationScoped ;
3735import jakarta .enterprise .event .Observes ;
@@ -101,7 +99,7 @@ public void resourceUpdated(final ResourceMessage resourceMessage) throws IOExce
10199 final var resourceId = resourceMessage .resourceId ();
102100 final var payloadAsBytes = objectMapper .writeValueAsBytes (resourceMessage .payload ());
103101 final var eventName = eventContext .getHeaderValue (MessagingHeaders .Message .EVENT_TYPE )
104- .orElseThrow (() -> new RuntimeException ("Missing required event type header!" ));
102+ .orElseThrow (() -> new RuntimeException ("Missing required event type header!" ));
105103 final var routingKey = String .format ("%s.%s" , eventName , Exchanges .SESSION .getValue ());
106104 final var snapshot = new Snapshot (eventName , routingKey , payloadAsBytes );
107105
@@ -116,13 +114,14 @@ public void resourceUpdated(final ResourceMessage resourceMessage) throws IOExce
116114
117115 subscriptions .forEach (subscription -> {
118116 final var routingDetails = new RoutingDetails .Builder ()
119- .eventName (eventName )
120- .exchange (Exchanges .SESSION .getValue ())
121- .exchangeType (ExchangeType .TOPIC )
122- .routingKey (routingKey )
123- .scope (Scope .SESSION )
124- .subscriptionId (subscription .id ())
125- .build ();
117+ .eventName (eventName )
118+ .exchange (Exchanges .SESSION .getValue ())
119+ .exchangeType (ExchangeType .TOPIC )
120+ .routingKey (routingKey )
121+ .scope (Scope .SESSION )
122+ .subscriptionId (subscription .id ())
123+ .sessionId (subscription .sessionId ())
124+ .build ();
126125 producer .sendResourceMessage (resourceType , resourceId , payloadAsBytes , routingDetails );
127126 });
128127 }
@@ -145,13 +144,13 @@ private void subscribe(final String resourceType, final String resourceId) throw
145144 final var routingKey = snapshot .routingKey ();
146145
147146 final var routingDetails = new RoutingDetails .Builder ()
148- .eventName (eventName )
149- .exchange (Exchanges .SESSION .getValue ())
150- .exchangeType (ExchangeType .TOPIC )
151- .routingKey (routingKey )
152- .scope (Scope .SESSION )
153- .subscriptionId (subscription .id ())
154- .build ();
147+ .eventName (eventName )
148+ .exchange (Exchanges .SESSION .getValue ())
149+ .exchangeType (ExchangeType .TOPIC )
150+ .routingKey (routingKey )
151+ .scope (Scope .SESSION )
152+ .subscriptionId (subscription .id ())
153+ .build ();
155154 producer .sendResourceMessage (resourceType , resourceId , snapshot .message (), routingDetails );
156155 }
157156
@@ -163,13 +162,13 @@ private void sendSnapshotRequested(final Subscription subscription) throws IOExc
163162 final var resourceId = subscription .resourceId ();
164163 final var exchangeName = Exchanges .SNAPSHOT_REQUESTED .getValue ();
165164 final var routingDetails = new RoutingDetails .Builder ()
166- .eventName (exchangeName )
167- .exchange (exchangeName )
168- .exchangeType (ExchangeType .TOPIC )
169- .routingKey (resourceType )
170- .scope (Scope .INTERNAL )
171- .subscriptionId (subscription .id ())
172- .build ();
165+ .eventName (exchangeName )
166+ .exchange (exchangeName )
167+ .exchangeType (ExchangeType .TOPIC )
168+ .routingKey (resourceType )
169+ .scope (Scope .INTERNAL )
170+ .subscriptionId (subscription .id ())
171+ .build ();
173172 final var snapshotRequestedMessage = new SnapshotRequested (resourceType , resourceId );
174173 final var payloadAsBytes = objectMapper .writeValueAsBytes (snapshotRequestedMessage );
175174 producer .sendResourceMessage (resourceType , resourceId , payloadAsBytes , routingDetails );
0 commit comments