Skip to content

Commit b3fea72

Browse files
zifeihanwu-sheng
andauthored
Add plugin to support aliyun-ons-1.x (#19)
* Add plugin to support aliyun-ons-1.x * Update dependencies skywalking.version to 8.3 Co-authored-by: 吴晟 Wu Sheng <[email protected]>
1 parent 618471f commit b3fea72

15 files changed

+857
-1
lines changed

aliyun-ons-1.x-plugin/pom.xml

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one or more
4+
~ contributor license agreements. See the NOTICE file distributed with
5+
~ this work for additional information regarding copyright ownership.
6+
~ The ASF licenses this file to You under the Apache License, Version 2.0
7+
~ (the "License"); you may not use this file except in compliance with
8+
~ the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
~
18+
-->
19+
20+
<project xmlns="http://maven.apache.org/POM/4.0.0"
21+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
22+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
23+
<modelVersion>4.0.0</modelVersion>
24+
25+
<parent>
26+
<groupId>org.openskywalking</groupId>
27+
<artifactId>java-plugin-extensions</artifactId>
28+
<version>2.0.0</version>
29+
</parent>
30+
31+
<groupId>org.apache.skywalking</groupId>
32+
<artifactId>apm-aliyun-ons-1.x-plugin</artifactId>
33+
<packaging>jar</packaging>
34+
35+
<name>aliyun-ons-1.x-plugin</name>
36+
<url>http://maven.apache.org</url>
37+
38+
<properties>
39+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
40+
<ons-client.version>1.8.4.Final</ons-client.version>
41+
</properties>
42+
43+
<dependencies>
44+
<dependency>
45+
<groupId>com.aliyun.openservices</groupId>
46+
<artifactId>ons-client</artifactId>
47+
<version>${ons-client.version}</version>
48+
<scope>provided</scope>
49+
</dependency>
50+
</dependencies>
51+
</project>
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.ons.v1;
20+
21+
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageExt;
22+
import java.lang.reflect.Method;
23+
import java.util.List;
24+
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
25+
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
26+
import org.apache.skywalking.apm.agent.core.context.ContextManager;
27+
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
28+
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
29+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
30+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
31+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
32+
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
33+
34+
public abstract class AbstractMessageConsumeInterceptor implements InstanceMethodsAroundInterceptor {
35+
36+
public static final String CONSUMER_OPERATION_NAME_PREFIX = "RocketMQ/";
37+
38+
@Override
39+
public final void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
40+
Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
41+
List<MessageExt> msgs = (List<MessageExt>) allArguments[0];
42+
43+
ContextCarrier contextCarrier = getContextCarrierFromMessage(msgs.get(0));
44+
AbstractSpan span = ContextManager.createEntrySpan(
45+
CONSUMER_OPERATION_NAME_PREFIX + msgs.get(0)
46+
.getTopic() + "/Consumer", contextCarrier);
47+
48+
span.setComponent(ComponentsDefine.ROCKET_MQ_CONSUMER);
49+
SpanLayer.asMQ(span);
50+
for (int i = 1; i < msgs.size(); i++) {
51+
ContextManager.extract(getContextCarrierFromMessage(msgs.get(i)));
52+
}
53+
54+
}
55+
56+
@Override
57+
public final void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
58+
Class<?>[] argumentsTypes, Throwable t) {
59+
ContextManager.activeSpan().log(t);
60+
}
61+
62+
private ContextCarrier getContextCarrierFromMessage(MessageExt message) {
63+
ContextCarrier contextCarrier = new ContextCarrier();
64+
65+
CarrierItem next = contextCarrier.items();
66+
while (next.hasNext()) {
67+
next = next.next();
68+
next.setHeadValue(message.getUserProperty(next.getHeadKey()));
69+
}
70+
71+
return contextCarrier;
72+
}
73+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.ons.v1;
20+
21+
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
22+
import java.lang.reflect.Method;
23+
import org.apache.skywalking.apm.agent.core.context.ContextManager;
24+
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
25+
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
26+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
27+
28+
public class MessageConcurrentlyConsumeInterceptor extends AbstractMessageConsumeInterceptor {
29+
30+
@Override
31+
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
32+
Object ret) throws Throwable {
33+
ConsumeConcurrentlyStatus status = (ConsumeConcurrentlyStatus) ret;
34+
if (status == ConsumeConcurrentlyStatus.RECONSUME_LATER) {
35+
AbstractSpan activeSpan = ContextManager.activeSpan();
36+
activeSpan.errorOccurred();
37+
Tags.STATUS_CODE.set(activeSpan, status.name());
38+
}
39+
ContextManager.stopSpan();
40+
return ret;
41+
}
42+
}
43+
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.ons.v1;
20+
21+
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
22+
import java.lang.reflect.Method;
23+
import org.apache.skywalking.apm.agent.core.context.ContextManager;
24+
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
25+
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
26+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
27+
28+
public class MessageOrderlyConsumeInterceptor extends AbstractMessageConsumeInterceptor {
29+
30+
@Override
31+
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
32+
Object ret) throws Throwable {
33+
34+
ConsumeOrderlyStatus status = (ConsumeOrderlyStatus) ret;
35+
if (status == ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT) {
36+
AbstractSpan activeSpan = ContextManager.activeSpan();
37+
activeSpan.errorOccurred();
38+
Tags.STATUS_CODE.set(activeSpan, status.name());
39+
}
40+
ContextManager.stopSpan();
41+
return ret;
42+
}
43+
44+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.ons.v1;
20+
21+
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.Message;
22+
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader;
23+
import java.lang.reflect.Method;
24+
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
25+
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
26+
import org.apache.skywalking.apm.agent.core.context.ContextManager;
27+
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
28+
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
29+
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
30+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
31+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
32+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
33+
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
34+
import org.apache.skywalking.apm.plugin.ons.v1.define.SendCallBackEnhanceInfo;
35+
import org.apache.skywalking.apm.util.StringUtil;
36+
37+
import static com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR;
38+
import static com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR;
39+
40+
public class MessageSendInterceptor implements InstanceMethodsAroundInterceptor {
41+
42+
public static final String ASYNC_SEND_OPERATION_NAME_PREFIX = "RocketMQ/";
43+
44+
@Override
45+
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
46+
MethodInterceptResult result) throws Throwable {
47+
Message message = (Message) allArguments[2];
48+
ContextCarrier contextCarrier = new ContextCarrier();
49+
String namingServiceAddress = String.valueOf(objInst.getSkyWalkingDynamicField());
50+
AbstractSpan span = ContextManager.createExitSpan(
51+
buildOperationName(message.getTopic()), contextCarrier, namingServiceAddress);
52+
span.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER);
53+
Tags.MQ_BROKER.set(span, (String) allArguments[0]);
54+
Tags.MQ_TOPIC.set(span, message.getTopic());
55+
contextCarrier.extensionInjector().injectSendingTimestamp();
56+
SpanLayer.asMQ(span);
57+
58+
SendMessageRequestHeader requestHeader = (SendMessageRequestHeader) allArguments[3];
59+
StringBuilder properties = new StringBuilder(requestHeader.getProperties());
60+
CarrierItem next = contextCarrier.items();
61+
while (next.hasNext()) {
62+
next = next.next();
63+
if (!StringUtil.isEmpty(next.getHeadValue())) {
64+
properties.append(next.getHeadKey());
65+
properties.append(NAME_VALUE_SEPARATOR);
66+
properties.append(next.getHeadValue());
67+
properties.append(PROPERTY_SEPARATOR);
68+
}
69+
}
70+
requestHeader.setProperties(properties.toString());
71+
72+
if (allArguments[6] != null) {
73+
((EnhancedInstance) allArguments[6]).setSkyWalkingDynamicField(
74+
new SendCallBackEnhanceInfo(message.getTopic(), ContextManager
75+
.capture()));
76+
}
77+
}
78+
79+
@Override
80+
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
81+
Object ret) throws Throwable {
82+
ContextManager.stopSpan();
83+
return ret;
84+
}
85+
86+
@Override
87+
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
88+
Class<?>[] argumentsTypes, Throwable t) {
89+
ContextManager.activeSpan().log(t);
90+
}
91+
92+
private String buildOperationName(String topicName) {
93+
return ASYNC_SEND_OPERATION_NAME_PREFIX + topicName + "/Producer";
94+
}
95+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.ons.v1;
20+
21+
import java.lang.reflect.Method;
22+
import org.apache.skywalking.apm.agent.core.context.ContextManager;
23+
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
24+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
25+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
26+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
27+
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
28+
import org.apache.skywalking.apm.plugin.ons.v1.define.SendCallBackEnhanceInfo;
29+
30+
public class OnExceptionInterceptor implements InstanceMethodsAroundInterceptor {
31+
32+
public static final String CALLBACK_OPERATION_NAME_PREFIX = "RocketMQ/";
33+
private static final String DEFAULT_TOPIC = "no_topic";
34+
35+
@Override
36+
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
37+
MethodInterceptResult result) throws Throwable {
38+
SendCallBackEnhanceInfo enhanceInfo = (SendCallBackEnhanceInfo) objInst.getSkyWalkingDynamicField();
39+
String topicId = DEFAULT_TOPIC;
40+
// The SendCallBackEnhanceInfo could be null when there is an internal exception in the client API,
41+
// such as MQClientException("no route info of this topic")
42+
if (enhanceInfo != null) {
43+
topicId = enhanceInfo.getTopicId();
44+
}
45+
AbstractSpan activeSpan = ContextManager.createLocalSpan(
46+
CALLBACK_OPERATION_NAME_PREFIX + topicId + "/Producer/Callback");
47+
activeSpan.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER);
48+
activeSpan.log((Throwable) allArguments[0]);
49+
if (enhanceInfo != null && enhanceInfo.getContextSnapshot() != null) {
50+
ContextManager.continued(enhanceInfo.getContextSnapshot());
51+
}
52+
}
53+
54+
@Override
55+
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
56+
Object ret) throws Throwable {
57+
ContextManager.stopSpan();
58+
return ret;
59+
}
60+
61+
@Override
62+
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
63+
Class<?>[] argumentsTypes, Throwable t) {
64+
ContextManager.activeSpan().log(t);
65+
}
66+
}

0 commit comments

Comments
 (0)