Skip to content

Commit c3c42b7

Browse files
rmarticabaRoberto Martinez Caballerovy
authored
Add publish command support (#138)
Co-authored-by: Roberto Martinez Caballero <[email protected]> Co-authored-by: Volkan Yazıcı <[email protected]>
1 parent d08a71a commit c3c42b7

File tree

6 files changed

+313
-4
lines changed

6 files changed

+313
-4
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
limitations under the License.
1515
-->
1616

17+
### (????-??-??) v0.16.0
18+
19+
- Added `publish` command support (#138)
1720

1821
### (2024-04-14) v0.15.0
1922

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,9 @@ Note that `sentinelNodes` and `sentinelMaster` have priority over `host` and
102102
| `port` | int | 6379 | Redis port |
103103
| `sentinelNodes` | String | `null` | Redis sentinel nodes as comma-separated list, e.g., `host1:port1,host2:port2`. If specified, `host` and `port` parameters are ignored. |
104104
| `sentinelMaster` | String | `null` | Redis sentinel master name |
105-
| `username` | String | `null` | Redis username |
105+
| `username` | String | `default` | Redis username |
106106
| `password` | String | `null` | Redis password |
107+
| `command` | String | `rpush` | Redis command for writing to the queue. Accepts `rpush` (default) and `publish`. |
107108
| `connectionTimeoutSeconds` | int | 2 | initial connection timeout in seconds |
108109
| `socketTimeoutSeconds` | int | 2 | socket timeout in seconds |
109110
| `ignoreExceptions` | boolean | `true` | Enabling causes exceptions encountered while appending events to be internally logged and then ignored. When set to false, exceptions will be propagated to the caller, instead. You must set this to false when wrapping this appender in a `FailoverAppender`. |

appender/src/main/java/com/vlkan/log4j2/redis/appender/RedisAppender.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.io.Serializable;
4242
import java.nio.charset.Charset;
4343
import java.nio.charset.StandardCharsets;
44+
import java.util.Arrays;
4445
import java.util.Set;
4546
import java.util.stream.Collectors;
4647
import java.util.stream.Stream;
@@ -57,6 +58,12 @@ public class RedisAppender implements Appender {
5758

5859
private static final StatusLogger LOGGER = StatusLogger.getLogger();
5960

61+
private static final String RPUSH_COMMAND = "rpush";
62+
63+
private static final String PUBLISH_COMMAND = "publish";
64+
65+
private static final Set<String> ALLOWED_COMMANDS = Stream.of(RPUSH_COMMAND, PUBLISH_COMMAND).collect(Collectors.toSet());
66+
6067
private final Configuration config;
6168

6269
private final String name;
@@ -89,6 +96,8 @@ public class RedisAppender implements Appender {
8996

9097
private final String sentinelMaster;
9198

99+
private final String command;
100+
92101
private final RedisConnectionPoolConfig poolConfig;
93102

94103
private final RedisThrottler throttler;
@@ -116,6 +125,7 @@ private RedisAppender(Builder builder) {
116125
this.ignoreExceptions = builder.ignoreExceptions;
117126
this.sentinelNodes = builder.sentinelNodes;
118127
this.sentinelMaster = builder.sentinelMaster;
128+
this.command = builder.command;
119129
this.poolConfig = builder.poolConfig;
120130
this.throttler = new RedisThrottler(builder.getThrottlerConfig(), this, ignoreExceptions);
121131
}
@@ -157,7 +167,18 @@ public State getState() {
157167
void consumeThrottledEvents(byte[]... events) {
158168
LOGGER.debug("{} consuming {} events", logPrefix, events.length);
159169
try (Jedis jedis = jedisPool.getResource()) {
160-
jedis.rpush(keyBytes, events);
170+
Arrays.stream(events).forEachOrdered(event -> sendEvent(jedis, event));
171+
}
172+
}
173+
174+
private void sendEvent(final Jedis jedis, final byte[] event) {
175+
if (RPUSH_COMMAND.equals(command)) {
176+
jedis.rpush(keyBytes, event);
177+
} else if (PUBLISH_COMMAND.equals(command)) {
178+
jedis.publish(keyBytes, event);
179+
} else {
180+
String message = String.format("unknown command: `%s`", command);
181+
throw new IllegalArgumentException(message);
161182
}
162183
}
163184

@@ -337,7 +358,7 @@ public static class Builder implements org.apache.logging.log4j.core.util.Builde
337358
private String host = "localhost";
338359

339360
@PluginBuilderAttribute
340-
private String username = null;
361+
private String username = "default";
341362

342363
@PluginBuilderAttribute
343364
private String password = null;
@@ -366,6 +387,9 @@ public static class Builder implements org.apache.logging.log4j.core.util.Builde
366387
@PluginElement("RedisThrottlerConfig")
367388
private RedisThrottlerConfig throttlerConfig = RedisThrottlerConfig.newBuilder().build();
368389

390+
@PluginBuilderAttribute
391+
private String command = RPUSH_COMMAND;
392+
369393
private Builder() {
370394
// Do nothing.
371395
}
@@ -537,6 +561,7 @@ private void check() {
537561
requireArgument(socketTimeoutSeconds > 0, "expecting: socketTimeoutSeconds > 0, found: %d", socketTimeoutSeconds);
538562
requireNonNull(poolConfig, "poolConfig");
539563
requireNonNull(throttlerConfig, "throttlerConfig");
564+
requireArgument(ALLOWED_COMMANDS.contains(command), "expecting: anyOf %s, found: %s", ALLOWED_COMMANDS, command);
540565
}
541566

542567
@Override
@@ -546,6 +571,8 @@ public String toString() {
546571
", layout='" + layout + '\'' +
547572
", database=" + database +
548573
", key='" + key + '\'' +
574+
", command='" + command + '\'' +
575+
", username='" + username + '\'' +
549576
", host='" + host + '\'' +
550577
", port=" + port +
551578
", connectionTimeoutSeconds=" + connectionTimeoutSeconds +
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Copyright 2017-2024 Volkan Yazıcı
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permits and
14+
* limitations under the License.
15+
*/
16+
package com.vlkan.log4j2.redis.appender;
17+
18+
import org.apache.logging.log4j.Logger;
19+
import org.apache.logging.log4j.core.Appender;
20+
import org.apache.logging.log4j.status.StatusLogger;
21+
import org.assertj.core.api.Assertions;
22+
import org.awaitility.Awaitility;
23+
import org.junit.jupiter.api.Order;
24+
import org.junit.jupiter.api.Test;
25+
import org.junit.jupiter.api.extension.RegisterExtension;
26+
27+
import java.time.Duration;
28+
29+
import static com.vlkan.log4j2.redis.appender.RedisTestConstants.RANDOM;
30+
31+
class RedisAppenderPubSubTest {
32+
33+
private static final Logger LOGGER = StatusLogger.getLogger();
34+
35+
private static final String CLASS_NAME = RedisAppenderPubSubTest.class.getSimpleName();
36+
37+
private static final String LOGGER_PREFIX = "[" + CLASS_NAME + "]";
38+
39+
private final String redisHost = NetworkUtils.localHostName();
40+
41+
private final int redisPort = NetworkUtils.findUnusedPort(redisHost);
42+
43+
private final String redisPassword = String.format("%s-RedisPassword-%s:%d", CLASS_NAME, redisHost, redisPort);
44+
45+
private final String redisUsername = String.format("%s-RedisUsername-%s", CLASS_NAME, redisHost);
46+
47+
private final String redisKey = String.format("%s-RedisKey-%s:%d", CLASS_NAME, redisHost, redisPort);
48+
49+
private final String redisAppenderName = String.format("%s-RedisAppender-%s-%d", CLASS_NAME, redisHost, redisPort);
50+
51+
@Order(1)
52+
@RegisterExtension
53+
final RedisServerExtension redisServerExtension = new RedisServerExtension(redisPort, redisUsername, redisPassword);
54+
55+
@Order(2)
56+
@RegisterExtension
57+
final RedisClientExtension redisClientExtension = new RedisClientExtension(redisHost, redisPort, redisUsername, redisPassword);
58+
59+
@Order(3)
60+
@RegisterExtension
61+
final RedisSubscriberExtension redisSubscriberExtension = new RedisSubscriberExtension(redisClientExtension.getClient(), redisKey);
62+
63+
@Order(4)
64+
@RegisterExtension
65+
final LoggerContextExtension loggerContextExtension =
66+
new LoggerContextExtension(
67+
CLASS_NAME,
68+
redisAppenderName,
69+
configBuilder -> configBuilder.add(configBuilder
70+
.newAppender(redisAppenderName, "RedisAppender")
71+
.addAttribute("host", redisHost)
72+
.addAttribute("port", redisPort)
73+
.addAttribute("username", redisUsername)
74+
.addAttribute("password", redisPassword)
75+
.addAttribute("key", redisKey)
76+
.addAttribute("ignoreExceptions", false)
77+
.addAttribute("command", "publish")
78+
.add(configBuilder
79+
.newLayout("PatternLayout")
80+
.addAttribute("pattern", "%level %m"))
81+
.addComponent(configBuilder
82+
.newComponent("RedisThrottlerConfig")
83+
// Batch size needs to be greater than 1, so that we can observe a partially filled batch push.
84+
.addAttribute("batchSize", 10)
85+
.addAttribute("bufferSize", 100)
86+
.addAttribute("flushPeriodMillis", 500L)
87+
.addAttribute("maxEventCountPerSecond", 35)
88+
.addAttribute("maxErrorCountPerSecond", 0)
89+
.addAttribute("maxByteCountPerSecond", 4194304))));
90+
91+
@Test
92+
void appended_messages_should_be_persisted() {
93+
94+
// Create the logger.
95+
LOGGER.debug("creating the logger");
96+
Logger logger = loggerContextExtension
97+
.getLoggerContext()
98+
.getLogger(RedisAppenderPubSubTest.class.getCanonicalName());
99+
100+
// Create and log the messages.
101+
int minMessageCount = 1;
102+
int maxMessageCount = 100;
103+
int expectedMessageCount = minMessageCount + RANDOM.nextInt(maxMessageCount - minMessageCount);
104+
105+
LOGGER.debug("{} logging {} messages", LOGGER_PREFIX, expectedMessageCount);
106+
RedisTestMessage[] expectedLogMessages = RedisTestMessage.createRandomArray(expectedMessageCount);
107+
for (RedisTestMessage expectedLogMessage : expectedLogMessages) {
108+
logger.log(expectedLogMessage.level, expectedLogMessage.message);
109+
}
110+
111+
// Verify the logging.
112+
verifyLogging(expectedLogMessages, expectedMessageCount, expectedMessageCount);
113+
}
114+
115+
@Test
116+
void throttler_should_not_flush_same_content_twice() {
117+
118+
// Create the logger.
119+
LOGGER.debug("{} creating the logger", LOGGER_PREFIX);
120+
Logger logger = loggerContextExtension
121+
.getLoggerContext()
122+
.getLogger(RedisAppenderPubSubTest.class);
123+
124+
// Log the 1st message.
125+
RedisTestMessage[] expectedLogMessages1 = RedisTestMessage.createRandomArray(1);
126+
logger.log(expectedLogMessages1[0].level, expectedLogMessages1[0].message);
127+
128+
// Verify the 1st message persistence.
129+
verifyLogging(expectedLogMessages1, 1, 1);
130+
131+
// Log the 2nd message.
132+
RedisTestMessage[] expectedLogMessages2 = RedisTestMessage.createRandomArray(1);
133+
logger.log(expectedLogMessages2[0].level, expectedLogMessages2[0].message);
134+
135+
// Verify the 2nd message persistence.
136+
verifyLogging(expectedLogMessages2, 2, 2);
137+
138+
}
139+
140+
private void verifyLogging(
141+
RedisTestMessage[] expectedLogMessages,
142+
int expectedTotalEventCount,
143+
int expectedRedisPushSuccessCount) {
144+
145+
// Verify the amount of persisted messages.
146+
LOGGER.debug("{} waiting for the logged messages to be persisted", LOGGER_PREFIX);
147+
Awaitility
148+
.await()
149+
.pollDelay(Duration.ofMillis(100))
150+
.atMost(Duration.ofSeconds(10))
151+
.until(() -> {
152+
long persistedMessageCount = redisSubscriberExtension.messageCount();
153+
Assertions.assertThat(persistedMessageCount).isLessThanOrEqualTo(expectedTotalEventCount);
154+
return persistedMessageCount == expectedTotalEventCount;
155+
});
156+
157+
// Verify the content of persisted messages.
158+
LOGGER.debug("{} verifying the content of persisted messages", LOGGER_PREFIX);
159+
for (int messageIndex = 0; messageIndex < expectedLogMessages.length; messageIndex++) {
160+
RedisTestMessage expectedLogMessage = expectedLogMessages[messageIndex];
161+
String expectedSerializedMessage = String.format(
162+
"%s %s",
163+
expectedLogMessage.level,
164+
expectedLogMessage.message);
165+
String actualSerializedMessage = redisSubscriberExtension.nextMessage();
166+
try {
167+
Assertions.assertThat(actualSerializedMessage).isEqualTo(expectedSerializedMessage);
168+
} catch (AssertionError error) {
169+
String message = String.format(
170+
"comparison failure (messageIndex=%d, messageCount=%d)",
171+
messageIndex,
172+
expectedTotalEventCount);
173+
throw new AssertionError(message, error);
174+
}
175+
}
176+
177+
// Verify the throttler counters.
178+
Appender appender = loggerContextExtension
179+
.getConfig()
180+
.getAppender(redisAppenderName);
181+
Assertions.assertThat(appender).isInstanceOf(RedisAppender.class);
182+
RedisThrottlerJmxBean jmxBean = ((RedisAppender) appender).getJmxBean();
183+
Assertions.assertThat(jmxBean.getTotalEventCount()).isEqualTo(expectedTotalEventCount);
184+
Assertions.assertThat(jmxBean.getIgnoredEventCount()).isEqualTo(0);
185+
Assertions.assertThat(jmxBean.getEventRateLimitFailureCount()).isEqualTo(0);
186+
Assertions.assertThat(jmxBean.getByteRateLimitFailureCount()).isEqualTo(0);
187+
Assertions.assertThat(jmxBean.getUnavailableBufferSpaceFailureCount()).isEqualTo(0);
188+
Assertions.assertThat(jmxBean.getRedisPushSuccessCount()).isEqualTo(expectedRedisPushSuccessCount);
189+
Assertions.assertThat(jmxBean.getRedisPushFailureCount()).isEqualTo(0);
190+
}
191+
192+
}

appender/src/test/java/com/vlkan/log4j2/redis/appender/RedisServerExtension.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ class RedisServerExtension implements BeforeEachCallback, AfterEachCallback {
3737
.builder()
3838
.port(port)
3939
.bind("0.0.0.0")
40-
.setting("user " + username + " on -debug +@all ~* >" + password)
40+
// Add user, grant access rights, and set password: https://redis.io/docs/latest/operate/oss_and_stack/management/security/acl
41+
.setting("user " + username + " on -debug +@all allkeys allchannels >" + password)
4142
.build();
4243
} catch (Exception error) {
4344
String message = String.format("failed creating Redis server (port=%d)", port);

0 commit comments

Comments
 (0)