Skip to content

Commit 5df70c4

Browse files
authored
Added sentinel support (#9)
#9 Add Redis sentinel support.
1 parent 8466bd6 commit 5df70c4

File tree

6 files changed

+318
-15
lines changed

6 files changed

+318
-15
lines changed

README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,16 @@ Below you can find a sample `log4j2.xml` snippet employing `RedisAppender`.
5858
</Configuration>
5959
```
6060

61+
If you have Sentinel cluster please configure RedisAppender like this:
62+
```xml
63+
<RedisAppender name="REDIS"
64+
key="log4j2-messages"
65+
sentinelNodes="localhost:63791,localhost:63792"
66+
sentinelMaster="mymaster">
67+
...
68+
</RedisAppender>
69+
```
70+
6171
`RedisAppender` is configured with the following parameters:
6272

6373
| Parameter Name | Type | Default | Description |
@@ -66,6 +76,8 @@ Below you can find a sample `log4j2.xml` snippet employing `RedisAppender`.
6676
| `key` | String | | Redis queue key |
6777
| `host` | String | `localhost` | Redis host|
6878
| `port` | int | 6379 | Redis port |
79+
| `sentinelNodes` | String | `localhost:63791,localhost:63792` | Redis sentinel nodes as comma-separated list. If specified, `host` and `port` parameters are ignored. |
80+
| `sentinelMaster` | String | `mymaster` | Redis sentinel master name |
6981
| `password` | String | `null` | Redis password |
7082
| `connectionTimeoutSeconds` | int | 2 | initial connection timeout in seconds |
7183
| `socketTimeoutSeconds` | int | 2 | socket timeout in seconds |
@@ -209,6 +221,7 @@ Contributors
209221

210222
- [t9t](https://github.com/t9t)
211223
- [Yaroslav Skopets](https://github.com/yskopets)
224+
- [Boris Faniuk](https://github.com/bfanyuk)
212225

213226
# License
214227

appender/src/main/java/com/vlkan/log4j2/redis/appender/RedisAppender.java

Lines changed: 72 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,20 @@
1717
import org.apache.logging.log4j.util.Strings;
1818
import redis.clients.jedis.Jedis;
1919
import redis.clients.jedis.JedisPool;
20+
import redis.clients.jedis.JedisSentinelPool;
2021
import redis.clients.jedis.Protocol;
2122
import redis.clients.jedis.exceptions.JedisConnectionException;
23+
import redis.clients.jedis.util.Pool;
2224

2325
import java.io.Serializable;
2426
import java.nio.charset.Charset;
27+
import java.util.Set;
28+
import java.util.stream.Collectors;
29+
import java.util.stream.Stream;
2530

2631
import static com.vlkan.log4j2.redis.appender.Helpers.requireArgument;
2732
import static java.util.Objects.requireNonNull;
33+
import static org.apache.logging.log4j.util.Strings.isNotBlank;
2834

2935
@Plugin(name = "RedisAppender",
3036
category = Core.CATEGORY_NAME,
@@ -56,13 +62,17 @@ public class RedisAppender implements Appender {
5662

5763
private final boolean debugEnabled;
5864

65+
private final String sentinelNodes;
66+
67+
private final String sentinelMaster;
68+
5969
private final RedisConnectionPoolConfig poolConfig;
6070

6171
private final DebugLogger logger;
6272

6373
private final RedisThrottler throttler;
6474

65-
private volatile JedisPool jedisPool;
75+
private volatile Pool<Jedis> jedisPool;
6676

6777
private volatile State state;
6878

@@ -81,6 +91,8 @@ private RedisAppender(Builder builder) {
8191
this.socketTimeoutSeconds = builder.socketTimeoutSeconds;
8292
this.ignoreExceptions = builder.ignoreExceptions;
8393
this.debugEnabled = builder.debugEnabled;
94+
this.sentinelNodes = builder.sentinelNodes;
95+
this.sentinelMaster = builder.sentinelMaster;
8496
this.poolConfig = builder.poolConfig;
8597
this.logger = new DebugLogger(RedisAppender.class, debugEnabled);
8698
this.throttler = new RedisThrottler(builder.getThrottlerConfig(), this, ignoreExceptions, debugEnabled);
@@ -201,19 +213,37 @@ private void connect() {
201213
logger.debug("connecting");
202214
int connectionTimeoutMillis = 1_000 * connectionTimeoutSeconds;
203215
int socketTimeoutMillis = 1_000 * socketTimeoutSeconds;
204-
jedisPool = new JedisPool(
205-
poolConfig.getJedisPoolConfig(),
206-
host,
207-
port,
208-
connectionTimeoutMillis,
209-
socketTimeoutMillis,
210-
password,
211-
0, // database
212-
null, // clientName
213-
false, // ssl
214-
null, // sslSocketFactory
215-
null, // sslParameters,
216-
null); // hostnameVerifier
216+
boolean sentinel = isNotBlank(sentinelNodes);
217+
if (sentinel) {
218+
Set<String> sentinelNodesAsSet = Stream.of(sentinelNodes.split("\\s*,\\s*"))
219+
.filter(Strings::isNotBlank)
220+
.collect(Collectors.toSet());
221+
jedisPool = new JedisSentinelPool(
222+
sentinelMaster,
223+
sentinelNodesAsSet,
224+
poolConfig.getJedisPoolConfig(),
225+
connectionTimeoutMillis,
226+
socketTimeoutMillis,
227+
password,
228+
0, // database
229+
null // clientName
230+
);
231+
} else {
232+
jedisPool = new JedisPool(
233+
poolConfig.getJedisPoolConfig(),
234+
host,
235+
port,
236+
connectionTimeoutMillis,
237+
socketTimeoutMillis,
238+
password,
239+
0, // database
240+
null, // clientName
241+
false, // ssl
242+
null, // sslSocketFactory
243+
null, // sslParameters,
244+
null // hostnameVerifier
245+
);
246+
}
217247
}
218248

219249
private void disconnect() {
@@ -296,6 +326,12 @@ public static class Builder implements org.apache.logging.log4j.core.util.Builde
296326
@PluginBuilderAttribute
297327
private boolean debugEnabled = false;
298328

329+
@PluginBuilderAttribute
330+
private String sentinelNodes;
331+
332+
@PluginBuilderAttribute
333+
private String sentinelMaster;
334+
299335
@PluginElement("RedisConnectionPoolConfig")
300336
private RedisConnectionPoolConfig poolConfig = RedisConnectionPoolConfig.newBuilder().build();
301337

@@ -414,6 +450,24 @@ public Builder setDebugEnabled(boolean debugEnabled) {
414450
return this;
415451
}
416452

453+
public String getSentinelNodes() {
454+
return sentinelNodes;
455+
}
456+
457+
public Builder setSentinelNodes(String sentinelNodes) {
458+
this.sentinelNodes = sentinelNodes;
459+
return this;
460+
}
461+
462+
public String getSentinelMaster() {
463+
return sentinelMaster;
464+
}
465+
466+
public Builder setSentinelMaster(String sentinelMaster) {
467+
this.sentinelMaster = sentinelMaster;
468+
return this;
469+
}
470+
417471
public RedisConnectionPoolConfig getPoolConfig() {
418472
return poolConfig;
419473
}
@@ -446,6 +500,10 @@ private void check() {
446500
requireArgument(Strings.isNotBlank(key), "blank key");
447501
requireArgument(Strings.isNotBlank(host), "blank host");
448502
requireArgument(port > 0, "expecting: port > 0, found: %d", port);
503+
if (sentinelNodes != null) {
504+
requireArgument(Strings.isNotBlank(sentinelNodes), "blank sentinel nodes");
505+
requireArgument(Strings.isNotBlank(sentinelMaster), "blank sentinel master");
506+
}
449507
requireArgument(connectionTimeoutSeconds > 0, "expecting: connectionTimeoutSeconds > 0, found: %d", connectionTimeoutSeconds);
450508
requireArgument(socketTimeoutSeconds > 0, "expecting: socketTimeoutSeconds > 0, found: %d", socketTimeoutSeconds);
451509
requireNonNull(poolConfig, "poolConfig");

appender/src/main/java/com/vlkan/log4j2/redis/appender/RedisThrottler.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.vlkan.log4j2.redis.appender;
22

33
import com.vlkan.log4j2.redis.appender.guava.GuavaRateLimiter;
4+
import org.apache.logging.log4j.LogManager;
5+
import org.apache.logging.log4j.core.LoggerContext;
46

57
import javax.management.InstanceAlreadyExistsException;
68
import javax.management.JMX;
@@ -73,9 +75,12 @@ class RedisThrottler implements AutoCloseable {
7375
private ObjectName createJmxBeanName() {
7476
String beanName = config.getJmxBeanName();
7577
if (beanName == null) {
78+
LoggerContext loggerContext = appender.getConfig().getLoggerContext();
79+
if (loggerContext == null)
80+
loggerContext = (LoggerContext) LogManager.getContext(false);
7681
beanName = String.format(
7782
"org.apache.logging.log4j2:type=%s,component=Appenders,name=%s,subtype=RedisThrottler",
78-
appender.getConfig().getLoggerContext().getName(),
83+
loggerContext.getName(),
7984
appender.getName());
8085
}
8186
try {
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
package com.vlkan.log4j2.redis.appender;
2+
3+
import org.apache.logging.log4j.Level;
4+
import org.apache.logging.log4j.Logger;
5+
import org.apache.logging.log4j.core.Appender;
6+
import org.junit.ClassRule;
7+
import org.junit.ComparisonFailure;
8+
import org.junit.Test;
9+
import org.junit.rules.RuleChain;
10+
import redis.clients.jedis.Jedis;
11+
12+
import java.net.URI;
13+
import java.net.URISyntaxException;
14+
import java.util.Random;
15+
import java.util.concurrent.atomic.AtomicInteger;
16+
17+
import static org.assertj.core.api.Assertions.assertThat;
18+
19+
public class RedisAppenderSentinelTest {
20+
21+
private static final DebugLogger LOGGER = new DebugLogger(RedisAppenderSentinelTest.class, true);
22+
23+
private static final Random RANDOM = new Random(0);
24+
25+
private static final int MIN_MESSAGE_COUNT = 1;
26+
27+
private static final int MAX_MESSAGE_COUNT = 100;
28+
29+
private static final String REDIS_KEY = "log4j2-messages";
30+
31+
private static final String REDIS_HOST = "localhost";
32+
33+
static final String REDIS_PASSWORD = "toosecret";
34+
35+
static final int REDIS_PORT = 63790;
36+
37+
static final int SENTINEL_PORT = 63792;
38+
39+
static final String MASTER_NAME = "mymaster";
40+
41+
private static final RedisServerResource REDIS_SERVER_RESOURCE = new RedisServerResource(REDIS_PORT, REDIS_PASSWORD);
42+
43+
private static final RedisSentinelResource REDIS_SENTINEL_RESOURCE = new RedisSentinelResource(SENTINEL_PORT, REDIS_PORT, MASTER_NAME);
44+
45+
private static final RedisClientResource REDIS_CLIENT_RESOURCE = new RedisClientResource(REDIS_HOST, REDIS_PORT, REDIS_PASSWORD);
46+
47+
static final URI CONFIG_FILE_URI = createConfigFileUri();
48+
49+
private static final LoggerContextResource LOGGER_CONTEXT_RESOURCE = new LoggerContextResource(CONFIG_FILE_URI);
50+
51+
@ClassRule
52+
public static final RuleChain RULE_CHAIN = RuleChain
53+
.outerRule(REDIS_SERVER_RESOURCE)
54+
.around(REDIS_SENTINEL_RESOURCE)
55+
.around(REDIS_CLIENT_RESOURCE)
56+
.around(LOGGER_CONTEXT_RESOURCE);
57+
58+
private static class LogMessage {
59+
60+
private static final Level[] LEVELS = Level.values();
61+
62+
private static final int MIN_MESSAGE_LENGTH = 10;
63+
64+
private static final int MAX_MESSAGE_LENGTH = 100;
65+
66+
private final Level level;
67+
68+
private final String message;
69+
70+
private static AtomicInteger COUNTER = new AtomicInteger(0);
71+
72+
private LogMessage(Level level, String message) {
73+
this.level = level;
74+
this.message = message;
75+
}
76+
77+
private static LogMessage createRandom() {
78+
int levelIndex = RANDOM.nextInt(LEVELS.length);
79+
Level level = LEVELS[levelIndex];
80+
int messageLength = MIN_MESSAGE_LENGTH + RANDOM.nextInt(MAX_MESSAGE_LENGTH - MIN_MESSAGE_LENGTH);
81+
String prefix = String.format("[%d] ", COUNTER.getAndIncrement());
82+
StringBuilder messageBuilder = new StringBuilder(prefix);
83+
while (messageBuilder.length() < messageLength) {
84+
char messageChar = (char) RANDOM.nextInt(Character.MAX_VALUE);
85+
if (Character.isLetterOrDigit(messageChar)) {
86+
messageBuilder.append(messageChar);
87+
}
88+
}
89+
String message = messageBuilder.toString();
90+
return new LogMessage(level, message);
91+
}
92+
93+
private static LogMessage[] createRandomArray(int count) {
94+
LogMessage[] messages = new LogMessage[count];
95+
for (int i = 0; i < count; i++) {
96+
messages[i] = createRandom();
97+
}
98+
return messages;
99+
}
100+
101+
}
102+
103+
private static URI createConfigFileUri() {
104+
try {
105+
return new URI("classpath:log4j2.RedisAppenderSentinelTest.xml");
106+
} catch (URISyntaxException error) {
107+
throw new RuntimeException("failed finding Log4j config", error);
108+
}
109+
}
110+
111+
@Test
112+
public void test_messages_are_enqueued_to_redis() throws InterruptedException {
113+
114+
LOGGER.debug("creating the logger");
115+
Logger logger = LOGGER_CONTEXT_RESOURCE.getLoggerContext().getLogger(RedisAppenderSentinelTest.class.getCanonicalName());
116+
117+
int expectedMessageCount = MIN_MESSAGE_COUNT + RANDOM.nextInt(MAX_MESSAGE_COUNT - MIN_MESSAGE_COUNT);
118+
LOGGER.debug("logging %d messages", expectedMessageCount);
119+
LogMessage[] expectedLogMessages = LogMessage.createRandomArray(expectedMessageCount);
120+
for (LogMessage expectedLogMessage : expectedLogMessages) {
121+
logger.log(expectedLogMessage.level, expectedLogMessage.message);
122+
}
123+
124+
LOGGER.debug("waiting for throttler to kick in");
125+
Thread.sleep(1_000);
126+
127+
LOGGER.debug("checking logged messages");
128+
Jedis redisClient = REDIS_CLIENT_RESOURCE.getClient();
129+
for (int messageIndex = 0; messageIndex < expectedMessageCount; messageIndex++) {
130+
LogMessage expectedLogMessage = expectedLogMessages[messageIndex];
131+
String expectedSerializedMessage = String.format("%s %s", expectedLogMessage.level, expectedLogMessage.message);
132+
String actualSerializedMessage = redisClient.lpop(REDIS_KEY);
133+
try {
134+
assertThat(actualSerializedMessage).isEqualTo(expectedSerializedMessage);
135+
} catch (ComparisonFailure comparisonFailure) {
136+
String message = String.format("comparison failure (messageIndex=%d, messageCount=%d)", messageIndex, expectedMessageCount);
137+
throw new RuntimeException(message, comparisonFailure);
138+
}
139+
}
140+
141+
Appender appender = LOGGER_CONTEXT_RESOURCE.getLoggerContext().getConfiguration().getAppender("REDIS");
142+
assertThat(appender).isInstanceOf(RedisAppender.class);
143+
RedisThrottlerJmxBean jmxBean = ((RedisAppender) appender).getJmxBean();
144+
assertThat(jmxBean.getTotalEventCount()).isEqualTo(expectedMessageCount);
145+
assertThat(jmxBean.getIgnoredEventCount()).isEqualTo(0);
146+
assertThat(jmxBean.getEventRateLimitFailureCount()).isEqualTo(0);
147+
assertThat(jmxBean.getByteRateLimitFailureCount()).isEqualTo(0);
148+
assertThat(jmxBean.getUnavailableBufferSpaceFailureCount()).isEqualTo(0);
149+
assertThat(jmxBean.getRedisPushSuccessCount()).isEqualTo(expectedMessageCount);
150+
assertThat(jmxBean.getRedisPushFailureCount()).isEqualTo(0);
151+
152+
}
153+
154+
}

0 commit comments

Comments
 (0)