Skip to content

Commit b478d9f

Browse files
moesterheldtodvora
andauthored
Add data node heap check (#20168)
* add data node hep check and notification * fix template names * suppress forbidden invocation as it is handled by exception handling * add recommended heap setting to notification * changelog * switch to oshi * use floating point arithmetic * change system notification text * fixed test * add paragraphs --------- Co-authored-by: Tomas Dvorak <[email protected]>
1 parent 1862c0c commit b478d9f

File tree

10 files changed

+228
-8
lines changed

10 files changed

+228
-8
lines changed

changelog/unreleased/pr-20168.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
type = "a"
2+
message = "Add system notification if heap size for data node is potentially too small."
3+
4+
issues = ["Graylog2/graylog-plugin-enterprise#6641"]
5+
pulls = ["20168"]

data-node/src/main/java/org/graylog/datanode/opensearch/OpensearchProcessImpl.java

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@
1717
package org.graylog.datanode.opensearch;
1818

1919
import com.fasterxml.jackson.databind.ObjectMapper;
20+
import com.github.joschi.jadconfig.util.Size;
21+
import com.google.common.annotations.VisibleForTesting;
2022
import com.google.common.eventbus.EventBus;
23+
import com.google.common.eventbus.Subscribe;
2124
import com.google.common.util.concurrent.ThreadFactoryBuilder;
2225
import jakarta.inject.Inject;
2326
import org.apache.commons.collections4.queue.CircularFifoQueue;
2427
import org.apache.commons.exec.ExecuteException;
28+
import org.apache.commons.io.FileUtils;
2529
import org.apache.http.client.utils.URIBuilder;
2630
import org.graylog.datanode.Configuration;
2731
import org.graylog.datanode.configuration.DatanodeConfiguration;
@@ -50,11 +54,17 @@
5054
import org.graylog.storage.opensearch2.OpenSearchClient;
5155
import org.graylog2.datanode.DataNodeLifecycleEvent;
5256
import org.graylog2.datanode.DataNodeLifecycleTrigger;
57+
import org.graylog2.datanode.DataNodeNotficationEvent;
58+
import org.graylog2.events.ClusterEventBus;
59+
import org.graylog2.notifications.Notification;
5360
import org.graylog2.plugin.system.NodeId;
5461
import org.graylog2.security.CustomCAX509TrustManager;
5562
import org.graylog2.security.TrustManagerAggregator;
63+
import org.graylog2.shared.SuppressForbidden;
5664
import org.slf4j.Logger;
5765
import org.slf4j.LoggerFactory;
66+
import oshi.SystemInfo;
67+
import oshi.hardware.GlobalMemory;
5868

5969
import javax.annotation.Nonnull;
6070
import javax.net.ssl.TrustManager;
@@ -64,17 +74,20 @@
6474
import java.security.KeyStore;
6575
import java.util.List;
6676
import java.util.Locale;
77+
import java.util.Map;
6778
import java.util.Objects;
6879
import java.util.Optional;
6980
import java.util.Queue;
7081
import java.util.concurrent.Executors;
7182
import java.util.concurrent.ScheduledExecutorService;
7283
import java.util.concurrent.TimeUnit;
7384

85+
import static org.graylog2.shared.utilities.StringUtils.f;
86+
7487
public class OpensearchProcessImpl implements OpensearchProcess, ProcessListener {
7588

7689
private static final Logger LOG = LoggerFactory.getLogger(OpensearchProcessImpl.class);
77-
90+
private static final long MEMORY_RATIO_THRESHOLD = 2;
7891

7992
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
8093
private Optional<OpensearchConfiguration> opensearchConfiguration = Optional.empty();
@@ -96,6 +109,7 @@ public class OpensearchProcessImpl implements OpensearchProcess, ProcessListener
96109
private final String nodeName;
97110
private final NodeId nodeId;
98111
private final EventBus eventBus;
112+
private final ClusterEventBus clusterEventBus;
99113

100114

101115
static final String CLUSTER_ROUTING_ALLOCATION_EXCLUDE_SETTING = "cluster.routing.allocation.exclude._name";
@@ -105,7 +119,8 @@ public class OpensearchProcessImpl implements OpensearchProcess, ProcessListener
105119
@Inject
106120
OpensearchProcessImpl(DatanodeConfiguration datanodeConfiguration, final CustomCAX509TrustManager trustManager,
107121
final Configuration configuration,
108-
ObjectMapper objectMapper, OpensearchStateMachine processState, NodeId nodeId, EventBus eventBus) {
122+
ObjectMapper objectMapper, OpensearchStateMachine processState, NodeId nodeId, EventBus eventBus,
123+
ClusterEventBus clusterEventBus) {
109124
this.datanodeConfiguration = datanodeConfiguration;
110125
this.processState = processState;
111126
this.stdout = new CircularFifoQueue<>(datanodeConfiguration.processLogsBufferSize());
@@ -116,6 +131,8 @@ public class OpensearchProcessImpl implements OpensearchProcess, ProcessListener
116131
this.nodeName = configuration.getDatanodeNodeName();
117132
this.nodeId = nodeId;
118133
this.eventBus = eventBus;
134+
this.clusterEventBus = clusterEventBus;
135+
eventBus.register(this);
119136
}
120137

121138
private RestHighLevelClient createRestClient(OpensearchConfiguration configuration) {
@@ -214,6 +231,37 @@ private void configure() {
214231
);
215232
}
216233

234+
@VisibleForTesting
235+
void checkConfiguredHeap() {
236+
Size heap = Size.parse(configuration.getOpensearchHeap());
237+
long heapBytes = heap.toBytes();
238+
final GlobalMemory memory = getGlobalMemory();
239+
long buffer = 2 * 1024 * 1024 * 1024L;
240+
long freeMemory = memory.getAvailable() - buffer;
241+
float memoryRatio = (float) freeMemory / heapBytes;
242+
if (memoryRatio > MEMORY_RATIO_THRESHOLD) {
243+
LOG.warn("There appears to be about {} times more available memory than the heap size configured for this data node.", memoryRatio);
244+
clusterEventBus.post(new DataNodeNotficationEvent(nodeId.getNodeId(), Notification.Type.DATA_NODE_HEAP_WARNING,
245+
Map.of("hostname", configuration.getHostname(),
246+
"memoryRatio", f("%.1f", memoryRatio),
247+
"totalMemory", FileUtils.byteCountToDisplaySize(memory.getTotal()),
248+
"availableMemory", FileUtils.byteCountToDisplaySize(memory.getAvailable()),
249+
"recommendedMemory", FileUtils.byteCountToDisplaySize(memory.getTotal()/2),
250+
"heapSize", FileUtils.byteCountToDisplaySize(heapBytes))));
251+
}
252+
}
253+
254+
protected GlobalMemory getGlobalMemory() {
255+
SystemInfo systemInfo = new SystemInfo();
256+
GlobalMemory memory = systemInfo.getHardware().getMemory();
257+
return memory;
258+
}
259+
260+
@Subscribe
261+
public void onNotificationEvent(DataNodeNotficationEvent event) {
262+
// we need a subscriber in the data node, otherwise this event will be ignored due to it having no subscribers
263+
}
264+
217265
@Override
218266
public synchronized void start() {
219267
opensearchConfiguration.ifPresentOrElse(
@@ -228,9 +276,11 @@ public synchronized void start() {
228276

229277
restClient = Optional.of(createRestClient(config));
230278
openSearchClient = restClient.map(c -> new OpenSearchClient(c, objectMapper));
231-
}),
232-
() -> {throw new IllegalArgumentException("Opensearch configuration required but not supplied!");}
233-
);
279+
checkConfiguredHeap();
280+
281+
}),
282+
() -> {throw new IllegalArgumentException("Opensearch configuration required but not supplied!");}
283+
);
234284
}
235285

236286
/**

data-node/src/test/java/org/graylog/datanode/opensearch/OpensearchProcessImplTest.java

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.graylog.shaded.opensearch2.org.opensearch.client.RequestOptions;
3030
import org.graylog.shaded.opensearch2.org.opensearch.client.RestHighLevelClient;
3131
import org.graylog.shaded.opensearch2.org.opensearch.common.settings.Settings;
32+
import org.graylog2.datanode.DataNodeNotficationEvent;
33+
import org.graylog2.events.ClusterEventBus;
3234
import org.graylog2.plugin.system.NodeId;
3335
import org.graylog2.plugin.system.SimpleNodeId;
3436
import org.graylog2.security.CustomCAX509TrustManager;
@@ -38,8 +40,12 @@
3840
import org.mockito.ArgumentCaptor;
3941
import org.mockito.Mock;
4042
import org.mockito.junit.MockitoJUnitRunner;
43+
import oshi.hardware.GlobalMemory;
44+
import oshi.hardware.PhysicalMemory;
45+
import oshi.hardware.VirtualMemory;
4146

4247
import java.io.IOException;
48+
import java.util.List;
4349
import java.util.Optional;
4450
import java.util.concurrent.ScheduledExecutorService;
4551

@@ -49,7 +55,9 @@
4955
import static org.mockito.ArgumentMatchers.eq;
5056
import static org.mockito.Mockito.mock;
5157
import static org.mockito.Mockito.spy;
58+
import static org.mockito.Mockito.times;
5259
import static org.mockito.Mockito.verify;
60+
import static org.mockito.Mockito.verifyNoInteractions;
5361
import static org.mockito.Mockito.verifyNoMoreInteractions;
5462
import static org.mockito.Mockito.when;
5563

@@ -76,13 +84,15 @@ public class OpensearchProcessImplTest {
7684
RestHighLevelClient restClient;
7785
@Mock
7886
ClusterClient clusterClient;
87+
@Mock
88+
ClusterEventBus clusterEventBus;
7989

8090
@Before
8191
public void setup() throws IOException {
8292
when(datanodeConfiguration.processLogsBufferSize()).thenReturn(100);
8393
when(configuration.getDatanodeNodeName()).thenReturn(nodeName);
8494
this.opensearchProcess = spy(new OpensearchProcessImpl(datanodeConfiguration, trustmManager, configuration,
85-
objectMapper, processState, nodeId, eventBus));
95+
objectMapper, processState, nodeId, eventBus, clusterEventBus));
8696
when(opensearchProcess.restClient()).thenReturn(Optional.of(restClient));
8797
when(restClient.cluster()).thenReturn(clusterClient);
8898
}
@@ -130,4 +140,55 @@ public void testShutdownWhenRemovedSuccessfully() throws IOException {
130140
verify(executor).shutdown();
131141
}
132142

143+
@Test
144+
public void testHeapThresholdWarning() {
145+
when(configuration.getHostname()).thenReturn("datanode");
146+
when(configuration.getOpensearchHeap()).thenReturn("1g");
147+
when(opensearchProcess.getGlobalMemory()).thenReturn(mockMemory(gigabytes(8), gigabytes(16)));
148+
opensearchProcess.checkConfiguredHeap();
149+
verify(clusterEventBus, times(1)).post(any(DataNodeNotficationEvent.class));
150+
}
151+
152+
@Test
153+
public void testNoHeapThresholdWarning() {
154+
when(configuration.getOpensearchHeap()).thenReturn("1g");
155+
when(opensearchProcess.getGlobalMemory()).thenReturn(mockMemory(gigabytes(2), gigabytes(3)));
156+
opensearchProcess.checkConfiguredHeap();
157+
verifyNoInteractions(clusterEventBus);
158+
}
159+
160+
private GlobalMemory mockMemory(long availableMemory, long totalMemory) {
161+
return new GlobalMemory() {
162+
163+
@Override
164+
public long getTotal() {
165+
return totalMemory;
166+
}
167+
168+
@Override
169+
public long getAvailable() {
170+
return availableMemory;
171+
}
172+
173+
@Override
174+
public long getPageSize() {
175+
throw new UnsupportedOperationException("Not supported here");
176+
}
177+
178+
@Override
179+
public VirtualMemory getVirtualMemory() {
180+
throw new UnsupportedOperationException("Not supported here");
181+
}
182+
183+
@Override
184+
public List<PhysicalMemory> getPhysicalMemory() {
185+
throw new UnsupportedOperationException("Not supported here");
186+
}
187+
};
188+
}
189+
190+
private static long gigabytes(int i) {
191+
return i * 1024 * 1024 * 1024L;
192+
}
193+
133194
}

graylog2-server/src/main/java/org/graylog/plugins/datanode/DataNodeModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.graylog2.datanode.DataNodeCommandService;
2020
import org.graylog2.datanode.DataNodeCommandServiceImpl;
21+
import org.graylog2.datanode.DataNodeEventService;
2122
import org.graylog2.migrations.V20231107164300_CreateDataNodeManagerRole;
2223
import org.graylog2.plugin.PluginModule;
2324

@@ -26,6 +27,7 @@ public class DataNodeModule extends PluginModule {
2627
@Override
2728
protected void configure() {
2829
bind(DataNodeCommandService.class).to(DataNodeCommandServiceImpl.class);
30+
bind(DataNodeEventService.class).asEagerSingleton();
2931
addMigration(V20231107164300_CreateDataNodeManagerRole.class);
3032
}
3133

graylog2-server/src/main/java/org/graylog2/commands/Server.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.graylog.metrics.prometheus.PrometheusExporterConfiguration;
3636
import org.graylog.metrics.prometheus.PrometheusMetricsModule;
3737
import org.graylog.plugins.cef.CEFInputModule;
38+
import org.graylog.plugins.datanode.DataNodeModule;
3839
import org.graylog.plugins.formatting.units.UnitsModule;
3940
import org.graylog.plugins.map.MapWidgetModule;
4041
import org.graylog.plugins.map.config.GeoIpProcessorConfig;
@@ -212,7 +213,8 @@ public boolean isLocal() {
212213
new DataTieringModule(),
213214
new DatanodeMigrationBindings(),
214215
new CaModule(),
215-
new TelemetryModule()
216+
new TelemetryModule(),
217+
new DataNodeModule()
216218
);
217219

218220
modules.add(new FieldTypeManagementModule());
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright (C) 2020 Graylog, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the Server Side Public License, version 1,
6+
* as published by MongoDB, Inc.
7+
*
8+
* This program is distributed in the hope that it will be useful,
9+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
* Server Side Public License for more details.
12+
*
13+
* You should have received a copy of the Server Side Public License
14+
* along with this program. If not, see
15+
* <http://www.mongodb.com/licensing/server-side-public-license>.
16+
*/
17+
package org.graylog2.datanode;
18+
19+
import com.google.common.eventbus.EventBus;
20+
import com.google.common.eventbus.Subscribe;
21+
import jakarta.inject.Inject;
22+
import jakarta.inject.Singleton;
23+
import org.graylog2.notifications.Notification;
24+
import org.graylog2.notifications.NotificationService;
25+
26+
@Singleton
27+
public class DataNodeEventService {
28+
29+
private final NotificationService notificationService;
30+
31+
@Inject
32+
public DataNodeEventService(EventBus eventBus, NotificationService notificationService) {
33+
this.notificationService = notificationService;
34+
eventBus.register(this);
35+
}
36+
37+
@Subscribe
38+
@SuppressWarnings("unused")
39+
public void onNotificationEvent(DataNodeNotficationEvent event) {
40+
Notification notification = notificationService.buildNow();
41+
notification.addSeverity(Notification.Severity.NORMAL);
42+
notification.addType(event.notificationType());
43+
event.details().forEach(notification::addDetail);
44+
notificationService.publishIfFirst(notification);
45+
}
46+
47+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright (C) 2020 Graylog, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the Server Side Public License, version 1,
6+
* as published by MongoDB, Inc.
7+
*
8+
* This program is distributed in the hope that it will be useful,
9+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
* Server Side Public License for more details.
12+
*
13+
* You should have received a copy of the Server Side Public License
14+
* along with this program. If not, see
15+
* <http://www.mongodb.com/licensing/server-side-public-license>.
16+
*/
17+
package org.graylog2.datanode;
18+
19+
import org.graylog2.notifications.Notification;
20+
21+
import java.util.Map;
22+
23+
public record DataNodeNotficationEvent(String nodeId, Notification.Type notificationType, Map<String, Object> details) {
24+
}

graylog2-server/src/main/java/org/graylog2/notifications/Notification.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,8 @@ enum Type {
9999
REMOTE_REINDEX_RUNNING,
100100
REMOTE_REINDEX_FINISHED,
101101
DATA_NODE_VERSION_MISMATCH,
102-
DATA_TIERING_ROLLOVER_ERROR
102+
DATA_TIERING_ROLLOVER_ERROR,
103+
DATA_NODE_HEAP_WARNING
103104
}
104105

105106
enum Severity {
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<#if _title>
2+
Data Node Heap Size Warning
3+
</#if>
4+
5+
<#if _description>
6+
<p>
7+
There are data node nodes in the cluster which could potentially run with a higher configured heap size for better performance.
8+
</p>
9+
<p>
10+
Data node <em>${hostname}</em> only has ${heapSize} Java Heap assigned, out of a total of ${totalMemory} RAM.<br/>
11+
Currently, there is ${availableMemory} free memory available on the node. We recommend to make an additional half of this available to the Java Heap.
12+
</p>
13+
<p>
14+
<em>Note: </em>For production performance, it is recommended to configure this node to use ${recommendedMemory} Java Heap (50% of RAM).<br/>
15+
The Java Heap can be configured using the <em>opensearch_heap</em> configuration parameter in the node's configuration file (<em>datanode.conf</em>).
16+
</p>
17+
</#if>
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<#if _title>
2+
Data Node Heap Size Warning
3+
</#if>
4+
5+
<#if _description>
6+
There are data node nodes in the cluster which could potentially run with a higher configured heap size for better performance.
7+
Data node ${hostname} only has ${heapSize} Java Heap assigned, out of a total of ${totalMemory} RAM.
8+
Currently, there is ${availableMemory} free memory available on the node. We recommend to make an additional half of this available to the Java Heap.
9+
Note: For production performance, it is recommended to configure this node to use ${recommendedMemory} Java Heap (50% of RAM).
10+
The Java Heap can be configured using the opensearch_heap configuration parameter in the node's configuration file (datanode.conf).
11+
</#if>

0 commit comments

Comments
 (0)