@@ -147,46 +147,48 @@ public void disconnect() {
147147 mConnected = false ;
148148 }
149149
150- public Observable <StompMessage > topic (String destinationPath ) {
151- return Observable .<StompMessage >create (subscriber -> {
152-
153- Set <Subscriber <? super StompMessage >> subscribersSet = mSubscribers .get (destinationPath );
154- if (subscribersSet == null ) {
155- subscribersSet = new HashSet <>();
156- mSubscribers .put (destinationPath , subscribersSet );
157- subscribePath (destinationPath );
158- }
159- subscribersSet .add (subscriber );
160-
161- }).doOnUnsubscribe (() -> {
162- for (String dest : mSubscribers .keySet ()) {
163- Set <Subscriber <? super StompMessage >> set = mSubscribers .get (dest );
164- for (Subscriber <? super StompMessage > subscriber : set ) {
165- if (subscriber .isUnsubscribed ()) {
166- set .remove (subscriber );
167- if (set .size () < 1 ) {
168- mSubscribers .remove (dest );
169- unsubscribePath (dest );
170- }
171- }
172- }
173- }
174- });
175- }
176-
177- private void subscribePath (String destinationPath ) {
178- if (destinationPath == null ) return ;
179- String topicId = UUID .randomUUID ().toString ();
180- Log .d (TAG , "Subscribe path: " + destinationPath + " id: " + topicId );
181-
182- if (mTopics == null ) mTopics = new HashMap <>();
183- mTopics .put (destinationPath , topicId );
184- send (new StompMessage (StompCommand .SUBSCRIBE ,
185- Arrays .asList (
186- new StompHeader (StompHeader .ID , topicId ),
187- new StompHeader (StompHeader .DESTINATION , destinationPath ),
188- new StompHeader (StompHeader .ACK , DEFAULT_ACK )), null ));
189- }
150+ public Observable <StompMessage > topic (String destinationPath , List <StompHeader > headerList ) {
151+ return Observable .<StompMessage >create (subscriber -> {
152+ Set <Subscriber <? super StompMessage >> subscribersSet = mSubscribers .get (destinationPath );
153+ if (subscribersSet == null ) {
154+ subscribersSet = new HashSet <>();
155+ mSubscribers .put (destinationPath , subscribersSet );
156+ subscribePath (destinationPath , headerList );
157+ }
158+ subscribersSet .add (subscriber );
159+
160+ }).doOnUnsubscribe (() -> {
161+ for (String dest : mSubscribers .keySet ()) {
162+ Set <Subscriber <? super StompMessage >> set = mSubscribers .get (dest );
163+ for (Subscriber <? super StompMessage > subscriber : set ) {
164+ if (subscriber .isUnsubscribed ()) {
165+ set .remove (subscriber );
166+ if (set .size () < 1 ) {
167+ mSubscribers .remove (dest );
168+ unsubscribePath (dest );
169+ }
170+ }
171+ }
172+ }
173+ });
174+ }
175+
176+ private void subscribePath (String destinationPath , List <StompHeader > headerList ) {
177+ if (destinationPath == null ) return ;
178+ String topicId = UUID .randomUUID ().toString ();
179+
180+ if (mTopics == null ) mTopics = new HashMap <>();
181+ mTopics .put (destinationPath , topicId );
182+ List <StompHeader > headers = new ArrayList <>();
183+ headers .add (new StompHeader (StompHeader .ID , topicId ));
184+ headers .add (new StompHeader (StompHeader .DESTINATION , destinationPath ));
185+ headers .add (new StompHeader (StompHeader .ACK , DEFAULT_ACK ));
186+ for (StompHeader header : headerList ){
187+ headers .add (header );
188+ }
189+ send (new StompMessage (StompCommand .SUBSCRIBE ,
190+ headers , null ));
191+ }
190192
191193
192194 private void unsubscribePath (String dest ) {
0 commit comments