66import org .datadog .jmxfetch .service .ServiceNameProvider ;
77import org .yaml .snakeyaml .Yaml ;
88
9- import java .util .concurrent .CompletableFuture ;
109import static java .util .concurrent .TimeUnit .*;
1110
1211import java .io .File ;
@@ -263,6 +262,7 @@ public Instance(
263262 log .info ("collect_default_jvm_metrics is false - not collecting default JVM metrics" );
264263 }
265264
265+ this .beans = new HashSet <>();
266266 Boolean enableBeanSubscription = (Boolean ) instanceMap .get ("enable_bean_subscription" );
267267 this .enableBeanSubscription = enableBeanSubscription != null && enableBeanSubscription ;
268268 }
@@ -426,7 +426,7 @@ public synchronized void init(boolean forceNewConnection)
426426 log .info ("Trying to connect to JMX Server at " + this .toString ());
427427 connection = getConnection (instanceMap , forceNewConnection );
428428 if (this .enableBeanSubscription ) {
429- log .info ("Subscribing for bean notifications before init " );
429+ log .info ("Subscribing for bean notifications before initial bean refresh " );
430430 this .subscribeToBeans ();
431431 } else {
432432 log .info ("Bean subscription not enabled." );
@@ -723,33 +723,6 @@ public synchronized void beanUnregistered(ObjectName mBeanName) {
723723 log .info ("Bean unregistered event. {}" , mBeanName );
724724 }
725725
726- private class BeanSubscriber implements Runnable {
727- private List <String > beanScopes ;
728- private Connection connection ;
729- private BeanListener listener ;
730- public CompletableFuture <Boolean > subscriptionSuccessful ;
731-
732- BeanSubscriber (List <String > beanScopes , Connection connection , BeanListener listener ) {
733- this .beanScopes = beanScopes ;
734- this .connection = connection ;
735- this .listener = listener ;
736- this .subscriptionSuccessful = new CompletableFuture <Boolean >();
737- }
738-
739- public void run () {
740- try {
741- log .info ("Subscribing to {} bean scopes" , beanScopes .size ());
742-
743- connection .subscribeToBeanScopes (beanScopes , this .listener );
744- this .subscriptionSuccessful .complete (true );
745-
746- Thread .currentThread ().join ();
747- } catch (Exception e ) {
748- log .warn ("Exception while subscribing to beans {}" , e );
749- this .subscriptionSuccessful .complete (false );
750- }
751- }
752- }
753726
754727 private void subscribeToBeans () {
755728 List <String > beanScopes = this .getBeansScopes ();
@@ -758,10 +731,13 @@ private void subscribeToBeans() {
758731 try {
759732 new Thread (subscriber ).start ();
760733 if (subscriber .subscriptionSuccessful .get (1 , SECONDS )) {
761- log .info ("Subscribed successfully!" );
734+ log .info ("Subscribed to {} bean scopes successfully!" , beanScopes . size () );
762735 }
763736 } catch (Exception e ) {
764- log .warn ("Exception while subscribing to beans {}" , e );
737+ log .warn ("Bean subscription failed! Will rely on bean_refresh, ensure it is set "
738+ + " to an appropriate value (currently {} seconds). Exception: {}" ,
739+ this .refreshBeansPeriod , e );
740+ this .enableBeanSubscription = false ;
765741 }
766742 }
767743
@@ -778,27 +754,53 @@ public List<String> getBeansScopes() {
778754 * certain actions, and fallback if necessary.
779755 */
780756 private synchronized void refreshBeansList () throws IOException {
781- this . beans = new HashSet <ObjectName >();
757+ Set < ObjectName > newBeans = new HashSet <>();
782758 String action = appConfig .getAction ();
783759 boolean limitQueryScopes =
784760 !action .equals (AppConfig .ACTION_LIST_EVERYTHING )
785761 && !action .equals (AppConfig .ACTION_LIST_NOT_MATCHING );
786762
763+ boolean fullBeanQueryNeeded = false ;
787764 if (limitQueryScopes ) {
788765 try {
789766 List <String > beanScopes = getBeansScopes ();
790767 for (String scope : beanScopes ) {
791768 ObjectName name = new ObjectName (scope );
792- this . beans .addAll (connection .queryNames (name ));
769+ newBeans .addAll (connection .queryNames (name ));
793770 }
794771 } catch (Exception e ) {
772+ fullBeanQueryNeeded = true ;
795773 log .error (
796774 "Unable to compute a common bean scope, querying all beans as a fallback" ,
797775 e );
798776 }
799777 }
800778
801- this .beans = (this .beans .isEmpty ()) ? connection .queryNames (null ) : this .beans ;
779+ if (fullBeanQueryNeeded ) {
780+ newBeans = connection .queryNames (null );
781+ }
782+ if (this .enableBeanSubscription && !fullBeanQueryNeeded ) {
783+ // This code exists to validate the bean-subscription is working properly
784+ // If every new bean and de-registered bean properly fires an event, then
785+ // this.beans (current set that has been updated via subscription) should
786+ // always equal the new set of beans queried (unless it was a full bean query)
787+ Set <ObjectName > beansNotSeen = new HashSet <>();
788+ if (this .beans .containsAll (newBeans )) {
789+ beansNotSeen .addAll (newBeans );
790+ beansNotSeen .removeAll (this .beans );
791+ log .error ("Bean refresh found {} new beans that were not already known via subscription" , beansNotSeen .size ());
792+ }
793+ if (!newBeans .containsAll (this .beans )){
794+ beansNotSeen .addAll (this .beans );
795+ beansNotSeen .removeAll (newBeans );
796+ log .error ("Bean refresh found {} FEWER beans than already known via subscription" , beansNotSeen .size ());
797+ }
798+
799+ for (ObjectName b : beansNotSeen ) {
800+ log .error ("Bean {} is one that has never been seen before, see why subscription did not detect this bean." , b .toString ());
801+ }
802+ }
803+ this .beans = newBeans ;
802804 this .lastRefreshTime = System .currentTimeMillis ();
803805 }
804806
0 commit comments