Skip to content

Commit beb754a

Browse files
author
zengqiao
committed
修复JMX连接被关闭,抛出IOException后,未进行连接重建的问题
1 parent e067123 commit beb754a

File tree

1 file changed

+39
-0
lines changed

1 file changed

+39
-0
lines changed

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/jmx/JmxConnectorWrap.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ public synchronized void close() {
9090
}
9191
try {
9292
jmxConnector.close();
93+
94+
jmxConnector = null;
9395
} catch (IOException e) {
9496
LOGGER.warn("close JmxConnector exception, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port, e);
9597
}
@@ -105,6 +107,11 @@ public Object getAttribute(ObjectName name, String attribute) throws
105107
acquire();
106108
MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection();
107109
return mBeanServerConnection.getAttribute(name, attribute);
110+
} catch (IOException ioe) {
111+
// 如果是因为连接断开,则进行重新连接,并抛出异常
112+
reInitDueIOException();
113+
114+
throw ioe;
108115
} finally {
109116
atomicInteger.incrementAndGet();
110117
}
@@ -120,6 +127,11 @@ public AttributeList getAttributes(ObjectName name, String[] attributes) throws
120127
acquire();
121128
MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection();
122129
return mBeanServerConnection.getAttributes(name, attributes);
130+
} catch (IOException ioe) {
131+
// 如果是因为连接断开,则进行重新连接,并抛出异常
132+
reInitDueIOException();
133+
134+
throw ioe;
123135
} finally {
124136
atomicInteger.incrementAndGet();
125137
}
@@ -131,6 +143,11 @@ public Set<ObjectName> queryNames(ObjectName name, QueryExp query)
131143
acquire();
132144
MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection();
133145
return mBeanServerConnection.queryNames(name, query);
146+
} catch (IOException ioe) {
147+
// 如果是因为连接断开,则进行重新连接,并抛出异常
148+
reInitDueIOException();
149+
150+
throw ioe;
134151
} finally {
135152
atomicInteger.incrementAndGet();
136153
}
@@ -186,4 +203,26 @@ private void acquire() {
186203
}
187204
}
188205
}
206+
207+
private synchronized void reInitDueIOException() {
208+
try {
209+
if (jmxConnector == null) {
210+
return;
211+
}
212+
213+
// 检查是否正常
214+
jmxConnector.getConnectionId();
215+
216+
// 如果正常则直接返回
217+
return;
218+
} catch (Exception e) {
219+
// ignore
220+
}
221+
222+
// 关闭旧的
223+
this.close();
224+
225+
// 重新创建
226+
this.checkJmxConnectionAndInitIfNeed();
227+
}
189228
}

0 commit comments

Comments
 (0)