Skip to content

Commit 5fbfb67

Browse files
authored
Self Observability: add BanyanDB and Elasticsearch write latency metrics and dashboards (#13544)
1 parent b81ef19 commit 5fbfb67

File tree

15 files changed

+393
-86
lines changed

15 files changed

+393
-86
lines changed

docs/en/changes/changes.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@
106106
* BanyanDB: fix when setting `@BanyanDB.TimestampColumn`, the column should not be indexed.
107107
* OAP Self Observability: make Trace analysis metrics separate by label `protocol`, add Zipkin span dropped metrics.
108108
* BanyanDB: Move data write logic from BanyanDB Java Client to OAP and support observe metrics for write operations.
109+
* Self Observability: add write latency metrics for BanyanDB and ElasticSearch.
109110

110111
#### UI
111112

@@ -137,6 +138,7 @@
137138
* Adapt new trace protocol and implement new trace view.
138139
* Implement Trace page.
139140
* Support collapsing and expanding for the event widget.
141+
* UI-template: add BanyanDB and Elasticsearch write latency dashboards for OAP self observability.
140142

141143
#### Documentation
142144

@@ -148,6 +150,7 @@
148150
* Enhance the async-profiling duration options.
149151
* Enhance the TTL Tab on Setting page.
150152
* Fix the snapshot charts in alarm page.
153+
* Fix `Fluent Bit` dead links.
151154

152155
All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/230?closed=1)
153156

docs/en/setup/backend/backend-alarm.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ slack:
309309
```
310310

311311
### WeChat
312-
Note that only the WeChat Company Edition (WeCom) supports WebHooks. To use the WeChat WebHook, follow the [Wechat Webhooks guide](https://work.weixin.qq.com/help?doc_id=13376).
312+
Note that only the WeChat Company Edition (WeCom) supports WebHooks. To use the WeChat WebHook, follow the [Wechat Webhooks guide](https://open.work.weixin.qq.com/help2/pc/14931).
313313
The alarm message will be sent through HTTP post by `application/json` content type after you have set up Wechat Webhooks as follows:
314314
```yml
315315
wechat:

docs/en/setup/backend/backend-mysql-monitoring.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ SkyWalking leverages [fluentbit](https://fluentbit.io/) or other log agents for
4545
3. The SkyWalking OAP Server parses the expression with [LAL](../../concepts-and-designs/lal.md) to parse/extract and store the results.
4646

4747
### Set up
48-
1. Set up [fluentbit](https://docs.fluentbit.io/manual/installation/docker).
48+
1. Set up [fluentbit](https://docs.fluentbit.io/manual/installation/getting-started-with-fluent-bit).
4949
2. Config fluentbit from [here](../../../../test/e2e-v2/cases/mysql/mysql-slowsql/fluent-bit.conf) for MySQL or [here](../../../../test/e2e-v2/cases/mariadb/mariadb-slowsql/fluent-bit.conf) for MariaDB.
5050
3. Enable slow log from [here](../../../../test/e2e-v2/cases/mysql/mysql-slowsql/my.cnf) for MySQL or [here](../../../../test/e2e-v2/cases/mariadb/mariadb-slowsql/my.cnf) for MariaDB.
5151

docs/en/setup/backend/backend-nginx-monitoring.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ SkyWalking leverages [fluentbit](https://fluentbit.io/) or other log agents for
121121
3. The SkyWalking OAP Server parses the expression with [LAL](../../concepts-and-designs/lal.md) to parse/extract and store the results.
122122

123123
### Set up
124-
1. Install [fluentbit](https://docs.fluentbit.io/manual/installation/docker).
124+
1. Install [fluentbit](https://docs.fluentbit.io/manual/installation/getting-started-with-fluent-bit).
125125
2. Config fluent bit with fluent-bit.conf, refer to [here](../../../../test/e2e-v2/cases/nginx/fluent-bit.conf).
126126

127127
### Error Log Monitoring

docs/en/setup/backend/backend-postgresql-monitoring.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ SkyWalking leverages [fluentbit](https://fluentbit.io/) or other log agents for
5858
3. The SkyWalking OAP Server parses the expression with [LAL](../../concepts-and-designs/lal.md) to parse/extract and store the results.
5959

6060
### Set up
61-
1. Set up [fluentbit](https://docs.fluentbit.io/manual/installation/docker).
61+
1. Set up [fluentbit](https://docs.fluentbit.io/manual/installation/getting-started-with-fluent-bit).
6262
2. Config [fluentbit](../../../../test/e2e-v2/cases/postgresql/postgres-exporter/fluent-bit.conf)
6363
3. Config PostgreSQL to enable slow log. [Example](../../../../test/e2e-v2/cases/postgresql/postgres-exporter/postgresql.conf).
6464

docs/en/setup/backend/backend-redis-monitoring.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ SkyWalking leverages [fluentbit](https://fluentbit.io/) or other log agents for
4646
4. The SkyWalking OAP Server parses the expression with [LAL](../../concepts-and-designs/lal.md) to parse/extract and store the results.
4747

4848
### Set up
49-
1. Set up [fluentbit](https://docs.fluentbit.io/manual/installation/docker).
49+
1. Set up [fluentbit](https://docs.fluentbit.io/manual/installation/getting-started-with-fluent-bit).
5050
2. Config fluentbit from [here](../../../../test/e2e-v2/cases/redis/redis-exporter/fluent-bit.conf) for Redis.
5151
3. Config slow log from [here](../../../../test/e2e-v2/cases/redis/redis-exporter/redis.conf) for Redis.
5252
4. Periodically execute the [commands](../../../../test/e2e-v2/cases/redis/redis-exporter/scripts/slowlog.sh).

oap-server/server-library/library-client/pom.xml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,16 @@
3838
<artifactId>library-util</artifactId>
3939
<version>${project.version}</version>
4040
</dependency>
41-
4241
<dependency>
4342
<groupId>org.apache.skywalking</groupId>
4443
<artifactId>library-elasticsearch-client</artifactId>
4544
<version>${project.version}</version>
4645
</dependency>
46+
<dependency>
47+
<groupId>org.apache.skywalking</groupId>
48+
<artifactId>telemetry-api</artifactId>
49+
<version>${project.version}</version>
50+
</dependency>
4751

4852
<dependency>
4953
<groupId>io.grpc</groupId>

oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java

Lines changed: 73 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@
3434
import java.util.concurrent.atomic.AtomicReference;
3535
import java.util.function.Function;
3636
import java.util.function.Supplier;
37-
import lombok.RequiredArgsConstructor;
3837
import lombok.Setter;
3938
import lombok.extern.slf4j.Slf4j;
4039
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
4140
import org.apache.skywalking.library.elasticsearch.response.Documents;
41+
import org.apache.skywalking.oap.server.library.module.ModuleManager;
4242
import org.apache.skywalking.oap.server.library.util.StringUtil;
4343
import org.apache.skywalking.library.elasticsearch.ElasticSearch;
4444
import org.apache.skywalking.library.elasticsearch.ElasticSearchBuilder;
@@ -55,12 +55,15 @@
5555
import org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker;
5656
import org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable;
5757
import org.apache.skywalking.oap.server.library.util.HealthChecker;
58+
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
59+
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
60+
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
61+
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
5862

5963
/**
6064
* ElasticSearchClient connects to the ES server by using ES client APIs.
6165
*/
6266
@Slf4j
63-
@RequiredArgsConstructor
6467
public class ElasticSearchClient implements Client, HealthCheckable {
6568
public static final String TYPE = "type";
6669

@@ -93,7 +96,14 @@ public class ElasticSearchClient implements Client, HealthCheckable {
9396

9497
private final AtomicReference<ElasticSearch> es = new AtomicReference<>();
9598

96-
public ElasticSearchClient(String clusterNodes,
99+
private final ModuleManager moduleManager;
100+
private HistogramMetrics singleWriteHistogram;
101+
private HistogramMetrics singleUpdateHistogram;
102+
private HistogramMetrics singleDeleteHistogram;
103+
private HistogramMetrics bulkWriteHistogram;
104+
105+
public ElasticSearchClient(ModuleManager moduleManager,
106+
String clusterNodes,
97107
String protocol,
98108
String trustStorePath,
99109
String trustStorePass,
@@ -104,6 +114,7 @@ public ElasticSearchClient(String clusterNodes,
104114
int socketTimeout,
105115
int responseTimeout,
106116
int numHttpClientThread) {
117+
this.moduleManager = moduleManager;
107118
this.clusterNodes = clusterNodes;
108119
this.protocol = protocol;
109120
this.trustStorePath = trustStorePath;
@@ -119,6 +130,7 @@ public ElasticSearchClient(String clusterNodes,
119130

120131
@Override
121132
public void connect() {
133+
initTelemetry();
122134
final ElasticSearch oldOne = es.get();
123135

124136
final ElasticSearchBuilder cb =
@@ -345,21 +357,27 @@ public SearchResponse searchIDs(String indexName, Iterable<String> ids) {
345357
}
346358

347359
public void forceInsert(String indexName, String id, Map<String, Object> source) {
348-
IndexRequestWrapper wrapper = prepareInsert(indexName, id, source);
349-
Map<String, Object> params = ImmutableMap.of("refresh", "true");
350-
es.get().documents().index(wrapper.getRequest(), params);
360+
try (HistogramMetrics.Timer timer = singleWriteHistogram.createTimer()) {
361+
IndexRequestWrapper wrapper = prepareInsert(indexName, id, source);
362+
Map<String, Object> params = ImmutableMap.of("refresh", "true");
363+
es.get().documents().index(wrapper.getRequest(), params);
364+
}
351365
}
352366

353367
public void forceUpdate(String indexName, String id, Map<String, Object> source) {
354-
UpdateRequestWrapper wrapper = prepareUpdate(indexName, id, source);
355-
Map<String, Object> params = ImmutableMap.of("refresh", "true");
356-
es.get().documents().update(wrapper.getRequest(), params);
368+
try (HistogramMetrics.Timer timer = singleUpdateHistogram.createTimer()) {
369+
UpdateRequestWrapper wrapper = prepareUpdate(indexName, id, source);
370+
Map<String, Object> params = ImmutableMap.of("refresh", "true");
371+
es.get().documents().update(wrapper.getRequest(), params);
372+
}
357373
}
358374

359375
public void deleteById(String indexName, String id) {
360-
indexName = indexNameConverter.apply(indexName);
361-
Map<String, Object> params = ImmutableMap.of("refresh", "true");
362-
es.get().documents().deleteById(indexName, TYPE, id, params);
376+
try (HistogramMetrics.Timer timer = singleDeleteHistogram.createTimer()) {
377+
indexName = indexNameConverter.apply(indexName);
378+
Map<String, Object> params = ImmutableMap.of("refresh", "true");
379+
es.get().documents().deleteById(indexName, TYPE, id, params);
380+
}
363381
}
364382

365383
public IndexRequestWrapper prepareInsert(String indexName, String id,
@@ -388,10 +406,53 @@ public BulkProcessor createBulkProcessor(int bulkActions,
388406
.batchOfBytes(batchOfBytes)
389407
.flushInterval(Duration.ofSeconds(flushInterval))
390408
.concurrentRequests(concurrentRequests)
409+
.bulkMetrics(bulkWriteHistogram)
391410
.build(es);
392411
}
393412

394413
public String formatIndexName(String indexName) {
395414
return indexNameConverter.apply(indexName);
396415
}
416+
417+
private void initTelemetry() {
418+
MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME)
419+
.provider()
420+
.getService(MetricsCreator.class);
421+
if (singleWriteHistogram == null) {
422+
singleWriteHistogram = metricsCreator.createHistogramMetric(
423+
"elasticsearch_write_latency",
424+
"Elasticsearch write/update/delete latency in seconds, bulk_write include write/update",
425+
new MetricsTag.Keys("operation"),
426+
new MetricsTag.Values("single_write"),
427+
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0
428+
);
429+
}
430+
if (singleUpdateHistogram == null) {
431+
singleUpdateHistogram = metricsCreator.createHistogramMetric(
432+
"elasticsearch_write_latency",
433+
"Elasticsearch write/update/delete latency in seconds, bulk_write include write/update",
434+
new MetricsTag.Keys("operation"),
435+
new MetricsTag.Values("single_update"),
436+
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0
437+
);
438+
}
439+
if (singleDeleteHistogram == null) {
440+
singleDeleteHistogram = metricsCreator.createHistogramMetric(
441+
"elasticsearch_write_latency",
442+
"Elasticsearch write/update/delete latency in seconds, bulk_write include write/update",
443+
new MetricsTag.Keys("operation"),
444+
new MetricsTag.Values("single_delete"),
445+
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0
446+
);
447+
}
448+
if (bulkWriteHistogram == null) {
449+
bulkWriteHistogram = metricsCreator.createHistogramMetric(
450+
"elasticsearch_write_latency",
451+
"Elasticsearch write/update/delete latency in seconds, bulk_write include write/update",
452+
new MetricsTag.Keys("operation"),
453+
new MetricsTag.Values("bulk_write"),
454+
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0
455+
);
456+
}
457+
}
397458
}

oap-server/server-library/library-client/src/test/java/org/apache/skywalking/library/elasticsearch/bulk/ElasticSearchIT.java

Lines changed: 58 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,20 @@
3131
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
3232
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchScroller;
3333
import org.apache.skywalking.oap.server.library.client.elasticsearch.IndexRequestWrapper;
34+
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
35+
import org.apache.skywalking.oap.server.library.module.ModuleManager;
36+
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
3437
import org.apache.skywalking.oap.server.library.util.StringUtil;
38+
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
39+
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
40+
import org.apache.skywalking.oap.server.telemetry.none.MetricsCreatorNoop;
41+
import org.apache.skywalking.oap.server.telemetry.none.NoneTelemetryProvider;
3542
import org.junit.jupiter.api.Assertions;
43+
import org.junit.jupiter.api.BeforeEach;
3644
import org.junit.jupiter.params.ParameterizedTest;
3745
import org.junit.jupiter.params.provider.MethodSource;
46+
import org.mockito.Mockito;
47+
import org.powermock.reflect.Whitebox;
3848
import org.testcontainers.elasticsearch.ElasticsearchContainer;
3949
import org.testcontainers.utility.DockerImageName;
4050

@@ -46,8 +56,11 @@
4656
import java.util.UUID;
4757
import java.util.function.Function;
4858

59+
import static org.mockito.Mockito.mock;
60+
4961
@Slf4j
5062
public class ElasticSearchIT {
63+
private ModuleManager moduleManager;
5164

5265
public static Collection<Object[]> versions() {
5366
// noinspection resource
@@ -75,17 +88,33 @@ public static Collection<Object[]> versions() {
7588
});
7689
}
7790

91+
@BeforeEach
92+
public void setUp() throws Exception {
93+
moduleManager = mock(ModuleManager.class);
94+
ModuleDefine storageModule = mock(ModuleDefine.class);
95+
ModuleProvider provider = mock(ModuleProvider.class);
96+
Mockito.when(provider.getModule()).thenReturn(storageModule);
97+
98+
NoneTelemetryProvider telemetryProvider = mock(NoneTelemetryProvider.class);
99+
Mockito.when(telemetryProvider.getService(MetricsCreator.class))
100+
.thenReturn(new MetricsCreatorNoop());
101+
TelemetryModule telemetryModule = Mockito.spy(TelemetryModule.class);
102+
Whitebox.setInternalState(telemetryModule, "loadedProvider", telemetryProvider);
103+
Mockito.when(moduleManager.find(TelemetryModule.NAME)).thenReturn(telemetryModule);
104+
}
105+
78106
@ParameterizedTest(name = "version: {0}")
79107
@MethodSource("versions")
80108
public void indexOperate(final ElasticsearchContainer server,
81109
final String namespace) {
82110
server.start();
83111

84112
final ElasticSearchClient client = new ElasticSearchClient(
85-
server.getHttpHostAddress(),
86-
"http", "", "", "test", "test",
87-
indexNameConverter(namespace), 500, 6000,
88-
0, 15
113+
moduleManager,
114+
server.getHttpHostAddress(),
115+
"http", "", "", "test", "test",
116+
indexNameConverter(namespace), 500, 6000,
117+
0, 15
89118
);
90119
client.connect();
91120

@@ -134,10 +163,11 @@ public void documentOperate(final ElasticsearchContainer server,
134163
server.start();
135164

136165
final ElasticSearchClient client = new ElasticSearchClient(
137-
server.getHttpHostAddress(),
138-
"http", "", "", "test", "test",
139-
indexNameConverter(namespace), 500, 6000,
140-
0, 15
166+
moduleManager,
167+
server.getHttpHostAddress(),
168+
"http", "", "", "test", "test",
169+
indexNameConverter(namespace), 500, 6000,
170+
0, 15
141171
);
142172
client.connect();
143173

@@ -209,10 +239,11 @@ public void templateOperate(final ElasticsearchContainer server,
209239
server.start();
210240

211241
final ElasticSearchClient client = new ElasticSearchClient(
212-
server.getHttpHostAddress(),
213-
"http", "", "", "test", "test",
214-
indexNameConverter(namespace), 500, 6000,
215-
0, 15
242+
moduleManager,
243+
server.getHttpHostAddress(),
244+
"http", "", "", "test", "test",
245+
indexNameConverter(namespace), 500, 6000,
246+
0, 15
216247
);
217248
client.connect();
218249

@@ -264,10 +295,11 @@ public void bulk(final ElasticsearchContainer server,
264295
server.start();
265296

266297
final ElasticSearchClient client = new ElasticSearchClient(
267-
server.getHttpHostAddress(),
268-
"http", "", "", "test", "test",
269-
indexNameConverter(namespace), 500, 6000,
270-
0, 15
298+
moduleManager,
299+
server.getHttpHostAddress(),
300+
"http", "", "", "test", "test",
301+
indexNameConverter(namespace), 500, 6000,
302+
0, 15
271303
);
272304
client.connect();
273305

@@ -297,10 +329,11 @@ public void bulkPer_1KB(final ElasticsearchContainer server,
297329
server.start();
298330

299331
final ElasticSearchClient client = new ElasticSearchClient(
300-
server.getHttpHostAddress(),
301-
"http", "", "", "test", "test",
302-
indexNameConverter(namespace), 500, 6000,
303-
0, 15
332+
moduleManager,
333+
server.getHttpHostAddress(),
334+
"http", "", "", "test", "test",
335+
indexNameConverter(namespace), 500, 6000,
336+
0, 15
304337
);
305338
client.connect();
306339

@@ -326,10 +359,11 @@ public void timeSeriesOperate(final ElasticsearchContainer server,
326359
server.start();
327360

328361
final ElasticSearchClient client = new ElasticSearchClient(
329-
server.getHttpHostAddress(),
330-
"http", "", "", "test", "test",
331-
indexNameConverter(namespace), 500, 6000,
332-
0, 15
362+
moduleManager,
363+
server.getHttpHostAddress(),
364+
"http", "", "", "test", "test",
365+
indexNameConverter(namespace), 500, 6000,
366+
0, 15
333367
);
334368
client.connect();
335369

0 commit comments

Comments
 (0)