25
25
import org .zeromq .ZContext ;
26
26
import org .zeromq .ZMQ ;
27
27
28
+ import java .net .Inet6Address ;
29
+ import java .net .InetAddress ;
30
+ import java .net .UnknownHostException ;
28
31
import java .util .concurrent .ExecutorService ;
29
32
import java .util .concurrent .Executors ;
30
33
import java .util .function .Consumer ;
34
+ import java .util .logging .Level ;
31
35
import java .util .logging .Logger ;
32
36
33
37
class BoundZmqEventBus implements EventBus {
@@ -46,12 +50,12 @@ class BoundZmqEventBus implements EventBus {
46
50
LOG .info (String .format ("XPUB binding to %s, XSUB binding to %s" , xpubAddr , xsubAddr ));
47
51
48
52
xpub = context .createSocket (SocketType .XPUB );
49
- xpub .setIPv6 (true );
53
+ xpub .setIPv6 (xpubAddr . isIPv6 );
50
54
xpub .setImmediate (true );
51
55
xpub .bind (xpubAddr .bindTo );
52
56
53
57
xsub = context .createSocket (SocketType .XSUB );
54
- xsub .setIPv6 (true );
58
+ xsub .setIPv6 (xsubAddr . isIPv6 );
55
59
xsub .setImmediate (true );
56
60
xsub .bind (xsubAddr .bindTo );
57
61
@@ -89,7 +93,7 @@ public void close() {
89
93
90
94
private Addresses deriveAddresses (String host , String connection ) {
91
95
if (connection .startsWith ("inproc:" )) {
92
- return new Addresses (connection , connection );
96
+ return new Addresses (connection , connection , false );
93
97
}
94
98
95
99
if (!connection .startsWith ("tcp://" )) {
@@ -109,19 +113,35 @@ private Addresses deriveAddresses(String host, String connection) {
109
113
host = hostName ;
110
114
}
111
115
116
+ boolean isAddressIPv6 = false ;
117
+ try {
118
+ if (InetAddress .getByName (host ) instanceof Inet6Address ) {
119
+ isAddressIPv6 = true ;
120
+ if (!host .startsWith ("[" )) {
121
+ host = String .format ("[%s]" , host );
122
+ }
123
+ }
124
+ } catch (UnknownHostException e ) {
125
+ LOG .log (Level .WARNING , "Could not determine if host address is IPv6 or IPv4" , e );
126
+ }
127
+
112
128
return new Addresses (
113
129
connection ,
114
- String .format ("tcp://%s:%d" , host , port ));
130
+ String .format ("tcp://%s:%d" , host , port ),
131
+ isAddressIPv6
132
+ );
115
133
}
116
134
117
135
private static class Addresses {
118
- Addresses (String bindTo , String advertise ) {
136
+ Addresses (String bindTo , String advertise , boolean isIPv6 ) {
119
137
this .bindTo = bindTo ;
120
138
this .advertise = advertise ;
139
+ this .isIPv6 = isIPv6 ;
121
140
}
122
141
123
142
String bindTo ;
124
143
String advertise ;
144
+ boolean isIPv6 ;
125
145
126
146
@ Override
127
147
public String toString () {
0 commit comments