1616 */
1717package org .graylog .plugins .usagestatistics .collectors ;
1818
19- import com .google .common .collect .Lists ;
2019import com .google .common .collect .Sets ;
21- import org .elasticsearch .action .admin .cluster .node .info .NodeInfo ;
22- import org .elasticsearch .action .admin .cluster .node .info .NodesInfoResponse ;
23- import org .elasticsearch .action .admin .cluster .node .stats .NodeStats ;
24- import org .elasticsearch .action .admin .cluster .node .stats .NodesStatsResponse ;
25- import org .elasticsearch .client .Client ;
26- import org .elasticsearch .client .ClusterAdminClient ;
27- import org .elasticsearch .monitor .jvm .JvmStats ;
20+ import com .google .gson .JsonElement ;
21+ import com .google .gson .JsonObject ;
22+ import io .searchbox .client .JestClient ;
23+ import io .searchbox .client .JestResult ;
24+ import io .searchbox .cluster .NodesInfo ;
2825import org .graylog .plugins .usagestatistics .dto .HostInfo ;
2926import org .graylog .plugins .usagestatistics .dto .JvmInfo ;
3027import org .graylog .plugins .usagestatistics .dto .MacAddress ;
3330import org .graylog .plugins .usagestatistics .dto .elasticsearch .ElasticsearchNodeInfo ;
3431import org .graylog .plugins .usagestatistics .dto .elasticsearch .IndicesStats ;
3532import org .graylog .plugins .usagestatistics .dto .elasticsearch .NodesStats ;
33+ import org .graylog2 .indexer .cluster .jest .JestUtils ;
34+ import org .graylog2 .indexer .gson .GsonUtils ;
3635import org .graylog2 .system .stats .ClusterStatsService ;
3736import org .graylog2 .system .stats .elasticsearch .ElasticsearchStats ;
3837import org .slf4j .Logger ;
3938import org .slf4j .LoggerFactory ;
4039
4140import javax .inject .Inject ;
41+ import java .io .IOException ;
4242import java .util .Collections ;
4343import java .util .List ;
44- import java .util .Map ;
44+ import java .util .Optional ;
4545import java .util .Set ;
46+ import java .util .stream .Collectors ;
47+ import java .util .stream .Stream ;
48+ import java .util .stream .StreamSupport ;
4649
4750public class ElasticsearchCollector {
4851 private static final Logger LOG = LoggerFactory .getLogger (ElasticsearchCollector .class );
4952
50- private final Client client ;
5153 private final ClusterStatsService clusterStatsService ;
54+ private final JestClient jestClient ;
5255
5356 @ Inject
54- public ElasticsearchCollector (Client client , ClusterStatsService clusterStatsService ) {
55- this .client = client ;
57+ public ElasticsearchCollector (ClusterStatsService clusterStatsService , JestClient jestClient ) {
5658 this .clusterStatsService = clusterStatsService ;
59+ this .jestClient = jestClient ;
5760 }
5861
5962 public Set <ElasticsearchNodeInfo > getNodeInfos () {
60- final Map <String , NodeInfo > nodeInfos = fetchNodeInfos ();
61- final Map <String , NodeStats > nodeStats = fetchNodeStats ();
62-
63- final Set <ElasticsearchNodeInfo > elasticsearchNodeInfos = Sets .newHashSetWithExpectedSize (nodeInfos .size ());
64- for (String node : nodeInfos .keySet ()) {
65- final NodeInfo info = nodeInfos .get (node );
66- final NodeStats stats = nodeStats .get (node );
67-
68- if (info == null || stats == null ) {
69- LOG .warn ("Couldn't retrieve all required information from Elasticsearch node {}, skipping." , node );
70- continue ;
71- }
72-
73- // TODO remove these as soon as the backend service treats HostInfo as optional
74- // the host info details aren't available in Elasticsearch 2.x anymore, but we still report the empty
75- // bean because the backend service still expects some data (even if it is empty)
76- final MacAddress macAddress = MacAddress .EMPTY ;
77- final HostInfo .Cpu cpu = null ;
78- final HostInfo .Memory memory = null ;
79- final HostInfo .Memory swap = null ;
80- final HostInfo hostInfo = HostInfo .create (macAddress , cpu , memory , swap );
81-
82- final List <String > garbageCollectors ;
83- if (stats .getJvm () != null ) {
84- garbageCollectors = Lists .newArrayList ();
85- for (JvmStats .GarbageCollector gc : stats .getJvm ().getGc ()) {
86- garbageCollectors .add (gc .getName ());
87- }
88- } else {
89- garbageCollectors = Collections .emptyList ();
90- }
91-
92- final JvmInfo jvmInfo ;
93- if (info .getJvm () != null ) {
94- final JvmInfo .Memory jvmMemory = JvmInfo .Memory .create (
95- info .getJvm ().getMem ().getHeapInit ().bytes (),
96- info .getJvm ().getMem ().getHeapMax ().bytes (),
97- info .getJvm ().getMem ().getNonHeapInit ().bytes (),
98- info .getJvm ().getMem ().getNonHeapMax ().bytes (),
99- info .getJvm ().getMem ().getDirectMemoryMax ().bytes ()
100- );
101- final JvmInfo .Os jvmOs = JvmInfo .Os .create (
102- info .getJvm ().getSystemProperties ().get ("os.name" ),
103- info .getJvm ().getSystemProperties ().get ("os.version" ),
104- info .getJvm ().getSystemProperties ().get ("os.arch" )
105- );
106- jvmInfo = JvmInfo .create (
107- info .getJvm ().version (),
108- info .getJvm ().getVmName (),
109- info .getJvm ().getVmVersion (),
110- info .getJvm ().getVmVendor (),
111- jvmOs ,
112- jvmMemory ,
113- garbageCollectors
114- );
115- } else {
116- jvmInfo = null ;
117- }
118-
119- final ElasticsearchNodeInfo elasticsearchNodeInfo = ElasticsearchNodeInfo .create (
120- info .getVersion ().toString (),
121- hostInfo ,
122- jvmInfo
123- );
124-
125- elasticsearchNodeInfos .add (elasticsearchNodeInfo );
63+ final JsonObject nodesMap = fetchNodeInfos ();
64+ if (nodesMap == null ) {
65+ return Collections .emptySet ();
12666 }
67+ final Set <ElasticsearchNodeInfo > elasticsearchNodeInfos = Sets .newHashSetWithExpectedSize (nodesMap .entrySet ().size ());
68+ Optional .of (nodesMap )
69+ .map (JsonObject ::entrySet )
70+ .map (Iterable ::spliterator )
71+ .map (splitr -> StreamSupport .stream (splitr , false ))
72+ .orElse (Stream .empty ())
73+ .forEach (entry -> {
74+
75+ // TODO remove these as soon as the backend service treats HostInfo as optional
76+ // the host info details aren't available in Elasticsearch 2.x anymore, but we still report the empty
77+ // bean because the backend service still expects some data (even if it is empty)
78+ final MacAddress macAddress = MacAddress .EMPTY ;
79+ final HostInfo .Cpu cpu = null ;
80+ final HostInfo .Memory memory = null ;
81+ final HostInfo .Memory swap = null ;
82+ final HostInfo hostInfo = HostInfo .create (macAddress , cpu , memory , swap );
83+
84+ final Optional <JsonObject > jvm = Optional .of (entry .getValue ())
85+ .map (JsonElement ::getAsJsonObject )
86+ .map (nodeInfo -> GsonUtils .asJsonObject (nodeInfo .get ("jvm" )));
87+
88+ final List <String > garbageCollectors = jvm
89+ .map (jvmInfo -> GsonUtils .asJsonArray (jvmInfo .get ("gc_collectors" )))
90+ .map (Iterable ::spliterator )
91+ .map (splitr -> StreamSupport .stream (splitr , false ))
92+ .orElse (Stream .empty ())
93+ .map (String ::valueOf )
94+ .collect (Collectors .toList ());
95+
96+ final Optional <JsonObject > memInfo = jvm .map (jvmInfo -> GsonUtils .asJsonObject (jvmInfo .get ("mem" )));
97+
98+ final JvmInfo .Memory jvmMemory = JvmInfo .Memory .create (
99+ memInfo .map (mem -> GsonUtils .asLong (mem .get ("heap_init_in_bytes" ))).orElse (-1L ),
100+ memInfo .map (mem -> GsonUtils .asLong (mem .get ("heap_max_in_bytes" ))).orElse (-1L ),
101+ memInfo .map (mem -> GsonUtils .asLong (mem .get ("non_heap_init_in_bytes" ))).orElse (-1L ),
102+ memInfo .map (mem -> GsonUtils .asLong (mem .get ("non_heap_max_in_bytes" ))).orElse (-1L ),
103+ memInfo .map (mem -> GsonUtils .asLong (mem .get ("direct_max_in_bytes" ))).orElse (-1L )
104+ );
105+
106+ final Optional <JsonObject > osInfo = Optional .of (entry .getValue ())
107+ .map (JsonElement ::getAsJsonObject )
108+ .map (nodeInfo -> GsonUtils .asJsonObject (nodeInfo .get ("os" )));
109+
110+ final JvmInfo .Os jvmOs = JvmInfo .Os .create (
111+ osInfo .map (os -> GsonUtils .asString (os .get ("name" ))).orElse ("<unknown>" ),
112+ osInfo .map (os -> GsonUtils .asString (os .get ("version" ))).orElse ("<unknown>" ),
113+ osInfo .map (os -> GsonUtils .asString (os .get ("arch" ))).orElse ("<unknown>" )
114+ );
115+ final JvmInfo jvmInfo = JvmInfo .create (
116+ jvm .map (j -> GsonUtils .asString (j .get ("version" ))).orElse ("<unknown>" ),
117+ jvm .map (j -> GsonUtils .asString (j .get ("vm_name" ))).orElse ("<unknown>" ),
118+ jvm .map (j -> GsonUtils .asString (j .get ("vm_version" ))).orElse ("<unknown>" ),
119+ jvm .map (j -> GsonUtils .asString (j .get ("vm_vendor" ))).orElse ("<unknown>" ),
120+ jvmOs ,
121+ jvmMemory ,
122+ garbageCollectors
123+ );
124+ final String esVersion = Optional .of (entry .getValue ())
125+ .map (JsonElement ::getAsJsonObject )
126+ .map (nodeInfo -> GsonUtils .asString (nodeInfo .get ("version" )))
127+ .orElse ("<unknown>" );
128+
129+ final ElasticsearchNodeInfo elasticsearchNodeInfo = ElasticsearchNodeInfo .create (
130+ esVersion ,
131+ hostInfo ,
132+ jvmInfo
133+ );
134+
135+ elasticsearchNodeInfos .add (elasticsearchNodeInfo );
136+ });
127137
128138 return elasticsearchNodeInfos ;
129139 }
@@ -137,15 +147,21 @@ public ElasticsearchClusterStats getClusterStats() {
137147 );
138148 }
139149
140- private Map <String , NodeInfo > fetchNodeInfos () {
141- ClusterAdminClient adminClient = this .client .admin ().cluster ();
142- NodesInfoResponse nodesInfoResponse = adminClient .nodesInfo (adminClient .prepareNodesInfo ().request ()).actionGet ();
143- return nodesInfoResponse .getNodesMap ();
144- }
145-
146- private Map <String , NodeStats > fetchNodeStats () {
147- ClusterAdminClient adminClient = this .client .admin ().cluster ();
148- NodesStatsResponse nodesStatsResponse = adminClient .nodesStats (adminClient .prepareNodesStats ().request ()).actionGet ();
149- return nodesStatsResponse .getNodesMap ();
150+ private JsonObject fetchNodeInfos () {
151+ final String errorMessage = "Unable to fetch node infos." ;
152+ final NodesInfo .Builder requestBuilder = new NodesInfo .Builder ()
153+ .withHttp ()
154+ .withJvm ()
155+ .withNetwork ()
156+ .withOs ()
157+ .withPlugins ()
158+ .withProcess ()
159+ .withSettings ()
160+ .withThreadPool ()
161+ .withTransport ();
162+ final JestResult result = JestUtils .execute (jestClient , requestBuilder .build (), () -> errorMessage );
163+ return Optional .of (result .getJsonObject ())
164+ .map (json -> GsonUtils .asJsonObject (json .get ("nodes" )))
165+ .orElseThrow (() -> new IllegalStateException (errorMessage + " Unable to parse reply: " + result .getJsonString ()));
150166 }
151167}
0 commit comments