Skip to content

Commit b1d8e60

Browse files
committed
[AMQ-9687] Add metrics about error and reconnect counts to network connector and network bridges
1 parent 6871226 commit b1d8e60

File tree

13 files changed

+303
-12
lines changed

13 files changed

+303
-12
lines changed

activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,21 @@ public void resetStats(){
9999
}
100100
}
101101

102+
@Override
103+
public long getStartedTimestamp() {
104+
return bridge.getStartedTimestamp();
105+
}
106+
107+
@Override
108+
public long getLocalExceptionCount() {
109+
return bridge.getLocalExceptionCount();
110+
}
111+
112+
@Override
113+
public long getRemoteExceptionCount() {
114+
return bridge.getRemoteExceptionCount();
115+
}
116+
102117
public void addNetworkDestinationView(NetworkDestinationView networkDestinationView){
103118
networkDestinationViewList.add(networkDestinationView);
104119
}

activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeViewMBean.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,9 @@ public interface NetworkBridgeViewMBean extends Service {
4040

4141
void resetStats();
4242

43+
long getStartedTimestamp();
44+
45+
long getLocalExceptionCount();
46+
47+
long getRemoteExceptionCount();
4348
}

activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,4 +203,29 @@ public void setRemoteUserName(String remoteUserName) {
203203
public boolean isAutoStart() {
204204
return connector.isAutoStart();
205205
}
206+
207+
@Override
208+
public long getStartedTimestamp() {
209+
return connector.getStartedTimestamp();
210+
}
211+
212+
@Override
213+
public long getStoppedTimestamp() {
214+
return connector.getStoppedTimestamp();
215+
}
216+
217+
@Override
218+
public long getBridgeExceptionCount() {
219+
return connector.getBridgeExceptionCounter().getCount();
220+
}
221+
222+
@Override
223+
public long getLocalExceptionCount() {
224+
return connector.getLocalExceptionCounter().getCount();
225+
}
226+
227+
@Override
228+
public long getRemoteExceptionCount() {
229+
return connector.getRemoteExceptionCounter().getCount();
230+
}
206231
}

activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,14 @@ public interface NetworkConnectorViewMBean extends Service {
9090
void setRemotePassword(String remotePassword);
9191

9292
boolean isAutoStart();
93+
94+
long getStartedTimestamp();
95+
96+
long getStoppedTimestamp();
97+
98+
long getBridgeExceptionCount();
99+
100+
long getLocalExceptionCount();
101+
102+
long getRemoteExceptionCount();
93103
}

activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.concurrent.TimeUnit;
3838
import java.util.concurrent.TimeoutException;
3939
import java.util.concurrent.atomic.AtomicBoolean;
40+
import java.util.concurrent.atomic.AtomicLong;
4041
import java.util.regex.Pattern;
4142

4243
import javax.management.ObjectName;
@@ -171,6 +172,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
171172
private final ExecutorService syncExecutor = Executors.newSingleThreadExecutor();
172173
private Transport duplexInboundLocalBroker = null;
173174
private ProducerInfo duplexInboundLocalProducerInfo;
175+
private AtomicLong startedTimestamp = new AtomicLong(0L);
174176

175177
public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
176178
this.configuration = configuration;
@@ -338,6 +340,7 @@ public void run() {
338340
startedLatch.countDown();
339341
localStartedLatch.countDown();
340342
staticDestinationsLatch.countDown();
343+
startedTimestamp.set(0L);
341344

342345
ss.throwFirstException();
343346
}
@@ -372,6 +375,7 @@ public void run() {
372375
// Once we have all required broker info we can attempt to start
373376
// the local and then remote sides of the bridge.
374377
doStartLocalAndRemoteBridges();
378+
startedTimestamp.set(System.currentTimeMillis());
375379
} finally {
376380
Thread.currentThread().setName(originalName);
377381
}
@@ -647,13 +651,16 @@ protected void startRemoteBridge() throws Exception {
647651

648652
@Override
649653
public void serviceRemoteException(Throwable error) {
654+
networkBridgeStatistics.getRemoteExceptionCount().increment();
655+
650656
if (!disposed.get()) {
651657
if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
652658
LOG.error("Network connection between {} and {} shutdown due to a remote error: {}", localBroker, remoteBroker, error.toString());
653659
} else {
654660
LOG.warn("Network connection between {} and {} shutdown due to a remote error: {}", localBroker, remoteBroker, error.toString());
655661
}
656662
LOG.debug("The remote Exception was: {}", error, error);
663+
657664
brokerService.getTaskRunnerFactory().execute(new Runnable() {
658665
@Override
659666
public void run() {
@@ -1112,6 +1119,8 @@ public void serviceLocalException(Throwable error) {
11121119

11131120
public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
11141121
LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error);
1122+
networkBridgeStatistics.getLocalExceptionCount().increment();
1123+
11151124
if (!disposed.get()) {
11161125
if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) {
11171126
// not a reason to terminate the bridge - temps can disappear with
@@ -1926,6 +1935,21 @@ public long getEnqueueCounter() {
19261935
return networkBridgeStatistics.getEnqueues().getCount();
19271936
}
19281937

1938+
@Override
1939+
public long getStartedTimestamp() {
1940+
return startedTimestamp.get();
1941+
}
1942+
1943+
@Override
1944+
public long getLocalExceptionCount() {
1945+
return networkBridgeStatistics.getLocalExceptionCount().getCount();
1946+
}
1947+
1948+
@Override
1949+
public long getRemoteExceptionCount() {
1950+
return networkBridgeStatistics.getRemoteExceptionCount().getCount();
1951+
}
1952+
19291953
@Override
19301954
public NetworkBridgeStatistics getNetworkBridgeStatistics() {
19311955
return networkBridgeStatistics;

activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ public void onServiceAdd(DiscoveryEvent event) {
131131
try {
132132
remoteTransport = TransportFactory.connect(connectUri);
133133
} catch (Exception e) {
134+
remoteExceptionCounter.increment();
134135
LOG.warn("Could not connect to remote URI: {}: {}", connectUri, e.getMessage());
135136
LOG.debug("Connection failure exception: ", e);
136137
try {
@@ -143,6 +144,7 @@ public void onServiceAdd(DiscoveryEvent event) {
143144
try {
144145
localTransport = createLocalTransport();
145146
} catch (Exception e) {
147+
localExceptionCounter.increment();
146148
ServiceSupport.dispose(remoteTransport);
147149
LOG.warn("Could not connect to local URI: {}: {}", localURI, e.getMessage());
148150
LOG.debug("Connection failure exception: ", e);
@@ -164,6 +166,7 @@ public void onServiceAdd(DiscoveryEvent event) {
164166
}
165167
bridge.start();
166168
} catch (Exception e) {
169+
bridgeExceptionCounter.increment();
167170
ServiceSupport.dispose(localTransport);
168171
ServiceSupport.dispose(remoteTransport);
169172
LOG.warn("Could not start network bridge between: {} and: {} due to: {}", localURI, uri, e.getMessage());

activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,4 +95,10 @@ public interface NetworkBridge extends Service {
9595
ObjectName getMbeanObjectName();
9696

9797
void resetStats();
98+
99+
long getStartedTimestamp();
100+
101+
long getLocalExceptionCount();
102+
103+
long getRemoteExceptionCount();
98104
}

activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeStatistics.java

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.activemq.network;
1919

20+
import java.util.Set;
21+
2022
import org.apache.activemq.management.CountStatisticImpl;
2123
import org.apache.activemq.management.StatsImpl;
2224

@@ -28,15 +30,17 @@ public class NetworkBridgeStatistics extends StatsImpl {
2830
protected CountStatisticImpl enqueues;
2931
protected CountStatisticImpl dequeues;
3032
protected CountStatisticImpl receivedCount;
33+
protected CountStatisticImpl localExceptionCount;
34+
protected CountStatisticImpl remoteExceptionCount;
3135

3236
public NetworkBridgeStatistics() {
3337
enqueues = new CountStatisticImpl("enqueues", "The current number of enqueues this bridge has, which is the number of potential messages to be forwarded.");
3438
dequeues = new CountStatisticImpl("dequeues", "The current number of dequeues this bridge has, which is the number of messages received by the remote broker.");
3539
receivedCount = new CountStatisticImpl("receivedCount", "The number of messages that have been received by the NetworkBridge from the remote broker. Only applies for Duplex bridges.");
40+
localExceptionCount = new CountStatisticImpl("localExceptionCount", "The number of exceptions that have been received by the NetworkBridge from the local broker.");
41+
remoteExceptionCount = new CountStatisticImpl("remoteExceptionCount", "The number of exceptions that have been received by the NetworkBridge from the remote broker.");
3642

37-
addStatistic("enqueues", enqueues);
38-
addStatistic("dequeues", dequeues);
39-
addStatistic("receivedCount", receivedCount);
43+
addStatistics(Set.of(enqueues, dequeues, receivedCount, localExceptionCount, remoteExceptionCount));
4044
}
4145

4246
/**
@@ -69,13 +73,35 @@ public CountStatisticImpl getReceivedCount() {
6973
return receivedCount;
7074
}
7175

76+
/**
77+
* The current number of exceptions this bridge has, which is the number of
78+
* exceptions received from the remote broker.
79+
*
80+
* @return
81+
*/
82+
public CountStatisticImpl getLocalExceptionCount() {
83+
return localExceptionCount;
84+
}
85+
86+
/**
87+
* The current number of exceptions this bridge has, which is the number of
88+
* exceptions received from the remote broker.
89+
*
90+
* @return
91+
*/
92+
public CountStatisticImpl getRemoteExceptionCount() {
93+
return remoteExceptionCount;
94+
}
95+
7296
@Override
7397
public void reset() {
7498
if (this.isDoReset()) {
7599
super.reset();
76100
enqueues.reset();
77101
dequeues.reset();
78102
receivedCount.reset();
103+
localExceptionCount.reset();
104+
remoteExceptionCount.reset();
79105
}
80106
}
81107

@@ -85,17 +111,23 @@ public void setEnabled(boolean enabled) {
85111
enqueues.setEnabled(enabled);
86112
dequeues.setEnabled(enabled);
87113
receivedCount.setEnabled(enabled);
114+
localExceptionCount.setEnabled(enabled);
115+
remoteExceptionCount.setEnabled(enabled);
88116
}
89117

90118
public void setParent(NetworkBridgeStatistics parent) {
91119
if (parent != null) {
92120
enqueues.setParent(parent.enqueues);
93121
dequeues.setParent(parent.dequeues);
94122
receivedCount.setParent(parent.receivedCount);
123+
localExceptionCount.setParent(parent.localExceptionCount);
124+
remoteExceptionCount.setParent(parent.remoteExceptionCount);
95125
} else {
96126
enqueues.setParent(null);
97127
dequeues.setParent(null);
98128
receivedCount.setParent(null);
129+
localExceptionCount.setParent(null);
130+
remoteExceptionCount.setParent(null);
99131
}
100132
}
101133

activemq-broker/src/main/java/org/apache/activemq/network/NetworkConnector.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Set;
2525
import java.util.concurrent.ConcurrentHashMap;
2626
import java.util.concurrent.ConcurrentMap;
27+
import java.util.concurrent.atomic.AtomicLong;
2728

2829
import javax.management.MalformedObjectNameException;
2930
import javax.management.ObjectName;
@@ -36,6 +37,8 @@
3637
import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean;
3738
import org.apache.activemq.command.ActiveMQDestination;
3839
import org.apache.activemq.command.ConsumerId;
40+
import org.apache.activemq.management.CountStatistic;
41+
import org.apache.activemq.management.CountStatisticImpl;
3942
import org.apache.activemq.transport.Transport;
4043
import org.apache.activemq.util.ServiceStopper;
4144
import org.apache.activemq.util.ServiceSupport;
@@ -51,6 +54,11 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
5154
protected URI localURI;
5255
protected ConnectionFilter connectionFilter;
5356
protected ConcurrentMap<URI, NetworkBridge> bridges = new ConcurrentHashMap<URI, NetworkBridge>();
57+
protected final AtomicLong startedTimestamp = new AtomicLong(0l);
58+
protected final AtomicLong stoppedTimestamp = new AtomicLong(0l);
59+
protected final CountStatisticImpl bridgeExceptionCounter = new CountStatisticImpl("bridgeExceptionCount", "Count of exceptions when establishing network bridge.");
60+
protected final CountStatisticImpl localExceptionCounter = new CountStatisticImpl("localExceptionCount", "Count of exceptions when connecting to local broker.");
61+
protected final CountStatisticImpl remoteExceptionCounter = new CountStatisticImpl("remoteExceptionCount", "Count of exceptions when connecting to remote broker.");
5462

5563
protected ServiceSupport serviceSupport = new ServiceSupport() {
5664

@@ -162,22 +170,35 @@ public static ActiveMQDestination[] getDurableTopicDestinations(final Set<Active
162170
@Override
163171
public void start() throws Exception {
164172
serviceSupport.start();
173+
startedTimestamp.set(System.currentTimeMillis());
174+
stoppedTimestamp.set(0l);
165175
}
166176

167177
@Override
168178
public void stop() throws Exception {
169179
serviceSupport.stop();
180+
stoppedTimestamp.set(System.currentTimeMillis());
181+
startedTimestamp.set(0l);
170182
}
171183

172184
protected void handleStart() throws Exception {
173185
if (localURI == null) {
174186
throw new IllegalStateException("You must configure the 'localURI' property");
175187
}
176188
LOG.info("Network Connector {} started", this);
189+
bridgeExceptionCounter.setEnabled(true);
190+
localExceptionCounter.setEnabled(true);
191+
remoteExceptionCounter.setEnabled(true);
192+
bridgeExceptionCounter.setCount(0l);
193+
localExceptionCounter.setCount(0l);
194+
remoteExceptionCounter.setCount(0l);
177195
}
178196

179197
protected void handleStop(ServiceStopper stopper) throws Exception {
180198
LOG.info("Network Connector {} stopped", this);
199+
bridgeExceptionCounter.reset();
200+
localExceptionCounter.reset();
201+
remoteExceptionCounter.reset();
181202
}
182203

183204
public boolean isStarted() {
@@ -255,4 +276,24 @@ public boolean removeDemandSubscription(ConsumerId consumerId) {
255276
public Collection<NetworkBridge> activeBridges() {
256277
return bridges.values();
257278
}
279+
280+
public long getStartedTimestamp() {
281+
return startedTimestamp.get();
282+
}
283+
284+
public long getStoppedTimestamp() {
285+
return stoppedTimestamp.get();
286+
}
287+
288+
public CountStatistic getBridgeExceptionCounter() {
289+
return bridgeExceptionCounter;
290+
}
291+
292+
public CountStatistic getLocalExceptionCounter() {
293+
return localExceptionCounter;
294+
}
295+
296+
public CountStatistic getRemoteExceptionCounter() {
297+
return remoteExceptionCounter;
298+
}
258299
}

0 commit comments

Comments
 (0)