Skip to content

Commit fe26250

Browse files
authored
Optimize metrics aggregate/persistent worker: seperate OAL and MAL workers and consume pools, and make the consumer thread sleep when no data in. (apache#13414)
1 parent bf04afd commit fe26250

File tree

11 files changed

+334
-63
lines changed

11 files changed

+334
-63
lines changed

docs/en/changes/changes.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,16 @@
5050
* GraphQL API: metadata, topology, log and trace support query by name.
5151
* [Break Change] MQE function `sort_values` sorts according to the aggregation result and labels rather than the simple time series values.
5252
* Self Observability: add `metrics_aggregation_queue_used_percentage` and `metrics_persistent_collection_cached_size` metrics for the OAP server.
53+
* Optimize metrics aggregate/persistent worker: separate `OAL` and `MAL` workers and consume pools. The dataflow signal drives the new MAL consumer,
54+
the following table shows the pool size,driven mode and queue size for each worker.
55+
56+
| Worker | poolSize | isSignalDrivenMode | queueChannelSize | queueBufferSize |
57+
|-------------------------------|------------------------------------------|--------------------|------------------|-----------------|
58+
| MetricsAggregateOALWorker | Math.ceil(availableProcessors * 2 * 1.5) | false | 2 | 10000 |
59+
| MetricsAggregateMALWorker | availableProcessors * 2 / 8, at least 1 | true | 1 | 1000 |
60+
| MetricsPersistentMinOALWorker | availableProcessors * 2 / 8, at least 1 | false | 1 | 2000 |
61+
| MetricsPersistentMinMALWorker | availableProcessors * 2 / 16, at least 1 | true | 1 | 1000 |
62+
5363

5464
#### UI
5565

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.oap.server.core.analysis.worker;
20+
21+
import lombok.extern.slf4j.Slf4j;
22+
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
23+
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
24+
import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
25+
import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory;
26+
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
27+
28+
/**
29+
* MetricsAggregateMALWorker provides an in-memory metrics merging capability for MAL
30+
*/
31+
@Slf4j
32+
public class MetricsAggregateMALWorker extends MetricsAggregateWorker {
33+
private final static String POOL_NAME = "METRICS_L1_AGGREGATION_MAL";
34+
private final BulkConsumePool pool;
35+
36+
MetricsAggregateMALWorker(ModuleDefineHolder moduleDefineHolder,
37+
AbstractWorker<Metrics> nextWorker,
38+
String modelName,
39+
long l1FlushPeriod,
40+
MetricStreamKind kind) {
41+
super(
42+
moduleDefineHolder, nextWorker, modelName, l1FlushPeriod, kind,
43+
POOL_NAME,
44+
calculatePoolSize(),
45+
true,
46+
1,
47+
1_000
48+
);
49+
this.pool = (BulkConsumePool) ConsumerPoolFactory.INSTANCE.get(POOL_NAME);
50+
}
51+
52+
/**
53+
* MetricsAggregateWorker#in operation does include enqueue only
54+
*/
55+
@Override
56+
public final void in(Metrics metrics) {
57+
super.in(metrics);
58+
pool.notifyConsumers();
59+
}
60+
61+
private static int calculatePoolSize() {
62+
int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
63+
return size == 0 ? 1 : size;
64+
}
65+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.oap.server.core.analysis.worker;
20+
21+
import lombok.extern.slf4j.Slf4j;
22+
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
23+
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
24+
import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
25+
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
26+
27+
/**
28+
* MetricsAggregateOALWorker provides an in-memory metrics merging capability for OAL
29+
*/
30+
@Slf4j
31+
public class MetricsAggregateOALWorker extends MetricsAggregateWorker {
32+
private final static String POOL_NAME = "METRICS_L1_AGGREGATION_OAL";
33+
34+
MetricsAggregateOALWorker(ModuleDefineHolder moduleDefineHolder,
35+
AbstractWorker<Metrics> nextWorker,
36+
String modelName,
37+
long l1FlushPeriod,
38+
MetricStreamKind kind) {
39+
super(
40+
moduleDefineHolder, nextWorker, modelName, l1FlushPeriod, kind,
41+
POOL_NAME,
42+
(int) Math.ceil(BulkConsumePool.Creator.recommendMaxSize() * 1.5),
43+
false,
44+
2,
45+
10_000
46+
);
47+
}
48+
}

oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
* payload.
4646
*/
4747
@Slf4j
48-
public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
48+
public abstract class MetricsAggregateWorker extends AbstractWorker<Metrics> {
4949
public final long l1FlushPeriod;
5050
private AbstractWorker<Metrics> nextWorker;
5151
private final DataCarrier<Metrics> dataCarrier;
@@ -54,39 +54,31 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
5454
private CounterMetrics aggregationCounter;
5555
private GaugeMetrics queuePercentageGauge;
5656
private long lastSendTime = 0;
57-
private final MetricStreamKind kind;
5857
private final int queueTotalSize;
5958

6059
MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder,
6160
AbstractWorker<Metrics> nextWorker,
6261
String modelName,
6362
long l1FlushPeriod,
64-
MetricStreamKind kind) {
63+
MetricStreamKind kind,
64+
String poolName,
65+
int poolSize,
66+
boolean isSignalDrivenMode,
67+
int queueChannelSize,
68+
int queueBufferSize
69+
) {
6570
super(moduleDefineHolder);
6671
this.nextWorker = nextWorker;
6772
this.mergeDataCache = new MergableBufferedData();
68-
this.kind = kind;
69-
String name = "METRICS_L1_AGGREGATION";
70-
int queueChannelSize = 2;
71-
int queueBufferSize = 10_000;
72-
if (MetricStreamKind.MAL == kind) {
73-
// In MAL meter streaming, the load of data flow is much less as they are statistics already,
74-
// but in OAL sources, they are raw data.
75-
// Set the buffer(size of queue) as 1/20 to reduce unnecessary resource costs.
76-
queueChannelSize = 1;
77-
queueBufferSize = 1_000;
78-
}
73+
BulkConsumePool.Creator creator = new BulkConsumePool.Creator(poolName, poolSize, 200, isSignalDrivenMode);
7974
this.dataCarrier = new DataCarrier<>(
80-
"MetricsAggregateWorker." + modelName, name, queueChannelSize, queueBufferSize, BufferStrategy.IF_POSSIBLE);
81-
82-
BulkConsumePool.Creator creator = new BulkConsumePool.Creator(
83-
name, BulkConsumePool.Creator.recommendMaxSize() * 2, 200);
75+
"MetricsAggregateWorker." + modelName, poolName, queueChannelSize, queueBufferSize, BufferStrategy.IF_POSSIBLE);
8476
try {
85-
ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
77+
ConsumerPoolFactory.INSTANCE.createIfAbsent(poolName, creator);
8678
} catch (Exception e) {
8779
throw new UnexpectedException(e.getMessage(), e);
8880
}
89-
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer());
81+
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(poolName), new AggregatorConsumer());
9082

9183
MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME)
9284
.provider()
@@ -116,7 +108,7 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
116108
* MetricsAggregateWorker#in operation does include enqueue only
117109
*/
118110
@Override
119-
public final void in(Metrics metrics) {
111+
public void in(Metrics metrics) {
120112
if (!dataCarrier.produce(metrics)) {
121113
abandonCounter.inc();
122114
}
@@ -149,7 +141,7 @@ private void flush() {
149141
}
150142
}
151143

152-
private class AggregatorConsumer implements IConsumer<Metrics> {
144+
protected class AggregatorConsumer implements IConsumer<Metrics> {
153145
@Override
154146
public void consume(List<Metrics> data) {
155147
queuePercentageGauge.setValue(Math.round(100 * (double) data.size() / queueTotalSize));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.oap.server.core.analysis.worker;
20+
21+
import lombok.extern.slf4j.Slf4j;
22+
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
23+
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
24+
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
25+
import org.apache.skywalking.oap.server.core.storage.model.Model;
26+
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
27+
import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
28+
import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory;
29+
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
30+
31+
@Slf4j
32+
public class MetricsPersistentMinMALWorker extends MetricsPersistentMinWorker {
33+
private final static String POOL_NAME = "METRICS_L2_AGGREGATION_MAL";
34+
private final BulkConsumePool pool;
35+
36+
MetricsPersistentMinMALWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO,
37+
AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker,
38+
MetricsTransWorker transWorker, boolean supportUpdate,
39+
long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind) {
40+
super(
41+
moduleDefineHolder, model, metricsDAO, nextAlarmWorker, nextExportWorker, transWorker, supportUpdate,
42+
storageSessionTimeout, metricsDataTTL, kind,
43+
POOL_NAME,
44+
calculatePoolSize(),
45+
true,
46+
1,
47+
1000
48+
);
49+
this.pool = (BulkConsumePool) ConsumerPoolFactory.INSTANCE.get(POOL_NAME);
50+
}
51+
52+
@Override
53+
public void in(Metrics metrics) {
54+
super.in(metrics);
55+
pool.notifyConsumers();
56+
}
57+
58+
private static int calculatePoolSize() {
59+
int size = BulkConsumePool.Creator.recommendMaxSize() / 16;
60+
return size == 0 ? 1 : size;
61+
}
62+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.oap.server.core.analysis.worker;
20+
21+
import lombok.extern.slf4j.Slf4j;
22+
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
23+
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
24+
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
25+
import org.apache.skywalking.oap.server.core.storage.model.Model;
26+
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
27+
import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
28+
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
29+
30+
@Slf4j
31+
public class MetricsPersistentMinOALWorker extends MetricsPersistentMinWorker {
32+
33+
MetricsPersistentMinOALWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO,
34+
AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker,
35+
MetricsTransWorker transWorker, boolean supportUpdate,
36+
long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind) {
37+
super(
38+
moduleDefineHolder, model, metricsDAO, nextAlarmWorker, nextExportWorker, transWorker, supportUpdate,
39+
storageSessionTimeout, metricsDataTTL, kind,
40+
"METRICS_L2_AGGREGATION_OAL",
41+
calculatePoolSize(),
42+
false,
43+
1,
44+
2000
45+
);
46+
}
47+
48+
private static int calculatePoolSize() {
49+
int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
50+
return size == 0 ? 1 : size;
51+
}
52+
}

oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
2727
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
2828
import org.apache.skywalking.oap.server.core.status.ServerStatusService;
29-
import org.apache.skywalking.oap.server.core.status.ServerStatusWatcher;
3029
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
3130
import org.apache.skywalking.oap.server.core.storage.model.Model;
3231
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
@@ -45,7 +44,7 @@
4544
* MetricsPersistentMinWorker is an extension of {@link MetricsPersistentWorker} and focuses on the Minute Metrics data persistent.
4645
*/
4746
@Slf4j
48-
public class MetricsPersistentMinWorker extends MetricsPersistentWorker implements ServerStatusWatcher {
47+
public abstract class MetricsPersistentMinWorker extends MetricsPersistentWorker {
4948
private final DataCarrier<Metrics> dataCarrier;
5049

5150
/**
@@ -65,33 +64,22 @@ public class MetricsPersistentMinWorker extends MetricsPersistentWorker implemen
6564
MetricsPersistentMinWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO,
6665
AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker,
6766
MetricsTransWorker transWorker, boolean supportUpdate,
68-
long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind) {
67+
long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind,
68+
String poolName, int poolSize, boolean isSignalDrivenMode,
69+
int queueChannelSize, int queueBufferSize) {
6970
super(
7071
moduleDefineHolder, model, metricsDAO, nextAlarmWorker, nextExportWorker, transWorker, supportUpdate,
7172
storageSessionTimeout, metricsDataTTL, kind
7273
);
7374

74-
String name = "METRICS_L2_AGGREGATION";
75-
int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
76-
if (size == 0) {
77-
size = 1;
78-
}
79-
BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, size, 200);
75+
BulkConsumePool.Creator creator = new BulkConsumePool.Creator(poolName, poolSize, 200, isSignalDrivenMode);
8076
try {
81-
ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
77+
ConsumerPoolFactory.INSTANCE.createIfAbsent(poolName, creator);
8278
} catch (Exception e) {
8379
throw new UnexpectedException(e.getMessage(), e);
8480
}
85-
86-
int bufferSize = 2000;
87-
if (MetricStreamKind.MAL == kind) {
88-
// In MAL meter streaming, the load of data flow is much less as they are statistics already,
89-
// but in OAL sources, they are raw data.
90-
// Set the buffer(size of queue) as 1/2 to reduce unnecessary resource costs.
91-
bufferSize = 1000;
92-
}
93-
this.dataCarrier = new DataCarrier<>("MetricsPersistentWorker." + model.getName(), name, 1, bufferSize);
94-
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new PersistentConsumer());
81+
this.dataCarrier = new DataCarrier<>("MetricsPersistentWorker." + model.getName(), poolName, queueChannelSize, queueBufferSize);
82+
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(poolName), new PersistentConsumer());
9583

9684
MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME)
9785
.provider()

0 commit comments

Comments
 (0)