Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/en/changes/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
* GraphQL API: metadata, topology, log and trace support query by name.
* [Break Change] MQE function `sort_values` sorts according to the aggregation result and labels rather than the simple time series values.
* Self Observability: add `metrics_aggregation_queue_used_percentage` and `metrics_persistent_collection_cached_size` metrics for the OAP server.
* Optimize metrics aggregate/persistent worker: seperate `OAL` and `MAL` workers and consume pools, and make the consumer thread sleep when no data in.

#### UI

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.oap.server.core.analysis.worker;

import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;

/**
* MetricsAggregateMALWorker provides an in-memory metrics merging capability for MAL
*/
@Slf4j
public class MetricsAggregateMALWorker extends MetricsAggregateWorker {
private final static String POOL_NAME = "METRICS_L1_AGGREGATION_MAL";
private final BulkConsumePool pool;

MetricsAggregateMALWorker(ModuleDefineHolder moduleDefineHolder,
AbstractWorker<Metrics> nextWorker,
String modelName,
long l1FlushPeriod,
MetricStreamKind kind) {
// In MAL meter streaming, the load of data flow is much less as they are statistics already,
// but in OAL sources, they are raw data.
// Set the buffer(size of queue) as 1/20 to reduce unnecessary resource costs.
super(
moduleDefineHolder, nextWorker, modelName, l1FlushPeriod, kind,
POOL_NAME,
calculatePoolSize(),
true,
1,
1_000
);
this.pool = (BulkConsumePool) ConsumerPoolFactory.INSTANCE.get(POOL_NAME);
}

/**
* MetricsAggregateWorker#in operation does include enqueue only
*/
@Override
public final void in(Metrics metrics) {
super.in(metrics);
pool.notifyConsumers();
}

private static int calculatePoolSize() {
int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
return size == 0 ? 1 : size;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.oap.server.core.analysis.worker;

import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;

/**
* MetricsAggregateOALWorker provides an in-memory metrics merging capability for OAL
*/
@Slf4j
public class MetricsAggregateOALWorker extends MetricsAggregateWorker {
private final static String POOL_NAME = "METRICS_L1_AGGREGATION_OAL";

MetricsAggregateOALWorker(ModuleDefineHolder moduleDefineHolder,
AbstractWorker<Metrics> nextWorker,
String modelName,
long l1FlushPeriod,
MetricStreamKind kind) {
super(
moduleDefineHolder, nextWorker, modelName, l1FlushPeriod, kind,
POOL_NAME,
(int) Math.ceil(BulkConsumePool.Creator.recommendMaxSize() * 1.5),
false,
2,
10_000
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
* payload.
*/
@Slf4j
public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
public abstract class MetricsAggregateWorker extends AbstractWorker<Metrics> {
public final long l1FlushPeriod;
private AbstractWorker<Metrics> nextWorker;
private final DataCarrier<Metrics> dataCarrier;
Expand All @@ -54,39 +54,31 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
private CounterMetrics aggregationCounter;
private GaugeMetrics queuePercentageGauge;
private long lastSendTime = 0;
private final MetricStreamKind kind;
private final int queueTotalSize;

MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder,
AbstractWorker<Metrics> nextWorker,
String modelName,
long l1FlushPeriod,
MetricStreamKind kind) {
MetricStreamKind kind,
String poolName,
int poolSize,
boolean notifiablePool,
int queueChannelSize,
int queueBufferSize
) {
super(moduleDefineHolder);
this.nextWorker = nextWorker;
this.mergeDataCache = new MergableBufferedData();
this.kind = kind;
String name = "METRICS_L1_AGGREGATION";
int queueChannelSize = 2;
int queueBufferSize = 10_000;
if (MetricStreamKind.MAL == kind) {
// In MAL meter streaming, the load of data flow is much less as they are statistics already,
// but in OAL sources, they are raw data.
// Set the buffer(size of queue) as 1/20 to reduce unnecessary resource costs.
queueChannelSize = 1;
queueBufferSize = 1_000;
}
BulkConsumePool.Creator creator = new BulkConsumePool.Creator(poolName, poolSize, 200, notifiablePool);
this.dataCarrier = new DataCarrier<>(
"MetricsAggregateWorker." + modelName, name, queueChannelSize, queueBufferSize, BufferStrategy.IF_POSSIBLE);

BulkConsumePool.Creator creator = new BulkConsumePool.Creator(
name, BulkConsumePool.Creator.recommendMaxSize() * 2, 200);
"MetricsAggregateWorker." + modelName, poolName, queueChannelSize, queueBufferSize, BufferStrategy.IF_POSSIBLE);
try {
ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
ConsumerPoolFactory.INSTANCE.createIfAbsent(poolName, creator);
} catch (Exception e) {
throw new UnexpectedException(e.getMessage(), e);
}
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer());
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(poolName), new AggregatorConsumer());

MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
Expand Down Expand Up @@ -116,7 +108,7 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
* MetricsAggregateWorker#in operation does include enqueue only
*/
@Override
public final void in(Metrics metrics) {
public void in(Metrics metrics) {
if (!dataCarrier.produce(metrics)) {
abandonCounter.inc();
}
Expand Down Expand Up @@ -149,7 +141,7 @@ private void flush() {
}
}

private class AggregatorConsumer implements IConsumer<Metrics> {
protected class AggregatorConsumer implements IConsumer<Metrics> {
@Override
public void consume(List<Metrics> data) {
queuePercentageGauge.setValue(Math.round(100 * (double) data.size() / queueTotalSize));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.oap.server.core.analysis.worker;

import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;

@Slf4j
public class MetricsPersistentMinMALWorker extends MetricsPersistentMinWorker {
private final static String POOL_NAME = "METRICS_L2_AGGREGATION_MAL";
private final BulkConsumePool pool;

MetricsPersistentMinMALWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO,
AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker,
MetricsTransWorker transWorker, boolean supportUpdate,
long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind) {
super(
moduleDefineHolder, model, metricsDAO, nextAlarmWorker, nextExportWorker, transWorker, supportUpdate,
storageSessionTimeout, metricsDataTTL, kind,
POOL_NAME,
calculatePoolSize(),
true,
1,
1000
);
this.pool = (BulkConsumePool) ConsumerPoolFactory.INSTANCE.get(POOL_NAME);
}

@Override
public void in(Metrics metrics) {
super.in(metrics);
pool.notifyConsumers();
}

private static int calculatePoolSize() {
int size = BulkConsumePool.Creator.recommendMaxSize() / 16;
return size == 0 ? 1 : size;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.oap.server.core.analysis.worker;

import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;

@Slf4j
public class MetricsPersistentMinOALWorker extends MetricsPersistentMinWorker {

MetricsPersistentMinOALWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO,
AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker,
MetricsTransWorker transWorker, boolean supportUpdate,
long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind) {
super(
moduleDefineHolder, model, metricsDAO, nextAlarmWorker, nextExportWorker, transWorker, supportUpdate,
storageSessionTimeout, metricsDataTTL, kind,
"METRICS_L2_AGGREGATION_OAL",
calculatePoolSize(),
false,
1,
2000
);
}

private static int calculatePoolSize() {
int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
return size == 0 ? 1 : size;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.status.ServerStatusService;
import org.apache.skywalking.oap.server.core.status.ServerStatusWatcher;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
Expand All @@ -45,7 +44,7 @@
* MetricsPersistentMinWorker is an extension of {@link MetricsPersistentWorker} and focuses on the Minute Metrics data persistent.
*/
@Slf4j
public class MetricsPersistentMinWorker extends MetricsPersistentWorker implements ServerStatusWatcher {
public abstract class MetricsPersistentMinWorker extends MetricsPersistentWorker {
private final DataCarrier<Metrics> dataCarrier;

/**
Expand All @@ -65,33 +64,22 @@ public class MetricsPersistentMinWorker extends MetricsPersistentWorker implemen
MetricsPersistentMinWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO,
AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker,
MetricsTransWorker transWorker, boolean supportUpdate,
long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind) {
long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind,
String poolName, int poolSize, boolean notifiablePool,
int queueChannelSize, int queueBufferSize) {
super(
moduleDefineHolder, model, metricsDAO, nextAlarmWorker, nextExportWorker, transWorker, supportUpdate,
storageSessionTimeout, metricsDataTTL, kind
);

String name = "METRICS_L2_AGGREGATION";
int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
if (size == 0) {
size = 1;
}
BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, size, 200);
BulkConsumePool.Creator creator = new BulkConsumePool.Creator(poolName, poolSize, 200, notifiablePool);
try {
ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
ConsumerPoolFactory.INSTANCE.createIfAbsent(poolName, creator);
} catch (Exception e) {
throw new UnexpectedException(e.getMessage(), e);
}

int bufferSize = 2000;
if (MetricStreamKind.MAL == kind) {
// In MAL meter streaming, the load of data flow is much less as they are statistics already,
// but in OAL sources, they are raw data.
// Set the buffer(size of queue) as 1/2 to reduce unnecessary resource costs.
bufferSize = 1000;
}
this.dataCarrier = new DataCarrier<>("MetricsPersistentWorker." + model.getName(), name, 1, bufferSize);
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new PersistentConsumer());
this.dataCarrier = new DataCarrier<>("MetricsPersistentWorker." + model.getName(), poolName, queueChannelSize, queueBufferSize);
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(poolName), new PersistentConsumer());

MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
Expand Down
Loading
Loading