diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index d128827e6e6a..9637f13f1e59 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -50,6 +50,16 @@ * GraphQL API: metadata, topology, log and trace support query by name. * [Break Change] MQE function `sort_values` sorts according to the aggregation result and labels rather than the simple time series values. * Self Observability: add `metrics_aggregation_queue_used_percentage` and `metrics_persistent_collection_cached_size` metrics for the OAP server. +* Optimize metrics aggregate/persistent worker: separate `OAL` and `MAL` workers and consume pools. The dataflow signal drives the new MAL consumer, + the following table shows the pool size,driven mode and queue size for each worker. + +| Worker | poolSize | isSignalDrivenMode | queueChannelSize | queueBufferSize | +|-------------------------------|------------------------------------------|--------------------|------------------|-----------------| +| MetricsAggregateOALWorker | Math.ceil(availableProcessors * 2 * 1.5) | false | 2 | 10000 | +| MetricsAggregateMALWorker | availableProcessors * 2 / 8, at least 1 | true | 1 | 1000 | +| MetricsPersistentMinOALWorker | availableProcessors * 2 / 8, at least 1 | false | 1 | 2000 | +| MetricsPersistentMinMALWorker | availableProcessors * 2 / 16, at least 1 | true | 1 | 1000 | + #### UI diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateMALWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateMALWorker.java new file mode 100644 index 000000000000..6ce0bb447d3b --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateMALWorker.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.worker; + +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; +import org.apache.skywalking.oap.server.core.worker.AbstractWorker; +import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool; +import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; + +/** + * MetricsAggregateMALWorker provides an in-memory metrics merging capability for MAL + */ +@Slf4j +public class MetricsAggregateMALWorker extends MetricsAggregateWorker { + private final static String POOL_NAME = "METRICS_L1_AGGREGATION_MAL"; + private final BulkConsumePool pool; + + MetricsAggregateMALWorker(ModuleDefineHolder moduleDefineHolder, + AbstractWorker nextWorker, + String modelName, + long l1FlushPeriod, + MetricStreamKind kind) { + super( + moduleDefineHolder, nextWorker, modelName, l1FlushPeriod, kind, + POOL_NAME, + calculatePoolSize(), + true, + 1, + 1_000 + ); + this.pool = (BulkConsumePool) ConsumerPoolFactory.INSTANCE.get(POOL_NAME); + } + + /** + * MetricsAggregateWorker#in operation does include enqueue only + */ + @Override + public final void in(Metrics metrics) { + super.in(metrics); + pool.notifyConsumers(); + } + + private static int calculatePoolSize() { + int size = BulkConsumePool.Creator.recommendMaxSize() / 8; + return size == 0 ? 1 : size; + } +} \ No newline at end of file diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateOALWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateOALWorker.java new file mode 100644 index 000000000000..833b24419a6f --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateOALWorker.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.worker; + +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; +import org.apache.skywalking.oap.server.core.worker.AbstractWorker; +import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; + +/** + * MetricsAggregateOALWorker provides an in-memory metrics merging capability for OAL + */ +@Slf4j +public class MetricsAggregateOALWorker extends MetricsAggregateWorker { + private final static String POOL_NAME = "METRICS_L1_AGGREGATION_OAL"; + + MetricsAggregateOALWorker(ModuleDefineHolder moduleDefineHolder, + AbstractWorker nextWorker, + String modelName, + long l1FlushPeriod, + MetricStreamKind kind) { + super( + moduleDefineHolder, nextWorker, modelName, l1FlushPeriod, kind, + POOL_NAME, + (int) Math.ceil(BulkConsumePool.Creator.recommendMaxSize() * 1.5), + false, + 2, + 10_000 + ); + } +} \ No newline at end of file diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java index 46decdf43fc1..b0a8bffa3430 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java @@ -45,7 +45,7 @@ * payload. */ @Slf4j -public class MetricsAggregateWorker extends AbstractWorker { +public abstract class MetricsAggregateWorker extends AbstractWorker { public final long l1FlushPeriod; private AbstractWorker nextWorker; private final DataCarrier dataCarrier; @@ -54,39 +54,31 @@ public class MetricsAggregateWorker extends AbstractWorker { private CounterMetrics aggregationCounter; private GaugeMetrics queuePercentageGauge; private long lastSendTime = 0; - private final MetricStreamKind kind; private final int queueTotalSize; MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker nextWorker, String modelName, long l1FlushPeriod, - MetricStreamKind kind) { + MetricStreamKind kind, + String poolName, + int poolSize, + boolean isSignalDrivenMode, + int queueChannelSize, + int queueBufferSize + ) { super(moduleDefineHolder); this.nextWorker = nextWorker; this.mergeDataCache = new MergableBufferedData(); - this.kind = kind; - String name = "METRICS_L1_AGGREGATION"; - int queueChannelSize = 2; - int queueBufferSize = 10_000; - if (MetricStreamKind.MAL == kind) { - // In MAL meter streaming, the load of data flow is much less as they are statistics already, - // but in OAL sources, they are raw data. - // Set the buffer(size of queue) as 1/20 to reduce unnecessary resource costs. - queueChannelSize = 1; - queueBufferSize = 1_000; - } + BulkConsumePool.Creator creator = new BulkConsumePool.Creator(poolName, poolSize, 200, isSignalDrivenMode); this.dataCarrier = new DataCarrier<>( - "MetricsAggregateWorker." + modelName, name, queueChannelSize, queueBufferSize, BufferStrategy.IF_POSSIBLE); - - BulkConsumePool.Creator creator = new BulkConsumePool.Creator( - name, BulkConsumePool.Creator.recommendMaxSize() * 2, 200); + "MetricsAggregateWorker." + modelName, poolName, queueChannelSize, queueBufferSize, BufferStrategy.IF_POSSIBLE); try { - ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator); + ConsumerPoolFactory.INSTANCE.createIfAbsent(poolName, creator); } catch (Exception e) { throw new UnexpectedException(e.getMessage(), e); } - this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer()); + this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(poolName), new AggregatorConsumer()); MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME) .provider() @@ -116,7 +108,7 @@ public class MetricsAggregateWorker extends AbstractWorker { * MetricsAggregateWorker#in operation does include enqueue only */ @Override - public final void in(Metrics metrics) { + public void in(Metrics metrics) { if (!dataCarrier.produce(metrics)) { abandonCounter.inc(); } @@ -149,7 +141,7 @@ private void flush() { } } - private class AggregatorConsumer implements IConsumer { + protected class AggregatorConsumer implements IConsumer { @Override public void consume(List data) { queuePercentageGauge.setValue(Math.round(100 * (double) data.size() / queueTotalSize)); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinMALWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinMALWorker.java new file mode 100644 index 000000000000..6ced82869427 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinMALWorker.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.worker; + +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; +import org.apache.skywalking.oap.server.core.exporter.ExportEvent; +import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; +import org.apache.skywalking.oap.server.core.storage.model.Model; +import org.apache.skywalking.oap.server.core.worker.AbstractWorker; +import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool; +import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; + +@Slf4j +public class MetricsPersistentMinMALWorker extends MetricsPersistentMinWorker { + private final static String POOL_NAME = "METRICS_L2_AGGREGATION_MAL"; + private final BulkConsumePool pool; + + MetricsPersistentMinMALWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO, + AbstractWorker nextAlarmWorker, AbstractWorker nextExportWorker, + MetricsTransWorker transWorker, boolean supportUpdate, + long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind) { + super( + moduleDefineHolder, model, metricsDAO, nextAlarmWorker, nextExportWorker, transWorker, supportUpdate, + storageSessionTimeout, metricsDataTTL, kind, + POOL_NAME, + calculatePoolSize(), + true, + 1, + 1000 + ); + this.pool = (BulkConsumePool) ConsumerPoolFactory.INSTANCE.get(POOL_NAME); + } + + @Override + public void in(Metrics metrics) { + super.in(metrics); + pool.notifyConsumers(); + } + + private static int calculatePoolSize() { + int size = BulkConsumePool.Creator.recommendMaxSize() / 16; + return size == 0 ? 1 : size; + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinOALWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinOALWorker.java new file mode 100644 index 000000000000..534b50f9f51e --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinOALWorker.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.worker; + +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; +import org.apache.skywalking.oap.server.core.exporter.ExportEvent; +import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; +import org.apache.skywalking.oap.server.core.storage.model.Model; +import org.apache.skywalking.oap.server.core.worker.AbstractWorker; +import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; + +@Slf4j +public class MetricsPersistentMinOALWorker extends MetricsPersistentMinWorker { + + MetricsPersistentMinOALWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO, + AbstractWorker nextAlarmWorker, AbstractWorker nextExportWorker, + MetricsTransWorker transWorker, boolean supportUpdate, + long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind) { + super( + moduleDefineHolder, model, metricsDAO, nextAlarmWorker, nextExportWorker, transWorker, supportUpdate, + storageSessionTimeout, metricsDataTTL, kind, + "METRICS_L2_AGGREGATION_OAL", + calculatePoolSize(), + false, + 1, + 2000 + ); + } + + private static int calculatePoolSize() { + int size = BulkConsumePool.Creator.recommendMaxSize() / 8; + return size == 0 ? 1 : size; + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java index b44988e23fa2..2e2f66704543 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java @@ -26,7 +26,6 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.exporter.ExportEvent; import org.apache.skywalking.oap.server.core.status.ServerStatusService; -import org.apache.skywalking.oap.server.core.status.ServerStatusWatcher; import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; @@ -45,7 +44,7 @@ * MetricsPersistentMinWorker is an extension of {@link MetricsPersistentWorker} and focuses on the Minute Metrics data persistent. */ @Slf4j -public class MetricsPersistentMinWorker extends MetricsPersistentWorker implements ServerStatusWatcher { +public abstract class MetricsPersistentMinWorker extends MetricsPersistentWorker { private final DataCarrier dataCarrier; /** @@ -65,33 +64,22 @@ public class MetricsPersistentMinWorker extends MetricsPersistentWorker implemen MetricsPersistentMinWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO, AbstractWorker nextAlarmWorker, AbstractWorker nextExportWorker, MetricsTransWorker transWorker, boolean supportUpdate, - long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind) { + long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind, + String poolName, int poolSize, boolean isSignalDrivenMode, + int queueChannelSize, int queueBufferSize) { super( moduleDefineHolder, model, metricsDAO, nextAlarmWorker, nextExportWorker, transWorker, supportUpdate, storageSessionTimeout, metricsDataTTL, kind ); - String name = "METRICS_L2_AGGREGATION"; - int size = BulkConsumePool.Creator.recommendMaxSize() / 8; - if (size == 0) { - size = 1; - } - BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, size, 200); + BulkConsumePool.Creator creator = new BulkConsumePool.Creator(poolName, poolSize, 200, isSignalDrivenMode); try { - ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator); + ConsumerPoolFactory.INSTANCE.createIfAbsent(poolName, creator); } catch (Exception e) { throw new UnexpectedException(e.getMessage(), e); } - - int bufferSize = 2000; - if (MetricStreamKind.MAL == kind) { - // In MAL meter streaming, the load of data flow is much less as they are statistics already, - // but in OAL sources, they are raw data. - // Set the buffer(size of queue) as 1/2 to reduce unnecessary resource costs. - bufferSize = 1000; - } - this.dataCarrier = new DataCarrier<>("MetricsPersistentWorker." + model.getName(), name, 1, bufferSize); - this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new PersistentConsumer()); + this.dataCarrier = new DataCarrier<>("MetricsPersistentWorker." + model.getName(), poolName, queueChannelSize, queueBufferSize); + this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(poolName), new PersistentConsumer()); MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME) .provider() diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java index 17e41d72f405..ee1a3bdca6f0 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java @@ -194,9 +194,19 @@ private void create(ModuleDefineHolder moduleDefineHolder, workerInstanceSetter.put(remoteReceiverWorkerName, minutePersistentWorker, metricsClass); MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName); - MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker( - moduleDefineHolder, remoteWorker, stream.getName(), l1FlushPeriod, kind); - + MetricsAggregateWorker aggregateWorker; + switch (kind) { + case OAL: + aggregateWorker = new MetricsAggregateOALWorker( + moduleDefineHolder, remoteWorker, stream.getName(), l1FlushPeriod, kind); + break; + case MAL: + aggregateWorker = new MetricsAggregateMALWorker( + moduleDefineHolder, remoteWorker, stream.getName(), l1FlushPeriod, kind); + break; + default: + throw new IllegalArgumentException("Unsupported MetricStreamKind: " + kind); + } entryWorkers.put(metricsClass, aggregateWorker); } @@ -209,10 +219,23 @@ private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder module AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(moduleDefineHolder); ExportMetricsWorker exportWorker = new ExportMetricsWorker(moduleDefineHolder); - MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentMinWorker( - moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, transWorker, - supportUpdate, storageSessionTimeout, metricsDataTTL, kind - ); + MetricsPersistentWorker minutePersistentWorker; + switch (kind) { + case OAL: + minutePersistentWorker = new MetricsPersistentMinOALWorker( + moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, transWorker, + supportUpdate, storageSessionTimeout, metricsDataTTL, kind + ); + break; + case MAL: + minutePersistentWorker = new MetricsPersistentMinMALWorker( + moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, transWorker, + supportUpdate, storageSessionTimeout, metricsDataTTL, kind + ); + break; + default: + throw new IllegalArgumentException("Unsupported MetricStreamKind: " + kind); + } persistentWorkers.add(minutePersistentWorker); return minutePersistentWorker; diff --git a/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/BulkConsumePool.java b/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/BulkConsumePool.java index 9d5bb0f6151e..3ee33f6581f0 100644 --- a/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/BulkConsumePool.java +++ b/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/BulkConsumePool.java @@ -34,11 +34,12 @@ public class BulkConsumePool implements ConsumerPool { private List allConsumers; private volatile boolean isStarted = false; - public BulkConsumePool(String name, int size, long consumeCycle) { + public BulkConsumePool(String name, int size, long consumeCycle, boolean isSignalDrivenMode) { size = EnvUtil.getInt(name + "_THREAD", size); allConsumers = new ArrayList<>(size); for (int i = 0; i < size; i++) { - MultipleChannelsConsumer multipleChannelsConsumer = new MultipleChannelsConsumer("DataCarrier." + name + ".BulkConsumePool." + i + ".Thread", consumeCycle); + MultipleChannelsConsumer multipleChannelsConsumer = new MultipleChannelsConsumer( + "DataCarrier." + name + ".BulkConsumePool." + i + ".Thread", consumeCycle, isSignalDrivenMode); multipleChannelsConsumer.setDaemon(true); allConsumers.add(multipleChannelsConsumer); } @@ -92,6 +93,12 @@ public void begin(Channels channels) { isStarted = true; } + public void notifyConsumers() { + for (MultipleChannelsConsumer consumer : allConsumers) { + consumer.setConsumeFlag(true); + } + } + /** * The creator for {@link BulkConsumePool}. */ @@ -99,16 +106,19 @@ public static class Creator implements Callable { private String name; private int size; private long consumeCycle; + // Consumer has two modes to drive consumption. 1. Polling mode. 2. Signal-Driven mode. + private final boolean isSignalDrivenMode; - public Creator(String name, int poolSize, long consumeCycle) { + public Creator(String name, int poolSize, long consumeCycle, boolean isSignalDrivenMode) { this.name = name; this.size = poolSize; this.consumeCycle = consumeCycle; + this.isSignalDrivenMode = isSignalDrivenMode; } @Override public ConsumerPool call() { - return new BulkConsumePool(name, size, consumeCycle); + return new BulkConsumePool(name, size, consumeCycle, isSignalDrivenMode); } public static int recommendMaxSize() { diff --git a/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/MultipleChannelsConsumer.java b/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/MultipleChannelsConsumer.java index 69e2732f2001..1551caca692d 100644 --- a/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/MultipleChannelsConsumer.java +++ b/oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/MultipleChannelsConsumer.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; +import lombok.Setter; import org.apache.skywalking.oap.server.library.datacarrier.buffer.Channels; import org.apache.skywalking.oap.server.library.datacarrier.buffer.QueueBuffer; @@ -33,11 +34,17 @@ public class MultipleChannelsConsumer extends Thread { @SuppressWarnings("NonAtomicVolatileUpdate") private volatile long size; private final long consumeCycle; + // The flag to indicate whether the consumer thread should consume data. + @Setter + private volatile boolean consumeFlag = false; + // Consumer has two modes to drive consumption. 1. Polling mode. 2. Signal-Driven mode. + private final boolean isSignalDrivenMode; - public MultipleChannelsConsumer(String threadName, long consumeCycle) { + public MultipleChannelsConsumer(String threadName, long consumeCycle, boolean isSignalDrivenMode) { super(threadName); this.consumeTargets = new ArrayList<>(); this.consumeCycle = consumeCycle; + this.isSignalDrivenMode = isSignalDrivenMode; } @Override @@ -47,15 +54,29 @@ public void run() { final List consumeList = new ArrayList(2000); while (running) { boolean hasData = false; - for (Group target : consumeTargets) { - boolean consumed = consume(target, consumeList); - hasData = hasData || consumed; - } + if (!isSignalDrivenMode) { + for (Group target : consumeTargets) { + boolean consumed = consume(target, consumeList); + hasData = hasData || consumed; + } - if (!hasData) { - try { - Thread.sleep(consumeCycle); - } catch (InterruptedException e) { + if (!hasData) { + try { + Thread.sleep(consumeCycle); + } catch (InterruptedException e) { + } + } + } else { + if (consumeFlag) { + consumeFlag = false; + for (Group target : consumeTargets) { + consume(target, consumeList); + } + } else { + try { + Thread.sleep(consumeCycle); + } catch (InterruptedException e) { + } } } } diff --git a/oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerPoolFactoryTest.java b/oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerPoolFactoryTest.java index 4b69309cd584..ef4b2e7e62ce 100644 --- a/oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerPoolFactoryTest.java +++ b/oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerPoolFactoryTest.java @@ -29,7 +29,7 @@ public class ConsumerPoolFactoryTest { @BeforeEach public void createIfAbsent() throws Exception { - BulkConsumePool.Creator creator = new BulkConsumePool.Creator("my-test-pool", 10, 20); + BulkConsumePool.Creator creator = new BulkConsumePool.Creator("my-test-pool", 10, 20, false); boolean firstCreated = ConsumerPoolFactory.INSTANCE.createIfAbsent("my-test-pool", creator); assertTrue(firstCreated);