1919import org .xmpp .packet .JID ;
2020import org .xmpp .packet .Packet ;
2121import org .xmpp .packet .PacketError ;
22+ import org .xbill .DNS .SRVRecord ;
23+ import org .xbill .DNS .Record ;
24+ import org .xbill .DNS .Type ;
25+ import org .xbill .DNS .Lookup ;
26+ import org .xbill .DNS .TextParseException ;
2227
2328public class FederatedQueueManager {
2429 private static final Logger logger = Logger
@@ -32,6 +37,8 @@ public class FederatedQueueManager {
3237 public static final String IDENTITY_TYPE_CHANNELS = "channels" ;
3338 public static final String BUDDYCLOUD_SERVER = "buddycloud-server" ;
3439
40+ public static final String SRV_PREFIX = "_buddycloud-server._tcp." ;
41+
3542 private int id = 1 ;
3643
3744 private final ChannelsEngine component ;
@@ -51,7 +58,7 @@ public class FederatedQueueManager {
5158 public FederatedQueueManager (ChannelsEngine component , String localServer ) {
5259 this .component = component ;
5360 this .localServer = localServer ;
54-
61+
5562 nodeMap .start ();
5663 sentRemotePackets .start ();
5764 }
@@ -63,9 +70,9 @@ private int getId() {
6370 }
6471
6572 public void process (Packet packet ) throws ComponentException {
66-
73+
6774 logger .debug ("Packet payload " + packet .toXML () + " going to federation." );
68-
75+
6976 String to = packet .getTo ().toString ();
7077
7178 String uniqueId = generateUniqueId (packet );
@@ -101,6 +108,7 @@ public void process(Packet packet) throws ComponentException {
101108 waitingStanzas .get (to ).add (packet );
102109 logger .debug ("Adding packet to waiting stanza list for " + to
103110 + " (size " + waitingStanzas .get (to ).size () + ")" );
111+ attemptDnsDiscovery (to );
104112 } catch (Exception e ) {
105113 logger .error (e );
106114 }
@@ -175,7 +183,7 @@ public void processDiscoItemsResponse(JID from, List<Element> items)
175183 public boolean isFederatedDiscoInfoRequest (String packetId ) {
176184 return remoteServerInfoRequestIds .containsKey (packetId );
177185 }
178-
186+
179187 private void setDiscoveredServer (String server , String handler ) {
180188 discoveredServers .put (server , handler );
181189 }
@@ -199,28 +207,53 @@ public void processDiscoInfoResponse(JID from, String id,
199207 sendFederatedRequests (originatingServer );
200208 }
201209 }
202-
210+
203211 if (remoteServerItemsToProcess .get (originatingServer ) < 1 ) {
204212 if (!discoveredServers .containsKey (originatingServer )) {
205- sendRemoteChannelServerNotFoundErrorResponses (originatingServer );
206- remoteChannelDiscoveryStatus .put (originatingServer ,
207- NO_CHANNEL_SERVER );
208- waitingStanzas .remove (originatingServer );
213+ sendRemoteChannelServerNotFoundErrorResponses (originatingServer );
214+ remoteChannelDiscoveryStatus .put (originatingServer ,
215+ NO_CHANNEL_SERVER );
216+ waitingStanzas .remove (originatingServer );
209217 } else {
210218 remoteChannelDiscoveryStatus .put (originatingServer , DISCOVERED );
211219 }
212220 }
213221 }
214222
223+ private boolean attemptDnsDiscovery (String originatingServer ) throws ComponentException {
224+ try {
225+ String query = SRV_PREFIX + originatingServer ;
226+ Record [] records = new Lookup (query , Type .SRV ).run ();
227+ if ((null == records ) || (0 == records .length )) {
228+ logger .debug ("No appropriate DNS entry found for " + originatingServer );
229+ return false ;
230+ }
231+ SRVRecord record = (SRVRecord ) records [0 ];
232+ String targetServer = record .getTarget ().toString (true );
233+ setDiscoveredServer (originatingServer , targetServer );
234+ logger .info ("DNS discovery complete for buddycloud server @ "
235+ + originatingServer + " (" + targetServer + ")" );
236+ remoteChannelDiscoveryStatus .put (originatingServer , DISCOVERED );
237+ sendFederatedRequests (originatingServer );
238+ return true ;
239+ } catch (TextParseException e ) {
240+ logger .error (e );
241+ return false ;
242+ }
243+ }
244+
215245 private void sendFederatedRequests (String originatingServer )
216246 throws ComponentException {
217247 String remoteServer = discoveredServers .get (originatingServer );
218248 List <Packet > packetsToSend = waitingStanzas .get (originatingServer );
219249 if (packetsToSend == null ) {
250+ logger .trace ("No queued federated packets to send to " + originatingServer );
220251 return ;
221252 }
253+ logger .trace ("Catching up on federated packet sending to " + originatingServer );
222254 for (Packet packet : packetsToSend ) {
223255 packet .setTo (remoteServer );
256+ logger .trace (packet .toString ());
224257 sendPacket (packet .createCopy ());
225258 }
226259 waitingStanzas .remove (originatingServer );
@@ -261,6 +294,10 @@ public void passResponseToRequester(IQ packet) throws Exception {
261294 + packet .getID () + ")" );
262295 }
263296
297+ if (packet .getType ().equals (IQ .Type .error ) && !remoteChannelDiscoveryStatus .get (packet .getFrom ()).equals (DISCOVERED )) {
298+ return ;
299+ }
300+
264301 String uniqueId = packet .getID ();
265302 packet .setID (idMap .get (uniqueId ));
266303 packet .setTo ((JID ) sentRemotePackets .get (uniqueId ));
@@ -270,7 +307,7 @@ public void passResponseToRequester(IQ packet) throws Exception {
270307
271308 component .sendPacket (packet );
272309 }
273-
310+
274311 public String getRelatedNodeForRemotePacket (IQ packet ) {
275312 String id = null ;
276313 if (nodeMap .containsKey (packet .getID ())) {
0 commit comments