Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/en/changes/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@
* GraphQL API: metadata, topology, log and trace support query by name.
* [Break Change] MQE function `sort_values` sorts according to the aggregation result and labels rather than the simple time series values.
* Self Observability: add `metrics_aggregation_queue_used_percentage` and `metrics_persistent_collection_cached_size` metrics for the OAP server.
* Optimize metrics aggregate/persistent worker: separate `OAL` and `MAL` workers and consume pools. The dataflow signal drives the new MAL consumer,
the following table shows the pool size,driven mode and queue size for each worker.

| Worker | poolSize | isSignalDrivenMode | queueChannelSize | queueBufferSize |
|-------------------------------|------------------------------------------|--------------------|------------------|-----------------|
| MetricsAggregateOALWorker | Math.ceil(availableProcessors * 2 * 1.5) | false | 2 | 10000 |
| MetricsAggregateMALWorker | availableProcessors * 2 / 8, at least 1 | true | 1 | 1000 |
| MetricsPersistentMinOALWorker | availableProcessors * 2 / 8, at least 1 | false | 1 | 2000 |
| MetricsPersistentMinMALWorker | availableProcessors * 2 / 16, at least 1 | true | 1 | 1000 |


#### UI

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.oap.server.core.analysis.worker;

import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;

/**
* MetricsAggregateMALWorker provides an in-memory metrics merging capability for MAL
*/
@Slf4j
public class MetricsAggregateMALWorker extends MetricsAggregateWorker {
private final static String POOL_NAME = "METRICS_L1_AGGREGATION_MAL";
private final BulkConsumePool pool;

MetricsAggregateMALWorker(ModuleDefineHolder moduleDefineHolder,
AbstractWorker<Metrics> nextWorker,
String modelName,
long l1FlushPeriod,
MetricStreamKind kind) {
super(
moduleDefineHolder, nextWorker, modelName, l1FlushPeriod, kind,
POOL_NAME,
calculatePoolSize(),
true,
1,
1_000
);
this.pool = (BulkConsumePool) ConsumerPoolFactory.INSTANCE.get(POOL_NAME);
}

/**
* MetricsAggregateWorker#in operation does include enqueue only
*/
@Override
public final void in(Metrics metrics) {
super.in(metrics);
pool.notifyConsumers();
}

private static int calculatePoolSize() {
int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
return size == 0 ? 1 : size;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.oap.server.core.analysis.worker;

import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;

/**
* MetricsAggregateOALWorker provides an in-memory metrics merging capability for OAL
*/
@Slf4j
public class MetricsAggregateOALWorker extends MetricsAggregateWorker {
private final static String POOL_NAME = "METRICS_L1_AGGREGATION_OAL";

MetricsAggregateOALWorker(ModuleDefineHolder moduleDefineHolder,
AbstractWorker<Metrics> nextWorker,
String modelName,
long l1FlushPeriod,
MetricStreamKind kind) {
super(
moduleDefineHolder, nextWorker, modelName, l1FlushPeriod, kind,
POOL_NAME,
(int) Math.ceil(BulkConsumePool.Creator.recommendMaxSize() * 1.5),
false,
2,
10_000
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
* payload.
*/
@Slf4j
public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
public abstract class MetricsAggregateWorker extends AbstractWorker<Metrics> {
public final long l1FlushPeriod;
private AbstractWorker<Metrics> nextWorker;
private final DataCarrier<Metrics> dataCarrier;
Expand All @@ -54,39 +54,31 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
private CounterMetrics aggregationCounter;
private GaugeMetrics queuePercentageGauge;
private long lastSendTime = 0;
private final MetricStreamKind kind;
private final int queueTotalSize;

MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder,
AbstractWorker<Metrics> nextWorker,
String modelName,
long l1FlushPeriod,
MetricStreamKind kind) {
MetricStreamKind kind,
String poolName,
int poolSize,
boolean isSignalDrivenMode,
int queueChannelSize,
int queueBufferSize
) {
super(moduleDefineHolder);
this.nextWorker = nextWorker;
this.mergeDataCache = new MergableBufferedData();
this.kind = kind;
String name = "METRICS_L1_AGGREGATION";
int queueChannelSize = 2;
int queueBufferSize = 10_000;
if (MetricStreamKind.MAL == kind) {
// In MAL meter streaming, the load of data flow is much less as they are statistics already,
// but in OAL sources, they are raw data.
// Set the buffer(size of queue) as 1/20 to reduce unnecessary resource costs.
queueChannelSize = 1;
queueBufferSize = 1_000;
}
BulkConsumePool.Creator creator = new BulkConsumePool.Creator(poolName, poolSize, 200, isSignalDrivenMode);
this.dataCarrier = new DataCarrier<>(
"MetricsAggregateWorker." + modelName, name, queueChannelSize, queueBufferSize, BufferStrategy.IF_POSSIBLE);

BulkConsumePool.Creator creator = new BulkConsumePool.Creator(
name, BulkConsumePool.Creator.recommendMaxSize() * 2, 200);
"MetricsAggregateWorker." + modelName, poolName, queueChannelSize, queueBufferSize, BufferStrategy.IF_POSSIBLE);
try {
ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
ConsumerPoolFactory.INSTANCE.createIfAbsent(poolName, creator);
} catch (Exception e) {
throw new UnexpectedException(e.getMessage(), e);
}
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer());
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(poolName), new AggregatorConsumer());

MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
Expand Down Expand Up @@ -116,7 +108,7 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
* MetricsAggregateWorker#in operation does include enqueue only
*/
@Override
public final void in(Metrics metrics) {
public void in(Metrics metrics) {
if (!dataCarrier.produce(metrics)) {
abandonCounter.inc();
}
Expand Down Expand Up @@ -149,7 +141,7 @@ private void flush() {
}
}

private class AggregatorConsumer implements IConsumer<Metrics> {
protected class AggregatorConsumer implements IConsumer<Metrics> {
@Override
public void consume(List<Metrics> data) {
queuePercentageGauge.setValue(Math.round(100 * (double) data.size() / queueTotalSize));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.oap.server.core.analysis.worker;

import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;

@Slf4j
public class MetricsPersistentMinMALWorker extends MetricsPersistentMinWorker {
private final static String POOL_NAME = "METRICS_L2_AGGREGATION_MAL";
private final BulkConsumePool pool;

MetricsPersistentMinMALWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO,
AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker,
MetricsTransWorker transWorker, boolean supportUpdate,
long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind) {
super(
moduleDefineHolder, model, metricsDAO, nextAlarmWorker, nextExportWorker, transWorker, supportUpdate,
storageSessionTimeout, metricsDataTTL, kind,
POOL_NAME,
calculatePoolSize(),
true,
1,
1000
);
this.pool = (BulkConsumePool) ConsumerPoolFactory.INSTANCE.get(POOL_NAME);
}

@Override
public void in(Metrics metrics) {
super.in(metrics);
pool.notifyConsumers();
}

private static int calculatePoolSize() {
int size = BulkConsumePool.Creator.recommendMaxSize() / 16;
return size == 0 ? 1 : size;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.oap.server.core.analysis.worker;

import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;

@Slf4j
public class MetricsPersistentMinOALWorker extends MetricsPersistentMinWorker {

MetricsPersistentMinOALWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO,
AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker,
MetricsTransWorker transWorker, boolean supportUpdate,
long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind) {
super(
moduleDefineHolder, model, metricsDAO, nextAlarmWorker, nextExportWorker, transWorker, supportUpdate,
storageSessionTimeout, metricsDataTTL, kind,
"METRICS_L2_AGGREGATION_OAL",
calculatePoolSize(),
false,
1,
2000
);
}

private static int calculatePoolSize() {
int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
return size == 0 ? 1 : size;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.status.ServerStatusService;
import org.apache.skywalking.oap.server.core.status.ServerStatusWatcher;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
Expand All @@ -45,7 +44,7 @@
* MetricsPersistentMinWorker is an extension of {@link MetricsPersistentWorker} and focuses on the Minute Metrics data persistent.
*/
@Slf4j
public class MetricsPersistentMinWorker extends MetricsPersistentWorker implements ServerStatusWatcher {
public abstract class MetricsPersistentMinWorker extends MetricsPersistentWorker {
private final DataCarrier<Metrics> dataCarrier;

/**
Expand All @@ -65,33 +64,22 @@ public class MetricsPersistentMinWorker extends MetricsPersistentWorker implemen
MetricsPersistentMinWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO,
AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker,
MetricsTransWorker transWorker, boolean supportUpdate,
long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind) {
long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind,
String poolName, int poolSize, boolean isSignalDrivenMode,
int queueChannelSize, int queueBufferSize) {
super(
moduleDefineHolder, model, metricsDAO, nextAlarmWorker, nextExportWorker, transWorker, supportUpdate,
storageSessionTimeout, metricsDataTTL, kind
);

String name = "METRICS_L2_AGGREGATION";
int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
if (size == 0) {
size = 1;
}
BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, size, 200);
BulkConsumePool.Creator creator = new BulkConsumePool.Creator(poolName, poolSize, 200, isSignalDrivenMode);
try {
ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
ConsumerPoolFactory.INSTANCE.createIfAbsent(poolName, creator);
} catch (Exception e) {
throw new UnexpectedException(e.getMessage(), e);
}

int bufferSize = 2000;
if (MetricStreamKind.MAL == kind) {
// In MAL meter streaming, the load of data flow is much less as they are statistics already,
// but in OAL sources, they are raw data.
// Set the buffer(size of queue) as 1/2 to reduce unnecessary resource costs.
bufferSize = 1000;
}
this.dataCarrier = new DataCarrier<>("MetricsPersistentWorker." + model.getName(), name, 1, bufferSize);
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new PersistentConsumer());
this.dataCarrier = new DataCarrier<>("MetricsPersistentWorker." + model.getName(), poolName, queueChannelSize, queueBufferSize);
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(poolName), new PersistentConsumer());

MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
Expand Down
Loading
Loading