Skip to content

Commit c1c0efe

Browse files
committed
初步完成log4j2 client
1 parent 336b50b commit c1c0efe

File tree

9 files changed

+373
-39
lines changed

9 files changed

+373
-39
lines changed

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,12 @@ gradle clean install uploadArchives
8686

8787
### dubbox
8888

89-
由于使用dubbox,为了能够采集到dubbox里面的rpc数据,需要修改dubbox的源码,见我修改的dubbox项目:[dubbox](https://github.com/JThink/dubbox/tree/skyeye-trace-1.0.0),该项目主要实现了rpc跟踪的具体实现,需要单独打包。
89+
由于使用dubbox,为了能够采集到dubbox里面的rpc数据,需要修改dubbox的源码,见我修改的dubbox项目:[dubbox](https://github.com/JThink/dubbox/tree/skyeye-trace-1.1.0),该项目主要实现了rpc跟踪的具体实现,需要单独打包。
9090

9191
```shell
9292
git clone https://github.com/JThink/dubbox.git
9393
cd dubbox
94-
git checkout skyeye-trace-1.0.0
94+
git checkout skyeye-trace-1.1.0
9595
修改相关pom中的私服地址
9696
mvn clean install deploy -Dmaven.test.skip=true
9797
```
@@ -602,11 +602,11 @@ compile "skyeye:skyeye-client-log4j:1.0.0"
602602
### 中间件
603603
如果项目中有使用到zkClient、,统一使用自己打包的版本,以防日志收集出错或者异常(PS:zk必须为3.4.6版本,尽量使用gradle进行打包部署)
604604
### rpc trace
605-
使用自己打包的dubbox([dubbox](https://github.com/JThink/dubbox/tree/skyeye-trace-1.0.0)),在soa中间件dubbox中封装了rpc的跟踪
605+
使用自己打包的dubbox([dubbox](https://github.com/JThink/dubbox/tree/skyeye-trace-1.1.0)),在soa中间件dubbox中封装了rpc的跟踪
606606
607607
``` shell
608608
compile "com.101tec:zkclient:0.9.1-up"
609-
compile ("com.alibaba:dubbo:2.8.4-skyeye-trace-1.0.0") {
609+
compile ("com.alibaba:dubbo:2.8.4-skyeye-trace-1.1.0") {
610610
exclude group: 'org.springframework', module: 'spring'
611611
}
612612
```

skyeye-client/settings.gradle

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1 @@
1-
include 'skyeye-client-core' , 'skyeye-client-log4j', 'skyeye-client-logback'
2-
//, 'skyeye-client-log4j2', 'skyeye-client-logback'
1+
include 'skyeye-client-core' , 'skyeye-client-log4j', 'skyeye-client-logback', 'skyeye-client-log4j2'

skyeye-client/skyeye-client-log4j/src/main/java/com/jthink/skyeye/client/log4j/appender/KafkaAppender.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,13 @@ public class KafkaAppender extends AppenderSkeleton {
5151
// 标记是否为rpc服务, 取值为RpcType.java
5252
private String rpc;
5353
// KafkaProducer类的配置
54-
private Map<String, Object> config = new HashMap<String, Object>();
54+
private Map<String, Object> config = new HashMap<>();
5555
// zk注册器
5656
private ZkRegister zkRegister;
5757
// kafka producer是否正在初始化
5858
private volatile AtomicBoolean isInitializing = new AtomicBoolean(false);
5959
// kafka producer未完成初始化之前的消息存放的队列
60-
private ConcurrentLinkedQueue<String> msgQueue = new ConcurrentLinkedQueue<String>();
60+
private ConcurrentLinkedQueue<String> msgQueue = new ConcurrentLinkedQueue<>();
6161

6262
// kafka server
6363
private String bootstrapServers;
@@ -131,7 +131,7 @@ protected void append(LoggingEvent event) {
131131
private void send(String value) {
132132
final byte[] key = ByteBuffer.allocate(4).putInt(new StringBuilder(app).append(host).toString().hashCode()).array();
133133

134-
final ProducerRecord<byte[], String> record = new ProducerRecord<byte[], String>(this.topic, key, value);
134+
final ProducerRecord<byte[], String> record = new ProducerRecord<>(this.topic, key, value);
135135
LazySingletonProducer.getInstance(this.config).send(record, new Callback() {
136136
@Override
137137
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
@@ -260,18 +260,6 @@ public void run() {
260260
}
261261
}
262262

263-
/**
264-
* 进行rpc trace注册
265-
* @param app
266-
* @param host
267-
* @param zkClient
268-
*/
269-
private void register(String app, String host, ZkClient zkClient) {
270-
RegisterDto dto = new RegisterDto(app, host, zkClient);
271-
Registry registry = new ZookeeperRegistry();
272-
IncrementIdGen.setId(registry.register(dto));
273-
}
274-
275263
/**
276264
* 监察rpc type是否正确
277265
* @param rpcType

skyeye-client/skyeye-client-log4j2/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ compileJava.options.encoding = 'UTF-8'
88
buildDir = 'target'
99

1010
ext {
11-
// log4jVersion = '1.2.17'
11+
log4j2Version = '2.7'
1212
}
1313

1414
dependencies {
1515
compile project(':skyeye-client-core')
16-
// compile "log4j:log4j:$log4jVersion"
16+
compile "org.apache.logging.log4j:log4j-core:$log4j2Version"
1717
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package com.jthink.skyeye.client.log4j2.appender;
2+
3+
import com.jthink.skyeye.base.constant.Constants;
4+
import com.jthink.skyeye.client.core.constant.NodeMode;
5+
import com.jthink.skyeye.client.core.producer.LazySingletonProducer;
6+
import com.jthink.skyeye.client.core.util.SysUtil;
7+
import org.apache.kafka.clients.producer.Callback;
8+
import org.apache.kafka.clients.producer.ProducerRecord;
9+
import org.apache.kafka.clients.producer.RecordMetadata;
10+
import org.apache.logging.log4j.core.Appender;
11+
import org.apache.logging.log4j.core.Filter;
12+
import org.apache.logging.log4j.core.Layout;
13+
import org.apache.logging.log4j.core.LogEvent;
14+
import org.apache.logging.log4j.core.appender.AbstractAppender;
15+
import org.apache.logging.log4j.core.appender.AppenderLoggingException;
16+
import org.apache.logging.log4j.core.config.Configuration;
17+
import org.apache.logging.log4j.core.config.Node;
18+
import org.apache.logging.log4j.core.config.Property;
19+
import org.apache.logging.log4j.core.config.plugins.*;
20+
import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
21+
import org.apache.logging.log4j.core.layout.SerializedLayout;
22+
import org.apache.logging.log4j.core.util.StringEncoder;
23+
24+
import java.io.Serializable;
25+
import java.nio.charset.StandardCharsets;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.AtomicBoolean;
28+
29+
/**
30+
* JThink@JThink
31+
*
32+
* @author JThink
33+
* @version 0.0.1
34+
* @desc KafkaAppender, 包含log4j2 kafka appender的配置, 仿照官网的appender进行修改加入自己的功能
35+
* @date 2017-08-14 09:30:45
36+
*/
37+
@Plugin(name = "KafkaCustomize", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
38+
public class KafkaAppender extends AbstractAppender {
39+
40+
private final KafkaManager manager;
41+
42+
// kafkaAppender遇到异常需要向zk进行写入数据,由于onCompletion()的调用在kafka集群完全挂掉时会有很多阻塞的日志会调用,所以我们需要保证只向zk写一次数据,监控中心只会发生一次报警
43+
private volatile AtomicBoolean flag = new AtomicBoolean(true);
44+
45+
@PluginFactory
46+
public static KafkaAppender createAppender(
47+
@PluginElement("Layout") final Layout<? extends Serializable> layout,
48+
@PluginElement("Filter") final Filter filter,
49+
@Required(message = "No name provided for KafkaAppender") @PluginAttribute("name") final String name,
50+
@Required(message = "No topic provided for KafkaAppender") @PluginAttribute("topic") final String topic,
51+
@Required(message = "No zkServers provided for KafkaAppender") @PluginAttribute("zkServers") final String zkServers,
52+
@Required(message = "No mail provided for KafkaAppender") @PluginAttribute("mail") final String mail,
53+
@Required(message = "No rpc provided for KafkaAppender") @PluginAttribute("rpc") final String rpc,
54+
@Required(message = "No app provided for KafkaAppender") @PluginAttribute("app") final String app,
55+
@Required(message = "No host provided for KafkaAppender") @PluginAttribute("host") final String host,
56+
@PluginElement("Properties") final Property[] properties,
57+
@PluginConfiguration final Configuration configuration) {
58+
final KafkaManager kafkaManager = new KafkaManager(configuration.getLoggerContext(), name, topic, zkServers, mail, rpc, app, host, properties);
59+
return new KafkaAppender(name, layout, filter, kafkaManager);
60+
}
61+
62+
private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter, final KafkaManager manager) {
63+
super(name, filter, layout, true);
64+
this.manager = manager;
65+
}
66+
67+
@Override
68+
public void append(LogEvent event) {
69+
if (!isStarted()) {
70+
return;
71+
}
72+
73+
if (event.getLoggerName().startsWith("org.apache.kafka")) {
74+
LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
75+
} else {
76+
try {
77+
final Layout<? extends Serializable> layout = getLayout();
78+
byte[] data;
79+
if (layout != null) {
80+
if (layout instanceof SerializedLayout) {
81+
final byte[] header = layout.getHeader();
82+
final byte[] body = layout.toByteArray(event);
83+
data = new byte[header.length + body.length];
84+
System.arraycopy(header, 0, data, 0, header.length);
85+
System.arraycopy(body, 0, data, header.length, body.length);
86+
} else {
87+
data = layout.toByteArray(event);
88+
}
89+
} else {
90+
data = StringEncoder.toBytes(event.getMessage().getFormattedMessage(), StandardCharsets.UTF_8);
91+
}
92+
// 发送数据到kafka
93+
String value = System.nanoTime() + Constants.SEMICOLON + new String(data);
94+
final ProducerRecord<byte[], String> record = new ProducerRecord<>(this.manager.getTopic(), this.manager.getKey(), value);
95+
LazySingletonProducer.getInstance(this.manager.getConfig()).send(record, new Callback() {
96+
@Override
97+
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
98+
// TODO: 异常发生如何处理(目前使用RollingFileAppender.java中的方法)
99+
if (null != e) {
100+
// 设置停止
101+
setStopped();
102+
LOGGER.error("kafka send error in appender", e);
103+
// 发生异常,kafkaAppender 停止收集,向节点写入数据(监控系统会感知进行报警)
104+
if (flag.get() == true) {
105+
KafkaAppender.this.manager.getZkRegister().write(Constants.SLASH + KafkaAppender.this.manager.getApp() + Constants.SLASH +
106+
KafkaAppender.this.manager.getHost(), NodeMode.EPHEMERAL,
107+
String.valueOf(System.currentTimeMillis()) + Constants.SEMICOLON + SysUtil.userDir);
108+
flag.compareAndSet(true, false);
109+
}
110+
}
111+
}
112+
});
113+
} catch (final Exception e) {
114+
LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e);
115+
throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e);
116+
}
117+
}
118+
119+
120+
}
121+
122+
@Override
123+
public void start() {
124+
super.start();
125+
126+
this.manager.startup();
127+
}
128+
129+
@Override
130+
public boolean stop(long timeout, TimeUnit timeUnit) {
131+
setStopping();
132+
boolean stopped = super.stop(timeout, timeUnit, false);
133+
stopped &= this.manager.stop(timeout, timeUnit);
134+
setStopped();
135+
return stopped;
136+
}
137+
}

0 commit comments

Comments
 (0)