Skip to content

Commit 8626000

Browse files
authored
Fix the malfunctioning alarm feature of MAL metrics (#13543)
1 parent 9c0fe37 commit 8626000

File tree

10 files changed

+264
-15
lines changed

10 files changed

+264
-15
lines changed

docs/en/changes/changes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@
107107
* OAP Self Observability: make Trace analysis metrics separate by label `protocol`, add Zipkin span dropped metrics.
108108
* BanyanDB: Move data write logic from BanyanDB Java Client to OAP and support observe metrics for write operations.
109109
* Self Observability: add write latency metrics for BanyanDB and ElasticSearch.
110+
* Fix the malfunctioning alarm feature of MAL metrics due to unknown metadata in L2 aggregate worker.
110111

111112
#### UI
112113

oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ private void create(ModuleDefineHolder moduleDefineHolder,
191191
IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME)
192192
.provider()
193193
.getService(IWorkerInstanceSetter.class);
194-
workerInstanceSetter.put(remoteReceiverWorkerName, minutePersistentWorker, metricsClass);
194+
workerInstanceSetter.put(remoteReceiverWorkerName, minutePersistentWorker, kind, metricsClass);
195195

196196
MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
197197
MetricsAggregateWorker aggregateWorker;

oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public void onNext(RemoteMessage message) {
122122
AbstractWorker nextWorker = handleWorker.getWorker();
123123
StreamData streamData;
124124
try {
125-
streamData = handleWorker.getStreamDataClass().newInstance();
125+
streamData = handleWorker.newStreamDataInstance();
126126
} catch (Throwable t) {
127127
remoteInErrorCounter.inc();
128128
LOGGER.error(t.getMessage(), t);

oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.skywalking.oap.server.core.worker;
2020

2121
import org.apache.skywalking.oap.server.core.analysis.Stream;
22+
import org.apache.skywalking.oap.server.core.analysis.worker.MetricStreamKind;
2223
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
2324
import org.apache.skywalking.oap.server.library.module.Service;
2425

@@ -29,7 +30,9 @@ public interface IWorkerInstanceSetter extends Service {
2930
/**
3031
* @param remoteReceiverWorkName worker name
3132
* @param instance The worker instance processes the given streamDataClass.
33+
* @param kind Metric kind (OAL, MAL).
3234
* @param streamDataClass Type of metrics.
3335
*/
34-
void put(String remoteReceiverWorkName, AbstractWorker instance, Class<? extends StreamData> streamDataClass);
36+
void put(String remoteReceiverWorkName, AbstractWorker instance,
37+
MetricStreamKind kind, Class<? extends StreamData> streamDataClass);
3538
}

oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/RemoteHandleWorker.java

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,48 @@
1818

1919
package org.apache.skywalking.oap.server.core.worker;
2020

21-
import lombok.AllArgsConstructor;
2221
import lombok.Getter;
22+
import org.apache.skywalking.oap.server.core.UnexpectedException;
23+
import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
24+
import org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata;
25+
import org.apache.skywalking.oap.server.core.analysis.worker.MetricStreamKind;
2326
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
2427

25-
@AllArgsConstructor
2628
@Getter
2729
public class RemoteHandleWorker {
28-
private AbstractWorker worker;
29-
private Class<? extends StreamData> streamDataClass;
30+
private final AbstractWorker worker;
31+
private final MetricStreamKind kind;
32+
private final Class<? extends StreamData> streamDataClass;
33+
34+
private AcceptableValue<?> meterClassPrototype;
35+
36+
public RemoteHandleWorker(AbstractWorker worker, MetricStreamKind kind,
37+
Class<? extends StreamData> streamDataClass) {
38+
this.worker = worker;
39+
this.kind = kind;
40+
this.streamDataClass = streamDataClass;
41+
42+
if (MetricStreamKind.MAL == kind) {
43+
try {
44+
meterClassPrototype = (AcceptableValue<?>) streamDataClass.newInstance();
45+
} catch (Exception e) {
46+
throw new UnexpectedException("Can't create mal meter prototype with stream class" + streamDataClass);
47+
}
48+
}
49+
}
50+
51+
/**
52+
* Create a new StreamData instance with metadata {@link WithMetadata} for RemoteServiceHandler to deserialize the RemoteMessage.
53+
* OAL metrics can initialize metadata through the constructor,
54+
* while MAL metrics need to initialize metadata through {@link AcceptableValue#createNew}.
55+
*/
56+
public StreamData newStreamDataInstance() throws InstantiationException, IllegalAccessException {
57+
switch (kind) {
58+
case OAL:
59+
return streamDataClass.newInstance();
60+
case MAL:
61+
return (StreamData) meterClassPrototype.createNew();
62+
}
63+
throw new UnexpectedException("Unsupported metrics stream kind" + kind);
64+
}
3065
}

oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.HashMap;
2222
import java.util.Map;
2323
import org.apache.skywalking.oap.server.core.UnexpectedException;
24+
import org.apache.skywalking.oap.server.core.analysis.worker.MetricStreamKind;
2425
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
2526
import org.slf4j.Logger;
2627
import org.slf4j.LoggerFactory;
@@ -44,11 +45,11 @@ public RemoteHandleWorker get(String nextWorkerName) {
4445

4546
@Override
4647
public void put(String remoteReceiverWorkName, AbstractWorker instance,
47-
Class<? extends StreamData> streamDataClass) {
48+
MetricStreamKind kind, Class<? extends StreamData> streamDataClass) {
4849
if (instances.containsKey(remoteReceiverWorkName)) {
4950
throw new UnexpectedException("Duplicate worker name:" + remoteReceiverWorkName);
5051
}
51-
instances.put(remoteReceiverWorkName, new RemoteHandleWorker(instance, streamDataClass));
52+
instances.put(remoteReceiverWorkName, new RemoteHandleWorker(instance, kind, streamDataClass));
5253
LOGGER.debug("Worker {} has been registered as {}", instance.toString(), remoteReceiverWorkName);
5354
}
5455
}
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.oap.server.core.remote;
20+
21+
import io.grpc.ManagedChannel;
22+
import io.grpc.Server;
23+
import io.grpc.inprocess.InProcessChannelBuilder;
24+
import io.grpc.inprocess.InProcessServerBuilder;
25+
import io.grpc.stub.StreamObserver;
26+
import io.grpc.util.MutableHandlerRegistry;
27+
import java.io.IOException;
28+
import java.util.UUID;
29+
import java.util.concurrent.TimeUnit;
30+
import org.apache.skywalking.oap.server.core.CoreModule;
31+
import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
32+
import org.apache.skywalking.oap.server.core.analysis.meter.function.avg.AvgFunction;
33+
import org.apache.skywalking.oap.server.core.analysis.worker.MetricStreamKind;
34+
import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty;
35+
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
36+
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteMessage;
37+
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc;
38+
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
39+
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
40+
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
41+
import org.apache.skywalking.oap.server.core.worker.WorkerInstancesService;
42+
import org.apache.skywalking.oap.server.library.module.DuplicateProviderException;
43+
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
44+
import org.apache.skywalking.oap.server.library.module.ProviderNotFoundException;
45+
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
46+
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
47+
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
48+
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
49+
import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
50+
import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
51+
import org.junit.jupiter.api.AfterEach;
52+
import org.junit.jupiter.api.Assertions;
53+
import org.junit.jupiter.api.BeforeEach;
54+
import org.junit.jupiter.api.Test;
55+
56+
import static org.mockito.Mockito.any;
57+
import static org.mockito.Mockito.mock;
58+
import static org.mockito.Mockito.when;
59+
60+
public class RemoteServiceHandlerMALTestCase {
61+
62+
private Server server;
63+
private ManagedChannel channel;
64+
private MutableHandlerRegistry serviceRegistry;
65+
66+
@BeforeEach
67+
public void beforeEach() throws IOException {
68+
serviceRegistry = new MutableHandlerRegistry();
69+
final String name = UUID.randomUUID().toString();
70+
InProcessServerBuilder serverBuilder =
71+
InProcessServerBuilder
72+
.forName(name)
73+
.fallbackHandlerRegistry(serviceRegistry);
74+
75+
server = serverBuilder.build();
76+
server.start();
77+
78+
channel = InProcessChannelBuilder.forName(name).build();
79+
}
80+
81+
@AfterEach
82+
public void after() {
83+
channel.shutdown();
84+
server.shutdown();
85+
86+
try {
87+
channel.awaitTermination(1L, TimeUnit.MINUTES);
88+
server.awaitTermination(1L, TimeUnit.MINUTES);
89+
} catch (InterruptedException e) {
90+
Thread.currentThread().interrupt();
91+
throw new RuntimeException(e);
92+
} finally {
93+
channel.shutdownNow();
94+
channel = null;
95+
server.shutdownNow();
96+
server = null;
97+
}
98+
}
99+
100+
@Test
101+
public void callTest() throws DuplicateProviderException, ProviderNotFoundException, IOException {
102+
final String testWorkerId = "mock-worker";
103+
104+
ModuleManagerTesting moduleManager = new ModuleManagerTesting();
105+
ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
106+
moduleManager.put(CoreModule.NAME, moduleDefine);
107+
108+
WorkerInstancesService workerInstancesService = new WorkerInstancesService();
109+
moduleDefine.provider().registerServiceImplementation(IWorkerInstanceGetter.class, workerInstancesService);
110+
moduleDefine.provider().registerServiceImplementation(IWorkerInstanceSetter.class, workerInstancesService);
111+
112+
TestWorker worker = new TestWorker(moduleManager);
113+
workerInstancesService.put(testWorkerId, worker, MetricStreamKind.MAL, TestRemoteData.class);
114+
115+
MetricsCreator metricsCreator = mock(MetricsCreator.class);
116+
when(metricsCreator.createCounter(any(), any(), any(), any())).thenReturn(new CounterMetrics() {
117+
@Override
118+
public void inc() {
119+
120+
}
121+
122+
@Override
123+
public void inc(double value) {
124+
125+
}
126+
});
127+
when(metricsCreator.createHistogramMetric(any(), any(), any(), any())).thenReturn(new HistogramMetrics() {
128+
@Override
129+
public Timer createTimer() {
130+
return super.createTimer();
131+
}
132+
133+
@Override
134+
public void observe(double value) {
135+
136+
}
137+
});
138+
139+
ModuleDefineTesting telemetryModuleDefine = new ModuleDefineTesting();
140+
moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine);
141+
telemetryModuleDefine.provider().registerServiceImplementation(MetricsCreator.class, metricsCreator);
142+
serviceRegistry.addService(new RemoteServiceHandler(moduleManager));
143+
144+
RemoteServiceGrpc.RemoteServiceStub remoteServiceStub = RemoteServiceGrpc.newStub(channel);
145+
146+
StreamObserver<RemoteMessage> streamObserver = remoteServiceStub.call(new StreamObserver<Empty>() {
147+
@Override
148+
public void onNext(Empty empty) {
149+
150+
}
151+
152+
@Override
153+
public void onError(Throwable throwable) {
154+
155+
}
156+
157+
@Override
158+
public void onCompleted() {
159+
160+
}
161+
});
162+
163+
TestRemoteData testRemoteData = new TestRemoteData();
164+
testRemoteData.setCount(10);
165+
testRemoteData.setSummation(100);
166+
testRemoteData.setTimeBucket(202510161550L);
167+
testRemoteData.setEntityId("test-entity-id");
168+
testRemoteData.setServiceId("test-service-id");
169+
170+
RemoteData.Builder remoteData = testRemoteData.serialize();
171+
172+
RemoteMessage.Builder remoteMessage = RemoteMessage.newBuilder();
173+
remoteMessage.setNextWorkerName(testWorkerId);
174+
remoteMessage.setRemoteData(remoteData);
175+
176+
streamObserver.onNext(remoteMessage.build());
177+
streamObserver.onCompleted();
178+
}
179+
180+
public static class TestRemoteData extends AvgFunction {
181+
182+
@Override
183+
public AcceptableValue<Long> createNew() {
184+
TestRemoteData testRemoteData = new TestRemoteData();
185+
testRemoteData.initMeta("test-avg-meter", 1);
186+
return testRemoteData;
187+
}
188+
}
189+
190+
static class TestWorker extends AbstractWorker {
191+
192+
public TestWorker(ModuleDefineHolder moduleDefineHolder) {
193+
super(moduleDefineHolder);
194+
}
195+
196+
@Override
197+
public void in(Object o) {
198+
TestRemoteData data = (TestRemoteData) o;
199+
200+
Assertions.assertEquals(10, data.getValue());
201+
Assertions.assertEquals("test-avg-meter", data.getMeta().getMetricsName());
202+
Assertions.assertEquals(1, data.getMeta().getScope());
203+
}
204+
}
205+
}

oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java renamed to oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerOALTestCase.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.grpc.stub.StreamObserver;
2626
import io.grpc.util.MutableHandlerRegistry;
2727
import org.apache.skywalking.oap.server.core.CoreModule;
28+
import org.apache.skywalking.oap.server.core.analysis.worker.MetricStreamKind;
2829
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
2930
import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty;
3031
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
@@ -56,7 +57,7 @@
5657
import static org.mockito.Mockito.mock;
5758
import static org.mockito.Mockito.when;
5859

59-
public class RemoteServiceHandlerTestCase {
60+
public class RemoteServiceHandlerOALTestCase {
6061

6162
private Server server;
6263
private ManagedChannel channel;
@@ -109,9 +110,8 @@ public void callTest() throws DuplicateProviderException, ProviderNotFoundExcept
109110
moduleDefine.provider().registerServiceImplementation(IWorkerInstanceSetter.class, workerInstancesService);
110111

111112
TestWorker worker = new TestWorker(moduleManager);
112-
workerInstancesService.put(testWorkerId, worker, TestRemoteData.class);
113+
workerInstancesService.put(testWorkerId, worker, MetricStreamKind.OAL, TestRemoteData.class);
113114

114-
String serverName = InProcessServerBuilder.generateName();
115115
MetricsCreator metricsCreator = mock(MetricsCreator.class);
116116
when(metricsCreator.createCounter(any(), any(), any(), any())).thenReturn(new CounterMetrics() {
117117
@Override
@@ -139,6 +139,7 @@ public void observe(double value) {
139139
ModuleDefineTesting telemetryModuleDefine = new ModuleDefineTesting();
140140
moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine);
141141
telemetryModuleDefine.provider().registerServiceImplementation(MetricsCreator.class, metricsCreator);
142+
serviceRegistry.addService(new RemoteServiceHandler(moduleManager));
142143

143144
RemoteServiceGrpc.RemoteServiceStub remoteServiceStub = RemoteServiceGrpc.newStub(channel);
144145

@@ -174,7 +175,7 @@ public void onCompleted() {
174175
streamObserver.onCompleted();
175176
}
176177

177-
static class TestRemoteData extends StreamData {
178+
public static class TestRemoteData extends StreamData {
178179

179180
private String str1;
180181
private String str2;

oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.grpc.inprocess.InProcessServerBuilder;
2525
import io.grpc.util.MutableHandlerRegistry;
2626
import org.apache.skywalking.oap.server.core.CoreModule;
27+
import org.apache.skywalking.oap.server.core.analysis.worker.MetricStreamKind;
2728
import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
2829
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
2930
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
@@ -84,7 +85,7 @@ public void before() throws IOException {
8485
moduleDefine.provider().registerServiceImplementation(IWorkerInstanceSetter.class, workerInstancesService);
8586

8687
TestWorker worker = new TestWorker(moduleManager);
87-
workerInstancesService.put(nextWorkerName, worker, TestStreamData.class);
88+
workerInstancesService.put(nextWorkerName, worker, MetricStreamKind.OAL, TestStreamData.class);
8889
}
8990

9091
@AfterEach

0 commit comments

Comments
 (0)